Tech Stack Advisor - Code Viewer

← Back to File Tree

streamlit_app.py

Language: python | Path: frontend/streamlit_app.py | Lines: 562
"""Streamlit UI for Tech Stack Advisor."""
import streamlit as st
import httpx
import json
from typing import Any
import os

# Simple health check for Railway
# Check query params to see if this is a health check request
query_params = st.query_params
if "health" in query_params or st.query_params.get("_stcore") == "health":
    st.write("OK")
    st.stop()

# Page config
st.set_page_config(
    page_title="Tech Stack Advisor",
    page_icon="🚀",
    layout="wide",
    initial_sidebar_state="expanded",
)

# API configuration
# Railway sets API_BASE_URL as environment variable
API_BASE_URL = os.getenv("API_BASE_URL") or st.secrets.get("API_BASE_URL", "http://localhost:8000")


def call_api(endpoint: str, method: str = "GET", data: dict | None = None) -> dict[str, Any]:
    """Call the API.

    Args:
        endpoint: API endpoint
        method: HTTP method
        data: Request data for POST

    Returns:
        API response as dictionary
    """
    url = f"{API_BASE_URL}{endpoint}"

    try:
        with httpx.Client(timeout=60.0) as client:
            if method == "GET":
                response = client.get(url)
            elif method == "POST":
                response = client.post(url, json=data)
            else:
                raise ValueError(f"Unsupported method: {method}")

            response.raise_for_status()
            return response.json()

    except httpx.HTTPError as e:
        st.error(f"API Error: {str(e)}")
        if hasattr(e, 'response') and e.response is not None:
            try:
                error_detail = e.response.json()
                st.json(error_detail)
            except:
                st.text(e.response.text)
        return {"error": str(e)}


def display_header() -> None:
    """Display the application header."""
    st.title("🚀 Tech Stack Advisor")
    st.markdown(
        """
        Get intelligent tech stack recommendations from **4 specialized AI agents**:
        - đŸ—„ī¸ **Database Agent** - Database selection & scaling
        - â˜ī¸ **Infrastructure Agent** - Cloud architecture & deployment
        - 💰 **Cost Agent** - Multi-provider cost analysis
        - 🔒 **Security Agent** - Threat modeling & compliance
        """
    )
    st.divider()


def display_sidebar() -> None:
    """Display the sidebar with metrics and info."""
    with st.sidebar:
        st.header("📊 System Status")

        # Health check
        health = call_api("/health")
        if "status" in health and health["status"] == "healthy":
            st.success("✅ API Healthy")
            st.metric("Agents Loaded", health.get("agents_loaded", 0))
            st.metric("Uptime", f"{health.get('uptime_seconds', 0):.0f}s")
        else:
            st.error("❌ API Offline")
            st.info(f"Make sure the API is running at:\n`{API_BASE_URL}`")

        st.divider()

        # Metrics
        st.header("📈 Usage Metrics")
        metrics = call_api("/metrics")
        if "total_requests" in metrics:
            col1, col2 = st.columns(2)
            with col1:
                st.metric("Total Requests", metrics.get("total_requests", 0))
                st.metric("Daily Queries", metrics.get("daily_queries", 0))
            with col2:
                st.metric("Daily Cost", f"${metrics.get('daily_cost_usd', 0):.4f}")
                st.metric("Budget Left", f"${metrics.get('budget_remaining_usd', 0):.2f}")

        st.divider()

        # Info
        st.header("â„šī¸ About")
        st.markdown(
            """
            **Tech Stack Advisor** uses:
            - LangGraph orchestration
            - Claude AI (Anthropic)
            - Qdrant vector search
            - FastAPI backend

            [GitHub](https://github.com/ranjanarajendran/tech-stack-advisor) |
            [Docs](http://localhost:8000/docs)
            """
        )


def display_query_form() -> tuple[str, int | None, float | None]:
    """Display the query input form.

    Returns:
        Tuple of (query, dau, budget)
    """
    st.header("đŸ’Ŧ Describe Your Project")

    query = st.text_area(
        "What are you building?",
        placeholder="Example: I'm building a real-time chat application expecting 100K daily active users with payment processing and GDPR compliance requirements...",
        height=120,
        help="Describe your project requirements, scale, compliance needs, etc.",
    )

    with st.expander("âš™ī¸ Advanced Options (Optional)"):
        col1, col2 = st.columns(2)

        with col1:
            dau_override = st.number_input(
                "Daily Active Users (DAU)",
                min_value=0,
                max_value=10_000_000,
                value=None,
                help="Override auto-detected DAU",
            )

        with col2:
            budget_override = st.number_input(
                "Monthly Budget ($)",
                min_value=0.0,
                max_value=100_000.0,
                value=None,
                help="Target monthly budget",
            )

    return query, dau_override, budget_override


def display_parsed_context(context: dict[str, Any]) -> None:
    """Display parsed context in a nice format.

    Args:
        context: Parsed context dictionary
    """
    st.subheader("📋 Parsed Requirements")

    col1, col2, col3, col4 = st.columns(4)

    with col1:
        st.metric("Daily Active Users", f"{context.get('dau', 0):,}")
        st.metric("Queries/Second", context.get('qps', 0))

    with col2:
        st.metric("Data Type", context.get('data_type', 'N/A'))
        st.metric("Workload", context.get('workload_type', 'N/A'))

    with col3:
        st.metric("Sensitivity", context.get('data_sensitivity', 'N/A'))
        compliance = context.get('compliance', [])
        st.metric("Compliance", len(compliance))

    with col4:
        if compliance:
            st.write("**Required:**")
            for comp in compliance:
                st.write(f"- {comp.upper()}")


def display_agent_results(recommendations: dict[str, Any]) -> None:
    """Display agent results in tabs.

    Args:
        recommendations: Dictionary of agent recommendations
    """
    st.subheader("🤖 Agent Recommendations")

    # Create tabs for each agent
    tabs = st.tabs(["đŸ—„ī¸ Database", "â˜ī¸ Infrastructure", "💰 Cost", "🔒 Security"])

    # Database Agent
    with tabs[0]:
        db_result = recommendations.get("database", {})
        if db_result:
            scale_info = db_result.get("scale_info", {})

            # Key Metrics
            col1, col2, col3 = st.columns(3)
            with col1:
                st.metric("Scale Tier", scale_info.get("tier", "N/A").title())
            with col2:
                cache = scale_info.get("cache_recommended", False)
                st.metric("Cache Recommended", "✅ Yes" if cache else "❌ No")
            with col3:
                st.metric("Estimated Connections", scale_info.get("estimated_connections", 0))

            st.markdown("**Quick Recommendation:**")
            st.info(scale_info.get("recommendation", "N/A"))

            st.divider()

            # Database Knowledge (Pros/Cons)
            raw_knowledge = db_result.get("raw_knowledge", {})
            if raw_knowledge and "results" in raw_knowledge:
                st.markdown("**📚 Database Options Analysis:**")
                db_options = raw_knowledge["results"]

                for db_name, db_info in list(db_options.items())[:4]:  # Show top 4
                    with st.expander(f"**{db_name.upper()}** - {db_info.get('description', '')}"):
                        st.markdown(f"**Best For:** {db_info.get('best_for', 'N/A')}")
                        st.markdown(f"**Scale:** {db_info.get('scale', 'N/A')}")

                        col1, col2 = st.columns(2)
                        with col1:
                            st.markdown("**✅ Pros:**")
                            for pro in db_info.get('pros', []):
                                st.write(f"â€ĸ {pro}")
                        with col2:
                            st.markdown("**❌ Cons:**")
                            for con in db_info.get('cons', []):
                                st.write(f"â€ĸ {con}")

            st.divider()

            # Comprehensive LLM Analysis
            with st.expander("📄 **COMPREHENSIVE DETAILED ANALYSIS** (Click to expand)", expanded=True):
                recommendations_text = db_result.get("recommendations", "No detailed analysis available")

                if isinstance(recommendations_text, str):
                    # Display as formatted markdown
                    st.markdown(recommendations_text)
                else:
                    st.json(recommendations_text)
        else:
            st.warning("No database recommendations available")

    # Infrastructure Agent
    with tabs[1]:
        infra_result = recommendations.get("infrastructure", {})
        if infra_result:
            scale_info = infra_result.get("scale_info", {})

            # Key Metrics
            col1, col2 = st.columns(2)
            with col1:
                st.metric("Scale Tier", scale_info.get("tier", "N/A").title())
                st.metric("Architecture", scale_info.get("suggested_architecture", "N/A").title())

            with col2:
                st.metric("Load Balancer", "✅ Yes" if scale_info.get("load_balancer_needed") else "❌ No")
                st.metric("CDN", "✅ Yes" if scale_info.get("cdn_recommended") else "❌ No")

            st.markdown("**Deployment Strategy:**")
            st.info(scale_info.get("deployment_strategy", "N/A"))

            st.markdown("**Compute Recommendation:**")
            st.code(scale_info.get("compute_recommendation", "N/A"))

            st.divider()

            # Architecture Patterns and Cloud Providers
            raw_knowledge = infra_result.get("raw_knowledge", {})
            if raw_knowledge:
                # Architecture Patterns
                patterns = raw_knowledge.get("patterns", {})
                if patterns:
                    st.markdown("**đŸ—ī¸ Architecture Patterns Analysis:**")
                    for pattern_name, pattern_info in list(patterns.items())[:3]:
                        with st.expander(f"**{pattern_name.upper()}** - {pattern_info.get('description', '')}"):
                            st.markdown(f"**Best For:** {pattern_info.get('best_for', 'N/A')}")
                            st.markdown(f"**Complexity:** {pattern_info.get('complexity', 'N/A').title()}")
                            st.markdown(f"**Components:** {', '.join(pattern_info.get('components', []))}")

                            col1, col2 = st.columns(2)
                            with col1:
                                st.markdown("**✅ Pros:**")
                                for pro in pattern_info.get('pros', []):
                                    st.write(f"â€ĸ {pro}")
                            with col2:
                                st.markdown("**❌ Cons:**")
                                for con in pattern_info.get('cons', []):
                                    st.write(f"â€ĸ {con}")

                # Cloud Provider Comparison
                providers = raw_knowledge.get("cloud_providers", {})
                if providers:
                    st.markdown("**â˜ī¸ Cloud Provider Comparison:**")
                    for provider_name, provider_info in providers.items():
                        with st.expander(f"**{provider_name.upper()}** - {provider_info.get('best_for', '')}"):
                            st.markdown(f"**Strengths:** {', '.join(provider_info.get('strengths', []))}")
                            st.markdown(f"**Weaknesses:** {', '.join(provider_info.get('weaknesses', []))}")

            st.divider()

            # Comprehensive LLM Analysis
            with st.expander("📄 **COMPREHENSIVE DETAILED ANALYSIS** (Click to expand)", expanded=True):
                recommendations_text = infra_result.get("recommendations", "No detailed analysis available")

                if isinstance(recommendations_text, str):
                    st.markdown(recommendations_text)
                else:
                    st.json(recommendations_text)
        else:
            st.warning("No infrastructure recommendations available")

    # Cost Agent
    with tabs[2]:
        cost_result = recommendations.get("cost", {})
        if cost_result:
            comparisons = cost_result.get("cost_comparisons", [])

            if comparisons:
                st.markdown("**đŸ’ĩ Cost Comparison by Provider:**")

                # Create comparison table
                import pandas as pd

                data = []
                for comp in comparisons:
                    data.append({
                        "Provider": comp["provider"].upper(),
                        "Monthly": f"${comp['monthly_cost']:.2f}",
                        "Annual": f"${comp['monthly_cost'] * 12:.2f}",
                        "Compute": f"${comp['cloud_costs']['breakdown']['compute']:.2f}",
                        "Storage": f"${comp['cloud_costs']['breakdown']['storage']:.2f}",
                        "Database": f"${comp['cloud_costs']['breakdown']['database']:.2f}",
                    })

                df = pd.DataFrame(data)
                st.dataframe(df, use_container_width=True)

                # Cheapest option
                cheapest = comparisons[0]
                st.success(f"💡 **Recommended:** {cheapest['provider'].upper()} - ${cheapest['monthly_cost']:.2f}/month")

                # Bar chart
                st.markdown("**Monthly Cost Comparison:**")
                import plotly.graph_objects as go

                fig = go.Figure(data=[
                    go.Bar(
                        x=[c["provider"].upper() for c in comparisons],
                        y=[c["monthly_cost"] for c in comparisons],
                        text=[f"${c['monthly_cost']:.2f}" for c in comparisons],
                        textposition='auto',
                    )
                ])
                fig.update_layout(
                    title="Monthly Cost by Provider",
                    xaxis_title="Provider",
                    yaxis_title="Monthly Cost (USD)",
                    height=400,
                )
                st.plotly_chart(fig, use_container_width=True)

            st.divider()

            # Configuration Details
            config = cost_result.get("configuration", {})
            if config:
                st.markdown("**âš™ī¸ Infrastructure Configuration:**")
                col1, col2, col3, col4 = st.columns(4)
                with col1:
                    st.metric("Instances", config.get("instances", 0))
                with col2:
                    st.metric("Instance Type", config.get("instance_type", "N/A").title())
                with col3:
                    st.metric("Storage", f"{config.get('storage_gb', 0)} GB")
                with col4:
                    st.metric("Bandwidth", f"{config.get('bandwidth_gb', 0)} GB/mo")

            st.divider()

            # Comprehensive LLM Analysis
            with st.expander("📄 **COMPREHENSIVE COST ANALYSIS & OPTIMIZATION STRATEGIES** (Click to expand)", expanded=True):
                recommendations_text = cost_result.get("recommendations", "No detailed analysis available")

                if isinstance(recommendations_text, str):
                    st.markdown(recommendations_text)
                else:
                    st.json(recommendations_text)
        else:
            st.warning("No cost estimates available")

    # Security Agent
    with tabs[3]:
        security_result = recommendations.get("security", {})
        if security_result:
            threat = security_result.get("threat_assessment", {})

            # Risk Assessment Metrics
            col1, col2, col3 = st.columns(3)
            with col1:
                priority = threat.get("priority", "N/A")
                color = {"low": "đŸŸĸ", "medium": "🟡", "high": "🟠", "critical": "🔴"}.get(priority.lower(), "âšĒ")
                st.metric("Risk Priority", f"{color} {priority.title()}")

            with col2:
                st.metric("Risk Multiplier", f"{threat.get('risk_multiplier', 0)}x")

            with col3:
                st.metric("Architecture", threat.get("architecture", "N/A").title())

            st.divider()

            # Identified Threats
            threats_data = threat.get("threats", {})
            if threats_data:
                st.markdown("**âš ī¸ Identified Threats by Severity:**")

                for severity, threat_list in threats_data.items():
                    severity_icon = {"high_risk": "🔴", "medium_risk": "🟡", "low_risk": "đŸŸĸ"}.get(severity, "âšĒ")
                    with st.expander(f"{severity_icon} {severity.replace('_', ' ').title()} ({len(threat_list)} threats)"):
                        for t in threat_list:
                            st.write(f"â€ĸ {t}")

            st.divider()

            # Security Checklist
            checklist = security_result.get("security_checklist", {})
            if checklist and "checklist" in checklist:
                st.markdown("**đŸ›Ąī¸ Security Best Practices Checklist:**")

                checklist_data = checklist["checklist"]
                for category, items in list(checklist_data.items())[:6]:  # Show top 6 categories
                    with st.expander(f"**{category.replace('_', ' ').title()}** ({len(items.get('critical', []))} critical items)"):
                        critical_items = items.get("critical", [])
                        recommended_items = items.get("recommended", [])

                        if critical_items:
                            st.markdown("**🔴 Critical:**")
                            for item in critical_items[:5]:  # Top 5
                                st.write(f"â€ĸ {item}")

                        if recommended_items:
                            st.markdown("**🟡 Recommended:**")
                            for item in recommended_items[:3]:  # Top 3
                                st.write(f"â€ĸ {item}")

            st.divider()

            # Compliance Requirements
            if checklist and "compliance_frameworks" in checklist:
                st.markdown("**📋 Compliance Frameworks:**")
                compliance_frameworks = checklist["compliance_frameworks"]

                for framework, requirements in compliance_frameworks.items():
                    with st.expander(f"**{framework.upper()}** ({len(requirements)} key requirements)"):
                        for req in requirements:
                            st.write(f"â€ĸ {req}")

            st.divider()

            # Comprehensive LLM Analysis
            with st.expander("📄 **COMPREHENSIVE SECURITY ASSESSMENT & IMPLEMENTATION GUIDE** (Click to expand)", expanded=True):
                recommendations_text = security_result.get("recommendations", "No detailed analysis available")

                if isinstance(recommendations_text, str):
                    st.markdown(recommendations_text)
                else:
                    st.json(recommendations_text)
        else:
            st.warning("No security assessment available")


def main() -> None:
    """Main application."""
    display_header()
    display_sidebar()

    # Query form
    query, dau_override, budget_override = display_query_form()

    # Submit button
    col1, col2, col3 = st.columns([1, 1, 4])
    with col1:
        submit = st.button("🚀 Get Recommendations", type="primary", use_container_width=True)
    with col2:
        if st.button("🔄 Clear", use_container_width=True):
            st.rerun()

    # Process query
    if submit:
        if not query or len(query) < 10:
            st.error("Please provide a more detailed description (at least 10 characters)")
            return

        with st.spinner("🤖 Analyzing your requirements with 4 AI agents..."):
            # Prepare request
            request_data = {"query": query}
            if dau_override:
                request_data["dau"] = dau_override
            if budget_override:
                request_data["budget_target"] = budget_override

            # Call API
            result = call_api("/recommend", method="POST", data=request_data)

            if result.get("error") or result.get("status") == "error":
                st.error("❌ Failed to get recommendations")
                if result.get("error"):
                    st.error(result["error"])
                return

            # Display results
            st.success("✅ Analysis Complete!")

            # Correlation ID
            st.caption(f"Request ID: `{result.get('correlation_id', 'N/A')}`")

            st.divider()

            # Parsed context
            if "parsed_context" in result:
                display_parsed_context(result["parsed_context"])

            st.divider()

            # Agent results
            if "recommendations" in result:
                display_agent_results(result["recommendations"])

            st.divider()

            # Download results
            col1, col2 = st.columns([1, 5])
            with col1:
                st.download_button(
                    "đŸ“Ĩ Download JSON",
                    data=json.dumps(result, indent=2),
                    file_name="tech_stack_recommendations.json",
                    mime="application/json",
                )


if __name__ == "__main__":
    main()