티스토리 뷰

728x90
반응형

목차

     

     

    지난 글까지 기초적인 게시판에 필요한 기능을 구현했다.

     

    구현 자체는 무리가 없었으나 한 가지 아쉬운 점이 있었는데,

     

    현재 작성된 로직과 디비 쿼리가 동기적으로만 이루어지고 있다는 사실이었다.

     

    그래서 프로젝트가 더 커지기 전에 비동기 메서드와 쿼리를 적용하는 것이 좋겠다고 판단,

     

    이번 글을 적게 되었다.

     

    참고로 이번 글에선 자잘하게 코드를 바꿔야 하는 부분이 매우 많기 때문에,

     

    Feed에 대한 예제만 확인하고, 코드 전체가 올라간 깃허브 레포지토리를 공유하도록 하겠다.

     

    그럼, 시작.

     

    PostgreSQL, aioboto3, SQLAlchemy 2.0 query

     

    PostgreSQL

     

    가장 먼저, 기본적으로 여태까지 잘 사용해 왔던 SQLite는 기본적으로 비동기 작업을 지원하지 않는다.

     

    물론 aiosqlite와 같은 라이브러리를 사용하면 그대로 비동기 I/O처리를 할 수 있으나,

     

    이왕 이렇게 된 김에 SQLAlchemy와 비동기 궁합이 좋다는 포스트그레스큐엘을 사용해 보기로 하자.

     

    혹시나 나중에 복잡한 쿼리가 필요해질 수도 있으니까.

     

    포스트그레스큐엘에 대해서는 지난 글에 설명해 두었고,

     

    이번 글에서는 도커를 이용해 간단하게 컨테이너를 띄우고 작업을 진행한다.

     

    먼저, 아래와 같이 입력해 포스트그레스큐엘 컨테이러를 띄운다.

    docker run --name postgres -e POSTGRES_PASSWORD=YOUR_PASSWORD -p 5432:5432 -v my_postgres_data:/var/lib/postgresql/data -d postgres

    위 커맨드는 이름을 postgres로, 비밀번호를 YOUR_PASSWORD로 지정해 컨테이너를 띄운다.

     

    그다음엔 파이썬 프로젝트로 돌아와

    pip install asyncpg psycopg2-binary

    를 입력해 필요한 라이브러리를 설치한다.

     

    이후의 진행과정에서 라이브러리를 추가로 설치하라는 메시지가 나오면 또 하나씩 설치하면 된다.

     

    config/db.py

     

    여기까지 했으면 우선 db 접근 설정을 비동기 용으로 바꾸어주어야 한다.

     

    전체 코드를 우선 공유하면 다음과 같다.

    from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
    from sqlalchemy.ext.declarative import DeclarativeMeta, declarative_base
    from sqlalchemy.orm import sessionmaker
    
    DATABASE_URL = "postgresql+asyncpg://postgres:YOUR_PASSWORD@localhost:5432/postgres"
    
    engine = create_async_engine(DATABASE_URL, echo=True)
    
    # 비동기 세션을 위한 설정
    AsyncSessionLocal = sessionmaker(
        bind=engine,
        expire_on_commit=False,
        class_=AsyncSession,
        autocommit=False,
        autoflush=False,
    )
    
    Base: DeclarativeMeta = declarative_base()
    
    
    async def get_db():
        db = AsyncSessionLocal()
        try:
            yield db
        finally:
            await db.close()

    이전의 동기 처리 세션보다 설정할 것이 조금 많아지긴 했는데, 잘라서 보면 별 거 아니다.

    from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
    from sqlalchemy.ext.declarative import DeclarativeMeta, declarative_base
    from sqlalchemy.orm import sessionmaker
    
    DATABASE_URL = "postgresql+asyncpg://postgres:YOUR_PASSWORD@localhost:5432/postgres"
    
    engine = create_async_engine(DATABASE_URL, echo=True)

    먼저 비동기 작업을 위한 클래스를 가져온다.

     

    이후 디비 모델 생성과 세션 관리를 위한 클래스를 가져온 뒤에

     

    DATABASE_URL을 우리의 설정에 맞게 바꿔준다.

     

    이어서 비동기 엔진을 생성, engine에 할당한다. 여기서 echo=True는 쿼리 실행을 콘솔에 보여달라는 설정이다.

    # 비동기 세션을 위한 설정
    AsyncSessionLocal = sessionmaker(
        bind=engine,
        expire_on_commit=False,
        class_=AsyncSession,
        autocommit=False,
        autoflush=False,
    )

    계속해서 sessionmaker() 메서드를 불러와 AsyncSessionLocal에 할당한다.

     

    먼저 bind=engine으로 위에서 생성한 engine을 바인딩해 주고

     

    class_=AsyncSession으로 비동기 세션 클래스를 사용한다.

     

    나머지 설정은 세션의 동작방식을 결정한다.

    Base: DeclarativeMeta = declarative_base()

    모델 클래스를 생성할 때 사용할 Base를 정의한다.

    async def get_db():
        db = AsyncSessionLocal()
        try:
            yield db
        finally:
            await db.close()

    비동기로 작동한느 디비 세션을 제공하는 코루틴 함수이다.

     

    여기서 코루틴이란 실행 중간에도 제어권을 반납할 수 있는 함수로서,

     

    작업을 멈췄다가 멈춘 지점부터 작업을 이어나갈 수 있는 특정을 가지고 있다.

     

    이런 특성을 가지고 있기 때문에 비동기 프로그래밍에서 주로 사용된다고 한다.

     

    어쨌거나 코드를 계속 보면, 비동기 디비 세션을 db에 할당해 이를 사용하는 코드에 전달,

     

    작업이 끝나면 세션을 닫도록 되어있다.

     

    aioboto3

     

    계속해서 S3에 접근하는 로직도 비동기처리를 해주기 위해 다음 라이브러리를 설치한다.

    pip install aioboto3

     

    config/s3_config.py

    import aioboto3
    from config import settings
    
    
    async def get_s3_client():
        session = aioboto3.Session(
            aws_access_key_id=settings.S3_ACCESS_KEY,
            aws_secret_access_key=settings.S3_SECRET_KEY,
            region_name="ap-northeast-2",
        )
        return session.client("s3")

    이어서 위와 같이 작성, S3에 대한 작업을 비동기로 처리하기 위한 준비를 마친다.

     

    SQLAlchemy 2.0 query

     

    SQLAlchemy 2.0 query는 쉽게 축약해 2.0 style query라고도 부른다.

     

    기존에 사용하던 1.x 버전의 쿼리도 여전히 잘 작동하며, 앞으로도 지원이 이어질 것이라고 하지만

     

    SQLAlchemy를 비동기로 구현하려면 반드시 2.0 스타일 쿼리를 사용해야 한다고.

     

    출처: https://www.sqlalchemy.org/

     

    SQLAlchemy

    The Database Toolkit for Python

    www.sqlalchemy.org

    공식문서에 가보면 기존의 1.x 스타일과 2.0 스타일의 다른 점을 보여주고 있기는 한데,

     

    여기선 생략하도록 하겠다.

     

    다만 앞으로 이 글에서 바꾸어 갈 쿼리는 전부 2.0 스타일로의 마이그레이션이라는 것만 강조.

     

    feed_service.py

     

    그럼 예고했던 대로 피드에 관련된 변경사항을 살펴보자.

     

    자연스럽게 S3 접근에 대한 비동기처리도 보게 될 것이다.

    from fastapi import HTTPException, UploadFile
    from sqlalchemy.ext.asyncio import AsyncSession
    from sqlalchemy.future import select
    from models.feed import Feed, FeedCreate, FeedUpdate
    from models.user import User
    from config import settings
    from typing import List, Optional
    import uuid
    from config.s3_config import get_s3_client
    import logging
    
    logging.basicConfig(level=logging.NOTSET)
    
    
    async def create_feed(
        db: AsyncSession, feed: FeedCreate, author_email: str, images: List[UploadFile] = None
    ):
        feed_dict = feed.model_dump()
        feed_dict["author_email"] = author_email
    
        if images:
            image_urls = await upload_image_to_s3(images)
            feed_dict["image_urls"] = image_urls
    
        author = await db.execute(select(User).where(User.email == author_email))
        author = author.scalar_one_or_none()
    
        if author is None:
            raise HTTPException(status_code=404, detail="Author Not Found")
    
        author_nickname = author.nickname
    
        db_feed = Feed(**feed_dict)
        db.add(db_feed)
        await db.commit()
        await db.refresh(db_feed)
    
        return {
            "id": db_feed.id,
            "title": db_feed.title,
            "content": db_feed.content,
            "author_email": db_feed.author_email,
            "author_nickname": author_nickname,
            "image_urls": db_feed.image_urls,
        }
    
    
    async def get_feed_by_id(db: AsyncSession, feed_id: int):
        feed_data = await db.execute(
            select(Feed, User.nickname)
            .join(User, User.email == Feed.author_email)
            .where(Feed.id == feed_id)
        )
        feed_data = feed_data.first()
    
        if feed_data is None:
            raise HTTPException(status_code=404, detail="Feed not found")
    
        feed, nickname = feed_data
    
        return {
            "id": feed.id,
            "title": feed.title,
            "content": feed.content,
            "author_email": feed.author_email,
            "author_nickname": nickname,
            "image_urls": feed.image_urls,
        }
    
    
    async def get_feeds(db: AsyncSession):
        feeds = await db.execute(
            select(Feed, User.nickname).join(User, User.email == Feed.author_email)
        )
        feeds = feeds.all()
    
        feed_responses = []
    
        for feed, nickname in feeds:
            feed_dict = {
                "id": feed.id,
                "title": feed.title,
                "content": feed.content,
                "author_email": feed.author_email,
                "author_nickname": nickname,
                "image_urls": feed.image_urls,
            }
            feed_responses.append(feed_dict)
    
        return feed_responses
    
    
    async def update_feed(
        db: AsyncSession,
        feed_id: int,
        feed_update: FeedUpdate,
        email: str,
        new_images: Optional[List[UploadFile]] = None,
        target_image_urls: Optional[List[str]] = None,
    ):
        db_feed = await db.execute(select(Feed).where(Feed.id == feed_id))
        db_feed = db_feed.scalar_one_or_none()
    
        if db_feed is None:
            raise HTTPException(status_code=404, detail="Feed Not Found")
    
        if db_feed.author_email != email:
            raise HTTPException(status_code=403, detail="Permission Denied")
    
        result = await db.execute(select(User).where(User.email == email))
        author = result.scalar_one_or_none()
    
        author_nickname = author.nickname
    
        db_feed.title = feed_update.title
        db_feed.content = feed_update.content
    
        existing_image_urls = db_feed.image_urls or []
    
        # Case 1: Both new_images and target_image_urls exist
        if new_images and target_image_urls:
            for url in target_image_urls:
                print(url)
                await delete_image_from_s3(url)
            new_image_urls = await upload_image_to_s3(new_images)
            existing_image_urls = [url for url in existing_image_urls if url not in target_image_urls]
            existing_image_urls = existing_image_urls + new_image_urls
    
        # Case 2: Only new_images exist
        elif new_images:
            new_image_urls = await upload_image_to_s3(new_images)
            existing_image_urls = existing_image_urls + new_image_urls
    
        # Case 3: Only target_image_urls exist
        elif target_image_urls:
            for url in target_image_urls:
                await delete_image_from_s3(url)
            existing_image_urls = [url for url in existing_image_urls if url not in target_image_urls]
    
        db_feed.image_urls = existing_image_urls
        print(db_feed.image_urls)
    
        await db.commit()
        await db.refresh(db_feed)
    
        result = {
            "id": db_feed.id,
            "title": db_feed.title,
            "content": db_feed.content,
            "author_email": db_feed.author_email,
            "author_nickname": author_nickname,
            "image_urls": db_feed.image_urls,
        }
    
        logging.debug(f"Updated feed: {result}")
    
        return result
    
    
    async def delete_feed(db: AsyncSession, feed_id: int, email: str):
        result = await db.execute(select(Feed).where(Feed.id == feed_id))
        db_feed = result.scalars().first()
    
        if db_feed is None:
            raise HTTPException(status_code=404, detail="Feed Not Found")
    
        if db_feed.author_email != email:
            raise HTTPException(status_code=403, detail="Permission Denied")
    
        image_urls = db_feed.image_urls
    
        for image_url in image_urls:
            await delete_image_from_s3(image_url)
    
        await db.delete(db_feed)
        await db.commit()
    
    
    async def upload_image_to_s3(images: List[UploadFile]):
        image_urls = []
    
        async with await get_s3_client() as s3_client:
            for image in images:
                logging.debug(f"Uploading {image.filename}")
                image_name = f"{uuid.uuid4()}.png"
    
                await s3_client.upload_fileobj(
                    image.file,
                    settings.S3_BUCKET,
                    image_name,
                    ExtraArgs={"ContentType": image.content_type},
                )
    
                image_url = f"https://{settings.S3_BUCKET}.s3.ap-northeast-2.amazonaws.com/{image_name}"
                image_urls.append(image_url)
    
        return image_urls
    
    
    async def delete_image_from_s3(image_url: str):
        image_name = image_url.split("/")[-1]
        bucket_name = settings.S3_BUCKET
        logging.debug(f"Deleting from Bucket: {bucket_name}, Key: {image_name}")
    
        async with await get_s3_client() as s3_client:
            await s3_client.delete_object(Bucket=settings.S3_BUCKET, Key=image_name)

    2.0 스타일로 변경된 쿼리만 하나씩 보자.

    async def create_feed(
        db: AsyncSession, feed: FeedCreate, author_email: str, images: List[UploadFile] = None
    ):
        feed_dict = feed.model_dump()
        feed_dict["author_email"] = author_email
    
        if images:
            image_urls = await upload_image_to_s3(images)
            feed_dict["image_urls"] = image_urls
    
        author = await db.execute(select(User).where(User.email == author_email))
        author = author.scalar_one_or_none()
    
        if author is None:
            raise HTTPException(status_code=404, detail="Author Not Found")
    
        author_nickname = author.nickname
    
        db_feed = Feed(**feed_dict)
        db.add(db_feed)
        await db.commit()
        await db.refresh(db_feed)
    
        return {
            "id": db_feed.id,
            "title": db_feed.title,
            "content": db_feed.content,
            "author_email": db_feed.author_email,
            "author_nickname": author_nickname,
            "image_urls": db_feed.image_urls,
        }

     

    먼저 피드 생성 로직.

    async def create_feed(
        db: AsyncSession, feed: FeedCreate, author_email: str, images: List[UploadFile] = None
    ):

    시그니처의 db 매개변수 역시 비동기 세션으로 바뀌었다.

        author = await db.execute(select(User).where(User.email == author_email))
        author = author.scalar_one_or_none()

    작성자를 검색하는 쿼리가 2.0 스타일로 바뀌었다.

     

    여기서 좀 묘한 것이, 위처럼 두 줄로 사용하지 않고

    author = (await db.execute(select(User).where(User.email == author_email))).scalar_one_or_none()

    이런 식으로 바로 한 줄로 적어버리면 에러가 난다. 공식문서를 열심히 읽지 않아 이유를 모르는 것일까.

     

    일단 가독성 측면에서 나쁘지 않기 때문에 두 줄로 나눠서 쓰는 것을 유지하기로 한다.

        db_feed = Feed(**feed_dict)
        db.add(db_feed)
        await db.commit()
        await db.refresh(db_feed)

    디비에 커밋을 하고 리프레쉬하는 부분도 비동기 처리를 해주어야 한다.

    async def get_feed_by_id(db: AsyncSession, feed_id: int):
        feed_data = await db.execute(
            select(Feed, User.nickname)
            .join(User, User.email == Feed.author_email)
            .where(Feed.id == feed_id)
        )
        feed_data = feed_data.first()
    
        if feed_data is None:
            raise HTTPException(status_code=404, detail="Feed not found")
    
        feed, nickname = feed_data
    
        return {
            "id": feed.id,
            "title": feed.title,
            "content": feed.content,
            "author_email": feed.author_email,
            "author_nickname": nickname,
            "image_urls": feed.image_urls,
        }

    위 부분도 마찬가지로 매개변수와 쿼리 스타일이 바뀌었다.

     

    디비 접근 함수를 호출할 때마다 await을 붙여주는 걸 잊어서는 안 된다.

    async def upload_image_to_s3(images: List[UploadFile]):
        image_urls = []
    
        async with await get_s3_client() as s3_client:
            for image in images:
                logging.debug(f"Uploading {image.filename}")
                image_name = f"{uuid.uuid4()}.png"
    
                await s3_client.upload_fileobj(
                    image.file,
                    settings.S3_BUCKET,
                    image_name,
                    ExtraArgs={"ContentType": image.content_type},
                )
    
                image_url = f"https://{settings.S3_BUCKET}.s3.ap-northeast-2.amazonaws.com/{image_name}"
                image_urls.append(image_url)
    
        return image_urls

    나머지는 계속 비슷하기 때문에, 조금 건너뛰어서 이미지 업로드 로직을 보자.

     

    위에서 만들어준 비동기 클라이언트를 사용해 업로드 함수를 비동기로 호출하는 것을 확인할 수 있다.

     

    기존의 코드를 바꾸는 부분도 그러지만, 특히나 이 부분은 너무 간결하게 바뀌는 데다, 실제 작동까지 잘하니 신기했다.

    async def delete_image_from_s3(image_url: str):
        image_name = image_url.split("/")[-1]
        bucket_name = settings.S3_BUCKET
        logging.debug(f"Deleting from Bucket: {bucket_name}, Key: {image_name}")
    
        async with await get_s3_client() as s3_client:
            await s3_client.delete_object(Bucket=settings.S3_BUCKET, Key=image_name)

    이미지 삭제 로직도 마찬가지다.

     

    비동기 클라이언트로 삭제 로직을 호출하면 된다.

     

    feed_router.py

     

    마지막으로 해당 비즈니스 로직을 불러서 사용하는 라우터 함수를 살펴보자.

    from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Form
    from sqlalchemy.ext.asyncio import AsyncSession
    from models.feed import FeedCreate, FeedResponse, FeedUpdate
    from services import feed_service, auth_service
    from config.db import get_db
    from typing import List
    import logging
    
    logging.basicConfig(level=logging.DEBUG)
    
    router = APIRouter()
    
    
    @router.post("/create", response_model=FeedResponse)
    async def create(
        title: str = Form(...),
        content: str = Form(...),
        images: List[UploadFile] = File(None),
        db: AsyncSession = Depends(get_db),
        email: str = Depends(auth_service.get_current_user_authorization),
    ):
        if email is None:
            raise HTTPException(status_code=401, detail="Not authorized")
    
        feed = FeedCreate(title=title, content=content)
    
        return await feed_service.create_feed(db, feed, email, images)
    
    
    @router.get("/read/{feed_id}", response_model=FeedResponse)
    async def read_feed(feed_id: int, db: AsyncSession = Depends(get_db)):
        return await feed_service.get_feed_by_id(db, feed_id)
    
    
    @router.get("/list", response_model=List[FeedResponse])
    async def list_feeds(db: AsyncSession = Depends(get_db)):
        return await feed_service.get_feeds(db)
    
    
    @router.patch("/update/{feed_id}", response_model=FeedResponse)
    async def update(
        feed_id: int,
        title: str = Form(...),
        content: str = Form(...),
        new_images: List[UploadFile] = File(None),
        target_image_urls: List[str] = Form(None),
        db: AsyncSession = Depends(get_db),
        email: str = Depends(auth_service.get_current_user_authorization),
    ):
        if email is None:
            raise HTTPException(status_code=401, detail="Not authorized")
    
        feed_update = FeedUpdate(title=title, content=content)
        updated_feed = await feed_service.update_feed(
            db, feed_id, feed_update, email, new_images=new_images, target_image_urls=target_image_urls
        )
    
        return updated_feed
    
    
    @router.delete("/delete/{feed_id}", response_model=None)
    async def delete(
        feed_id: int,
        db: AsyncSession = Depends(get_db),
        email: str = Depends(auth_service.get_current_user_authorization),
    ):
        if email is None:
            raise HTTPException(status_code=401, detail="Not Authorized")
    
        await feed_service.delete_feed(db, feed_id, email)
        return {"message": "Feed Deleted"}

    지난 코드와 비교해 보면 알겠지만 특별히 달라진 것은 없다.

     

    다만 함수 정의 라인에 async가, 비즈니스 로직 호출 시점에 await이 들어갔다는 것 정도.

     

    위와 같이 설정하면 피드 CRUD 로직에 대한 비동기 처리는 끝난다.

     

    Summary

     

    별생각 없이 구현하다가 모든 메서드를 동기처리 하고 있다는 걸 깨닫곤 살짝 멘탈이 흔들렸던 것 같다.

     

    뒤늦게 바꾸는 게 쉽지 않을 거라 지레 겁먹기도 했고.

     

    하지만 막상 구현해 보니 아주 어려운 부분은 아니었고, 비동기처리를 함으로써

     

    제한된 리소스를 더욱 효율적으로 사용할 수 있게 되었다.

     

    글 초입부에 말한 대로, 해당 프로젝트를 업로드하는 레포지토리를 공유하려고 한다.

     

    나름대로 정리한다고 했는데 보기 편하면 좋겠다.

     

    https://github.com/gnidinger/FastAPI_SQLAlchemy_Pydantic/tree/06.async

     

    GitHub - gnidinger/FastAPI_SQLAlchemy_Pydantic

    Contribute to gnidinger/FastAPI_SQLAlchemy_Pydantic development by creating an account on GitHub.

    github.com

    이렇게 해서 프로젝트에 비동기 처리까지, 끝!

    반응형
    댓글
    공지사항
    최근에 올라온 글
    최근에 달린 댓글
    Total
    Today
    Yesterday
    링크
    «   2024/11   »
    1 2
    3 4 5 6 7 8 9
    10 11 12 13 14 15 16
    17 18 19 20 21 22 23
    24 25 26 27 28 29 30
    글 보관함