Files

664 lines
23 KiB
Python

#!/usr/bin/env python3
import argparse
import hashlib
import json
import subprocess
from datetime import datetime, timezone
from pathlib import Path
ROOT = Path(__file__).resolve().parent
DOC_UPDATE_FILES = [
"docs/tasks.md",
"docs/change-log.md",
"docs/traceability.md",
"docs/test-results.md",
"docs/progress.md",
"docs/agent-handoff.md",
]
BROWNFIELD_ONBOARD_FILES = [
"docs/as-is-architecture.md",
"docs/system-inventory.md",
"docs/dependency-map.md",
"docs/legacy-risk-register.md",
"docs/compatibility-matrix.md",
"docs/migration-plan.md",
"docs/characterization-tests.md",
]
ASSUMPTION_MARKERS = [
"i assumed",
"we assumed",
"assumed that",
"probably",
"likely",
"guessed",
"defaulted to",
"for convenience",
]
def run(cmd, cwd=None):
p = subprocess.run(cmd, cwd=cwd, text=True, capture_output=True)
if p.stdout:
print(p.stdout.strip())
if p.returncode != 0:
if p.stderr:
print(p.stderr.strip())
raise SystemExit(p.returncode)
def run_capture(cmd, cwd=None):
p = subprocess.run(cmd, cwd=cwd, text=True, capture_output=True)
out = (p.stdout or "") + ("\n" + p.stderr if p.stderr else "")
return p.returncode, out.strip()
def file_hash(path: Path) -> str:
if not path.exists():
return "MISSING"
h = hashlib.sha256()
h.update(path.read_bytes())
return h.hexdigest()
def snapshot_files(root: Path, files: list[str]):
snap = {}
for rel in files:
snap[rel] = file_hash(root / rel)
return snap
def changed_files(root: Path, files: list[str], before: dict):
changed = []
for rel in files:
if before.get(rel) != file_hash(root / rel):
changed.append(rel)
return changed
def load_context(root: Path):
p = root / ".orchestrator" / "context.json"
if not p.exists():
raise SystemExit(f"Missing context file: {p}. Run init_project_docs.py first.")
return p, json.loads(p.read_text(encoding="utf-8"))
def save_context(path: Path, ctx):
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(ctx, indent=2), encoding="utf-8")
def append_validation_log(root: Path, gate: str, lines: list[str]):
log_path = root / "docs" / "validation-log.md"
log_path.parent.mkdir(parents=True, exist_ok=True)
with log_path.open("a", encoding="utf-8") as f:
f.write(f"\n## {gate} Validation\n")
for line in lines:
f.write(f"- {line}\n")
def set_gate_state(root: Path, gate: str, state: str, note: str):
run(
[
"python3",
str(ROOT / "gate_status.py"),
"set",
"--root",
str(root),
"--gate",
gate,
"--state",
state,
"--note",
note,
]
)
def check_g4_task_plan(root: Path):
path = root / "docs" / "g4-task-plan.md"
if not path.exists():
raise SystemExit("G4 requires docs/g4-task-plan.md with task breakdown.")
text = path.read_text(encoding="utf-8", errors="ignore")
if "[ ]" not in text and "[x]" not in text and "[X]" not in text:
raise SystemExit("G4 task plan must use checklist format ([ ] and [x]).")
def check_g4_ready_for_pass(root: Path):
path = root / "docs" / "g4-task-plan.md"
text = path.read_text(encoding="utf-8", errors="ignore")
if "[ ]" in text:
raise SystemExit("Cannot mark G4 PASS while unchecked tasks remain in docs/g4-task-plan.md")
def build_fix_prompt(
proj_root: Path,
gate: str,
task: str,
spec_ref: str,
failing_cmd: str,
failure_output: str,
retry_num: int,
max_retries: int,
):
prompt_path = proj_root / "docs" / f"prompt-{gate}-fix-{retry_num}.txt"
safe_out = (failure_output or "(no output)")[:1200]
spec_line = spec_ref if spec_ref else "requirements.md#relevant-section"
prompt = f"""You are fixing Gate {gate} after validation failure.
## SPEC-DRIVEN RULES (NON-NEGOTIABLE)
1. Implement ONLY what is specified.
2. Do NOT add unrequested features.
3. Do NOT guess at requirements.
4. If ambiguity remains, document open questions and stop.
## TASK CONTEXT
- Task: {task}
- Spec reference: {spec_line}
- Retry attempt: {retry_num}/{max_retries}
## FAILURE TO FIX
- Command: {failing_cmd}
- Output:
{safe_out}
## REQUIRED ACTIONS
1. Fix the concrete failure above.
2. Re-run the relevant local checks you can run.
3. Update these docs yourself:
- docs/tasks.md
- docs/progress.md
- docs/change-log.md
- docs/traceability.md
- docs/test-results.md
- docs/agent-handoff.md
4. In docs/agent-handoff.md include:
- What you changed
- Why it failed
- Exact CLI checks for OpenClaw agent to run
- Exact browser checks for OpenClaw agent to run (or N/A)
When fully done, run:
openclaw gateway wake --text "Done: {gate} fix attempt {retry_num} complete | verify: docs/agent-handoff.md" --mode now
"""
prompt_path.write_text(prompt, encoding="utf-8")
return prompt_path
def execute_agent(agent_cmd_base: list[str], prompt_file: str):
cmd = list(agent_cmd_base) + ["--prompt-file", str(prompt_file)]
run(cmd)
def run_validation_commands(validate_cmds: list[str], proj_root: Path):
lines = []
for cmd in validate_cmds:
code, out = run_capture(["bash", "-lc", cmd], cwd=str(proj_root))
snippet = (out[:500] + "...") if len(out) > 500 else out
if code != 0:
lines.append(f"FAIL `{cmd}` exit={code}")
if snippet:
lines.append(f"Output: {snippet}")
return False, lines, cmd, snippet
lines.append(f"PASS `{cmd}`")
if snippet:
lines.append(f"Output: {snippet}")
return True, lines, "", ""
def check_assumption_markers(proj_root: Path):
handoff = proj_root / "docs" / "agent-handoff.md"
if not handoff.exists():
return True, []
text = handoff.read_text(encoding="utf-8", errors="ignore").lower()
hits = [m for m in ASSUMPTION_MARKERS if m in text]
if hits:
return False, hits
return True, []
def parse_spec_acceptance_criteria(spec_file: Path):
if not spec_file.exists():
return []
lines = spec_file.read_text(encoding="utf-8", errors="ignore").splitlines()
ids = []
for line in lines:
s = line.strip()
if s.startswith("AC-") and ":" in s:
ids.append(s.split(":", 1)[0].strip())
return sorted(set(ids))
def collect_g4_task_spec_refs(root: Path):
path = root / "docs" / "g4-task-plan.md"
if not path.exists():
return []
refs = []
for line in path.read_text(encoding="utf-8", errors="ignore").splitlines():
s = line.strip()
if not s.startswith("-"):
continue
marker = "(Spec: "
if marker in s and ")" in s.split(marker, 1)[1]:
ref = s.split(marker, 1)[1].split(")", 1)[0].strip()
if ref:
refs.append(ref)
return refs
def validate_spec_coverage_for_g4(root: Path, spec_ref: str):
refs = collect_g4_task_spec_refs(root)
if not refs:
return False, "G4 spec coverage check failed: docs/g4-task-plan.md has no '(Spec: ...)' references."
if spec_ref and spec_ref not in refs:
return False, f"G4 spec coverage check failed: {spec_ref} not present in docs/g4-task-plan.md task refs."
return True, ""
def validate_ac_mapping(proj_root: Path, spec_ref: str):
if not spec_ref:
return True, []
spec_rel = spec_ref.split("#", 1)[0]
spec_file = proj_root / "docs" / spec_rel
ac_ids = parse_spec_acceptance_criteria(spec_file)
if not ac_ids:
return True, []
handoff = proj_root / "docs" / "agent-handoff.md"
if not handoff.exists():
return False, ["AC mapping check failed: docs/agent-handoff.md missing."]
text = handoff.read_text(encoding="utf-8", errors="ignore")
missing = [ac for ac in ac_ids if ac not in text]
if missing:
return False, ["AC mapping missing in agent handoff: " + ", ".join(missing)]
return True, []
def get_git_changed_files(proj_root: Path):
code, out = run_capture(["git", "status", "--porcelain"], cwd=str(proj_root))
if code != 0:
return []
changed = []
for line in out.splitlines():
if not line.strip():
continue
path = line[3:].strip() if len(line) > 3 else ""
if path:
changed.append(path)
return changed
def parse_allowed_scope_from_spec(spec_file: Path):
if not spec_file.exists():
return []
lines = spec_file.read_text(encoding="utf-8", errors="ignore").splitlines()
allowed = []
capture = False
for line in lines:
s = line.strip()
lower = s.lower()
if lower.startswith("##") and "allowed scope files" in lower:
capture = True
continue
if capture and s.startswith("##") and "allowed scope files" not in lower:
break
if capture and s.startswith("-"):
candidate = s.lstrip("-").strip().strip("`")
if candidate:
allowed.append(candidate)
return allowed
def validate_drift_against_spec(proj_root: Path, spec_ref: str):
if not spec_ref:
return True, []
spec_rel = spec_ref.split("#", 1)[0]
spec_file = proj_root / "docs" / spec_rel
allowed = parse_allowed_scope_from_spec(spec_file)
if not allowed:
return True, []
changed = get_git_changed_files(proj_root)
if not changed:
return True, []
violations = []
for path in changed:
if path.startswith("docs/") or path.startswith(".orchestrator/"):
continue
if not any(path == a or path.startswith(a.rstrip("/") + "/") for a in allowed):
violations.append(path)
if violations:
return False, ["Spec drift detected (changed outside allowed scope): " + ", ".join(sorted(set(violations)))]
return True, []
def write_validation_artifact(
proj_root: Path,
gate: str,
task: str,
spec_ref: str,
validate_cmds: list[str],
validation_lines: list[str],
ui_review_note: str,
status: str,
):
ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
out_dir = proj_root / "docs" / "validation-artifacts"
out_dir.mkdir(parents=True, exist_ok=True)
path = out_dir / f"{gate}-{ts}.json"
payload = {
"timestamp": ts,
"gate": gate,
"task": task,
"specRef": spec_ref,
"status": status,
"validateCmds": validate_cmds,
"uiReviewNote": ui_review_note,
"results": validation_lines,
}
path.write_text(json.dumps(payload, indent=2), encoding="utf-8")
return path
def main():
parser = argparse.ArgumentParser(description="Single-task gate runner (agent-executed, evidence-driven)")
parser.add_argument("--root", default=".", help="Project root")
parser.add_argument("--gate", required=True, choices=["G1", "G2", "G3", "G4", "G5", "G6", "G7"])
parser.add_argument(
"--agent",
required=True,
choices=["codex", "claude", "opencode", "pi"],
help="Coding agent to execute implementation work",
)
parser.add_argument(
"--fallback-agent",
choices=["codex", "claude", "opencode", "pi"],
help="Fallback coding agent (required first time if context has no fallback)",
)
parser.add_argument("--project-mode", choices=["greenfield", "brownfield"], default="greenfield")
parser.add_argument("--execution-mode", choices=["autonomous", "gated"], default="gated")
parser.add_argument("--research-mode", choices=["true", "false"], default="false")
parser.add_argument("--task", required=True, help="Single task summary (one task per run)")
parser.add_argument("--evidence", default="", help="Evidence summary")
parser.add_argument(
"--status",
default="IN_PROGRESS",
choices=["IN_PROGRESS", "PASS", "FAIL", "BLOCKED"],
help="Gate status to set for this run",
)
parser.add_argument("--prompt-out", help="Optional prompt output path")
parser.add_argument("--full-auto", action="store_true", help="Pass full-auto to codex in agent_exec")
parser.add_argument("--agent-dry-run", action="store_true", help="Print coding-agent command without executing it")
parser.add_argument(
"--validate-cmd",
action="append",
default=[],
help="Validation command to run after agent execution (repeatable). Required when status=PASS.",
)
parser.add_argument(
"--ui-review-note",
default="",
help="Manual browser/UI verification notes produced by OpenClaw agent after checks.",
)
parser.add_argument(
"--requires-browser-check",
action="store_true",
help="Require explicit browser/manual review note for this task.",
)
parser.add_argument(
"--spec-ref",
default="",
help="Spec reference for this task (required for G3/G4). Format: specs/feature.md#section or requirements.md#feature",
)
parser.add_argument(
"--auto-fix-retries",
type=int,
default=2,
help="Autonomous mode: retries with fix prompts after failed validations (default: 2).",
)
parser.add_argument(
"--auto-block-on-retry-exhaust",
action="store_true",
help="When retries are exhausted, auto-set gate state to BLOCKED with failure reason before exit.",
)
args = parser.parse_args()
proj_root = Path(args.root).resolve()
prompt_out = args.prompt_out or str(proj_root / "docs" / f"prompt-{args.gate}.txt")
# Context + agent persistence
ctx_path, ctx = load_context(proj_root)
ctx.setdefault("primaryAgent", "")
ctx.setdefault("fallbackAgent", "")
if not ctx["primaryAgent"]:
ctx["primaryAgent"] = args.agent
elif ctx["primaryAgent"] != args.agent:
print(f"[warn] overriding primaryAgent from {ctx['primaryAgent']} to {args.agent}")
ctx["primaryAgent"] = args.agent
if not ctx["fallbackAgent"]:
if not args.fallback_agent:
raise SystemExit("Missing fallback agent. Provide --fallback-agent for first run.")
ctx["fallbackAgent"] = args.fallback_agent
elif args.fallback_agent and args.fallback_agent != ctx["fallbackAgent"]:
print(f"[warn] overriding fallbackAgent from {ctx['fallbackAgent']} to {args.fallback_agent}")
ctx["fallbackAgent"] = args.fallback_agent
ctx["projectMode"] = args.project_mode
ctx["executionMode"] = args.execution_mode
ctx["researchMode"] = True if args.research_mode == "true" else False
save_context(ctx_path, ctx)
# Evidence and manual-check policy
if args.status == "PASS" and not args.validate_cmd:
raise SystemExit("status=PASS requires at least one --validate-cmd to prove working behavior.")
if args.status == "PASS" and args.agent_dry_run:
raise SystemExit("status=PASS is not allowed with --agent-dry-run. Execute the coding agent for real.")
# Every task should include validation activity (CLI and/or browser)
if args.status in ("IN_PROGRESS", "PASS") and not args.validate_cmd and not args.ui_review_note:
raise SystemExit(
"Each task must include post-task validation evidence. Provide --validate-cmd and/or --ui-review-note."
)
if args.requires_browser_check and not args.ui_review_note:
raise SystemExit("This task requires browser checks. Provide --ui-review-note after manual browser validation.")
# Spec-driven enforcement: G3/G4 require spec reference
if args.gate in ("G3", "G4") and not args.spec_ref:
raise SystemExit(
f"{args.gate} requires --spec-ref. No implementation without a spec.\n"
"Format: --spec-ref specs/feature.md#section or --spec-ref requirements.md#feature"
)
# Verify spec file exists
if args.spec_ref:
spec_path = args.spec_ref.split("#")[0]
full_spec_path = proj_root / "docs" / spec_path
if not full_spec_path.exists():
raise SystemExit(f"Spec file not found: {full_spec_path}. Create the spec before implementation.")
if args.gate == "G4":
check_g4_task_plan(proj_root)
if args.status == "PASS":
check_g4_ready_for_pass(proj_root)
# 1) Generate initial gate prompt
run(
[
"python3",
str(ROOT / "generate_gate_prompt.py"),
"--gate",
args.gate,
"--agent",
args.agent,
"--project-mode",
args.project_mode,
"--execution-mode",
args.execution_mode,
"--research-mode",
args.research_mode,
"--task",
args.task,
"--spec-ref",
args.spec_ref,
"--output",
prompt_out,
]
)
# Build agent command base (prompt-file added per attempt)
agent_cmd_base = [
"python3",
str(ROOT / "agent_exec.py"),
"--root",
str(proj_root),
"--agent",
args.agent,
"--spec-ref",
args.spec_ref,
]
if args.gate in ("G3", "G4"):
agent_cmd_base.append("--enforce-spec-ref")
if args.full_auto:
agent_cmd_base.append("--full-auto")
if args.agent_dry_run:
agent_cmd_base.append("--dry-run")
max_fix_retries = args.auto_fix_retries if args.execution_mode == "autonomous" else 0
attempt = 0
current_prompt = prompt_out
all_validation_lines = []
while True:
before_docs = snapshot_files(proj_root, DOC_UPDATE_FILES)
before_brownfield = snapshot_files(proj_root, BROWNFIELD_ONBOARD_FILES)
# 2) Execute coding agent
execute_agent(agent_cmd_base, current_prompt)
# 3) Verify docs were updated by coding agent (every task, every run)
changed = changed_files(proj_root, DOC_UPDATE_FILES, before_docs)
if not args.agent_dry_run and args.status in ("IN_PROGRESS", "PASS") and not changed:
raise SystemExit(
"Coding agent did not update required docs (tasks/change-log/traceability/test-results/progress/agent-handoff). "
"Docs updates must be done by the coding agent after each task."
)
# Brownfield onboarding (G1/G2) must also be authored by coding agent
changed_brownfield = []
if args.project_mode == "brownfield" and args.gate in ("G1", "G2") and not args.agent_dry_run:
changed_brownfield = changed_files(proj_root, BROWNFIELD_ONBOARD_FILES, before_brownfield)
if not changed_brownfield:
raise SystemExit(
"Brownfield onboarding docs were not updated by the coding agent during this run. "
"Agent must update onboarding artifacts directly."
)
# 4) CLI/manual validations by orchestrator
ok, validation_lines, failing_cmd, failure_snippet = run_validation_commands(args.validate_cmd, proj_root)
assumptions_ok, assumption_hits = check_assumption_markers(proj_root)
if not assumptions_ok:
ok = False
failing_cmd = "assumption-detector:docs/agent-handoff.md"
failure_snippet = "Assumption language detected: " + ", ".join(assumption_hits)
validation_lines.append("FAIL assumption detector: " + ", ".join(assumption_hits))
if args.gate == "G4":
coverage_ok, coverage_msg = validate_spec_coverage_for_g4(proj_root, args.spec_ref)
if not coverage_ok:
ok = False
failing_cmd = "g4-spec-coverage:docs/g4-task-plan.md"
failure_snippet = coverage_msg
validation_lines.append("FAIL " + coverage_msg)
if args.gate in ("G3", "G4") and args.spec_ref:
ac_ok, ac_msgs = validate_ac_mapping(proj_root, args.spec_ref)
if not ac_ok:
ok = False
failing_cmd = "ac-mapping:docs/agent-handoff.md"
failure_snippet = " | ".join(ac_msgs)
validation_lines.extend(["FAIL " + msg for msg in ac_msgs])
drift_ok, drift_msgs = validate_drift_against_spec(proj_root, args.spec_ref)
if not drift_ok:
ok = False
failing_cmd = "spec-drift:git-status"
failure_snippet = " | ".join(drift_msgs)
validation_lines.extend(["FAIL " + msg for msg in drift_msgs])
if args.ui_review_note:
validation_lines.append(f"UI review: {args.ui_review_note}")
if changed:
validation_lines.append("Docs updated by agent: " + ", ".join(changed))
if changed_brownfield:
validation_lines.append("Brownfield onboarding docs updated by agent: " + ", ".join(changed_brownfield))
all_validation_lines.extend(validation_lines)
if ok:
break
# Validation failed: autonomous fix-retry loop
if args.agent_dry_run:
append_validation_log(proj_root, args.gate, all_validation_lines)
raise SystemExit(f"Validation failed in dry-run mode: `{failing_cmd}`")
if attempt >= max_fix_retries:
append_validation_log(proj_root, args.gate, all_validation_lines)
failure_note = (
f"Retry exhausted after {attempt} attempts. Last failed command: {failing_cmd}. "
f"Failure: {failure_snippet[:240]}"
)
if args.auto_block_on_retry_exhaust:
set_gate_state(proj_root, args.gate, "BLOCKED", failure_note)
append_validation_log(proj_root, args.gate, ["Auto-classified as BLOCKED due to retry exhaustion."])
raise SystemExit(
f"Validation failed after {attempt} fix retries. Last failed command: `{failing_cmd}`"
)
attempt += 1
all_validation_lines.append(
f"Auto-fix retry {attempt}/{max_fix_retries}: re-invoking coding agent with failure details."
)
fix_prompt_path = build_fix_prompt(
proj_root=proj_root,
gate=args.gate,
task=args.task,
spec_ref=args.spec_ref,
failing_cmd=failing_cmd,
failure_output=failure_snippet,
retry_num=attempt,
max_retries=max_fix_retries,
)
current_prompt = str(fix_prompt_path)
# 5) Persist validation evidence log + machine artifact
if all_validation_lines:
append_validation_log(proj_root, args.gate, all_validation_lines)
artifact_path = write_validation_artifact(
proj_root=proj_root,
gate=args.gate,
task=args.task,
spec_ref=args.spec_ref,
validate_cmds=args.validate_cmd,
validation_lines=all_validation_lines,
ui_review_note=args.ui_review_note,
status=args.status,
)
append_validation_log(proj_root, args.gate, [f"Validation artifact: {artifact_path.relative_to(proj_root)}"])
# 6) Gate status + dashboard
set_gate_state(proj_root, args.gate, args.status, args.task)
run(["python3", str(ROOT / "progress_dashboard.py"), "--root", str(proj_root)])
if __name__ == "__main__":
main()