"""Streamlit UI for Tech Stack Advisor."""
import streamlit as st
import httpx
import json
from typing import Any
import os
# Simple health check for Railway
# Check query params to see if this is a health check request
query_params = st.query_params
if "health" in query_params or st.query_params.get("_stcore") == "health":
st.write("OK")
st.stop()
# Page config
st.set_page_config(
page_title="Tech Stack Advisor",
page_icon="đ",
layout="wide",
initial_sidebar_state="expanded",
)
# API configuration
# Railway sets API_BASE_URL as environment variable
API_BASE_URL = os.getenv("API_BASE_URL") or st.secrets.get("API_BASE_URL", "http://localhost:8000")
def call_api(endpoint: str, method: str = "GET", data: dict | None = None) -> dict[str, Any]:
"""Call the API.
Args:
endpoint: API endpoint
method: HTTP method
data: Request data for POST
Returns:
API response as dictionary
"""
url = f"{API_BASE_URL}{endpoint}"
try:
with httpx.Client(timeout=60.0) as client:
if method == "GET":
response = client.get(url)
elif method == "POST":
response = client.post(url, json=data)
else:
raise ValueError(f"Unsupported method: {method}")
response.raise_for_status()
return response.json()
except httpx.HTTPError as e:
st.error(f"API Error: {str(e)}")
if hasattr(e, 'response') and e.response is not None:
try:
error_detail = e.response.json()
st.json(error_detail)
except:
st.text(e.response.text)
return {"error": str(e)}
def display_header() -> None:
"""Display the application header."""
st.title("đ Tech Stack Advisor")
st.markdown(
"""
Get intelligent tech stack recommendations from **4 specialized AI agents**:
- đī¸ **Database Agent** - Database selection & scaling
- âī¸ **Infrastructure Agent** - Cloud architecture & deployment
- đ° **Cost Agent** - Multi-provider cost analysis
- đ **Security Agent** - Threat modeling & compliance
"""
)
st.divider()
def display_sidebar() -> None:
"""Display the sidebar with metrics and info."""
with st.sidebar:
st.header("đ System Status")
# Health check
health = call_api("/health")
if "status" in health and health["status"] == "healthy":
st.success("â
API Healthy")
st.metric("Agents Loaded", health.get("agents_loaded", 0))
st.metric("Uptime", f"{health.get('uptime_seconds', 0):.0f}s")
else:
st.error("â API Offline")
st.info(f"Make sure the API is running at:\n`{API_BASE_URL}`")
st.divider()
# Metrics
st.header("đ Usage Metrics")
metrics = call_api("/metrics")
if "total_requests" in metrics:
col1, col2 = st.columns(2)
with col1:
st.metric("Total Requests", metrics.get("total_requests", 0))
st.metric("Daily Queries", metrics.get("daily_queries", 0))
with col2:
st.metric("Daily Cost", f"${metrics.get('daily_cost_usd', 0):.4f}")
st.metric("Budget Left", f"${metrics.get('budget_remaining_usd', 0):.2f}")
st.divider()
# Info
st.header("âšī¸ About")
st.markdown(
"""
**Tech Stack Advisor** uses:
- LangGraph orchestration
- Claude AI (Anthropic)
- Qdrant vector search
- FastAPI backend
[GitHub](https://github.com/ranjanarajendran/tech-stack-advisor) |
[Docs](http://localhost:8000/docs)
"""
)
def display_query_form() -> tuple[str, int | None, float | None]:
"""Display the query input form.
Returns:
Tuple of (query, dau, budget)
"""
st.header("đŦ Describe Your Project")
query = st.text_area(
"What are you building?",
placeholder="Example: I'm building a real-time chat application expecting 100K daily active users with payment processing and GDPR compliance requirements...",
height=120,
help="Describe your project requirements, scale, compliance needs, etc.",
)
with st.expander("âī¸ Advanced Options (Optional)"):
col1, col2 = st.columns(2)
with col1:
dau_override = st.number_input(
"Daily Active Users (DAU)",
min_value=0,
max_value=10_000_000,
value=None,
help="Override auto-detected DAU",
)
with col2:
budget_override = st.number_input(
"Monthly Budget ($)",
min_value=0.0,
max_value=100_000.0,
value=None,
help="Target monthly budget",
)
return query, dau_override, budget_override
def display_parsed_context(context: dict[str, Any]) -> None:
"""Display parsed context in a nice format.
Args:
context: Parsed context dictionary
"""
st.subheader("đ Parsed Requirements")
col1, col2, col3, col4 = st.columns(4)
with col1:
st.metric("Daily Active Users", f"{context.get('dau', 0):,}")
st.metric("Queries/Second", context.get('qps', 0))
with col2:
st.metric("Data Type", context.get('data_type', 'N/A'))
st.metric("Workload", context.get('workload_type', 'N/A'))
with col3:
st.metric("Sensitivity", context.get('data_sensitivity', 'N/A'))
compliance = context.get('compliance', [])
st.metric("Compliance", len(compliance))
with col4:
if compliance:
st.write("**Required:**")
for comp in compliance:
st.write(f"- {comp.upper()}")
def display_agent_results(recommendations: dict[str, Any]) -> None:
"""Display agent results in tabs.
Args:
recommendations: Dictionary of agent recommendations
"""
st.subheader("đ¤ Agent Recommendations")
# Create tabs for each agent
tabs = st.tabs(["đī¸ Database", "âī¸ Infrastructure", "đ° Cost", "đ Security"])
# Database Agent
with tabs[0]:
db_result = recommendations.get("database", {})
if db_result:
scale_info = db_result.get("scale_info", {})
# Key Metrics
col1, col2, col3 = st.columns(3)
with col1:
st.metric("Scale Tier", scale_info.get("tier", "N/A").title())
with col2:
cache = scale_info.get("cache_recommended", False)
st.metric("Cache Recommended", "â
Yes" if cache else "â No")
with col3:
st.metric("Estimated Connections", scale_info.get("estimated_connections", 0))
st.markdown("**Quick Recommendation:**")
st.info(scale_info.get("recommendation", "N/A"))
st.divider()
# Database Knowledge (Pros/Cons)
raw_knowledge = db_result.get("raw_knowledge", {})
if raw_knowledge and "results" in raw_knowledge:
st.markdown("**đ Database Options Analysis:**")
db_options = raw_knowledge["results"]
for db_name, db_info in list(db_options.items())[:4]: # Show top 4
with st.expander(f"**{db_name.upper()}** - {db_info.get('description', '')}"):
st.markdown(f"**Best For:** {db_info.get('best_for', 'N/A')}")
st.markdown(f"**Scale:** {db_info.get('scale', 'N/A')}")
col1, col2 = st.columns(2)
with col1:
st.markdown("**â
Pros:**")
for pro in db_info.get('pros', []):
st.write(f"âĸ {pro}")
with col2:
st.markdown("**â Cons:**")
for con in db_info.get('cons', []):
st.write(f"âĸ {con}")
st.divider()
# Comprehensive LLM Analysis
with st.expander("đ **COMPREHENSIVE DETAILED ANALYSIS** (Click to expand)", expanded=True):
recommendations_text = db_result.get("recommendations", "No detailed analysis available")
if isinstance(recommendations_text, str):
# Display as formatted markdown
st.markdown(recommendations_text)
else:
st.json(recommendations_text)
else:
st.warning("No database recommendations available")
# Infrastructure Agent
with tabs[1]:
infra_result = recommendations.get("infrastructure", {})
if infra_result:
scale_info = infra_result.get("scale_info", {})
# Key Metrics
col1, col2 = st.columns(2)
with col1:
st.metric("Scale Tier", scale_info.get("tier", "N/A").title())
st.metric("Architecture", scale_info.get("suggested_architecture", "N/A").title())
with col2:
st.metric("Load Balancer", "â
Yes" if scale_info.get("load_balancer_needed") else "â No")
st.metric("CDN", "â
Yes" if scale_info.get("cdn_recommended") else "â No")
st.markdown("**Deployment Strategy:**")
st.info(scale_info.get("deployment_strategy", "N/A"))
st.markdown("**Compute Recommendation:**")
st.code(scale_info.get("compute_recommendation", "N/A"))
st.divider()
# Architecture Patterns and Cloud Providers
raw_knowledge = infra_result.get("raw_knowledge", {})
if raw_knowledge:
# Architecture Patterns
patterns = raw_knowledge.get("patterns", {})
if patterns:
st.markdown("**đī¸ Architecture Patterns Analysis:**")
for pattern_name, pattern_info in list(patterns.items())[:3]:
with st.expander(f"**{pattern_name.upper()}** - {pattern_info.get('description', '')}"):
st.markdown(f"**Best For:** {pattern_info.get('best_for', 'N/A')}")
st.markdown(f"**Complexity:** {pattern_info.get('complexity', 'N/A').title()}")
st.markdown(f"**Components:** {', '.join(pattern_info.get('components', []))}")
col1, col2 = st.columns(2)
with col1:
st.markdown("**â
Pros:**")
for pro in pattern_info.get('pros', []):
st.write(f"âĸ {pro}")
with col2:
st.markdown("**â Cons:**")
for con in pattern_info.get('cons', []):
st.write(f"âĸ {con}")
# Cloud Provider Comparison
providers = raw_knowledge.get("cloud_providers", {})
if providers:
st.markdown("**âī¸ Cloud Provider Comparison:**")
for provider_name, provider_info in providers.items():
with st.expander(f"**{provider_name.upper()}** - {provider_info.get('best_for', '')}"):
st.markdown(f"**Strengths:** {', '.join(provider_info.get('strengths', []))}")
st.markdown(f"**Weaknesses:** {', '.join(provider_info.get('weaknesses', []))}")
st.divider()
# Comprehensive LLM Analysis
with st.expander("đ **COMPREHENSIVE DETAILED ANALYSIS** (Click to expand)", expanded=True):
recommendations_text = infra_result.get("recommendations", "No detailed analysis available")
if isinstance(recommendations_text, str):
st.markdown(recommendations_text)
else:
st.json(recommendations_text)
else:
st.warning("No infrastructure recommendations available")
# Cost Agent
with tabs[2]:
cost_result = recommendations.get("cost", {})
if cost_result:
comparisons = cost_result.get("cost_comparisons", [])
if comparisons:
st.markdown("**đĩ Cost Comparison by Provider:**")
# Create comparison table
import pandas as pd
data = []
for comp in comparisons:
data.append({
"Provider": comp["provider"].upper(),
"Monthly": f"${comp['monthly_cost']:.2f}",
"Annual": f"${comp['monthly_cost'] * 12:.2f}",
"Compute": f"${comp['cloud_costs']['breakdown']['compute']:.2f}",
"Storage": f"${comp['cloud_costs']['breakdown']['storage']:.2f}",
"Database": f"${comp['cloud_costs']['breakdown']['database']:.2f}",
})
df = pd.DataFrame(data)
st.dataframe(df, use_container_width=True)
# Cheapest option
cheapest = comparisons[0]
st.success(f"đĄ **Recommended:** {cheapest['provider'].upper()} - ${cheapest['monthly_cost']:.2f}/month")
# Bar chart
st.markdown("**Monthly Cost Comparison:**")
import plotly.graph_objects as go
fig = go.Figure(data=[
go.Bar(
x=[c["provider"].upper() for c in comparisons],
y=[c["monthly_cost"] for c in comparisons],
text=[f"${c['monthly_cost']:.2f}" for c in comparisons],
textposition='auto',
)
])
fig.update_layout(
title="Monthly Cost by Provider",
xaxis_title="Provider",
yaxis_title="Monthly Cost (USD)",
height=400,
)
st.plotly_chart(fig, use_container_width=True)
st.divider()
# Configuration Details
config = cost_result.get("configuration", {})
if config:
st.markdown("**âī¸ Infrastructure Configuration:**")
col1, col2, col3, col4 = st.columns(4)
with col1:
st.metric("Instances", config.get("instances", 0))
with col2:
st.metric("Instance Type", config.get("instance_type", "N/A").title())
with col3:
st.metric("Storage", f"{config.get('storage_gb', 0)} GB")
with col4:
st.metric("Bandwidth", f"{config.get('bandwidth_gb', 0)} GB/mo")
st.divider()
# Comprehensive LLM Analysis
with st.expander("đ **COMPREHENSIVE COST ANALYSIS & OPTIMIZATION STRATEGIES** (Click to expand)", expanded=True):
recommendations_text = cost_result.get("recommendations", "No detailed analysis available")
if isinstance(recommendations_text, str):
st.markdown(recommendations_text)
else:
st.json(recommendations_text)
else:
st.warning("No cost estimates available")
# Security Agent
with tabs[3]:
security_result = recommendations.get("security", {})
if security_result:
threat = security_result.get("threat_assessment", {})
# Risk Assessment Metrics
col1, col2, col3 = st.columns(3)
with col1:
priority = threat.get("priority", "N/A")
color = {"low": "đĸ", "medium": "đĄ", "high": "đ ", "critical": "đ´"}.get(priority.lower(), "âĒ")
st.metric("Risk Priority", f"{color} {priority.title()}")
with col2:
st.metric("Risk Multiplier", f"{threat.get('risk_multiplier', 0)}x")
with col3:
st.metric("Architecture", threat.get("architecture", "N/A").title())
st.divider()
# Identified Threats
threats_data = threat.get("threats", {})
if threats_data:
st.markdown("**â ī¸ Identified Threats by Severity:**")
for severity, threat_list in threats_data.items():
severity_icon = {"high_risk": "đ´", "medium_risk": "đĄ", "low_risk": "đĸ"}.get(severity, "âĒ")
with st.expander(f"{severity_icon} {severity.replace('_', ' ').title()} ({len(threat_list)} threats)"):
for t in threat_list:
st.write(f"âĸ {t}")
st.divider()
# Security Checklist
checklist = security_result.get("security_checklist", {})
if checklist and "checklist" in checklist:
st.markdown("**đĄī¸ Security Best Practices Checklist:**")
checklist_data = checklist["checklist"]
for category, items in list(checklist_data.items())[:6]: # Show top 6 categories
with st.expander(f"**{category.replace('_', ' ').title()}** ({len(items.get('critical', []))} critical items)"):
critical_items = items.get("critical", [])
recommended_items = items.get("recommended", [])
if critical_items:
st.markdown("**đ´ Critical:**")
for item in critical_items[:5]: # Top 5
st.write(f"âĸ {item}")
if recommended_items:
st.markdown("**đĄ Recommended:**")
for item in recommended_items[:3]: # Top 3
st.write(f"âĸ {item}")
st.divider()
# Compliance Requirements
if checklist and "compliance_frameworks" in checklist:
st.markdown("**đ Compliance Frameworks:**")
compliance_frameworks = checklist["compliance_frameworks"]
for framework, requirements in compliance_frameworks.items():
with st.expander(f"**{framework.upper()}** ({len(requirements)} key requirements)"):
for req in requirements:
st.write(f"âĸ {req}")
st.divider()
# Comprehensive LLM Analysis
with st.expander("đ **COMPREHENSIVE SECURITY ASSESSMENT & IMPLEMENTATION GUIDE** (Click to expand)", expanded=True):
recommendations_text = security_result.get("recommendations", "No detailed analysis available")
if isinstance(recommendations_text, str):
st.markdown(recommendations_text)
else:
st.json(recommendations_text)
else:
st.warning("No security assessment available")
def main() -> None:
"""Main application."""
display_header()
display_sidebar()
# Query form
query, dau_override, budget_override = display_query_form()
# Submit button
col1, col2, col3 = st.columns([1, 1, 4])
with col1:
submit = st.button("đ Get Recommendations", type="primary", use_container_width=True)
with col2:
if st.button("đ Clear", use_container_width=True):
st.rerun()
# Process query
if submit:
if not query or len(query) < 10:
st.error("Please provide a more detailed description (at least 10 characters)")
return
with st.spinner("đ¤ Analyzing your requirements with 4 AI agents..."):
# Prepare request
request_data = {"query": query}
if dau_override:
request_data["dau"] = dau_override
if budget_override:
request_data["budget_target"] = budget_override
# Call API
result = call_api("/recommend", method="POST", data=request_data)
if result.get("error") or result.get("status") == "error":
st.error("â Failed to get recommendations")
if result.get("error"):
st.error(result["error"])
return
# Display results
st.success("â
Analysis Complete!")
# Correlation ID
st.caption(f"Request ID: `{result.get('correlation_id', 'N/A')}`")
st.divider()
# Parsed context
if "parsed_context" in result:
display_parsed_context(result["parsed_context"])
st.divider()
# Agent results
if "recommendations" in result:
display_agent_results(result["recommendations"])
st.divider()
# Download results
col1, col2 = st.columns([1, 5])
with col1:
st.download_button(
"đĨ Download JSON",
data=json.dumps(result, indent=2),
file_name="tech_stack_recommendations.json",
mime="application/json",
)
if __name__ == "__main__":
main()