"""Stage 5 — Pilot Test Runner. Run a sample of N prompts (default 10) through 3 LLM models in parallel. Reject prompts that: - Get "I don't know" / "depends on preferences" responses - Produce zero overlap across models (incoherent) - Get misinterpreted (LLM answers about wrong topic) """ from __future__ import annotations import argparse import asyncio import json import os import random from pathlib import Path import httpx from anthropic import AsyncAnthropic from openai import AsyncOpenAI from config import CONFIG async def query_chatgpt(client: AsyncOpenAI, prompt: str) -> dict: """Query GPT-4o with web search.""" try: response = await client.chat.completions.create( model="gpt-4o-search-preview", messages=[{"role": "user", "content": prompt}], max_tokens=600, ) text = response.choices[0].message.content return {"model": "chatgpt", "response": text, "ok": True} except Exception as exc: return {"model": "chatgpt", "response": None, "ok": False, "error": str(exc)} async def query_perplexity(prompt: str) -> dict: """Query Perplexity Sonar Pro.""" try: async with httpx.AsyncClient(timeout=60) as client: response = await client.post( "https://api.perplexity.ai/chat/completions", headers={ "Authorization": f"Bearer {os.environ['PERPLEXITY_API_KEY']}", "Content-Type": "application/json", }, json={ "model": "sonar-pro", "messages": [{"role": "user", "content": prompt}], "max_tokens": 600, }, ) data = response.json() text = data["choices"][0]["message"]["content"] return {"model": "perplexity", "response": text, "ok": True} except Exception as exc: return {"model": "perplexity", "response": None, "ok": False, "error": str(exc)} async def query_gemini(prompt: str) -> dict: """Query Gemini Pro.""" try: # Using direct REST call (google-generativeai SDK is verbose) async with httpx.AsyncClient(timeout=60) as client: response = await client.post( f"https://generativelanguage.googleapis.com/v1beta/models/gemini-pro:generateContent?key={os.environ['GOOGLE_API_KEY']}", json={ "contents": [{"parts": [{"text": prompt}]}], "generationConfig": {"maxOutputTokens": 600}, }, ) data = response.json() text = data["candidates"][0]["content"]["parts"][0]["text"] return {"model": "gemini", "response": text, "ok": True} except Exception as exc: return {"model": "gemini", "response": None, "ok": False, "error": str(exc)} async def query_all_models(prompt_obj: dict, openai_client: AsyncOpenAI) -> dict: """Run prompt against all 3 models in parallel.""" prompt_text = prompt_obj["prompt"] results = await asyncio.gather( query_chatgpt(openai_client, prompt_text), query_perplexity(prompt_text), query_gemini(prompt_text), ) return { "prompt": prompt_obj["prompt"], "type": prompt_obj["type"], "persona_id": prompt_obj.get("persona_id"), "model_responses": {r["model"]: r for r in results}, } def evaluate_response_quality(test_result: dict, brand_catalog: dict) -> dict: """Detect bad signals in responses.""" bad_signals = [] brand_names = set() for brand in brand_catalog["brands"]: brand_names.add(brand["name"].lower()) for alias in brand.get("aliases", []): brand_names.add(alias.lower()) responses = test_result["model_responses"] successful = [r for r in responses.values() if r.get("ok")] if len(successful) < 2: bad_signals.append("less_than_2_models_responded") for model_name, r in responses.items(): if not r.get("ok"): continue text = (r.get("response") or "").lower() # "I don't know" patterns idk_patterns = ["nie wiem", "trudno powiedzieć", "to zależy od", "preferencje"] if any(pattern in text for pattern in idk_patterns) and len(text) < 200: bad_signals.append(f"{model_name}_uncertain_short") # No brand mentions brands_mentioned = [b for b in brand_names if b in text] r["brands_detected"] = brands_mentioned[:10] if not brands_mentioned: bad_signals.append(f"{model_name}_no_brands_mentioned") # Cross-model overlap all_brands_per_model = { m: set(r.get("brands_detected", [])) for m, r in responses.items() if r.get("ok") } if len(all_brands_per_model) >= 2: overlap = set.intersection(*all_brands_per_model.values()) if all_brands_per_model.values() else set() if not overlap and any(all_brands_per_model.values()): bad_signals.append("zero_brand_overlap_across_models") return { "bad_signals": bad_signals, "verdict": "reject" if len(bad_signals) >= 2 else ("flag" if bad_signals else "pass"), } async def run_pilot_test(category_slug: str) -> dict: data_dir = Path(__file__).parent.parent.parent / "data" / category_slug critic_file = data_dir / "critic_review.json" if not critic_file.exists(): raise FileNotFoundError(f"Run 4_validation_agents.py first. Missing: {critic_file}") with open(critic_file, "r", encoding="utf-8") as f: critic_data = json.load(f) catalog_file = data_dir / "brand_catalog.json" with open(catalog_file, "r", encoding="utf-8") as f: brand_catalog = json.load(f) candidates = critic_data["kept_prompts"] sample_size = min(CONFIG.pilot_sample_size, len(candidates)) # Stratified sample across types by_type: dict[str, list[dict]] = {} for p in candidates: by_type.setdefault(p["type"], []).append(p) sampled = [] for ptype, prompts in by_type.items(): n = max(1, int(round(sample_size * CONFIG.type_distribution.get(ptype, 0.2)))) sampled.extend(random.sample(prompts, min(n, len(prompts)))) # Trim to sample_size sampled = sampled[:sample_size] print(f"[Stage 5] Pilot testing {len(sampled)} prompts × 3 models = {len(sampled) * 3} API calls") openai_client = AsyncOpenAI(api_key=os.environ["OPENAI_API_KEY"]) pilot_results = [] for i, prompt_obj in enumerate(sampled): print(f"[Stage 5] Testing prompt {i+1}/{len(sampled)}: {prompt_obj['prompt'][:50]}...") test_result = await query_all_models(prompt_obj, openai_client) evaluation = evaluate_response_quality(test_result, brand_catalog) test_result["evaluation"] = evaluation pilot_results.append(test_result) pass_count = sum(1 for r in pilot_results if r["evaluation"]["verdict"] == "pass") flag_count = sum(1 for r in pilot_results if r["evaluation"]["verdict"] == "flag") reject_count = sum(1 for r in pilot_results if r["evaluation"]["verdict"] == "reject") return { "category": category_slug, "sample_size": len(sampled), "summary": {"pass": pass_count, "flag": flag_count, "reject": reject_count}, "pilot_results": pilot_results, } def save_pilot_results(category_slug: str, data: dict) -> Path: output_file = ( Path(__file__).parent.parent.parent / "data" / category_slug / "pilot_test_results.json" ) with open(output_file, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) return output_file def main(): parser = argparse.ArgumentParser(description="Pilot test sample prompts on 3 models.") parser.add_argument("--category", required=True) args = parser.parse_args() print(f"[Stage 5] Pilot testing {args.category}...") data = asyncio.run(run_pilot_test(args.category)) output_path = save_pilot_results(args.category, data) print(f"[Stage 5] ✅ Saved {output_path}") print(f"[Stage 5] Summary: {data['summary']}") if __name__ == "__main__": main()