Tech Stack Advisor - Code Viewer

← Back to File Tree

security.py

Language: python | Path: backend/src/agents/security.py | Lines: 445
"""Security assessment and recommendation agent."""
from typing import Any
from .base import BaseAgent, Tool


class SecurityChecklistTool:
    """Tool to retrieve security best practices checklist."""

    name = "security_checklist"
    description = "Get security best practices and compliance requirements for tech stacks"

    def execute(self, stack_type: str = "web", **kwargs: Any) -> dict[str, Any]:
        """Get security checklist for stack type.

        Args:
            stack_type: Type of stack (web, api, mobile, iot)
            **kwargs: Additional parameters

        Returns:
            Dictionary with security checklist
        """
        # Security best practices by category
        checklist = {
            "authentication": {
                "critical": [
                    "Implement multi-factor authentication (MFA)",
                    "Use OAuth 2.0 / OpenID Connect for third-party auth",
                    "Enforce strong password policies (min 12 chars, complexity)",
                    "Implement rate limiting on auth endpoints",
                    "Use secure session management (httpOnly, secure, SameSite cookies)",
                ],
                "recommended": [
                    "Implement passwordless authentication (WebAuthn)",
                    "Add CAPTCHA for sensitive operations",
                    "Monitor for suspicious login patterns",
                ],
            },
            "data_protection": {
                "critical": [
                    "Encrypt data at rest (AES-256)",
                    "Encrypt data in transit (TLS 1.3)",
                    "Implement database encryption",
                    "Secure API keys and secrets (use secrets manager)",
                    "Regular automated backups with encryption",
                ],
                "recommended": [
                    "Implement field-level encryption for PII",
                    "Use HSM for key management",
                    "Implement data retention policies",
                ],
            },
            "network_security": {
                "critical": [
                    "Use VPC/private networks for internal services",
                    "Implement Web Application Firewall (WAF)",
                    "Configure security groups/firewall rules (principle of least privilege)",
                    "Enable DDoS protection",
                    "Use CDN for additional DDoS mitigation",
                ],
                "recommended": [
                    "Implement network segmentation",
                    "Use service mesh for microservices",
                    "Enable VPN for admin access",
                ],
            },
            "application_security": {
                "critical": [
                    "Input validation and sanitization (prevent XSS, SQL injection)",
                    "Implement CSRF protection",
                    "Set secure HTTP headers (CSP, HSTS, X-Frame-Options)",
                    "Regular dependency scanning (Snyk, Dependabot)",
                    "Implement proper error handling (no sensitive info in errors)",
                ],
                "recommended": [
                    "Use Content Security Policy (CSP)",
                    "Implement Subresource Integrity (SRI)",
                    "Regular penetration testing",
                    "Code security reviews",
                ],
            },
            "access_control": {
                "critical": [
                    "Implement Role-Based Access Control (RBAC)",
                    "Principle of least privilege for all services",
                    "Use IAM roles instead of static credentials",
                    "Audit logs for all access (who, what, when)",
                ],
                "recommended": [
                    "Implement Attribute-Based Access Control (ABAC)",
                    "Regular access reviews",
                    "Just-in-time access provisioning",
                ],
            },
            "monitoring": {
                "critical": [
                    "Centralized logging with retention policies",
                    "Real-time security monitoring and alerting",
                    "Failed authentication attempt monitoring",
                    "Anomaly detection",
                ],
                "recommended": [
                    "SIEM integration",
                    "Security incident response playbooks",
                    "Regular security audits",
                ],
            },
        }

        compliance = {
            "gdpr": ["Data encryption", "Right to erasure", "Data portability", "Consent management"],
            "hipaa": ["PHI encryption", "Audit logs", "Access controls", "BAA with cloud provider"],
            "pci_dss": ["Tokenization", "Network segmentation", "Regular scans", "Restricted access"],
            "soc2": ["Access controls", "Encryption", "Monitoring", "Incident response"],
        }

        return {
            "checklist": checklist,
            "compliance_frameworks": compliance,
            "stack_type": stack_type,
        }


class ThreatModelingTool:
    """Tool to identify potential security threats."""

    name = "threat_modeling"
    description = "Identify potential security threats based on architecture and data sensitivity"

    def execute(
        self,
        architecture: str = "monolith",
        data_sensitivity: str = "medium",
        public_facing: bool = True,
        **kwargs: Any,
    ) -> dict[str, Any]:
        """Perform threat modeling.

        Args:
            architecture: Architecture type (monolith, microservices, serverless)
            data_sensitivity: Data sensitivity level (low, medium, high, critical)
            public_facing: Whether application is public-facing
            **kwargs: Additional parameters

        Returns:
            Dictionary with identified threats and mitigations
        """
        threats = {
            "monolith": {
                "high_risk": [
                    "Single point of failure - entire app compromised if breached",
                    "Lateral movement - attacker can access all features",
                    "Deployment risk - security updates require full deployment",
                ],
                "medium_risk": [
                    "Dependency vulnerabilities affect entire application",
                    "Credential theft impacts all services",
                ],
            },
            "microservices": {
                "high_risk": [
                    "Increased attack surface - multiple entry points",
                    "Service-to-service authentication complexity",
                    "API gateway as single point of failure",
                    "Inter-service communication vulnerabilities",
                ],
                "medium_risk": [
                    "Configuration drift across services",
                    "Distributed tracing of security events",
                ],
            },
            "serverless": {
                "high_risk": [
                    "Function-level vulnerabilities",
                    "Over-privileged IAM roles",
                    "Supply chain attacks via dependencies",
                    "Cold start timing attacks",
                ],
                "medium_risk": [
                    "Event injection attacks",
                    "Resource exhaustion (cost-based DoS)",
                ],
            },
        }

        severity_multipliers = {
            "low": 0.5,
            "medium": 1.0,
            "high": 1.5,
            "critical": 2.0,
        }

        multiplier = severity_multipliers.get(data_sensitivity, 1.0)
        if public_facing:
            multiplier *= 1.3

        arch_threats = threats.get(architecture, threats["monolith"])

        return {
            "architecture": architecture,
            "data_sensitivity": data_sensitivity,
            "public_facing": public_facing,
            "risk_multiplier": round(multiplier, 2),
            "threats": arch_threats,
            "priority": "critical" if multiplier > 1.8 else "high" if multiplier > 1.2 else "medium",
        }


class SecurityAgent(BaseAgent):
    """Agent specialized in security assessment and recommendations."""

    def __init__(self) -> None:
        """Initialize the security agent."""
        tools: list[Tool] = [
            SecurityChecklistTool(),  # type: ignore[list-item]
            ThreatModelingTool(),  # type: ignore[list-item]
        ]
        super().__init__(
            name="security",
            role="security engineer specializing in application security and compliance",
            tools=tools,
        )

    async def analyze(self, context: dict[str, Any]) -> dict[str, Any]:
        """Analyze tech stack for security concerns and recommendations.

        Args:
            context: Dictionary with keys like:
                - user_query: str
                - architecture: str (from infrastructure agent)
                - database_type: str (from database agent)
                - data_sensitivity: str (low, medium, high, critical)
                - compliance_required: list[str] (gdpr, hipaa, pci_dss, soc2)
                - public_facing: bool

        Returns:
            Dictionary with security recommendations
        """
        self.logger.info("security_analysis_start", context=context)

        # Extract context
        user_query = context.get("user_query", "")
        architecture = context.get("architecture", "monolith")
        data_sensitivity = context.get("data_sensitivity", "medium")
        compliance_required = context.get("compliance_required", [])
        public_facing = context.get("public_facing", True)
        api_key = context.get("api_key")

        # Infer from query if not provided
        if "healthcare" in user_query.lower() or "medical" in user_query.lower():
            compliance_required.append("hipaa")
            data_sensitivity = "critical"
        if "payment" in user_query.lower() or "credit card" in user_query.lower():
            compliance_required.append("pci_dss")
            data_sensitivity = "critical"
        if "gdpr" in user_query.lower() or "europe" in user_query.lower():
            compliance_required.append("gdpr")

        # Get security checklist
        checklist = self._execute_tool(
            "security_checklist",
            stack_type="web",
        )

        # Perform threat modeling
        threats = self._execute_tool(
            "threat_modeling",
            architecture=architecture,
            data_sensitivity=data_sensitivity,
            public_facing=public_facing,
        )

        # Build prompt for LLM
        prompt = f"""Perform a COMPREHENSIVE, DETAILED security assessment for this tech stack with extensive explanations:

User Query: {user_query}

Configuration:
- Architecture: {architecture}
- Data Sensitivity: {data_sensitivity}
- Public Facing: {public_facing}
- Compliance Required: {', '.join(compliance_required) if compliance_required else 'None'}

Threat Assessment:
- Risk Priority: {threats['priority']}
- Risk Multiplier: {threats['risk_multiplier']}x
- High Risk Threats: {len(threats['threats'].get('high_risk', []))}
- Medium Risk Threats: {len(threats['threats'].get('medium_risk', []))}

Key Threats:
{self._format_threats(threats['threats'])}

Security Checklist Categories:
{self._format_checklist_summary(checklist['checklist'])}

Compliance Requirements:
{self._format_compliance(checklist['compliance_frameworks'], compliance_required)}

Provide an EXTREMELY DETAILED security analysis with:

1. **Threat Analysis & Risk Assessment** (300+ words):
   - Detailed analysis of EACH high-risk threat for THIS specific architecture
   - Attack vectors and exploitation techniques
   - Likelihood and impact assessment for each threat
   - Threat actors and motivations (script kiddies, organized crime, nation states)
   - Data breach scenarios and consequences
   - Financial/reputational impact of security incidents
   - Threat prioritization matrix
   - Why THIS stack is particularly vulnerable to these threats

2. **Critical Security Priorities** (250+ words):
   - Top 8-10 security priorities ranked by urgency for THIS stack
   - For EACH priority:
     * Detailed description of the vulnerability/gap
     * Why it's critical for THIS use case
     * Specific implementation steps
     * Timeline and effort estimate
     * Success metrics
     * Dependencies and blockers
   - Quick wins (implement in 1 week)
   - Medium-term improvements (1-3 months)
   - Long-term strategic initiatives (3-12 months)

3. **Security Architecture & Controls** (300+ words):
   - Defense-in-depth strategy with specific layers
   - Authentication architecture (OAuth 2.0, SAML, custom)
   - Authorization model (RBAC, ABAC, policies)
   - Session management approach
   - Secret management (AWS Secrets Manager, HashiCorp Vault, etc.)
   - Encryption strategy (at-rest, in-transit, in-use)
   - Key management architecture
   - API security (rate limiting, JWT validation, CORS, CSP)
   - Input validation and sanitization framework
   - Security headers configuration (HSTS, CSP, X-Frame-Options)
   - Network segmentation strategy
   - Zero-trust architecture implementation

4. **Compliance Implementation** (200+ words for each required framework):
   - DETAILED step-by-step guide for EACH compliance requirement
   - Specific controls to implement
   - Documentation and audit trail requirements
   - Third-party certifications needed
   - Data residency and sovereignty considerations
   - User consent and data rights (GDPR)
   - Breach notification procedures
   - Data retention and deletion policies
   - Vendor management (BAAs for HIPAA, etc.)
   - Regular compliance audits schedule

5. **Security Tools & Services** (250+ words):
   - WAF (AWS WAF, Cloudflare, Imperva) configuration
   - SIEM tools (Splunk, ELK, Datadog Security)
   - Vulnerability scanning (Nessus, Qualys, OpenVAS) schedule
   - Dependency scanning (Snyk, WhiteSource, Dependabot) integration
   - DAST tools (OWASP ZAP, Burp Suite, Acunetix)
   - SAST tools (SonarQube, Checkmarx, Veracode)
   - Container security (Aqua, Twistlock, Sysdig)
   - Secret scanning (GitGuardian, TruffleHog)
   - DDoS protection setup
   - Intrusion detection/prevention systems
   - Security monitoring dashboards

6. **Security Testing & Validation** (200+ words):
   - Penetration testing strategy (frequency, scope, vendors)
   - Security code review process
   - Threat modeling workshops
   - Red team/blue team exercises
   - Bug bounty program recommendations
   - Security regression testing
   - Compliance audits and certifications
   - Security metrics to track (MTTR, vulnerabilities found, patching time)

7. **Incident Response & Recovery** (200+ words):
   - Incident response plan structure
   - Security incident classification (P0-P4)
   - Escalation procedures and contact trees
   - Incident detection and alerting
   - Containment strategies
   - Evidence preservation and forensics
   - Communication plans (internal, external, regulatory)
   - Post-incident review process
   - Disaster recovery and business continuity
   - Backup and restore procedures
   - RTO/RPO targets

8. **Security Culture & Training** (100+ words):
   - Security awareness training for developers
   - Secure coding practices
   - Security champions program
   - Phishing simulations
   - Third-party vendor security assessment
   - Supply chain security

Respond with extensive, paragraph-form explanations with specific product names, configurations, and implementation details. Be thorough and actionable.
"""

        # Get LLM recommendation
        response = self._call_llm(prompt, api_key=api_key)

        self.logger.info(
            "security_analysis_complete",
            priority=threats["priority"],
            compliance_required=len(compliance_required),
        )

        return {
            "agent": self.name,
            "threat_assessment": threats,
            "security_checklist": checklist,
            "recommendations": response,
        }

    def _format_threats(self, threats: dict[str, list[str]]) -> str:
        """Format threats for prompt."""
        lines = []
        for severity, threat_list in threats.items():
            lines.append(f"\n{severity.upper().replace('_', ' ')}:")
            for threat in threat_list[:3]:  # Top 3 per category
                lines.append(f"  - {threat}")
        return "\n".join(lines)

    def _format_checklist_summary(self, checklist: dict[str, Any]) -> str:
        """Format checklist summary."""
        lines = []
        for category, items in checklist.items():
            critical_count = len(items.get("critical", []))
            lines.append(f"- {category.replace('_', ' ').title()}: "
                        f"{critical_count} critical items")
        return "\n".join(lines)

    def _format_compliance(
        self,
        frameworks: dict[str, list[str]],
        required: list[str],
    ) -> str:
        """Format compliance requirements."""
        if not required:
            return "None specified"

        lines = []
        for framework in required:
            if framework in frameworks:
                lines.append(f"\n{framework.upper()}:")
                for req in frameworks[framework][:3]:  # Top 3 requirements
                    lines.append(f"  - {req}")
        return "\n".join(lines) if lines else "None specified"