"""Security assessment and recommendation agent."""
from typing import Any
from .base import BaseAgent, Tool
class SecurityChecklistTool:
"""Tool to retrieve security best practices checklist."""
name = "security_checklist"
description = "Get security best practices and compliance requirements for tech stacks"
def execute(self, stack_type: str = "web", **kwargs: Any) -> dict[str, Any]:
"""Get security checklist for stack type.
Args:
stack_type: Type of stack (web, api, mobile, iot)
**kwargs: Additional parameters
Returns:
Dictionary with security checklist
"""
# Security best practices by category
checklist = {
"authentication": {
"critical": [
"Implement multi-factor authentication (MFA)",
"Use OAuth 2.0 / OpenID Connect for third-party auth",
"Enforce strong password policies (min 12 chars, complexity)",
"Implement rate limiting on auth endpoints",
"Use secure session management (httpOnly, secure, SameSite cookies)",
],
"recommended": [
"Implement passwordless authentication (WebAuthn)",
"Add CAPTCHA for sensitive operations",
"Monitor for suspicious login patterns",
],
},
"data_protection": {
"critical": [
"Encrypt data at rest (AES-256)",
"Encrypt data in transit (TLS 1.3)",
"Implement database encryption",
"Secure API keys and secrets (use secrets manager)",
"Regular automated backups with encryption",
],
"recommended": [
"Implement field-level encryption for PII",
"Use HSM for key management",
"Implement data retention policies",
],
},
"network_security": {
"critical": [
"Use VPC/private networks for internal services",
"Implement Web Application Firewall (WAF)",
"Configure security groups/firewall rules (principle of least privilege)",
"Enable DDoS protection",
"Use CDN for additional DDoS mitigation",
],
"recommended": [
"Implement network segmentation",
"Use service mesh for microservices",
"Enable VPN for admin access",
],
},
"application_security": {
"critical": [
"Input validation and sanitization (prevent XSS, SQL injection)",
"Implement CSRF protection",
"Set secure HTTP headers (CSP, HSTS, X-Frame-Options)",
"Regular dependency scanning (Snyk, Dependabot)",
"Implement proper error handling (no sensitive info in errors)",
],
"recommended": [
"Use Content Security Policy (CSP)",
"Implement Subresource Integrity (SRI)",
"Regular penetration testing",
"Code security reviews",
],
},
"access_control": {
"critical": [
"Implement Role-Based Access Control (RBAC)",
"Principle of least privilege for all services",
"Use IAM roles instead of static credentials",
"Audit logs for all access (who, what, when)",
],
"recommended": [
"Implement Attribute-Based Access Control (ABAC)",
"Regular access reviews",
"Just-in-time access provisioning",
],
},
"monitoring": {
"critical": [
"Centralized logging with retention policies",
"Real-time security monitoring and alerting",
"Failed authentication attempt monitoring",
"Anomaly detection",
],
"recommended": [
"SIEM integration",
"Security incident response playbooks",
"Regular security audits",
],
},
}
compliance = {
"gdpr": ["Data encryption", "Right to erasure", "Data portability", "Consent management"],
"hipaa": ["PHI encryption", "Audit logs", "Access controls", "BAA with cloud provider"],
"pci_dss": ["Tokenization", "Network segmentation", "Regular scans", "Restricted access"],
"soc2": ["Access controls", "Encryption", "Monitoring", "Incident response"],
}
return {
"checklist": checklist,
"compliance_frameworks": compliance,
"stack_type": stack_type,
}
class ThreatModelingTool:
"""Tool to identify potential security threats."""
name = "threat_modeling"
description = "Identify potential security threats based on architecture and data sensitivity"
def execute(
self,
architecture: str = "monolith",
data_sensitivity: str = "medium",
public_facing: bool = True,
**kwargs: Any,
) -> dict[str, Any]:
"""Perform threat modeling.
Args:
architecture: Architecture type (monolith, microservices, serverless)
data_sensitivity: Data sensitivity level (low, medium, high, critical)
public_facing: Whether application is public-facing
**kwargs: Additional parameters
Returns:
Dictionary with identified threats and mitigations
"""
threats = {
"monolith": {
"high_risk": [
"Single point of failure - entire app compromised if breached",
"Lateral movement - attacker can access all features",
"Deployment risk - security updates require full deployment",
],
"medium_risk": [
"Dependency vulnerabilities affect entire application",
"Credential theft impacts all services",
],
},
"microservices": {
"high_risk": [
"Increased attack surface - multiple entry points",
"Service-to-service authentication complexity",
"API gateway as single point of failure",
"Inter-service communication vulnerabilities",
],
"medium_risk": [
"Configuration drift across services",
"Distributed tracing of security events",
],
},
"serverless": {
"high_risk": [
"Function-level vulnerabilities",
"Over-privileged IAM roles",
"Supply chain attacks via dependencies",
"Cold start timing attacks",
],
"medium_risk": [
"Event injection attacks",
"Resource exhaustion (cost-based DoS)",
],
},
}
severity_multipliers = {
"low": 0.5,
"medium": 1.0,
"high": 1.5,
"critical": 2.0,
}
multiplier = severity_multipliers.get(data_sensitivity, 1.0)
if public_facing:
multiplier *= 1.3
arch_threats = threats.get(architecture, threats["monolith"])
return {
"architecture": architecture,
"data_sensitivity": data_sensitivity,
"public_facing": public_facing,
"risk_multiplier": round(multiplier, 2),
"threats": arch_threats,
"priority": "critical" if multiplier > 1.8 else "high" if multiplier > 1.2 else "medium",
}
class SecurityAgent(BaseAgent):
"""Agent specialized in security assessment and recommendations."""
def __init__(self) -> None:
"""Initialize the security agent."""
tools: list[Tool] = [
SecurityChecklistTool(), # type: ignore[list-item]
ThreatModelingTool(), # type: ignore[list-item]
]
super().__init__(
name="security",
role="security engineer specializing in application security and compliance",
tools=tools,
)
async def analyze(self, context: dict[str, Any]) -> dict[str, Any]:
"""Analyze tech stack for security concerns and recommendations.
Args:
context: Dictionary with keys like:
- user_query: str
- architecture: str (from infrastructure agent)
- database_type: str (from database agent)
- data_sensitivity: str (low, medium, high, critical)
- compliance_required: list[str] (gdpr, hipaa, pci_dss, soc2)
- public_facing: bool
Returns:
Dictionary with security recommendations
"""
self.logger.info("security_analysis_start", context=context)
# Extract context
user_query = context.get("user_query", "")
architecture = context.get("architecture", "monolith")
data_sensitivity = context.get("data_sensitivity", "medium")
compliance_required = context.get("compliance_required", [])
public_facing = context.get("public_facing", True)
api_key = context.get("api_key")
# Infer from query if not provided
if "healthcare" in user_query.lower() or "medical" in user_query.lower():
compliance_required.append("hipaa")
data_sensitivity = "critical"
if "payment" in user_query.lower() or "credit card" in user_query.lower():
compliance_required.append("pci_dss")
data_sensitivity = "critical"
if "gdpr" in user_query.lower() or "europe" in user_query.lower():
compliance_required.append("gdpr")
# Get security checklist
checklist = self._execute_tool(
"security_checklist",
stack_type="web",
)
# Perform threat modeling
threats = self._execute_tool(
"threat_modeling",
architecture=architecture,
data_sensitivity=data_sensitivity,
public_facing=public_facing,
)
# Build prompt for LLM
prompt = f"""Perform a COMPREHENSIVE, DETAILED security assessment for this tech stack with extensive explanations:
User Query: {user_query}
Configuration:
- Architecture: {architecture}
- Data Sensitivity: {data_sensitivity}
- Public Facing: {public_facing}
- Compliance Required: {', '.join(compliance_required) if compliance_required else 'None'}
Threat Assessment:
- Risk Priority: {threats['priority']}
- Risk Multiplier: {threats['risk_multiplier']}x
- High Risk Threats: {len(threats['threats'].get('high_risk', []))}
- Medium Risk Threats: {len(threats['threats'].get('medium_risk', []))}
Key Threats:
{self._format_threats(threats['threats'])}
Security Checklist Categories:
{self._format_checklist_summary(checklist['checklist'])}
Compliance Requirements:
{self._format_compliance(checklist['compliance_frameworks'], compliance_required)}
Provide an EXTREMELY DETAILED security analysis with:
1. **Threat Analysis & Risk Assessment** (300+ words):
- Detailed analysis of EACH high-risk threat for THIS specific architecture
- Attack vectors and exploitation techniques
- Likelihood and impact assessment for each threat
- Threat actors and motivations (script kiddies, organized crime, nation states)
- Data breach scenarios and consequences
- Financial/reputational impact of security incidents
- Threat prioritization matrix
- Why THIS stack is particularly vulnerable to these threats
2. **Critical Security Priorities** (250+ words):
- Top 8-10 security priorities ranked by urgency for THIS stack
- For EACH priority:
* Detailed description of the vulnerability/gap
* Why it's critical for THIS use case
* Specific implementation steps
* Timeline and effort estimate
* Success metrics
* Dependencies and blockers
- Quick wins (implement in 1 week)
- Medium-term improvements (1-3 months)
- Long-term strategic initiatives (3-12 months)
3. **Security Architecture & Controls** (300+ words):
- Defense-in-depth strategy with specific layers
- Authentication architecture (OAuth 2.0, SAML, custom)
- Authorization model (RBAC, ABAC, policies)
- Session management approach
- Secret management (AWS Secrets Manager, HashiCorp Vault, etc.)
- Encryption strategy (at-rest, in-transit, in-use)
- Key management architecture
- API security (rate limiting, JWT validation, CORS, CSP)
- Input validation and sanitization framework
- Security headers configuration (HSTS, CSP, X-Frame-Options)
- Network segmentation strategy
- Zero-trust architecture implementation
4. **Compliance Implementation** (200+ words for each required framework):
- DETAILED step-by-step guide for EACH compliance requirement
- Specific controls to implement
- Documentation and audit trail requirements
- Third-party certifications needed
- Data residency and sovereignty considerations
- User consent and data rights (GDPR)
- Breach notification procedures
- Data retention and deletion policies
- Vendor management (BAAs for HIPAA, etc.)
- Regular compliance audits schedule
5. **Security Tools & Services** (250+ words):
- WAF (AWS WAF, Cloudflare, Imperva) configuration
- SIEM tools (Splunk, ELK, Datadog Security)
- Vulnerability scanning (Nessus, Qualys, OpenVAS) schedule
- Dependency scanning (Snyk, WhiteSource, Dependabot) integration
- DAST tools (OWASP ZAP, Burp Suite, Acunetix)
- SAST tools (SonarQube, Checkmarx, Veracode)
- Container security (Aqua, Twistlock, Sysdig)
- Secret scanning (GitGuardian, TruffleHog)
- DDoS protection setup
- Intrusion detection/prevention systems
- Security monitoring dashboards
6. **Security Testing & Validation** (200+ words):
- Penetration testing strategy (frequency, scope, vendors)
- Security code review process
- Threat modeling workshops
- Red team/blue team exercises
- Bug bounty program recommendations
- Security regression testing
- Compliance audits and certifications
- Security metrics to track (MTTR, vulnerabilities found, patching time)
7. **Incident Response & Recovery** (200+ words):
- Incident response plan structure
- Security incident classification (P0-P4)
- Escalation procedures and contact trees
- Incident detection and alerting
- Containment strategies
- Evidence preservation and forensics
- Communication plans (internal, external, regulatory)
- Post-incident review process
- Disaster recovery and business continuity
- Backup and restore procedures
- RTO/RPO targets
8. **Security Culture & Training** (100+ words):
- Security awareness training for developers
- Secure coding practices
- Security champions program
- Phishing simulations
- Third-party vendor security assessment
- Supply chain security
Respond with extensive, paragraph-form explanations with specific product names, configurations, and implementation details. Be thorough and actionable.
"""
# Get LLM recommendation
response = self._call_llm(prompt, api_key=api_key)
self.logger.info(
"security_analysis_complete",
priority=threats["priority"],
compliance_required=len(compliance_required),
)
return {
"agent": self.name,
"threat_assessment": threats,
"security_checklist": checklist,
"recommendations": response,
}
def _format_threats(self, threats: dict[str, list[str]]) -> str:
"""Format threats for prompt."""
lines = []
for severity, threat_list in threats.items():
lines.append(f"\n{severity.upper().replace('_', ' ')}:")
for threat in threat_list[:3]: # Top 3 per category
lines.append(f" - {threat}")
return "\n".join(lines)
def _format_checklist_summary(self, checklist: dict[str, Any]) -> str:
"""Format checklist summary."""
lines = []
for category, items in checklist.items():
critical_count = len(items.get("critical", []))
lines.append(f"- {category.replace('_', ' ').title()}: "
f"{critical_count} critical items")
return "\n".join(lines)
def _format_compliance(
self,
frameworks: dict[str, list[str]],
required: list[str],
) -> str:
"""Format compliance requirements."""
if not required:
return "None specified"
lines = []
for framework in required:
if framework in frameworks:
lines.append(f"\n{framework.upper()}:")
for req in frameworks[framework][:3]: # Top 3 requirements
lines.append(f" - {req}")
return "\n".join(lines) if lines else "None specified"