Tech Stack Advisor - Code Viewer

← Back to File Tree

test_auth_features.py

Language: python | Path: scripts/test_auth_features.py | Lines: 390
#!/usr/bin/env python3
"""Test authentication and user management features on Railway deployment."""
import sys
import argparse
import requests
import json
from typing import Optional

# Railway deployment URL
BASE_URL = "https://ranjana-tech-stack-advisor-production.up.railway.app"

# Admin email (public)
ADMIN_EMAIL = "admin@techstackadvisor.com"

# Test user credentials
TEST_EMAIL = "testuser@example.com"
TEST_PASSWORD = "TestPassword123!"
TEST_NAME = "Test User"


class Colors:
    """ANSI color codes for pretty output."""
    GREEN = '\033[92m'
    RED = '\033[91m'
    YELLOW = '\033[93m'
    BLUE = '\033[94m'
    END = '\033[0m'


def print_success(msg: str):
    print(f"{Colors.GREEN}✅ {msg}{Colors.END}")


def print_error(msg: str):
    print(f"{Colors.RED}❌ {msg}{Colors.END}")


def print_info(msg: str):
    print(f"{Colors.BLUE}ℹ️  {msg}{Colors.END}")


def print_warning(msg: str):
    print(f"{Colors.YELLOW}⚠️  {msg}{Colors.END}")


def print_section(title: str):
    print(f"\n{Colors.BLUE}{'='*60}")
    print(f"  {title}")
    print(f"{'='*60}{Colors.END}\n")


def test_health_check():
    """Test 1: Health check."""
    print_section("Test 1: Health Check")

    try:
        response = requests.get(f"{BASE_URL}/health", timeout=10)
        if response.status_code == 200:
            data = response.json()
            print_success(f"Server is healthy")
            print_info(f"   Version: {data.get('version')}")
            print_info(f"   Agents loaded: {data.get('agents_loaded')}")
            print_info(f"   Uptime: {data.get('uptime_seconds')}s")
            return True
        else:
            print_error(f"Health check failed: {response.status_code}")
            return False
    except Exception as e:
        print_error(f"Health check error: {e}")
        return False


def test_admin_login(admin_password: str) -> Optional[str]:
    """Test 2: Admin login."""
    print_section("Test 2: Admin Login")

    try:
        response = requests.post(
            f"{BASE_URL}/auth/login",
            json={"email": ADMIN_EMAIL, "password": admin_password},
            timeout=10
        )

        if response.status_code == 200:
            data = response.json()
            token = data.get("token")
            user = data.get("user", {})

            if token:
                print_success("Admin login successful")
                print_info(f"   Email: {user.get('email')}")
                print_info(f"   User ID: {user.get('user_id')}")
                print_info(f"   Is Admin: {user.get('is_admin')}")
                print_info(f"   Token: {token[:20]}...")
                return token
            else:
                print_error("Login response missing access_token")
                print_error(f"   Response: {data}")
                return None
        else:
            print_error(f"Login failed: {response.status_code}")
            print_error(f"   Response: {response.text}")
            return None
    except Exception as e:
        print_error(f"Login error: {e}")
        return None


def test_admin_dashboard(admin_token: str):
    """Test 3: Admin dashboard."""
    print_section("Test 3: Admin Dashboard")

    try:
        headers = {"Authorization": f"Bearer {admin_token}"}
        response = requests.get(f"{BASE_URL}/admin/stats", headers=headers, timeout=10)

        if response.status_code == 200:
            data = response.json()
            print_success("Admin dashboard accessible")
            print_info(f"   Total users: {data.get('total_users', 0)}")
            print_info(f"   Total queries: {data.get('total_queries', 0)}")
            print_info(f"   Total cost: ${data.get('total_cost_usd', 0):.4f}")
            print_info(f"   Total feedback: {data.get('total_feedback', 0)}")
            return True
        else:
            print_error(f"Dashboard access failed: {response.status_code}")
            print_error(f"   Response: {response.text}")
            return False
    except Exception as e:
        print_error(f"Dashboard error: {e}")
        return False


def test_user_registration() -> Optional[str]:
    """Test 4: Create regular user."""
    print_section("Test 4: User Registration")

    try:
        response = requests.post(
            f"{BASE_URL}/auth/register",
            json={
                "email": TEST_EMAIL,
                "password": TEST_PASSWORD,
                "full_name": TEST_NAME
            },
            timeout=10
        )

        if response.status_code == 200:
            data = response.json()
            user = data.get("user", {})
            token = data.get("token")

            print_success("User registration successful")
            print_info(f"   Email: {user.get('email')}")
            print_info(f"   Name: {user.get('full_name')}")
            print_info(f"   User ID: {user.get('user_id')}")
            print_info(f"   Is Admin: {user.get('is_admin')}")
            return token
        elif response.status_code == 400 and "already exists" in response.text.lower():
            print_warning("User already exists, trying to login...")

            # Try to login with existing user
            login_response = requests.post(
                f"{BASE_URL}/auth/login",
                json={"email": TEST_EMAIL, "password": TEST_PASSWORD},
                timeout=10
            )

            if login_response.status_code == 200:
                data = login_response.json()
                token = data.get("token")
                print_success("Logged in with existing user")
                return token
            else:
                print_error("Could not login with existing user")
                return None
        else:
            print_error(f"Registration failed: {response.status_code}")
            print_error(f"   Response: {response.text}")
            return None
    except Exception as e:
        print_error(f"Registration error: {e}")
        return None


def test_query_storage(user_token: str):
    """Test 5: Query storage (make a recommendation request)."""
    print_section("Test 5: Query Storage & Recommendation")

    try:
        headers = {"Authorization": f"Bearer {user_token}"}
        response = requests.post(
            f"{BASE_URL}/recommend",
            json={
                "query": "I need a tech stack for a small e-commerce website with 100 daily users",
                "dau": 100
            },
            headers=headers,
            timeout=30
        )

        if response.status_code == 200:
            data = response.json()
            print_success("Recommendation request successful")
            print_info(f"   Correlation ID: {data.get('correlation_id')}")
            print_info(f"   Status: {data.get('status')}")
            print_info(f"   Tokens used: {data.get('usage', {}).get('total_tokens', 0)}")
            print_info(f"   Cost: ${data.get('cost_usd', 0):.4f}")

            recommendations = data.get('recommendations', {})
            if recommendations:
                print_info(f"   Recommendations: {len(recommendations)} categories")

            return True
        else:
            print_error(f"Recommendation failed: {response.status_code}")
            print_error(f"   Response: {response.text[:200]}")
            return False
    except Exception as e:
        print_error(f"Recommendation error: {e}")
        return False


def test_query_history(user_token: str):
    """Test 6: Query history retrieval."""
    print_section("Test 6: Query History")

    try:
        headers = {"Authorization": f"Bearer {user_token}"}
        response = requests.get(f"{BASE_URL}/user/queries", headers=headers, timeout=10)

        if response.status_code == 200:
            data = response.json()
            queries = data.get("queries", [])

            print_success(f"Query history retrieved")
            print_info(f"   Total queries: {len(queries)}")

            if queries:
                latest = queries[0]
                print_info(f"   Latest query: {latest.get('query', '')[:50]}...")
                print_info(f"   Status: {latest.get('status')}")
                print_info(f"   Cost: ${latest.get('cost_usd', 0):.4f}")

            return True
        else:
            print_error(f"Query history failed: {response.status_code}")
            return False
    except Exception as e:
        print_error(f"Query history error: {e}")
        return False


def test_feedback_submission(user_token: str):
    """Test 7: Feedback submission."""
    print_section("Test 7: Feedback Submission")

    try:
        headers = {"Authorization": f"Bearer {user_token}"}
        response = requests.post(
            f"{BASE_URL}/feedback",
            json={
                "rating": 5,
                "comment": "Great recommendations! Very helpful.",
                "feedback_type": "positive"
            },
            headers=headers,
            timeout=10
        )

        if response.status_code == 200:
            data = response.json()
            print_success("Feedback submitted successfully")
            print_info(f"   Feedback ID: {data.get('feedback_id')}")
            return True
        else:
            print_error(f"Feedback submission failed: {response.status_code}")
            return False
    except Exception as e:
        print_error(f"Feedback error: {e}")
        return False


def main():
    """Run all tests."""
    global BASE_URL

    parser = argparse.ArgumentParser(
        description="Test authentication and user management features on Railway"
    )
    parser.add_argument(
        "--admin-password",
        required=True,
        help="Admin password for authentication tests"
    )
    parser.add_argument(
        "--base-url",
        default=BASE_URL,
        help=f"Base URL for the API (default: {BASE_URL})"
    )

    args = parser.parse_args()

    # Update BASE_URL if provided
    BASE_URL = args.base_url

    print("\n" + "="*60)
    print("  Testing Authentication & User Management Features")
    print(f"  Target: {BASE_URL}")
    print("="*60)

    results = {
        "passed": 0,
        "failed": 0,
        "total": 7
    }

    # Test 1: Health check
    if test_health_check():
        results["passed"] += 1
    else:
        results["failed"] += 1
        print_error("Cannot proceed without healthy server")
        sys.exit(1)

    # Test 2: Admin login
    admin_token = test_admin_login(args.admin_password)
    if admin_token:
        results["passed"] += 1
    else:
        results["failed"] += 1
        print_error("Cannot proceed without admin token")
        sys.exit(1)

    # Test 3: Admin dashboard
    if test_admin_dashboard(admin_token):
        results["passed"] += 1
    else:
        results["failed"] += 1

    # Test 4: User registration
    user_token = test_user_registration()
    if user_token:
        results["passed"] += 1
    else:
        results["failed"] += 1
        print_warning("Cannot test user features without user token")

    # Test 5: Query storage (if we have user token)
    if user_token:
        if test_query_storage(user_token):
            results["passed"] += 1
        else:
            results["failed"] += 1

        # Test 6: Query history
        if test_query_history(user_token):
            results["passed"] += 1
        else:
            results["failed"] += 1

        # Test 7: Feedback submission
        if test_feedback_submission(user_token):
            results["passed"] += 1
        else:
            results["failed"] += 1
    else:
        results["failed"] += 3

    # Summary
    print_section("Test Summary")
    print_info(f"Total tests: {results['total']}")
    print_success(f"Passed: {results['passed']}")
    if results['failed'] > 0:
        print_error(f"Failed: {results['failed']}")

    success_rate = (results['passed'] / results['total']) * 100
    print_info(f"Success rate: {success_rate:.1f}%")

    if results['failed'] == 0:
        print_success("\n🎉 All tests passed!")
        sys.exit(0)
    else:
        print_error(f"\n❌ {results['failed']} test(s) failed")
        sys.exit(1)


if __name__ == "__main__":
    main()