티스토리 뷰

728x90
반응형

목차

     

     

    지난 글에서는 Like, Follow 구현을 하며 여기저기 잘못된 코드를 수정했다.

     

    이번 글에선 마이페이지, 즉 사용자의 프로필, 피드, 코멘트, 팔로워 등을 위한 로직을 작성할 예정이다.

     

    그런데 이 기능을 구현하려고 보니 이번에는 피드와 코멘트, 회원 모델에 생성/수정 일자를 빼먹었다는 것을 깨달았다.

     

    이를 위한 구현을 하나하나 보여주기엔 글이 늘어질 것 같아, 완성본이 올라가 있는 깃허브 주소를 공유한다.

     

    https://github.com/gnidinger/FastAPI_SQLAlchemy_Pydantic

     

    GitHub - gnidinger/FastAPI_SQLAlchemy_Pydantic

    Contribute to gnidinger/FastAPI_SQLAlchemy_Pydantic development by creating an account on GitHub.

    github.com

    그러니까 위 레포지토리에 가면 이번 글에서 구현한 코드까지 완전판으로 볼 수 있다는 뜻이다.

     

    이후의 글은 조회를 위한 주요 기능, 그러니까 정렬과 페이지네이션에 중점을 두고 진행할 것이다.

     

    길이가 얼마나 될지는 모르지만, 시작해 보자.

     

    mypage_service.py

     

    먼저 코드 전체를 보고 반복되는 중요한 부분만 체크하자.

    from sqlalchemy import select, desc, asc, func
    from sqlalchemy.ext.asyncio import AsyncSession
    from fastapi import HTTPException
    from models.user import User
    from models.feed import Feed, FeedResponse
    from models.comment import Comment, CommentResponse
    from models.follow import Follow
    import logging
    
    logging.basicConfig(level=logging.DEBUG)
    
    
    async def get_user_profile(db: AsyncSession, user_email: str):
        # 사용자 프로필 정보 가져오기
        query = select(User).where(User.email == user_email)
        result = await db.execute(query)
        user_profile = result.scalar_one_or_none()
    
        if not user_profile:
            raise HTTPException(status_code=404, detail="User profile not found.")
    
        user_data = user_profile.__dict__
        user_data.pop("password", None)  # 비밀번호 정보 제거
    
        return user_data
    
    
    async def get_user_feeds(
        db: AsyncSession,
        email: str,
        skip: int = 0,
        limit: int = 10,
        sort_by: str = "create_dt_desc",
    ):
        # 사용자 이메일에 해당하는 피드 조회 쿼리
        query = select(Feed, User.nickname).join(User, User.email == Feed.author_email)
    
        condition = User.email == email
        query = query.where(condition)
    
        if sort_by == "create_dt_desc":
            query = query.order_by(desc(Feed.create_dt))
        elif sort_by == "create_dt_asc":
            query = query.order_by(asc(Feed.create_dt))
        elif sort_by == "update_dt_desc":
            query = query.order_by(desc(Feed.update_dt))
        elif sort_by == "update_dt_asc":
            query = query.order_by(asc(Feed.update_dt))
    
        total_count_result = await db.execute(select(func.count()).select_from(Feed).where(condition))
        total_count = total_count_result.scalar_one_or_none()
        if total_count is None:
            total_count = 0
    
        total_count = int(total_count)
    
        query = query.offset(skip).limit(limit)
    
        feeds_result = await db.execute(query)
        feeds = feeds_result.all()
        if feeds is None:
            feeds = []
    
        feed_responses = []
    
        for feed, nickname in feeds:
            feed_dict = FeedResponse(
                id=feed.id,
                title=feed.title,
                content=feed.content,
                author_email=feed.author_email,
                author_nickname=nickname,
                image_urls=feed.image_urls,
                create_dt=feed.create_dt,
                update_dt=feed.update_dt,
            )
            feed_responses.append(feed_dict.model_dump())
    
        return total_count, feed_responses
    
    
    async def get_user_comments(
        db: AsyncSession,
        user_email: str,
        skip: int = 0,
        limit: int = 10,
        sort_by: str = "create_dt_desc",
    ):
        # 사용자 이메일에 해당하는 댓글 조회 쿼리
        query = select(Comment, User.nickname).join(User, User.email == Comment.author_email)
        condition = User.email == user_email
        query = query.where(condition)
    
        if sort_by == "create_dt_desc":
            query = query.order_by(desc(Comment.create_dt))
        elif sort_by == "create_dt_asc":
            query = query.order_by(asc(Comment.create_dt))
        elif sort_by == "update_dt_desc":
            query = query.order_by(desc(Comment.update_dt))
        elif sort_by == "update_dt_asc":
            query = query.order_by(asc(Comment.update_dt))
    
        total_count_result = await db.execute(
            select(func.count()).select_from(Comment).where(condition)
        )
        total_count = total_count_result.scalar_one_or_none()
        if total_count is None:
            total_count = 0
    
        total_count = int(total_count)
    
        query = query.offset(skip).limit(limit)
    
        comments_result = await db.execute(query)
        comments = comments_result.all()
        if comments is None:
            comments = []
    
        comment_responses = []
    
        for comment, nickname in comments:
            comment_dict = CommentResponse(
                id=comment.id,
                content=comment.content,
                author_email=comment.author_email,
                author_nickname=nickname,
                feed_id=comment.feed_id,
                create_dt=comment.create_dt,
                update_dt=comment.update_dt,
            )
            comment_responses.append(comment_dict.model_dump())
    
        return {"total_count": total_count, "comments": comment_responses}
    
    
    async def get_user_followers(db: AsyncSession, user_id: int, skip: int = 0, limit: int = 10):
        # 전체 팔로워 수를 구하는 쿼리
        count_query = (
            select(func.count())
            .select_from(User)
            .join(Follow, Follow.follower_id == User.id)
            .where(Follow.following_id == user_id)
        )
        total_count_result = await db.execute(count_query)
        total_count = total_count_result.scalar_one_or_none()
        if total_count is None:
            total_count = 0
    
        # 사용자를 팔로우하는 사람들의 목록 가져오기
        query = (
            select(User)
            .join(Follow, Follow.follower_id == User.id)
            .where(Follow.following_id == user_id)
            .offset(skip)
            .limit(limit)
        )
        result = await db.execute(query)
        followers = result.scalars().all()
    
        def extract_profile(user):
            return {
                "email": user.email,
                "nickname": user.nickname,
            }
    
        return {"total_count": total_count, "followers": [extract_profile(user) for user in followers]}
    
    
    async def get_user_followings(db: AsyncSession, user_id: int, skip: int = 0, limit: int = 10):
        # 전체 팔로잉 수를 구하는 쿼리
        count_query = (
            select(func.count())
            .select_from(User)
            .join(Follow, Follow.following_id == User.id)
            .where(Follow.follower_id == user_id)
        )
        total_count_result = await db.execute(count_query)
        total_count = total_count_result.scalar_one_or_none()
        if total_count is None:
            total_count = 0
    
        # 사용자가 팔로우하는 사람들의 목록 가져오기
        query = (
            select(User)
            .join(Follow, Follow.following_id == User.id)
            .where(Follow.follower_id == user_id)
            .offset(skip)
            .limit(limit)
        )
        result = await db.execute(query)
        followings = result.scalars().all()
    
        def extract_profile(user):
            return {
                "email": user.email,
                "nickname": user.nickname,
            }
    
        return {
            "total_count": total_count,
            "followings": [extract_profile(user) for user in followings],
        }

    200줄이 넘는 코드이지만 비슷한 패턴이 반복되기 때문에 별 어려울 것이 없다.

     

    가장 위부터 하나씩 살펴보자.

     

    get_user_profile

     

    async def get_user_profile(db: AsyncSession, user_email: str):
        # 사용자 프로필 정보 가져오기
        query = select(User).where(User.email == user_email)
        result = await db.execute(query)
        user_profile = result.scalar_one_or_none()
    
        if not user_profile:
            raise HTTPException(status_code=404, detail="User profile not found.")
    
        user_data = user_profile.__dict__
        user_data.pop("password", None)  # 비밀번호 정보 제거
    
        return user_data

    이름대로 사용자 프로필 정보를 가져오는 함수이다.

     

    2.0 스타일 쿼리와 예외 처리는 지난 글에서 확인했으니 넘어가자.

     

    특이하면서도 굉장히 편리한 부분은 아래 두 줄인데,

        user_data = user_profile.__dict__
        user_data.pop("password", None)  # 비밀번호 정보 제거

    가져온 사용자 정보를 딕셔너리 형태로 저장한 뒤 비밀번호 정보를 제거하는 기능을 한다.

     

    위 코드가 없으면 사용자 정보를 모두 리턴하기 때문에 비밀번호까지 노출된다.

     

    딕셔너리에 사용자 정보를 담고, "password"키와 관련된 값을 제거해 리턴하는 게 새로웠다.

     

    get_user_feeds

     

    async def get_user_feeds(
        db: AsyncSession,
        email: str,
        skip: int = 0,
        limit: int = 10,
        sort_by: str = "create_dt_desc",
    ):
        # 사용자 이메일에 해당하는 피드 조회 쿼리
        query = select(Feed, User.nickname).join(User, User.email == Feed.author_email)
    
        condition = User.email == email
        query = query.where(condition)
    
        if sort_by == "create_dt_desc":
            query = query.order_by(desc(Feed.create_dt))
        elif sort_by == "create_dt_asc":
            query = query.order_by(asc(Feed.create_dt))
        elif sort_by == "update_dt_desc":
            query = query.order_by(desc(Feed.update_dt))
        elif sort_by == "update_dt_asc":
            query = query.order_by(asc(Feed.update_dt))
    
        total_count_result = await db.execute(select(func.count()).select_from(Feed).where(condition))
        total_count = total_count_result.scalar_one_or_none()
        if total_count is None:
            total_count = 0
    
        total_count = int(total_count)
    
        query = query.offset(skip).limit(limit)
    
        feeds_result = await db.execute(query)
        feeds = feeds_result.all()
        if feeds is None:
            feeds = []
    
        feed_responses = []
    
        for feed, nickname in feeds:
            feed_dict = FeedResponse(
                id=feed.id,
                title=feed.title,
                content=feed.content,
                author_email=feed.author_email,
                author_nickname=nickname,
                image_urls=feed.image_urls,
                create_dt=feed.create_dt,
                update_dt=feed.update_dt,
            )
            feed_responses.append(feed_dict.model_dump())
    
        return total_count, feed_responses

    역시 이름대로 사용자가 작성한 피드를 가져오는 부분이다.

     

    정렬 로직과 페이지네이션 때문에 코드가 길어졌는데, 대부분은 별 거 아닌 내용이다.

        # 사용자 이메일에 해당하는 피드 조회 쿼리
        query = select(Feed, User.nickname).join(User, User.email == Feed.author_email)

    먼저 Feed와 User 테이블을 조인해 사용자의 닉네임과 피드 정보를 가져오는 쿼리를 생성한다.

        condition = User.email == email
        query = query.where(condition)

    계속해서 사용자의 이메일에 해당하는 조건을 설정하고, 쿼리에 적용해 둔다.

        if sort_by == "create_dt_desc":
            query = query.order_by(desc(Feed.create_dt))
        elif sort_by == "create_dt_asc":
            query = query.order_by(asc(Feed.create_dt))
        elif sort_by == "update_dt_desc":
            query = query.order_by(desc(Feed.update_dt))
        elif sort_by == "update_dt_asc":
            query = query.order_by(asc(Feed.update_dt))

    파라미터로 받은 sort_by의 값에 따라 피드 정렬 방식을 설정한다.

     

    여기서는 생성/수정 날짜로 내림/오름차순을 구현했다.

        total_count_result = await db.execute(select(func.count()).select_from(Feed).where(condition))
        total_count = total_count_result.scalar_one_or_none()
        if total_count is None:
            total_count = 0

    다음으로 사용자의 이메일에 해당하는 피드의 총개수를 조회하는 쿼리를 실행한다.

     

    총개수가 None인 경우 0으로 설정하고 정수로 저장한다.

        query = query.offset(skip).limit(limit)

    파라미터로 주어진 skip과 limit를 이용해 조회 범위를 설정한다.

        feeds_result = await db.execute(query)
        feeds = feeds_result.all()
        if feeds is None:
            feeds = []

    설정된 조회 범위에 해당하는 피드 정보를 디비에서 조회한다.

     

    피드가 없는 경우에는 빈 리스트로 초기화한다.

        feed_responses = []
    
        for feed, nickname in feeds:
            feed_dict = FeedResponse(
                id=feed.id,
                title=feed.title,
                content=feed.content,
                author_email=feed.author_email,
                author_nickname=nickname,
                image_urls=feed.image_urls,
                create_dt=feed.create_dt,
                update_dt=feed.update_dt,
            )
            feed_responses.append(feed_dict.model_dump())

    반환할 피드 응답 리스트를 생성하고,

     

    각각의 피드 정보와 사용자 닉네임을 사용해 FeedResponse 객체를 생성한 뒤 해당 리스트에 채워준다.

        return total_count, feed_responses

    피드의 총 개수와 응답 리스트를 반환한다.

     

    get_user_followers

     

    async def get_user_followers(db: AsyncSession, user_id: int, skip: int = 0, limit: int = 10):
        # 전체 팔로워 수를 구하는 쿼리
        count_query = (
            select(func.count())
            .select_from(User)
            .join(Follow, Follow.follower_id == User.id)
            .where(Follow.following_id == user_id)
        )
        total_count_result = await db.execute(count_query)
        total_count = total_count_result.scalar_one_or_none()
        if total_count is None:
            total_count = 0
    
        # 사용자를 팔로우하는 사람들의 목록 가져오기
        query = (
            select(User)
            .join(Follow, Follow.follower_id == User.id)
            .where(Follow.following_id == user_id)
            .offset(skip)
            .limit(limit)
        )
        result = await db.execute(query)
        followers = result.scalars().all()
    
        def extract_profile(user):
            return {
                "email": user.email,
                "nickname": user.nickname,
            }
    
        return {"total_count": total_count, "followers": [extract_profile(user) for user in followers]}

    코멘트 부분은 피드와 크게 다를 것이 없으므로 넘어간다.

     

    mypage_router.py

     

    계속해서 위 로직을 호출하는 라우터 모듈을 보자.

     

    페이지네이션 작업을 이곳에서 해준다.

    from fastapi import APIRouter, Depends, HTTPException
    from sqlalchemy.ext.asyncio import AsyncSession
    from config.db import get_db
    from services.mypage_service import (
        get_user_profile,
        get_user_feeds,
        get_user_comments,
        get_user_followers,
        get_user_followings,
    )
    from services.auth_service import get_current_user_authorization
    
    router = APIRouter()
    
    
    @router.get("/profile")
    async def user_profile(
        db: AsyncSession = Depends(get_db), email: str = Depends(get_current_user_authorization)
    ):
        profile = await get_user_profile(db, email)
        if not profile:
            raise HTTPException(status_code=404, detail="User profile not found.")
        return profile
    
    
    @router.get("/feeds")
    async def user_feeds(
        db: AsyncSession = Depends(get_db),
        email: str = Depends(get_current_user_authorization),
        skip: int = 0,
        limit: int = 10,
        sort_by: str = "create_dt_desc",
    ):
        total_count, feeds = await get_user_feeds(db, email, skip, limit, sort_by)
    
        current_page = (skip // limit) + 1
        total_pages = -(-total_count // limit)
        is_last_page = (skip + limit) >= total_count
    
        return {
            "feeds": feeds,
            "pagination": {
                "current_page": current_page,
                "total_pages": total_pages,
                "is_last_page": is_last_page,
                "total_count": total_count,
            },
        }
    
    
    @router.get("/comments")
    async def user_comments(
        db: AsyncSession = Depends(get_db),
        email: str = Depends(get_current_user_authorization),
        skip: int = 0,
        limit: int = 10,
        sort_by: str = "create_dt_desc",
    ):
        result = await get_user_comments(db, email, skip, limit, sort_by)
        total_count = result["total_count"]
        comments = result["comments"]
    
        current_page = (skip // limit) + 1
        total_pages = -(-total_count // limit)  # Ceiling division in Python
        is_last_page = (skip + limit) >= total_count
    
        return {
            "comments": comments,
            "pagination": {
                "current_page": current_page,
                "total_pages": total_pages,
                "is_last_page": is_last_page,
                "total_count": total_count,
            },
        }
    
    
    @router.get("/{user_id}/followers")
    async def user_followers(
        user_id: int, skip: int = 0, limit: int = 10, db: AsyncSession = Depends(get_db)
    ):
        result_dict = await get_user_followers(db, user_id, skip, limit)
    
        total_count = result_dict["total_count"]
        followers = result_dict["followers"]
    
        current_page = (skip // limit) + 1
        total_pages = -(-total_count // limit)
        is_last_page = (skip + limit) >= total_count
    
        return {
            "followers": followers,
            "pagination": {
                "current_page": current_page,
                "total_pages": total_pages,
                "is_last_page": is_last_page,
                "total_count": total_count,
            },
        }
    
    
    @router.get("/{user_id}/followings")
    async def user_followings(
        user_id: int, skip: int = 0, limit: int = 10, db: AsyncSession = Depends(get_db)
    ):
        result_dict = await get_user_followings(db, user_id, skip, limit)
    
        total_count = result_dict["total_count"]
        followings = result_dict["followings"]
    
        current_page = (skip // limit) + 1
        total_pages = -(-total_count // limit)
        is_last_page = (skip + limit) >= total_count
    
        return {
            "followings": followings,
            "pagination": {
                "current_page": current_page,
                "total_pages": total_pages,
                "is_last_page": is_last_page,
                "total_count": total_count,
            },
        }

     

    user_feeds

     

    @router.get("/feeds")
    async def user_feeds(
        db: AsyncSession = Depends(get_db),
        email: str = Depends(get_current_user_authorization),
        skip: int = 0,
        limit: int = 10,
        sort_by: str = "create_dt_desc",
    ):
        total_count, feeds = await get_user_feeds(db, email, skip, limit, sort_by)
    
        current_page = (skip // limit) + 1
        total_pages = -(-total_count // limit)
        is_last_page = (skip + limit) >= total_count
    
        return {
            "feeds": feeds,
            "pagination": {
                "current_page": current_page,
                "total_pages": total_pages,
                "is_last_page": is_last_page,
                "total_count": total_count,
            },
        }

    사용자 프로필 조회 함수는 건너뛰고 바로 피드로 오자.

     

    async def user_feeds(
        db: AsyncSession = Depends(get_db),
        email: str = Depends(get_current_user_authorization),
        skip: int = 0,
        limit: int = 10,
        sort_by: str = "create_dt_desc",
    ):

    user_feeds 비동기 함수는 다섯 개의 파라미터를 받는다.

     

    특히 skip, limit, sort_by를 각각 받아 이후의 페이지네이션에 사용한다.

        total_count, feeds = await get_user_feeds(db, email, skip, limit, sort_by)

    먼저 위에서 정의한 get_user_feeds 함수를 비동기 호출하여 사용자의 총 피드 수와 피드 리스트를 받아온다.

        current_page = (skip // limit) + 1
        total_pages = -(-total_count // limit)
        is_last_page = (skip + limit) >= total_count

    이어서 페이지 번호, 총 페이지 수, 마지막 페이지 여부를 계산한다.

     

    총 페이지 수에서 굳이

        total_pages = -(-total_count // limit)

    이런 식의 요상한 계산을 하는 이유는 분자를 분모로 나눈 뒤 올림하기 위해서이다.

        return {
            "feeds": feeds,
            "pagination": {
                "current_page": current_page,
                "total_pages": total_pages,
                "is_last_page": is_last_page,
                "total_count": total_count,
            },
        }

    마지막으로 클라이언트에 반환할 응답 객체를 생성한다.

     

    해당 객체는 피드 리스트와 페이지네이션 정보를 포함한다.

     

    이제와 굳이 이런 말 하긴 뭐 하지만, 실로 간결하고 간편하다.

     

    user_comments

     

    @router.get("/comments")
    async def user_comments(
        db: AsyncSession = Depends(get_db),
        email: str = Depends(get_current_user_authorization),
        skip: int = 0,
        limit: int = 10,
        sort_by: str = "create_dt_desc",
    ):
        result = await get_user_comments(db, email, skip, limit, sort_by)
        total_count = result["total_count"]
        comments = result["comments"]
    
        current_page = (skip // limit) + 1
        total_pages = -(-total_count // limit)  # Ceiling division in Python
        is_last_page = (skip + limit) >= total_count
    
        return {
            "comments": comments,
            "pagination": {
                "current_page": current_page,
                "total_pages": total_pages,
                "is_last_page": is_last_page,
                "total_count": total_count,
            },
        }

    위 코드는 비슷하지만 조금 다르게 구현되었다.

     

    다른 부분만 살펴보고 가자.

        result = await get_user_comments(db, email, skip, limit, sort_by)

    서비스 로직에서도 확인할 수 있듯이, 함수를 비동기적으로 호출해 result 딕셔너리를 받는다.

     

    이 딕셔너리에는

        total_count = result["total_count"]
        comments = result["comments"]

    위와 같은 정보가 담겨 있는데, 굳이 이렇게 할 필요 없이 피드처럼 처리해도 괜찮다.

     

    아니 오히려 그렇게 처리하는 것이 일관성 측면에서는 더 좋을 것이다.

     

    하지만 나는 이렇게 저렇게 굴려보고 있는 거니까..

     

    이후의 단계는 완전히 동일하므로 생략한다.

     

    main.py

     

    from fastapi import FastAPI
    from sqlalchemy import create_engine
    from models.user import Base as UserBase  # User의 Base 클래스
    from models.feed import Base as FeedBase  # Feed의 Base 클래스
    from models.comment import Base as CommentBase  # Comment의 Base
    from models.like import Base as LikeBase  # Like의 Base 클래스
    from models.follow import Base as FollowBase  # Follow의 Base 클래스
    from routers import (
        auth_router,
        feed_router,
        comment_router,
        like_router,
        follow_router,
        mypage_router,
    )
    from config.cors_config import setup_cors
    
    DATABASE_URL = "postgresql://postgres:rjslrjsl333@localhost:5432/postgres"
    engine = create_engine(DATABASE_URL)
    
    UserBase.metadata.create_all(bind=engine)
    FeedBase.metadata.create_all(bind=engine)
    CommentBase.metadata.create_all(bind=engine)
    LikeBase.metadata.create_all(bind=engine)
    FollowBase.metadata.create_all(bind=engine)
    
    app = FastAPI()
    
    setup_cors(app)
    
    app.include_router(auth_router.router, prefix="/api/auth", tags=["auth"])
    app.include_router(feed_router.router, prefix="/api/feed", tags=["feed"])
    app.include_router(comment_router.router, prefix="/api/comment", tags=["comment"])
    app.include_router(like_router.router, prefix="/api/like", tags=["like"])
    app.include_router(follow_router.router, prefix="/api/follow", tags=["follow"])
    app.include_router(mypage_router.router, prefix="/api/mypage", tags=["mypage"])
    
    
    @app.get("/")
    def read_root():
        return {"message": "Hello, World!"}

    늘 그래왔듯 메인 함수에 테이블 생성과 API 연결을 위한 설정을 해주고 구현을 마친다.

     

    내가 빼먹거나 생략한 부분은 깃허브에 가면 제대로 돌아가는 코드로 구현이 되어 있으니 참고하면 된다.

     

    이렇게 해서 마이페이지까지.. 끝!

    반응형
    댓글
    공지사항
    최근에 올라온 글
    최근에 달린 댓글
    Total
    Today
    Yesterday
    링크
    «   2024/06   »
    1
    2 3 4 5 6 7 8
    9 10 11 12 13 14 15
    16 17 18 19 20 21 22
    23 24 25 26 27 28 29
    30
    글 보관함