"""Infrastructure recommendation agent."""
from typing import Any
from .base import BaseAgent, Tool
class InfrastructureKnowledgeTool:
"""Tool to retrieve infrastructure knowledge."""
name = "infrastructure_knowledge"
description = "Search for cloud architecture patterns, deployment strategies, and infrastructure best practices"
def __init__(self, vectorstore: Any = None) -> None:
"""Initialize with optional vector store."""
self.vectorstore = vectorstore
def execute(self, query: str, **kwargs: Any) -> dict[str, Any]:
"""Search for infrastructure knowledge.
Args:
query: Search query
**kwargs: Additional parameters
Returns:
Dictionary with infrastructure patterns
"""
# Mock infrastructure patterns
patterns = {
"microservices": {
"description": "Distributed architecture with independent services",
"best_for": "Large teams, complex domains, independent scaling needs",
"components": ["API Gateway", "Service Mesh", "Container Orchestration"],
"complexity": "high",
"pros": ["Independent deployment", "Technology flexibility", "Fault isolation"],
"cons": ["Operational overhead", "Distributed complexity", "Network latency"],
},
"monolith": {
"description": "Single unified application",
"best_for": "Small teams, simple domains, rapid development",
"components": ["Load Balancer", "Application Server", "Database"],
"complexity": "low",
"pros": ["Simple deployment", "Easy debugging", "Lower latency"],
"cons": ["Scaling limitations", "Technology lock-in", "Deployment risk"],
},
"serverless": {
"description": "Event-driven functions-as-a-service",
"best_for": "Variable workloads, event processing, cost optimization",
"components": ["API Gateway", "Lambda Functions", "Event Bus"],
"complexity": "medium",
"pros": ["Auto-scaling", "Pay-per-use", "No server management"],
"cons": ["Cold starts", "Vendor lock-in", "Debugging challenges"],
},
"jamstack": {
"description": "JavaScript, APIs, and Markup static site architecture",
"best_for": "Content sites, blogs, documentation, marketing pages",
"components": ["CDN", "Static Hosting", "API Services"],
"complexity": "low",
"pros": ["Excellent performance", "Low cost", "High security"],
"cons": ["Build time increases", "Limited dynamic features"],
},
}
cloud_providers = {
"aws": {
"strengths": ["Largest market share", "Most services", "Global reach"],
"weaknesses": ["Complex pricing", "Steep learning curve"],
"best_for": "Enterprise, full-featured needs",
},
"gcp": {
"strengths": ["ML/AI tools", "Kubernetes origins", "Data analytics"],
"weaknesses": ["Smaller service catalog", "Less regions"],
"best_for": "Data-heavy, ML workloads",
},
"azure": {
"strengths": ["Microsoft integration", "Hybrid cloud", "Enterprise support"],
"weaknesses": ["UI complexity", "Service inconsistency"],
"best_for": "Microsoft shops, enterprise",
},
"railway": {
"strengths": ["Simple deployment", "Developer-friendly", "Low cost for startups"],
"weaknesses": ["Limited scale", "Fewer services"],
"best_for": "Prototypes, small apps, demos",
},
}
return {
"patterns": patterns,
"cloud_providers": cloud_providers,
"query": query,
}
class InfrastructureScaleCalculator:
"""Tool to calculate infrastructure requirements."""
name = "scale_calculator"
description = "Calculate infrastructure needs based on traffic and workload patterns"
def execute(
self,
dau: int = 0,
rps: int = 0, # requests per second
workload_type: str = "web",
**kwargs: Any,
) -> dict[str, Any]:
"""Calculate infrastructure requirements.
Args:
dau: Daily active users
rps: Requests per second
workload_type: Type of workload (web, api, background, realtime)
**kwargs: Additional parameters
Returns:
Dictionary with infrastructure recommendations
"""
# Simple estimation logic - check from highest tier to lowest
# Scale based on whichever dimension (DAU or RPS) is larger
if dau >= 500_000 or rps >= 1_000:
tier = "enterprise"
compute = "50+ instances, kubernetes cluster"
deployment = "Multi-region, global CDN, advanced caching"
architecture = "microservices with service mesh"
elif dau >= 50_000 or rps >= 100:
tier = "scale"
compute = "10-20 instances with auto-scaling"
deployment = "Multi-AZ deployment, caching layer"
architecture = "microservices or hybrid"
elif dau >= 1_000 or rps >= 10:
tier = "growth"
compute = "3-5 medium instances (4 vCPU, 8GB RAM)"
deployment = "Single region with load balancing"
architecture = "monolith or modular monolith"
else:
tier = "starter"
compute = "1-2 small instances (2 vCPU, 4GB RAM)"
deployment = "Single region, simple setup"
architecture = "monolith"
return {
"tier": tier,
"dau": dau,
"rps": rps,
"workload_type": workload_type,
"compute_recommendation": compute,
"deployment_strategy": deployment,
"suggested_architecture": architecture,
"load_balancer_needed": dau > 1_000,
"cdn_recommended": dau > 10_000,
"container_orchestration": tier in ["scale", "enterprise"],
}
class InfrastructureAgent(BaseAgent):
"""Agent specialized in infrastructure and deployment recommendations."""
def __init__(self, vectorstore: Any = None) -> None:
"""Initialize the infrastructure agent.
Args:
vectorstore: Optional vector store for RAG
"""
tools: list[Tool] = [
InfrastructureKnowledgeTool(vectorstore), # type: ignore[list-item]
InfrastructureScaleCalculator(), # type: ignore[list-item]
]
super().__init__(
name="infrastructure",
role="cloud architect specializing in infrastructure design and deployment strategies",
tools=tools,
)
async def analyze(self, context: dict[str, Any]) -> dict[str, Any]:
"""Analyze requirements and recommend infrastructure.
Args:
context: Dictionary with keys like:
- user_query: str
- dau: int
- rps: int (requests per second)
- workload_type: str
- budget_conscious: bool
- existing_stack: str (optional)
Returns:
Dictionary with infrastructure recommendations
"""
self.logger.info("infrastructure_analysis_start", context=context)
# Extract context
user_query = context.get("user_query", "")
dau = context.get("dau", 0)
rps = context.get("rps", 0)
workload_type = context.get("workload_type", "web")
budget_conscious = context.get("budget_conscious", True)
existing_stack = context.get("existing_stack", "none")
api_key = context.get("api_key")
# Calculate scale requirements
scale_info = self._execute_tool(
"scale_calculator",
dau=dau,
rps=rps,
workload_type=workload_type,
)
# Get infrastructure knowledge
knowledge = self._execute_tool(
"infrastructure_knowledge",
query=f"{workload_type} infrastructure for {user_query}",
)
# Build prompt for LLM
prompt = f"""Analyze this infrastructure requirement and provide EXTREMELY DETAILED recommendations with complete architecture design:
User Query: {user_query}
Requirements:
- Daily Active Users: {dau:,}
- Requests per Second: {rps}
- Workload Type: {workload_type}
- Budget Conscious: {budget_conscious}
- Existing Stack: {existing_stack}
Scale Assessment: {scale_info['tier']} tier
Compute: {scale_info['compute_recommendation']}
Deployment: {scale_info['deployment_strategy']}
Architecture: {scale_info['suggested_architecture']}
Available Architecture Patterns:
{self._format_patterns(knowledge['patterns'])}
Cloud Provider Options:
{self._format_providers(knowledge['cloud_providers'])}
Provide a COMPREHENSIVE, DETAILED analysis with:
1. **Cloud Provider Recommendation** (200+ words):
- Specific provider (AWS, GCP, Azure, or Railway) with detailed justification
- Compare ALL providers for THIS use case (pros/cons matrix)
- Region recommendations (with latency/compliance considerations)
- Why this provider's services fit your architecture best
- Long-term lock-in considerations and mitigation
- Pricing advantages for this workload
- Support and SLA considerations
2. **Architecture Pattern & Design** (300+ words):
- Detailed architecture choice (microservices, monolith, serverless, hybrid)
- Complete architecture diagram description (components, data flow)
- SPECIFIC services to use (e.g., "AWS ECS Fargate", not just "containers")
- Service mesh recommendation (Istio, Linkerd, AWS App Mesh)
- API Gateway setup (Kong, AWS API Gateway, etc.)
- Authentication/Authorization layer (Cognito, Auth0, custom)
- Service discovery mechanism
- Inter-service communication patterns
- Data consistency patterns (saga, 2PC, eventual consistency)
- Why this architecture beats alternatives for THIS use case
3. **Compute & Deployment Strategy** (250+ words):
- Container orchestration (Kubernetes, ECS, GKE) vs serverless vs VMs
- Specific instance types/sizes with justification
- Auto-scaling policies (CPU/memory thresholds, predictive scaling)
- Deployment strategies (blue-green, canary, rolling)
- CI/CD pipeline setup (GitHub Actions, GitLab CI, Jenkins)
- Infrastructure as Code tools (Terraform, CloudFormation, Pulumi)
- Container image optimization
- Environment management (dev, staging, prod)
4. **Load Balancing & Traffic Management** (200+ words):
- Load balancer type (ALB, NLB, Cloud Load Balancer, Traefik)
- SSL/TLS termination strategy
- Health check configuration
- Session affinity vs stateless design
- Rate limiting and throttling
- DDoS protection (AWS Shield, Cloudflare, Akamai)
- CDN setup (CloudFront, Cloudflare, Fastly) with cache rules
- DNS and global traffic routing
- Circuit breaker patterns
5. **Networking & Security** (200+ words):
- VPC/Network architecture
- Public vs private subnet design
- NAT Gateway/Instance setup
- Security groups/firewall rules (specific ports/protocols)
- Network ACLs
- VPN/VPC peering for hybrid setups
- Service endpoint/private link configuration
- Egress control and monitoring
6. **Monitoring, Observability & Reliability** (250+ words):
- Metrics collection (Prometheus, CloudWatch, Datadog, New Relic)
- Log aggregation (ELK, CloudWatch Logs, Splunk)
- Distributed tracing (Jaeger, X-Ray, Zipkin)
- APM tools and configuration
- Alerting strategy (PagerDuty, OpsGenie)
- SLI/SLO/SLA definitions for this app
- Error budgets and monitoring
- Chaos engineering recommendations
- Disaster recovery plan (RTO/RPO targets)
7. **Performance Optimization** (150+ words):
- Caching layers (application, CDN, database)
- Asset optimization (compression, minification, lazy loading)
- Database connection pooling
- API response optimization
- Async processing patterns
- Message queues (SQS, Kafka, RabbitMQ) setup
8. **Cost Optimization** (150+ words):
- Reserved instances/commitments strategy
- Spot instances where applicable
- Auto-scaling to match demand
- Right-sizing recommendations
- Data transfer cost reduction
- Serverless vs always-on trade-offs
- Cost monitoring and alerts
Respond with extensive, paragraph-form explanations. Include specific service names, versions, configurations, and technical reasoning.
"""
# Get LLM recommendation
response = self._call_llm(prompt, api_key=api_key)
self.logger.info(
"infrastructure_analysis_complete",
tier=scale_info["tier"],
architecture=scale_info["suggested_architecture"],
)
return {
"agent": self.name,
"scale_info": scale_info,
"recommendations": response,
"raw_knowledge": knowledge,
}
def _format_patterns(self, patterns: dict[str, Any]) -> str:
"""Format architecture patterns for prompt."""
lines = []
for name, info in patterns.items():
lines.append(f"\n{name.upper()}:")
lines.append(f" Best for: {info['best_for']}")
lines.append(f" Complexity: {info['complexity']}")
lines.append(f" Components: {', '.join(info['components'])}")
return "\n".join(lines)
def _format_providers(self, providers: dict[str, Any]) -> str:
"""Format cloud providers for prompt."""
lines = []
for name, info in providers.items():
lines.append(f"\n{name.upper()}:")
lines.append(f" Best for: {info['best_for']}")
lines.append(f" Strengths: {', '.join(info['strengths'])}")
return "\n".join(lines)
async def generate_diagram(self, context: dict[str, Any]) -> dict[str, Any]:
"""Generate architecture diagram based on infrastructure context.
Args:
context: Context with user_query, recommendations, scale_info
Returns:
Dictionary with mermaid diagram code
"""
self.logger.info("diagram_generation_start")
user_query = context.get("user_query", "")
recommendations = context.get("recommendations", "")
scale_tier = context.get("scale_tier", "STARTER")
api_key = context.get("api_key")
prompt = f"""Generate a Mermaid architecture diagram for this system:
User Query: {user_query}
Scale Tier: {scale_tier}
Infrastructure Summary: {recommendations[:500] if recommendations else "N/A"}...
Create a comprehensive Mermaid diagram showing the complete system architecture.
IMPORTANT FORMATTING RULES:
1. Use ONLY the "graph TB" syntax (top-to-bottom flow)
2. Include ALL major components based on the scale tier
3. Use proper Mermaid node syntax:
- Regular nodes: [Label]
- Database nodes: [(Database Name)]
- Use --> for arrows
Example structure:
```mermaid
graph TB
User[Users/Clients] --> CDN[CDN - CloudFront]
CDN --> LB[Load Balancer]
LB --> App1[App Server 1]
LB --> App2[App Server 2]
App1 --> Cache[Redis Cache]
App2 --> Cache
App1 --> DB[(PostgreSQL Database)]
App2 --> DB
DB --> Replica[(Read Replica)]
App1 --> Queue[Message Queue]
App2 --> Queue
Queue --> Worker[Background Workers]
App1 --> Monitor[Monitoring]
App2 --> Monitor
```
Generate a diagram with:
- User/Client entry point
- CDN (if scale > STARTER)
- Load Balancer
- Application servers (number based on scale)
- Cache layer
- Database(s) with replicas if needed
- Message queues (if applicable)
- Background workers (if applicable)
- Monitoring/logging
Return ONLY the mermaid code block, nothing else."""
response = self._call_llm(prompt, api_key=api_key)
self.logger.info("diagram_generation_complete")
return {
"diagram": response,
"status": "success"
}