""" Authentication Router """ from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from datetime import timedelta from database import get_db from models import User, SystemConfig from schemas import UserCreate, UserLogin, Token, UserResponse from utils import hash_password, verify_password, create_access_token from services.auth_service import get_current_user router = APIRouter() @router.post("/register", response_model=UserResponse, status_code=status.HTTP_201_CREATED) async def register( user_data: UserCreate, db: AsyncSession = Depends(get_db) ): """Register a new user""" # Check if registration is allowed result = await db.execute( select(SystemConfig).where(SystemConfig.key == "allow_registration") ) config = result.scalar_one_or_none() if config and config.value.lower() == "false": raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Registration is currently disabled" ) # Check if username already exists result = await db.execute( select(User).where(User.username == user_data.username) ) existing_user = result.scalar_one_or_none() if existing_user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Username already registered" ) # Create new user new_user = User( username=user_data.username, hashed_password=hash_password(user_data.password), is_admin=False ) db.add(new_user) await db.commit() await db.refresh(new_user) return new_user @router.post("/login", response_model=Token) async def login( user_data: UserLogin, db: AsyncSession = Depends(get_db) ): """Login and get access token""" # Find user result = await db.execute( select(User).where(User.username == user_data.username) ) user = result.scalar_one_or_none() # Verify credentials if not user or not verify_password(user_data.password, user.hashed_password): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) # Create access token access_token = create_access_token( data={"sub": str(user.id)} # JWT 'sub' must be a string ) print(f"✅ Login successful: user={user.username}, id={user.id}") print(f"🔑 Generated token (first 50 chars): {access_token[:50]}...") return { "access_token": access_token, "token_type": "bearer" } @router.get("/me", response_model=UserResponse) async def get_current_user_info( current_user: User = Depends(get_current_user) ): """Get current user information""" return current_user @router.post("/change-password") async def change_password( old_password: str, new_password: str, current_user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db) ): """Change user password""" # Verify old password if not verify_password(old_password, current_user.hashed_password): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Incorrect current password" ) # Validate new password if len(new_password) < 6: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="New password must be at least 6 characters" ) # Update password current_user.hashed_password = hash_password(new_password) await db.commit() return {"message": "Password changed successfully"}