티스토리 뷰

728x90
반응형

목차

     

     

    지난 글에서는 JWT를 이용한 회원가입과 로그인 로직을 구현해보았다.

     

    그러나, 실행시켜보았으면 알겠지만, 회원가입시 비밀번호까지 같이 반환된다거나

     

    로그인이 생각처럼 되지 않는 등 사소한 문제들이 존재한다.

     

    해서 hotfix 개념으로 지난 글에서 몇 가지를 수정해 쓸 수 있도록 만들어보자.

     

    user.py

     

    유저를 그대로 반환하지 않고 특정 필드만 반환하도록 하기 위해 다음과 같이 추가한다.

    from sqlalchemy import Column, String
    from sqlalchemy.ext.declarative import declarative_base
    from pydantic import BaseModel, validator
    import re
    
    Base = declarative_base()
    
    
    class User(Base):
        __tablename__ = "users"
    
        email = Column(String, primary_key=True, index=True)
        password = Column(String())
        nickname = Column(String())
    
    
    class UserCreate(BaseModel):
        email: str
        password: str
        nickname: str
    
    
    class UserInDB(UserCreate):
        pass
    
    
    class UserLogin(BaseModel):
        email: str
        password: str
    
        @validator("email")
        def validate_email(cls, value):
            if not re.match(r"[^@]+@[^@]+\.[^@]+", value):
                raise ValueError("Invalid Email Format")
            return value
    
    # 추가된 부분
    class UserResponse(BaseModel):
        email: str
        nickname: str

    이는 사용자의 필드 중 Response로 사용할 클래스이다. 

     

    auth.py

     

    계속해서 위에서 만든 클래스를 라우터 함수에 저장한다. 구현은 아래와 같다.

    from fastapi import APIRouter, Depends, HTTPException, status
    from sqlalchemy.orm import Session
    from config import db as config
    from models.user import UserCreate, UserLogin, UserResponse
    from services import auth_service
    
    router = APIRouter()
    
    
    @router.post("/signup", response_model=UserResponse) # 변경된 부분
    def signup(user: UserCreate, db: Session = Depends(config.get_db)):
        try:
            db_user = auth_service.create_user(db, user)
            return {"email": db_user.email, "nickname": db_user.nickname}
        except ValueError:
            raise HTTPException(status_code=400, detail="Email Already Registered")
    
    
    @router.post("/login")
    def login(user: UserLogin, db: Session = Depends(config.get_db)):
        try:
            db_user = auth_service.authenticate_user(db, user.email, user.password)
        except ValueError:
            raise HTTPException(status_code=401, detail="Invalid Credentials")
    
        access_token = auth_service.create_access_token({"sub": user.email})
        return {"email": db_user.email, "nickname": db_user.nickname, "access_token": access_token}

    기존의 UserCreate모델에서 방금 만든 UserResponse를 도입하였다.

     

    물론 이렇게 하려면 import문도 손봐주어야 한다.

     

    내가 작업을 하며 다른 부분도 놓쳤을 수 있으니 여기에 전문들 다시 싣는다.

     

    auth.service.py

     

    from datetime import datetime, timedelta
    from sqlalchemy.orm import Session
    from jose import JWTError, jwt
    from passlib.context import CryptContext
    from models.user import User
    
    SECRET_KEY = "ThisIsTheSecretKeyOfFastAPIApplicationWithSQLAlchemyAndPydantic"
    ALGORITHM = "HS256"
    
    pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
    
    
    def get_password_hash(password):
        return pwd_context.hash(password)
    
    
    def verify_password(plain_password, hashed_password):
        return pwd_context.verify(plain_password, hashed_password)
    
    
    def create_access_token(data: dict):
        to_encode = data.copy()
        expire = datetime.utcnow() + timedelta(days=1)
        to_encode.update({"exp": expire})
        encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    
        return encoded_jwt
    
    
    def get_user_by_email(db: Session, email: str):
        return db.query(User).filter(User.email == email).first()
    
    
    def create_user(db: Session, user: User):
        existing_user = get_user_by_email(db, user.email)
        if existing_user:
            raise ValueError("Email Already Registered")
    
        hashed_password = get_password_hash(user.password)
        db_user = User(email=user.email, password=hashed_password, nickname=user.nickname)
        db.add(db_user)
        db.commit()
        db.refresh(db_user)
    
        return db_user
    
    
    def authenticate_user(db: Session, email: str, password: str):
        user = get_user_by_email(db, email)
        if not user:
            raise ValueError("Invalid Credentials")
        if not verify_password(password, user.password):
            raise ValueError("Invalid Credentials")
        return user

     

    db.py

     

    from sqlalchemy import create_engine
    from sqlalchemy.ext.declarative import DeclarativeMeta, declarative_base
    from sqlalchemy.orm import sessionmaker
    from contextlib import contextmanager
    
    DATABASE_URL = "sqlite:///./test.db"
    
    engine = create_engine(DATABASE_URL)
    SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
    
    Base: DeclarativeMeta = declarative_base()
    
    
    def get_db():
        db = SessionLocal()
        try:
            yield db
        finally:
            db.close()

     

    main.py

     

    from fastapi import FastAPI
    from sqlalchemy import create_engine
    from models.user import Base
    from routers import auth as auth_router
    
    DATABASE_URL = "sqlite:///./test.db"
    engine = create_engine(DATABASE_URL)
    Base.metadata.create_all(bind=engine)
    
    app = FastAPI()
    
    app.include_router(auth_router.router, prefix="/api/auth", tags=["auth"])
    
    
    @app.get("/")
    def read_root():
        return {"message": "Hello, World!"}
    반응형
    댓글
    공지사항
    최근에 올라온 글
    최근에 달린 댓글
    Total
    Today
    Yesterday
    링크
    «   2024/06   »
    1
    2 3 4 5 6 7 8
    9 10 11 12 13 14 15
    16 17 18 19 20 21 22
    23 24 25 26 27 28 29
    30
    글 보관함