"""Test the complete LangGraph workflow orchestration."""
import asyncio
import json
from backend.src.orchestration import TechStackOrchestrator
from backend.src.core.logging import setup_logging
async def test_workflow() -> None:
"""Test the workflow with various queries."""
setup_logging()
print("š Testing Tech Stack Advisor - LangGraph Workflow\n")
print("=" * 70)
# Initialize orchestrator
print("\nš Initializing TechStackOrchestrator...")
orchestrator = TechStackOrchestrator()
print(" ā Orchestrator initialized")
print(f" - Agents loaded: 4 (Database, Infrastructure, Cost, Security)")
print(f" - Workflow nodes: 6 (parse, 4 agents, synthesize)")
# Test queries
test_queries = [
{
"name": "Real-time Chat App",
"query": "I'm building a real-time chat application expecting 100K daily active users. What tech stack should I use?",
},
{
"name": "E-commerce Platform",
"query": "Need a database and infrastructure for an e-commerce site with 50k DAU, payment processing, and GDPR compliance",
},
{
"name": "Healthcare App",
"query": "Building a healthcare application with HIPAA compliance requirements for 10,000 daily users",
},
]
# Run first query in detail
print("\n" + "=" * 70)
print(f"\nš Test Query 1: {test_queries[0]['name']}")
print(f" Query: \"{test_queries[0]['query']}\"\n")
result = await orchestrator.process_query(test_queries[0]["query"])
if result.get("status") == "success":
print("ā
Workflow completed successfully!\n")
# Show parsed context
print("š Parsed Context:")
context = result.get("parsed_context", {})
print(f" - DAU: {context.get('dau', 0):,}")
print(f" - QPS: {context.get('qps', 0)}")
print(f" - Data Type: {context.get('data_type', 'N/A')}")
print(f" - Workload: {context.get('workload_type', 'N/A')}")
print(f" - Sensitivity: {context.get('data_sensitivity', 'N/A')}")
# Show agent results summary
print("\nš¤ Agent Results:")
recommendations = result.get("recommendations", {})
# Database
db_result = recommendations.get("database", {})
if db_result:
scale_info = db_result.get("scale_info", {})
print(f"\n 1. Database Agent:")
print(f" - Scale Tier: {scale_info.get('tier', 'N/A')}")
print(f" - Cache Recommended: {scale_info.get('cache_recommended', False)}")
print(f" - Recommendation: {scale_info.get('recommendation', 'N/A')[:60]}...")
# Infrastructure
infra_result = recommendations.get("infrastructure", {})
if infra_result:
scale_info = infra_result.get("scale_info", {})
print(f"\n 2. Infrastructure Agent:")
print(f" - Scale Tier: {scale_info.get('tier', 'N/A')}")
print(f" - Architecture: {scale_info.get('suggested_architecture', 'N/A')}")
print(f" - Deployment: {scale_info.get('deployment_strategy', 'N/A')[:60]}...")
# Cost
cost_result = recommendations.get("cost", {})
if cost_result:
comparisons = cost_result.get("cost_comparisons", [])
if comparisons:
cheapest = comparisons[0]
print(f"\n 3. Cost Agent:")
print(f" - Recommended: {cheapest.get('provider', 'N/A').upper()}")
print(f" - Monthly Cost: ${cheapest.get('monthly_cost', 0):.2f}")
print(f" - Annual Cost: ${cheapest.get('monthly_cost', 0) * 12:.2f}")
# Security
security_result = recommendations.get("security", {})
if security_result:
threat = security_result.get("threat_assessment", {})
print(f"\n 4. Security Agent:")
print(f" - Risk Priority: {threat.get('priority', 'N/A')}")
print(f" - Risk Multiplier: {threat.get('risk_multiplier', 0)}x")
print(f" - Architecture: {threat.get('architecture', 'N/A')}")
print("\n" + "=" * 70)
else:
print(f"ā Workflow failed: {result.get('error', 'Unknown error')}")
# Quick test remaining queries
print("\n\nš Quick Tests for Other Queries:\n")
for i, test_query in enumerate(test_queries[1:], start=2):
print(f" {i}. {test_query['name']}...", end=" ")
result = await orchestrator.process_query(test_query["query"])
if result.get("status") == "success":
context = result.get("parsed_context", {})
print(f"ā (DAU: {context.get('dau', 0):,}, Compliance: {len(context.get('compliance', []))} rules)")
else:
print(f"ā Error: {result.get('error', 'Unknown')}")
print("\n" + "=" * 70)
print("\nā
LangGraph Workflow Testing Complete!")
print("\nšÆ Workflow Features Demonstrated:")
print(" ā Query parsing and context extraction")
print(" ā Sequential agent coordination (Database ā Infra ā Cost ā Security)")
print(" ā State management across agents")
print(" ā Error handling and logging")
print(" ā Result synthesis and formatting")
print("\nš¦ Next Steps:")
print(" - Create FastAPI endpoints to expose this workflow")
print(" - Add RAG integration with Qdrant")
print(" - Build Streamlit frontend")
print()
if __name__ == "__main__":
asyncio.run(test_workflow())