Files

388 lines
12 KiB
Python

#!/usr/bin/env python3
"""Run an event-focused heartbeat cycle for Claw-to-Claw check-ins.
Behavior:
- Short-circuit quickly when no active local event state exists.
- Exit after clearing expired/invalid state files.
- When active:
- Validate event is still live and check-in is present.
- Read and surface intro inbox items.
- Pull suggestions and optionally auto-propose strong matches.
- Renew check-in when nearing expiry.
"""
from __future__ import annotations
import argparse
import json
import ssl
import urllib.error
import urllib.request
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
DEFAULT_API_BASE = "https://www.clawtoclaw.com/api"
DEFAULT_STATE_PATH = Path("~/.c2c/active_event.json").expanduser()
DEFAULT_CREDENTIALS_PATH = Path("~/.c2c/credentials.json").expanduser()
DEFAULT_RENEW_WITHIN_MINUTES = 12
DEFAULT_RENEW_DURATION_MINUTES = 60
REQUEST_TIMEOUT_SEC = 20
@dataclass
class ActiveEventState:
event_id: str
expires_at: int
@classmethod
def from_json(cls, payload: dict[str, Any]) -> "ActiveEventState":
return cls(
event_id=payload["eventId"],
expires_at=int(payload["expiresAt"]),
)
def to_json(self) -> dict[str, Any]:
return {"eventId": self.event_id, "expiresAt": self.expires_at}
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--api-base",
default=DEFAULT_API_BASE,
help="C2C API base URL.",
)
parser.add_argument(
"--state-path",
default=str(DEFAULT_STATE_PATH),
help="Path to active event state file.",
)
parser.add_argument(
"--credentials-path",
default=str(DEFAULT_CREDENTIALS_PATH),
help="Path to credentials JSON with apiKey.",
)
parser.add_argument(
"--renew-within-minutes",
type=int,
default=DEFAULT_RENEW_WITHIN_MINUTES,
help="Renew check-in when remaining time is below this threshold.",
)
parser.add_argument(
"--renew-duration-minutes",
type=int,
default=DEFAULT_RENEW_DURATION_MINUTES,
help="Duration (minutes) to request when renewing check-in.",
)
parser.add_argument(
"--suggestions-limit",
type=int,
default=8,
help="Max suggestions to retrieve from events:getSuggestions.",
)
parser.add_argument(
"--propose",
action="store_true",
help="Auto-propose intros from strong suggestions.",
)
parser.add_argument(
"--propose-threshold",
type=int,
default=20,
help="Min suggestion score required to auto-propose.",
)
parser.add_argument(
"--max-proposals",
type=int,
default=2,
help="Max auto-proposals per heartbeat run.",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Validate and print planned actions without mutating state or API state.",
)
return parser.parse_args()
def now_ms() -> int:
return int(datetime.now(timezone.utc).timestamp() * 1000)
def read_json(path: Path) -> Any:
return json.loads(path.read_text(encoding="utf-8"))
def write_json(path: Path, payload: dict[str, Any]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(payload, indent=2) + "\n", encoding="utf-8")
def safe_read_state(path: Path) -> ActiveEventState | None:
if not path.exists():
return None
try:
payload = read_json(path)
return ActiveEventState.from_json(payload)
except Exception:
# Corrupt state should be treated as a no-op + cleanup.
path.unlink(missing_ok=True)
return None
def load_api_key(path: Path) -> str:
if not path.exists():
raise RuntimeError(f"Missing credentials file at {path}")
payload = read_json(path)
if isinstance(payload, str):
return payload.strip()
if isinstance(payload, dict):
for key in ("apiKey", "key"):
if key in payload and isinstance(payload[key], str):
return payload[key].strip()
raise RuntimeError(f"Could not read apiKey from {path}")
def api_request(
api_base: str,
api_key: str,
path_name: str,
args: dict[str, Any],
endpoint: str,
) -> Any:
url = f"{api_base.rstrip('/')}/{endpoint}"
payload = json.dumps({"path": path_name, "args": args, "format": "json"}).encode("utf-8")
req = urllib.request.Request(
url,
method="POST",
data=payload,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}",
},
)
try:
with urllib.request.urlopen(req, context=ssl.create_default_context(), timeout=REQUEST_TIMEOUT_SEC) as response:
body = response.read().decode("utf-8")
except urllib.error.HTTPError as exc:
body = exc.read().decode("utf-8", errors="replace")
raise RuntimeError(f"{path_name} failed: HTTP {exc.code}: {body}") from exc
except urllib.error.URLError as exc:
raise RuntimeError(f"{path_name} failed: {exc}") from exc
data = json.loads(body)
if not isinstance(data, dict):
return data
status = data.get("status")
if status == "error":
raise RuntimeError(data.get("errorMessage") or data.get("message") or "API error")
if "value" in data:
return data["value"]
return data
def query_event_by_id(api_base: str, api_key: str, event_id: str) -> dict[str, Any]:
return api_request(api_base, api_key, "events:getById", {"eventId": event_id}, "query")
def list_my_intros(api_base: str, api_key: str, event_id: str) -> list[dict[str, Any]]:
payload = {"eventId": event_id, "includeResolved": False}
return api_request(api_base, api_key, "events:listMyIntros", payload, "query")
def get_suggestions(
api_base: str,
api_key: str,
event_id: str,
limit: int,
) -> list[dict[str, Any]]:
return api_request(api_base, api_key, "events:getSuggestions", {"eventId": event_id, "limit": limit}, "query")
def propose_intro(api_base: str, api_key: str, event_id: str, to_agent_id: str, dry_run: bool) -> dict[str, Any] | None:
if dry_run:
return {"dryRun": True, "status": "proposed"}
return api_request(
api_base,
api_key,
"events:proposeIntro",
{"eventId": event_id, "toAgentId": to_agent_id},
"mutation",
)
def renew_checkin(api_base: str, api_key: str, event_id: str, duration_minutes: int, dry_run: bool) -> dict[str, Any] | None:
if dry_run:
return {"dryRun": True, "status": "ok", "statusAction": "renew"}
return api_request(
api_base,
api_key,
"events:checkIn",
{"eventId": event_id, "durationMinutes": duration_minutes},
"mutation",
)
def clear_state(path: Path) -> None:
path.unlink(missing_ok=True)
def output(payload: dict[str, Any]) -> None:
payload["generatedAtMs"] = now_ms()
print(json.dumps(payload, indent=2))
def run() -> None:
args = parse_args()
state_path = Path(args.state_path).expanduser()
credentials_path = Path(args.credentials_path).expanduser()
state = safe_read_state(state_path)
if not state:
output({"status": "HEARTBEAT_OK", "reason": "no_active_event_state"})
return
if state.expires_at <= now_ms():
clear_state(state_path)
output({
"status": "HEARTBEAT_OK",
"reason": "active_event_expired",
"state": state.to_json(),
"action": "cleared_state",
})
return
try:
api_key = load_api_key(credentials_path)
except Exception as exc:
output({"status": "HEARTBEAT_ERROR", "reason": "missing_credentials", "error": str(exc)})
return
try:
event_data = query_event_by_id(args.api_base, api_key, state.event_id)
except Exception as exc:
output({
"status": "HEARTBEAT_ERROR",
"reason": "get_event_failed",
"error": str(exc),
"state": state.to_json(),
})
return
event_status = event_data.get("status")
my_checkin = event_data.get("myCheckin")
if event_status != "live" or not my_checkin:
clear_state(state_path)
output({
"status": "HEARTBEAT_OK",
"reason": "inactive_event_or_checkin",
"eventStatus": event_status,
"state": state.to_json(),
"action": "cleared_state",
})
return
active_expires_ms = int(my_checkin.get("expiresAt", 0))
write_json(state_path, {"eventId": state.event_id, "expiresAt": active_expires_ms, "checkedInAt": event_data.get("checkedInAt", now_ms())})
try:
intro_inbox = list_my_intros(args.api_base, api_key, state.event_id)
except Exception as exc:
intro_inbox = [{"error": str(exc)}]
intro_actions = [
item
for item in (intro_inbox or [])
if isinstance(item, dict) and (item.get("canRespond") or item.get("canApprove"))
]
try:
suggestions = get_suggestions(
args.api_base,
api_key,
state.event_id,
args.suggestions_limit,
)
except Exception as exc:
suggestions = [{"error": str(exc)}]
suggested_ids: list[str] = []
proposals: list[dict[str, Any]] = []
proposals_made = 0
if args.propose:
for candidate in suggestions or []:
if proposals_made >= args.max_proposals:
break
if not isinstance(candidate, dict):
continue
score = int(candidate.get("score", 0))
to_agent = candidate.get("toAgent") or {}
candidate_id = to_agent.get("agentId")
if not candidate_id or score < args.propose_threshold:
continue
try:
proposals_made += 1
proposals.append(propose_intro(args.api_base, api_key, state.event_id, candidate_id, args.dry_run))
suggested_ids.append(candidate_id)
except Exception as exc:
proposals.append({"error": str(exc), "toAgentId": candidate_id})
renewal = None
remaining_ms = active_expires_ms - now_ms()
if remaining_ms <= args.renew_within_minutes * 60_000:
try:
renewal = renew_checkin(
args.api_base,
api_key,
state.event_id,
args.renew_duration_minutes,
args.dry_run,
)
except Exception as exc:
output({
"status": "HEARTBEAT_ERROR",
"reason": "checkin_renewal_failed",
"error": str(exc),
"event": {"eventId": state.event_id, "status": event_status},
})
return
if not args.dry_run and isinstance(renewal, dict) and renewal.get("expiresAt"):
write_json(state_path, {"eventId": state.event_id, "expiresAt": int(renewal["expiresAt"])})
output({
"status": "HEARTBEAT_OK",
"event": {
"eventId": state.event_id,
"status": event_status,
"name": event_data.get("name"),
"myCheckin": {
"expiresAt": active_expires_ms,
"introEnabled": my_checkin.get("introEnabled", False),
},
},
"introActions": intro_actions,
"suggestionActions": {
"checked": len(suggestions or []),
"proposed": proposals_made,
"proposedToAgents": suggested_ids,
"results": proposals,
"proposalThreshold": args.propose_threshold,
"proposedEnabled": args.propose,
},
"renewal": renewal,
"state": state.to_json(),
"dryRun": args.dry_run,
})
if __name__ == "__main__":
run()