← Back to File Tree
test_auth_features.py
Language: python |
Path: scripts/test_auth_features.py |
Lines: 390
#!/usr/bin/env python3
"""Test authentication and user management features on Railway deployment."""
import sys
import argparse
import requests
import json
from typing import Optional
# Railway deployment URL
BASE_URL = "https://ranjana-tech-stack-advisor-production.up.railway.app"
# Admin email (public)
ADMIN_EMAIL = "admin@techstackadvisor.com"
# Test user credentials
TEST_EMAIL = "testuser@example.com"
TEST_PASSWORD = "TestPassword123!"
TEST_NAME = "Test User"
class Colors:
"""ANSI color codes for pretty output."""
GREEN = '\033[92m'
RED = '\033[91m'
YELLOW = '\033[93m'
BLUE = '\033[94m'
END = '\033[0m'
def print_success(msg: str):
print(f"{Colors.GREEN}✅ {msg}{Colors.END}")
def print_error(msg: str):
print(f"{Colors.RED}❌ {msg}{Colors.END}")
def print_info(msg: str):
print(f"{Colors.BLUE}ℹ️ {msg}{Colors.END}")
def print_warning(msg: str):
print(f"{Colors.YELLOW}⚠️ {msg}{Colors.END}")
def print_section(title: str):
print(f"\n{Colors.BLUE}{'='*60}")
print(f" {title}")
print(f"{'='*60}{Colors.END}\n")
def test_health_check():
"""Test 1: Health check."""
print_section("Test 1: Health Check")
try:
response = requests.get(f"{BASE_URL}/health", timeout=10)
if response.status_code == 200:
data = response.json()
print_success(f"Server is healthy")
print_info(f" Version: {data.get('version')}")
print_info(f" Agents loaded: {data.get('agents_loaded')}")
print_info(f" Uptime: {data.get('uptime_seconds')}s")
return True
else:
print_error(f"Health check failed: {response.status_code}")
return False
except Exception as e:
print_error(f"Health check error: {e}")
return False
def test_admin_login(admin_password: str) -> Optional[str]:
"""Test 2: Admin login."""
print_section("Test 2: Admin Login")
try:
response = requests.post(
f"{BASE_URL}/auth/login",
json={"email": ADMIN_EMAIL, "password": admin_password},
timeout=10
)
if response.status_code == 200:
data = response.json()
token = data.get("token")
user = data.get("user", {})
if token:
print_success("Admin login successful")
print_info(f" Email: {user.get('email')}")
print_info(f" User ID: {user.get('user_id')}")
print_info(f" Is Admin: {user.get('is_admin')}")
print_info(f" Token: {token[:20]}...")
return token
else:
print_error("Login response missing access_token")
print_error(f" Response: {data}")
return None
else:
print_error(f"Login failed: {response.status_code}")
print_error(f" Response: {response.text}")
return None
except Exception as e:
print_error(f"Login error: {e}")
return None
def test_admin_dashboard(admin_token: str):
"""Test 3: Admin dashboard."""
print_section("Test 3: Admin Dashboard")
try:
headers = {"Authorization": f"Bearer {admin_token}"}
response = requests.get(f"{BASE_URL}/admin/stats", headers=headers, timeout=10)
if response.status_code == 200:
data = response.json()
print_success("Admin dashboard accessible")
print_info(f" Total users: {data.get('total_users', 0)}")
print_info(f" Total queries: {data.get('total_queries', 0)}")
print_info(f" Total cost: ${data.get('total_cost_usd', 0):.4f}")
print_info(f" Total feedback: {data.get('total_feedback', 0)}")
return True
else:
print_error(f"Dashboard access failed: {response.status_code}")
print_error(f" Response: {response.text}")
return False
except Exception as e:
print_error(f"Dashboard error: {e}")
return False
def test_user_registration() -> Optional[str]:
"""Test 4: Create regular user."""
print_section("Test 4: User Registration")
try:
response = requests.post(
f"{BASE_URL}/auth/register",
json={
"email": TEST_EMAIL,
"password": TEST_PASSWORD,
"full_name": TEST_NAME
},
timeout=10
)
if response.status_code == 200:
data = response.json()
user = data.get("user", {})
token = data.get("token")
print_success("User registration successful")
print_info(f" Email: {user.get('email')}")
print_info(f" Name: {user.get('full_name')}")
print_info(f" User ID: {user.get('user_id')}")
print_info(f" Is Admin: {user.get('is_admin')}")
return token
elif response.status_code == 400 and "already exists" in response.text.lower():
print_warning("User already exists, trying to login...")
# Try to login with existing user
login_response = requests.post(
f"{BASE_URL}/auth/login",
json={"email": TEST_EMAIL, "password": TEST_PASSWORD},
timeout=10
)
if login_response.status_code == 200:
data = login_response.json()
token = data.get("token")
print_success("Logged in with existing user")
return token
else:
print_error("Could not login with existing user")
return None
else:
print_error(f"Registration failed: {response.status_code}")
print_error(f" Response: {response.text}")
return None
except Exception as e:
print_error(f"Registration error: {e}")
return None
def test_query_storage(user_token: str):
"""Test 5: Query storage (make a recommendation request)."""
print_section("Test 5: Query Storage & Recommendation")
try:
headers = {"Authorization": f"Bearer {user_token}"}
response = requests.post(
f"{BASE_URL}/recommend",
json={
"query": "I need a tech stack for a small e-commerce website with 100 daily users",
"dau": 100
},
headers=headers,
timeout=30
)
if response.status_code == 200:
data = response.json()
print_success("Recommendation request successful")
print_info(f" Correlation ID: {data.get('correlation_id')}")
print_info(f" Status: {data.get('status')}")
print_info(f" Tokens used: {data.get('usage', {}).get('total_tokens', 0)}")
print_info(f" Cost: ${data.get('cost_usd', 0):.4f}")
recommendations = data.get('recommendations', {})
if recommendations:
print_info(f" Recommendations: {len(recommendations)} categories")
return True
else:
print_error(f"Recommendation failed: {response.status_code}")
print_error(f" Response: {response.text[:200]}")
return False
except Exception as e:
print_error(f"Recommendation error: {e}")
return False
def test_query_history(user_token: str):
"""Test 6: Query history retrieval."""
print_section("Test 6: Query History")
try:
headers = {"Authorization": f"Bearer {user_token}"}
response = requests.get(f"{BASE_URL}/user/queries", headers=headers, timeout=10)
if response.status_code == 200:
data = response.json()
queries = data.get("queries", [])
print_success(f"Query history retrieved")
print_info(f" Total queries: {len(queries)}")
if queries:
latest = queries[0]
print_info(f" Latest query: {latest.get('query', '')[:50]}...")
print_info(f" Status: {latest.get('status')}")
print_info(f" Cost: ${latest.get('cost_usd', 0):.4f}")
return True
else:
print_error(f"Query history failed: {response.status_code}")
return False
except Exception as e:
print_error(f"Query history error: {e}")
return False
def test_feedback_submission(user_token: str):
"""Test 7: Feedback submission."""
print_section("Test 7: Feedback Submission")
try:
headers = {"Authorization": f"Bearer {user_token}"}
response = requests.post(
f"{BASE_URL}/feedback",
json={
"rating": 5,
"comment": "Great recommendations! Very helpful.",
"feedback_type": "positive"
},
headers=headers,
timeout=10
)
if response.status_code == 200:
data = response.json()
print_success("Feedback submitted successfully")
print_info(f" Feedback ID: {data.get('feedback_id')}")
return True
else:
print_error(f"Feedback submission failed: {response.status_code}")
return False
except Exception as e:
print_error(f"Feedback error: {e}")
return False
def main():
"""Run all tests."""
global BASE_URL
parser = argparse.ArgumentParser(
description="Test authentication and user management features on Railway"
)
parser.add_argument(
"--admin-password",
required=True,
help="Admin password for authentication tests"
)
parser.add_argument(
"--base-url",
default=BASE_URL,
help=f"Base URL for the API (default: {BASE_URL})"
)
args = parser.parse_args()
# Update BASE_URL if provided
BASE_URL = args.base_url
print("\n" + "="*60)
print(" Testing Authentication & User Management Features")
print(f" Target: {BASE_URL}")
print("="*60)
results = {
"passed": 0,
"failed": 0,
"total": 7
}
# Test 1: Health check
if test_health_check():
results["passed"] += 1
else:
results["failed"] += 1
print_error("Cannot proceed without healthy server")
sys.exit(1)
# Test 2: Admin login
admin_token = test_admin_login(args.admin_password)
if admin_token:
results["passed"] += 1
else:
results["failed"] += 1
print_error("Cannot proceed without admin token")
sys.exit(1)
# Test 3: Admin dashboard
if test_admin_dashboard(admin_token):
results["passed"] += 1
else:
results["failed"] += 1
# Test 4: User registration
user_token = test_user_registration()
if user_token:
results["passed"] += 1
else:
results["failed"] += 1
print_warning("Cannot test user features without user token")
# Test 5: Query storage (if we have user token)
if user_token:
if test_query_storage(user_token):
results["passed"] += 1
else:
results["failed"] += 1
# Test 6: Query history
if test_query_history(user_token):
results["passed"] += 1
else:
results["failed"] += 1
# Test 7: Feedback submission
if test_feedback_submission(user_token):
results["passed"] += 1
else:
results["failed"] += 1
else:
results["failed"] += 3
# Summary
print_section("Test Summary")
print_info(f"Total tests: {results['total']}")
print_success(f"Passed: {results['passed']}")
if results['failed'] > 0:
print_error(f"Failed: {results['failed']}")
success_rate = (results['passed'] / results['total']) * 100
print_info(f"Success rate: {success_rate:.1f}%")
if results['failed'] == 0:
print_success("\n🎉 All tests passed!")
sys.exit(0)
else:
print_error(f"\n❌ {results['failed']} test(s) failed")
sys.exit(1)
if __name__ == "__main__":
main()