"""Database recommendation agent."""
from typing import Any
from .base import BaseAgent, Tool
class DatabaseKnowledgeTool:
"""Tool to retrieve database knowledge from vector store."""
name = "database_knowledge"
description = "Search technical documentation for database recommendations and best practices"
def __init__(self, vectorstore: Any = None) -> None:
"""Initialize with optional vector store."""
self.vectorstore = vectorstore
def execute(self, query: str, **kwargs: Any) -> dict[str, Any]:
"""Search for relevant database knowledge.
Args:
query: Search query
**kwargs: Additional parameters
Returns:
Dictionary with search results
"""
# TODO: Integrate with actual Qdrant vector store
# For now, return mock data
mock_knowledge = {
"postgresql": {
"description": "Robust ACID-compliant relational database",
"best_for": "Complex queries, transactional workloads, structured data",
"scale": "Up to millions of records with proper indexing",
"pros": ["Strong ACID guarantees", "Rich query capabilities", "Mature ecosystem"],
"cons": ["Vertical scaling limits", "Complex sharding"],
},
"mongodb": {
"description": "Document-oriented NoSQL database",
"best_for": "Flexible schemas, rapid iteration, document storage",
"scale": "Horizontal scaling via sharding",
"pros": ["Flexible schema", "Easy horizontal scaling", "Good for JSON data"],
"cons": ["Weaker consistency guarantees", "Complex transactions"],
},
"redis": {
"description": "In-memory key-value store",
"best_for": "Caching, session storage, real-time data",
"scale": "Millions of operations per second",
"pros": ["Extremely fast", "Rich data structures", "Pub/sub support"],
"cons": ["Memory-bound", "Data persistence trade-offs"],
},
"cassandra": {
"description": "Distributed wide-column store",
"best_for": "Massive scale, write-heavy workloads, time-series data",
"scale": "Petabyte-scale with linear scalability",
"pros": ["Excellent write performance", "No single point of failure"],
"cons": ["Complex operations", "Eventual consistency"],
},
}
return {
"results": mock_knowledge,
"query": query,
}
class DatabaseScaleEstimator:
"""Tool to estimate database requirements based on scale."""
name = "scale_estimator"
description = "Estimate database requirements based on expected load (DAU, QPS, data volume)"
def execute(self, dau: int = 0, qps: int = 0, data_gb: int = 0, **kwargs: Any) -> dict[str, Any]:
"""Estimate database requirements.
Args:
dau: Daily active users
qps: Queries per second
data_gb: Expected data volume in GB
**kwargs: Additional parameters
Returns:
Dictionary with scale estimates
"""
# Simple heuristic-based estimation - check from highest tier to lowest
# Scale based on whichever dimension (DAU or QPS) is larger
if dau >= 1_000_000 or qps >= 10_000:
tier = "enterprise"
recommendation = "Multi-region, auto-sharding, extensive caching, CDN"
elif dau >= 100_000 or qps >= 1_000:
tier = "large"
recommendation = "Sharded setup, dedicated caching, read replicas"
elif dau >= 10_000 or qps >= 100:
tier = "medium"
recommendation = "Primary + read replicas, consider caching layer"
else:
tier = "small"
recommendation = "Single instance with read replicas"
return {
"tier": tier,
"dau": dau,
"qps": qps,
"data_gb": data_gb,
"recommendation": recommendation,
"estimated_connections": min(qps * 2, 10_000),
"cache_recommended": qps > 100,
}
class DatabaseAgent(BaseAgent):
"""Agent specialized in database recommendations."""
def __init__(self, vectorstore: Any = None) -> None:
"""Initialize the database agent.
Args:
vectorstore: Optional vector store for RAG
"""
tools: list[Tool] = [
DatabaseKnowledgeTool(vectorstore), # type: ignore[list-item]
DatabaseScaleEstimator(), # type: ignore[list-item]
]
super().__init__(
name="database",
role="database architect specializing in selecting the right database technologies",
tools=tools,
)
async def analyze(self, context: dict[str, Any]) -> dict[str, Any]:
"""Analyze requirements and recommend databases.
Args:
context: Dictionary with keys like:
- user_query: str
- dau: int (daily active users)
- qps: int (queries per second)
- data_type: str (structured/unstructured/time-series)
- consistency: str (strong/eventual)
Returns:
Dictionary with database recommendations
"""
self.logger.info("database_analysis_start", context=context)
# Extract context
user_query = context.get("user_query", "")
dau = context.get("dau", 0)
qps = context.get("qps", 0)
data_gb = context.get("data_gb", 10)
data_type = context.get("data_type", "structured")
consistency = context.get("consistency", "strong")
api_key = context.get("api_key")
# Get scale estimates
scale_info = self._execute_tool(
"scale_estimator",
dau=dau,
qps=qps,
data_gb=data_gb,
)
# Get database knowledge
knowledge = self._execute_tool(
"database_knowledge",
query=f"{data_type} database for {user_query}",
)
# Build prompt for LLM
prompt = f"""Analyze this database requirement and provide EXTREMELY DETAILED recommendations with full technical explanations:
User Query: {user_query}
Requirements:
- Daily Active Users: {dau:,}
- Queries per Second: {qps}
- Data Volume: {data_gb} GB
- Data Type: {data_type}
- Consistency: {consistency}
Scale Assessment: {scale_info['tier']} tier
Recommendation: {scale_info['recommendation']}
Cache Recommended: {scale_info['cache_recommended']}
Available Database Options:
{self._format_db_knowledge(knowledge['results'])}
Provide a COMPREHENSIVE, DETAILED analysis with:
1. **Primary Database Recommendation** (200+ words):
- Specific database (PostgreSQL 15, MongoDB 6, etc.) and detailed rationale
- Why this database is PERFECT for THIS specific use case
- DETAILED comparison with 2-3 alternatives (pros/cons for this workload)
- Specific features of this database you'll leverage
- Query patterns this database excels at
- Data model design considerations
- ACID properties and consistency guarantees
- Licensing and cost implications
2. **Caching Strategy** (200+ words):
- Detailed caching architecture (Redis, Memcached, or application-level)
- WHAT to cache (query results, sessions, computed data, etc.)
- HOW to cache (cache-aside, write-through, write-behind)
- Cache invalidation strategy (TTL values, event-driven)
- Cache hit ratio expectations and monitoring
- Memory sizing recommendations
- Cache warming strategies
- Fallback mechanisms when cache fails
- Cost-benefit analysis of caching
3. **Scaling Approach** (250+ words):
- Vertical scaling limits and recommendations
- Horizontal scaling strategy (sharding, partitioning, read replicas)
- SPECIFIC sharding key recommendations for this use case
- Read replica configuration (how many, where, when)
- Write scaling strategies
- Connection pooling setup (PgBouncer, ProxySQL)
- Database clustering approach
- Backup and replication strategy
- Failover and high availability setup
- Performance tuning (indexes, query optimization)
- Monitoring metrics to track
4. **Schema Design & Data Modeling** (150+ words):
- Recommended schema approach for this use case
- Indexing strategy (which columns, which types)
- Normalization vs denormalization decisions
- Partitioning strategies (time-based, hash-based)
- JSON/JSONB usage (if applicable)
- Full-text search considerations
- Geospatial data handling (if relevant)
5. **Operational Considerations** (150+ words):
- Backup strategy (frequency, retention, testing)
- Disaster recovery plan (RTO, RPO targets)
- Migration path from simpler setups
- Upgrade strategy
- Security (encryption, access control, audit logs)
- Performance monitoring tools
- Cost optimization (reserved instances, storage tiers)
6. **Alternative Solutions** (150+ words):
- Alternative #1: [Database name] - When to use instead, trade-offs
- Alternative #2: [Database name] - When to use instead, trade-offs
- Alternative #3: [Database name] - When to use instead, trade-offs
- Decision matrix to help choose
- Migration difficulty between options
Respond with extensive, paragraph-form explanations. Be specific with version numbers, configuration values, and technical details.
"""
# Get LLM recommendation
response = self._call_llm(prompt, api_key=api_key)
self.logger.info(
"database_analysis_complete",
scale_tier=scale_info["tier"],
cache_recommended=scale_info["cache_recommended"],
)
return {
"agent": self.name,
"scale_info": scale_info,
"recommendations": response,
"raw_knowledge": knowledge,
}
def _format_db_knowledge(self, knowledge: dict[str, Any]) -> str:
"""Format database knowledge for prompt.
Args:
knowledge: Database knowledge dictionary
Returns:
Formatted string
"""
lines = []
for db_name, info in knowledge.items():
lines.append(f"\n{db_name.upper()}:")
lines.append(f" Description: {info['description']}")
lines.append(f" Best for: {info['best_for']}")
lines.append(f" Pros: {', '.join(info['pros'])}")
lines.append(f" Cons: {', '.join(info['cons'])}")
return "\n".join(lines)