"""Citee Index — Prompt Curation Pipeline orchestrator. Runs all 6 stages sequentially. Stage 7 (finalize) requires manual human review between Stage 6 and Stage 7, so this pipeline stops after Stage 6 and prints instructions for the human reviewer. Usage: python pipeline.py --category swiece-sojowe-pl --display-name "Świece sojowe PL" Or run individual stages by importing: from pipeline import run_stage run_stage(1, category, display_name) """ from __future__ import annotations import argparse import importlib.util import os import sys from pathlib import Path # Load each stage as a module STAGE_FILES = { 1: "1_persona_generator.py", 2: "2_prompt_brainstormer.py", 3: "3_reality_checker.py", 4: "4_validation_agents.py", 5: "5_pilot_test_runner.py", 6: "6_human_review_export.py", 7: "7_finalize.py", } def load_stage_module(stage_num: int): """Dynamically load a stage script (filenames start with digits, not Python-importable normally).""" stage_file = Path(__file__).parent / STAGE_FILES[stage_num] spec = importlib.util.spec_from_file_location(f"stage_{stage_num}", stage_file) module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) return module def check_prerequisites(): """Verify required env vars and brand catalog exist.""" required_env = ["ANTHROPIC_API_KEY"] missing = [v for v in required_env if not os.environ.get(v)] if missing: print(f"❌ Missing required environment variables: {missing}") print(f" Copy .env.example to .env and fill in API keys.") sys.exit(1) optional_env = { "OPENAI_API_KEY": "Stage 5 (pilot test runner)", "GOOGLE_API_KEY": "Stage 5 (pilot test runner)", "PERPLEXITY_API_KEY": "Stage 5 (pilot test runner)", "REDDIT_CLIENT_ID": "Stage 3 (reality checker — Reddit signal)", } for env, stage in optional_env.items(): if not os.environ.get(env): print(f"⚠ Optional env var missing: {env} (used by {stage}). Stage will skip Reddit/multi-model checks.") def run_pipeline(category: str, display_name: str, skip_pilot: bool = False, skip_reality: bool = False): """Run stages 1-6 sequentially. Stage 7 requires human action between 6 and 7.""" check_prerequisites() # Verify brand catalog exists catalog_file = Path(__file__).parent.parent.parent / "data" / category / "brand_catalog.json" if not catalog_file.exists(): print(f"❌ Brand catalog not found: {catalog_file}") print(f" Create one before running pipeline. See data/README.md for schema.") sys.exit(1) print("=" * 60) print(f"Citee Index — Prompt Curation Pipeline") print(f"Category: {category}") print(f"Display name: {display_name}") print("=" * 60) print() # Stage 1 print(">>> STAGE 1: Persona Generator") stage_1 = load_stage_module(1) personas = stage_1.generate_personas(category, display_name) stage_1.save_personas(category, personas) print(f"✅ Generated {len(personas['personas'])} personas") print() # Stage 2 print(">>> STAGE 2: Prompt Brainstormer") stage_2 = load_stage_module(2) raw_data = stage_2.brainstorm_all(category, display_name) stage_2.save_raw_prompts(category, raw_data) print(f"✅ Brainstormed {raw_data['total_raw_prompts']} raw prompts") print() # Stage 3 (skippable in some scenarios) if not skip_reality: print(">>> STAGE 3: Reality Checker (Google Trends + Reddit)") stage_3 = load_stage_module(3) validated = stage_3.check_all_prompts(category) stage_3.save_validated(category, validated) print(f"✅ Reality check: {validated['summary']}") print() else: print(">>> STAGE 3: SKIPPED (--skip-reality flag)") # Copy raw to validated as fallback import shutil data_dir = Path(__file__).parent.parent.parent / "data" / category shutil.copy(data_dir / "raw_prompts.json", data_dir / "validated_prompts.json") print() # Stage 4 print(">>> STAGE 4: Multi-agent Validation") stage_4 = load_stage_module(4) # stage_4 has async main, run via subprocess pattern import asyncio import json data_dir = Path(__file__).parent.parent.parent / "data" / category with open(data_dir / "validated_prompts.json", "r", encoding="utf-8") as f: validated_data = json.load(f) candidates = [p for p in validated_data["validated_prompts"] if p.get("reality_signal", "pass") != "fail"] critic_results = asyncio.run(stage_4.run_three_critics(candidates, display_name)) aggregation = stage_4.aggregate_flags(critic_results, len(candidates)) output = { "category": category, "input_count": len(candidates), "critic_results": critic_results, "aggregation": aggregation, "kept_prompts": [ p for i, p in enumerate(candidates) if i not in aggregation["flagged_for_removal"] ], } with open(data_dir / "critic_review.json", "w", encoding="utf-8") as f: json.dump(output, f, ensure_ascii=False, indent=2) print(f"✅ Critics review: removed {aggregation['total_removed']}, kept {aggregation['total_kept']}") print() # Stage 5 (optional — pilot test costs API) if not skip_pilot: print(">>> STAGE 5: Pilot Test Runner (sample 10 prompts × 3 models)") stage_5 = load_stage_module(5) pilot_data = asyncio.run(stage_5.run_pilot_test(category)) stage_5.save_pilot_results(category, pilot_data) print(f"✅ Pilot test: {pilot_data['summary']}") print() else: print(">>> STAGE 5: SKIPPED (--skip-pilot flag)") print() # Stage 6 print(">>> STAGE 6: Human Review Export") stage_6 = load_stage_module(6) prompts_for_review = stage_6.load_kept_prompts(category) csv_path = stage_6.export_to_csv(category, prompts_for_review) summary_path = stage_6.export_summary_md(category, prompts_for_review) print(f"✅ Exported {len(prompts_for_review)} prompts for human review") print(f" CSV: {csv_path}") print(f" Summary: {summary_path}") print() print("=" * 60) print("PIPELINE COMPLETE — Stages 1-6 done.") print("=" * 60) print() print("NEXT STEPS (manual):") print(f"1. Open {csv_path} in spreadsheet") print(f"2. Fill `decision` column for each row: APPROVE / REJECT / EDIT") print(f"3. If EDIT, fill `edited_prompt` column") print(f"4. Save as `for_human_review_decided.csv`") print(f"5. Run: python 7_finalize.py --category {category}") print() print("This produces final closed pool: prompts/{category}/v1.json (gitignored)") def main(): parser = argparse.ArgumentParser(description="Citee prompt curation pipeline orchestrator.") parser.add_argument("--category", required=True, help="Category slug (e.g., 'swiece-sojowe-pl')") parser.add_argument("--display-name", required=True, help="Human-readable category name") parser.add_argument("--skip-pilot", action="store_true", help="Skip Stage 5 (saves API cost)") parser.add_argument("--skip-reality", action="store_true", help="Skip Stage 3 (no Google Trends/Reddit check)") args = parser.parse_args() run_pipeline(args.category, args.display_name, args.skip_pilot, args.skip_reality) if __name__ == "__main__": main()