#!/usr/bin/env python3 """Manage C2C active event state for heartbeat workflows.""" from __future__ import annotations import argparse import json from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from typing import Any DEFAULT_STATE_PATH = Path("~/.c2c/active_event.json").expanduser() @dataclass class EventState: event_id: str expires_at: int checked_in_at: str @classmethod def from_dict(cls, data: dict[str, Any]) -> "EventState": return cls( event_id=data["eventId"], expires_at=int(data["expiresAt"]), checked_in_at=data.get("checkedInAt") or datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z"), ) def to_dict(self) -> dict[str, Any]: return { "eventId": self.event_id, "expiresAt": self.expires_at, "checkedInAt": self.checked_in_at, } def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--path", default=str(DEFAULT_STATE_PATH), help="State file path") subparsers = parser.add_subparsers(dest="command", required=True) set_cmd = subparsers.add_parser("set", help="Persist active event state") set_cmd.add_argument("--event-id", required=True, help="Event ID") set_cmd.add_argument("--expires-at", required=True, type=int, help="Epoch millis expiration") set_cmd.add_argument("--checked-in-at", help="ISO timestamp, defaults to now") status_cmd = subparsers.add_parser("status", help="Read active event state") status_cmd.add_argument("--now-ms", type=int, help="Override current epoch millis") status_cmd.add_argument("--clear-expired", action="store_true", help="Delete file when expired") subparsers.add_parser("clear", help="Delete active event state") return parser.parse_args() def get_now_ms(now_ms: int | None = None) -> int: if now_ms is not None: return now_ms return int(datetime.now(timezone.utc).timestamp() * 1000) def read_state(path: Path) -> EventState: data = json.loads(path.read_text(encoding="utf-8")) return EventState.from_dict(data) def write_state(path: Path, state: EventState) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(state.to_dict(), indent=2) + "\n", encoding="utf-8") def print_json(data: dict[str, Any]) -> None: print(json.dumps(data, indent=2)) def run_set(args: argparse.Namespace, path: Path) -> None: checked_in_at = args.checked_in_at or datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z") state = EventState(event_id=args.event_id, expires_at=args.expires_at, checked_in_at=checked_in_at) write_state(path, state) print_json({"status": "ok", "path": str(path), "state": state.to_dict()}) def run_status(args: argparse.Namespace, path: Path) -> None: if not path.exists(): print_json({"active": False, "reason": "missing_state_file", "path": str(path)}) return state = read_state(path) now_ms = get_now_ms(args.now_ms) expired = state.expires_at <= now_ms if expired and args.clear_expired: path.unlink(missing_ok=True) print_json( { "active": not expired, "expired": expired, "path": str(path), "nowMs": now_ms, "state": state.to_dict(), "action": "cleared" if expired and args.clear_expired else "none", } ) def run_clear(path: Path) -> None: existed = path.exists() path.unlink(missing_ok=True) print_json({"status": "ok", "path": str(path), "deleted": existed}) def main() -> None: args = parse_args() path = Path(args.path).expanduser() if args.command == "set": run_set(args, path) elif args.command == "status": run_status(args, path) elif args.command == "clear": run_clear(path) else: raise ValueError(f"Unsupported command: {args.command}") if __name__ == "__main__": main()