티스토리 뷰
[FastAPI]SQLAlchemy + Pydantic를 이용한 게시판 만들기-(5)async
Vagabund.Gni 2023. 10. 23. 11:18목차
FastAPI + SQLAlchemy + Pydantic을 이용한 게시판 만들기
[FastAPI]SQLAlchemy + Pydantic를 이용한 게시판 만들기-(1) 회원가입 및 로그인 구현(JWT)
[FastAPI]SQLAlchemy + Pydantic를 이용한 게시판 만들기-(1.5) 디테일 수정
[FastAPI]SQLAlchemy + Pydantic를 이용한 게시판 만들기-(2-1)Feed CRUD - Model, Service
[FastAPI]SQLAlchemy + Pydantic을 이용한 게시판 만들기-(2-2)Feed CRUD - Routes, main.py
[FastAPI]SQLAlchemy + Pydantic을 이용한 게시판 만들기-(3)S3 버킷을 이용한 사진 업로드
[FastAPI]SQLAlchemy + Pydantic를 이용한 게시판 만들기-(4-1)Comment CRUD - Models, Services
[FastAPI]SQLAlchemy + Pydantic를 이용한 게시판 만들기-(4-2)Comment CRUD - Routes, main.py
지난 글까지 기초적인 게시판에 필요한 기능을 구현했다.
구현 자체는 무리가 없었으나 한 가지 아쉬운 점이 있었는데,
현재 작성된 로직과 디비 쿼리가 동기적으로만 이루어지고 있다는 사실이었다.
그래서 프로젝트가 더 커지기 전에 비동기 메서드와 쿼리를 적용하는 것이 좋겠다고 판단,
이번 글을 적게 되었다.
참고로 이번 글에선 자잘하게 코드를 바꿔야 하는 부분이 매우 많기 때문에,
Feed에 대한 예제만 확인하고, 코드 전체가 올라간 깃허브 레포지토리를 공유하도록 하겠다.
그럼, 시작.
PostgreSQL, aioboto3, SQLAlchemy 2.0 query
PostgreSQL
가장 먼저, 기본적으로 여태까지 잘 사용해 왔던 SQLite는 기본적으로 비동기 작업을 지원하지 않는다.
물론 aiosqlite와 같은 라이브러리를 사용하면 그대로 비동기 I/O처리를 할 수 있으나,
이왕 이렇게 된 김에 SQLAlchemy와 비동기 궁합이 좋다는 포스트그레스큐엘을 사용해 보기로 하자.
혹시나 나중에 복잡한 쿼리가 필요해질 수도 있으니까.
포스트그레스큐엘에 대해서는 지난 글에 설명해 두었고,
이번 글에서는 도커를 이용해 간단하게 컨테이너를 띄우고 작업을 진행한다.
먼저, 아래와 같이 입력해 포스트그레스큐엘 컨테이러를 띄운다.
docker run --name postgres -e POSTGRES_PASSWORD=YOUR_PASSWORD -p 5432:5432 -v my_postgres_data:/var/lib/postgresql/data -d postgres
위 커맨드는 이름을 postgres로, 비밀번호를 YOUR_PASSWORD로 지정해 컨테이너를 띄운다.
그다음엔 파이썬 프로젝트로 돌아와
pip install asyncpg psycopg2-binary
를 입력해 필요한 라이브러리를 설치한다.
이후의 진행과정에서 라이브러리를 추가로 설치하라는 메시지가 나오면 또 하나씩 설치하면 된다.
config/db.py
여기까지 했으면 우선 db 접근 설정을 비동기 용으로 바꾸어주어야 한다.
전체 코드를 우선 공유하면 다음과 같다.
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.ext.declarative import DeclarativeMeta, declarative_base
from sqlalchemy.orm import sessionmaker
DATABASE_URL = "postgresql+asyncpg://postgres:YOUR_PASSWORD@localhost:5432/postgres"
engine = create_async_engine(DATABASE_URL, echo=True)
# 비동기 세션을 위한 설정
AsyncSessionLocal = sessionmaker(
bind=engine,
expire_on_commit=False,
class_=AsyncSession,
autocommit=False,
autoflush=False,
)
Base: DeclarativeMeta = declarative_base()
async def get_db():
db = AsyncSessionLocal()
try:
yield db
finally:
await db.close()
이전의 동기 처리 세션보다 설정할 것이 조금 많아지긴 했는데, 잘라서 보면 별 거 아니다.
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.ext.declarative import DeclarativeMeta, declarative_base
from sqlalchemy.orm import sessionmaker
DATABASE_URL = "postgresql+asyncpg://postgres:YOUR_PASSWORD@localhost:5432/postgres"
engine = create_async_engine(DATABASE_URL, echo=True)
먼저 비동기 작업을 위한 클래스를 가져온다.
이후 디비 모델 생성과 세션 관리를 위한 클래스를 가져온 뒤에
DATABASE_URL을 우리의 설정에 맞게 바꿔준다.
이어서 비동기 엔진을 생성, engine에 할당한다. 여기서 echo=True는 쿼리 실행을 콘솔에 보여달라는 설정이다.
# 비동기 세션을 위한 설정
AsyncSessionLocal = sessionmaker(
bind=engine,
expire_on_commit=False,
class_=AsyncSession,
autocommit=False,
autoflush=False,
)
계속해서 sessionmaker() 메서드를 불러와 AsyncSessionLocal에 할당한다.
먼저 bind=engine으로 위에서 생성한 engine을 바인딩해 주고
class_=AsyncSession으로 비동기 세션 클래스를 사용한다.
나머지 설정은 세션의 동작방식을 결정한다.
Base: DeclarativeMeta = declarative_base()
모델 클래스를 생성할 때 사용할 Base를 정의한다.
async def get_db():
db = AsyncSessionLocal()
try:
yield db
finally:
await db.close()
비동기로 작동한느 디비 세션을 제공하는 코루틴 함수이다.
여기서 코루틴이란 실행 중간에도 제어권을 반납할 수 있는 함수로서,
작업을 멈췄다가 멈춘 지점부터 작업을 이어나갈 수 있는 특정을 가지고 있다.
이런 특성을 가지고 있기 때문에 비동기 프로그래밍에서 주로 사용된다고 한다.
어쨌거나 코드를 계속 보면, 비동기 디비 세션을 db에 할당해 이를 사용하는 코드에 전달,
작업이 끝나면 세션을 닫도록 되어있다.
aioboto3
계속해서 S3에 접근하는 로직도 비동기처리를 해주기 위해 다음 라이브러리를 설치한다.
pip install aioboto3
config/s3_config.py
import aioboto3
from config import settings
async def get_s3_client():
session = aioboto3.Session(
aws_access_key_id=settings.S3_ACCESS_KEY,
aws_secret_access_key=settings.S3_SECRET_KEY,
region_name="ap-northeast-2",
)
return session.client("s3")
이어서 위와 같이 작성, S3에 대한 작업을 비동기로 처리하기 위한 준비를 마친다.
SQLAlchemy 2.0 query
SQLAlchemy 2.0 query는 쉽게 축약해 2.0 style query라고도 부른다.
기존에 사용하던 1.x 버전의 쿼리도 여전히 잘 작동하며, 앞으로도 지원이 이어질 것이라고 하지만
SQLAlchemy를 비동기로 구현하려면 반드시 2.0 스타일 쿼리를 사용해야 한다고.
출처: https://www.sqlalchemy.org/
공식문서에 가보면 기존의 1.x 스타일과 2.0 스타일의 다른 점을 보여주고 있기는 한데,
여기선 생략하도록 하겠다.
다만 앞으로 이 글에서 바꾸어 갈 쿼리는 전부 2.0 스타일로의 마이그레이션이라는 것만 강조.
feed_service.py
그럼 예고했던 대로 피드에 관련된 변경사항을 살펴보자.
자연스럽게 S3 접근에 대한 비동기처리도 보게 될 것이다.
from fastapi import HTTPException, UploadFile
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from models.feed import Feed, FeedCreate, FeedUpdate
from models.user import User
from config import settings
from typing import List, Optional
import uuid
from config.s3_config import get_s3_client
import logging
logging.basicConfig(level=logging.NOTSET)
async def create_feed(
db: AsyncSession, feed: FeedCreate, author_email: str, images: List[UploadFile] = None
):
feed_dict = feed.model_dump()
feed_dict["author_email"] = author_email
if images:
image_urls = await upload_image_to_s3(images)
feed_dict["image_urls"] = image_urls
author = await db.execute(select(User).where(User.email == author_email))
author = author.scalar_one_or_none()
if author is None:
raise HTTPException(status_code=404, detail="Author Not Found")
author_nickname = author.nickname
db_feed = Feed(**feed_dict)
db.add(db_feed)
await db.commit()
await db.refresh(db_feed)
return {
"id": db_feed.id,
"title": db_feed.title,
"content": db_feed.content,
"author_email": db_feed.author_email,
"author_nickname": author_nickname,
"image_urls": db_feed.image_urls,
}
async def get_feed_by_id(db: AsyncSession, feed_id: int):
feed_data = await db.execute(
select(Feed, User.nickname)
.join(User, User.email == Feed.author_email)
.where(Feed.id == feed_id)
)
feed_data = feed_data.first()
if feed_data is None:
raise HTTPException(status_code=404, detail="Feed not found")
feed, nickname = feed_data
return {
"id": feed.id,
"title": feed.title,
"content": feed.content,
"author_email": feed.author_email,
"author_nickname": nickname,
"image_urls": feed.image_urls,
}
async def get_feeds(db: AsyncSession):
feeds = await db.execute(
select(Feed, User.nickname).join(User, User.email == Feed.author_email)
)
feeds = feeds.all()
feed_responses = []
for feed, nickname in feeds:
feed_dict = {
"id": feed.id,
"title": feed.title,
"content": feed.content,
"author_email": feed.author_email,
"author_nickname": nickname,
"image_urls": feed.image_urls,
}
feed_responses.append(feed_dict)
return feed_responses
async def update_feed(
db: AsyncSession,
feed_id: int,
feed_update: FeedUpdate,
email: str,
new_images: Optional[List[UploadFile]] = None,
target_image_urls: Optional[List[str]] = None,
):
db_feed = await db.execute(select(Feed).where(Feed.id == feed_id))
db_feed = db_feed.scalar_one_or_none()
if db_feed is None:
raise HTTPException(status_code=404, detail="Feed Not Found")
if db_feed.author_email != email:
raise HTTPException(status_code=403, detail="Permission Denied")
result = await db.execute(select(User).where(User.email == email))
author = result.scalar_one_or_none()
author_nickname = author.nickname
db_feed.title = feed_update.title
db_feed.content = feed_update.content
existing_image_urls = db_feed.image_urls or []
# Case 1: Both new_images and target_image_urls exist
if new_images and target_image_urls:
for url in target_image_urls:
print(url)
await delete_image_from_s3(url)
new_image_urls = await upload_image_to_s3(new_images)
existing_image_urls = [url for url in existing_image_urls if url not in target_image_urls]
existing_image_urls = existing_image_urls + new_image_urls
# Case 2: Only new_images exist
elif new_images:
new_image_urls = await upload_image_to_s3(new_images)
existing_image_urls = existing_image_urls + new_image_urls
# Case 3: Only target_image_urls exist
elif target_image_urls:
for url in target_image_urls:
await delete_image_from_s3(url)
existing_image_urls = [url for url in existing_image_urls if url not in target_image_urls]
db_feed.image_urls = existing_image_urls
print(db_feed.image_urls)
await db.commit()
await db.refresh(db_feed)
result = {
"id": db_feed.id,
"title": db_feed.title,
"content": db_feed.content,
"author_email": db_feed.author_email,
"author_nickname": author_nickname,
"image_urls": db_feed.image_urls,
}
logging.debug(f"Updated feed: {result}")
return result
async def delete_feed(db: AsyncSession, feed_id: int, email: str):
result = await db.execute(select(Feed).where(Feed.id == feed_id))
db_feed = result.scalars().first()
if db_feed is None:
raise HTTPException(status_code=404, detail="Feed Not Found")
if db_feed.author_email != email:
raise HTTPException(status_code=403, detail="Permission Denied")
image_urls = db_feed.image_urls
for image_url in image_urls:
await delete_image_from_s3(image_url)
await db.delete(db_feed)
await db.commit()
async def upload_image_to_s3(images: List[UploadFile]):
image_urls = []
async with await get_s3_client() as s3_client:
for image in images:
logging.debug(f"Uploading {image.filename}")
image_name = f"{uuid.uuid4()}.png"
await s3_client.upload_fileobj(
image.file,
settings.S3_BUCKET,
image_name,
ExtraArgs={"ContentType": image.content_type},
)
image_url = f"https://{settings.S3_BUCKET}.s3.ap-northeast-2.amazonaws.com/{image_name}"
image_urls.append(image_url)
return image_urls
async def delete_image_from_s3(image_url: str):
image_name = image_url.split("/")[-1]
bucket_name = settings.S3_BUCKET
logging.debug(f"Deleting from Bucket: {bucket_name}, Key: {image_name}")
async with await get_s3_client() as s3_client:
await s3_client.delete_object(Bucket=settings.S3_BUCKET, Key=image_name)
2.0 스타일로 변경된 쿼리만 하나씩 보자.
async def create_feed(
db: AsyncSession, feed: FeedCreate, author_email: str, images: List[UploadFile] = None
):
feed_dict = feed.model_dump()
feed_dict["author_email"] = author_email
if images:
image_urls = await upload_image_to_s3(images)
feed_dict["image_urls"] = image_urls
author = await db.execute(select(User).where(User.email == author_email))
author = author.scalar_one_or_none()
if author is None:
raise HTTPException(status_code=404, detail="Author Not Found")
author_nickname = author.nickname
db_feed = Feed(**feed_dict)
db.add(db_feed)
await db.commit()
await db.refresh(db_feed)
return {
"id": db_feed.id,
"title": db_feed.title,
"content": db_feed.content,
"author_email": db_feed.author_email,
"author_nickname": author_nickname,
"image_urls": db_feed.image_urls,
}
먼저 피드 생성 로직.
async def create_feed(
db: AsyncSession, feed: FeedCreate, author_email: str, images: List[UploadFile] = None
):
시그니처의 db 매개변수 역시 비동기 세션으로 바뀌었다.
author = await db.execute(select(User).where(User.email == author_email))
author = author.scalar_one_or_none()
작성자를 검색하는 쿼리가 2.0 스타일로 바뀌었다.
여기서 좀 묘한 것이, 위처럼 두 줄로 사용하지 않고
author = (await db.execute(select(User).where(User.email == author_email))).scalar_one_or_none()
이런 식으로 바로 한 줄로 적어버리면 에러가 난다. 공식문서를 열심히 읽지 않아 이유를 모르는 것일까.
일단 가독성 측면에서 나쁘지 않기 때문에 두 줄로 나눠서 쓰는 것을 유지하기로 한다.
db_feed = Feed(**feed_dict)
db.add(db_feed)
await db.commit()
await db.refresh(db_feed)
디비에 커밋을 하고 리프레쉬하는 부분도 비동기 처리를 해주어야 한다.
async def get_feed_by_id(db: AsyncSession, feed_id: int):
feed_data = await db.execute(
select(Feed, User.nickname)
.join(User, User.email == Feed.author_email)
.where(Feed.id == feed_id)
)
feed_data = feed_data.first()
if feed_data is None:
raise HTTPException(status_code=404, detail="Feed not found")
feed, nickname = feed_data
return {
"id": feed.id,
"title": feed.title,
"content": feed.content,
"author_email": feed.author_email,
"author_nickname": nickname,
"image_urls": feed.image_urls,
}
위 부분도 마찬가지로 매개변수와 쿼리 스타일이 바뀌었다.
디비 접근 함수를 호출할 때마다 await을 붙여주는 걸 잊어서는 안 된다.
async def upload_image_to_s3(images: List[UploadFile]):
image_urls = []
async with await get_s3_client() as s3_client:
for image in images:
logging.debug(f"Uploading {image.filename}")
image_name = f"{uuid.uuid4()}.png"
await s3_client.upload_fileobj(
image.file,
settings.S3_BUCKET,
image_name,
ExtraArgs={"ContentType": image.content_type},
)
image_url = f"https://{settings.S3_BUCKET}.s3.ap-northeast-2.amazonaws.com/{image_name}"
image_urls.append(image_url)
return image_urls
나머지는 계속 비슷하기 때문에, 조금 건너뛰어서 이미지 업로드 로직을 보자.
위에서 만들어준 비동기 클라이언트를 사용해 업로드 함수를 비동기로 호출하는 것을 확인할 수 있다.
기존의 코드를 바꾸는 부분도 그러지만, 특히나 이 부분은 너무 간결하게 바뀌는 데다, 실제 작동까지 잘하니 신기했다.
async def delete_image_from_s3(image_url: str):
image_name = image_url.split("/")[-1]
bucket_name = settings.S3_BUCKET
logging.debug(f"Deleting from Bucket: {bucket_name}, Key: {image_name}")
async with await get_s3_client() as s3_client:
await s3_client.delete_object(Bucket=settings.S3_BUCKET, Key=image_name)
이미지 삭제 로직도 마찬가지다.
비동기 클라이언트로 삭제 로직을 호출하면 된다.
feed_router.py
마지막으로 해당 비즈니스 로직을 불러서 사용하는 라우터 함수를 살펴보자.
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Form
from sqlalchemy.ext.asyncio import AsyncSession
from models.feed import FeedCreate, FeedResponse, FeedUpdate
from services import feed_service, auth_service
from config.db import get_db
from typing import List
import logging
logging.basicConfig(level=logging.DEBUG)
router = APIRouter()
@router.post("/create", response_model=FeedResponse)
async def create(
title: str = Form(...),
content: str = Form(...),
images: List[UploadFile] = File(None),
db: AsyncSession = Depends(get_db),
email: str = Depends(auth_service.get_current_user_authorization),
):
if email is None:
raise HTTPException(status_code=401, detail="Not authorized")
feed = FeedCreate(title=title, content=content)
return await feed_service.create_feed(db, feed, email, images)
@router.get("/read/{feed_id}", response_model=FeedResponse)
async def read_feed(feed_id: int, db: AsyncSession = Depends(get_db)):
return await feed_service.get_feed_by_id(db, feed_id)
@router.get("/list", response_model=List[FeedResponse])
async def list_feeds(db: AsyncSession = Depends(get_db)):
return await feed_service.get_feeds(db)
@router.patch("/update/{feed_id}", response_model=FeedResponse)
async def update(
feed_id: int,
title: str = Form(...),
content: str = Form(...),
new_images: List[UploadFile] = File(None),
target_image_urls: List[str] = Form(None),
db: AsyncSession = Depends(get_db),
email: str = Depends(auth_service.get_current_user_authorization),
):
if email is None:
raise HTTPException(status_code=401, detail="Not authorized")
feed_update = FeedUpdate(title=title, content=content)
updated_feed = await feed_service.update_feed(
db, feed_id, feed_update, email, new_images=new_images, target_image_urls=target_image_urls
)
return updated_feed
@router.delete("/delete/{feed_id}", response_model=None)
async def delete(
feed_id: int,
db: AsyncSession = Depends(get_db),
email: str = Depends(auth_service.get_current_user_authorization),
):
if email is None:
raise HTTPException(status_code=401, detail="Not Authorized")
await feed_service.delete_feed(db, feed_id, email)
return {"message": "Feed Deleted"}
지난 코드와 비교해 보면 알겠지만 특별히 달라진 것은 없다.
다만 함수 정의 라인에 async가, 비즈니스 로직 호출 시점에 await이 들어갔다는 것 정도.
위와 같이 설정하면 피드 CRUD 로직에 대한 비동기 처리는 끝난다.
Summary
별생각 없이 구현하다가 모든 메서드를 동기처리 하고 있다는 걸 깨닫곤 살짝 멘탈이 흔들렸던 것 같다.
뒤늦게 바꾸는 게 쉽지 않을 거라 지레 겁먹기도 했고.
하지만 막상 구현해 보니 아주 어려운 부분은 아니었고, 비동기처리를 함으로써
제한된 리소스를 더욱 효율적으로 사용할 수 있게 되었다.
글 초입부에 말한 대로, 해당 프로젝트를 업로드하는 레포지토리를 공유하려고 한다.
나름대로 정리한다고 했는데 보기 편하면 좋겠다.
https://github.com/gnidinger/FastAPI_SQLAlchemy_Pydantic/tree/06.async
이렇게 해서 프로젝트에 비동기 처리까지, 끝!
'Python > FastAPI' 카테고리의 다른 글
- Total
- Today
- Yesterday
- java
- 스트림
- Backjoon
- 지지
- 유럽
- 중남미
- 동적계획법
- 세계여행
- 면접 준비
- BOJ
- spring
- RX100M5
- 파이썬
- 알고리즘
- 남미
- Python
- 자바
- Algorithm
- 기술면접
- 맛집
- 세모
- 유럽여행
- 백준
- a6000
- 여행
- 리스트
- 스프링
- 야경
- 칼이사
- 세계일주
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | |||||
3 | 4 | 5 | 6 | 7 | 8 | 9 |
10 | 11 | 12 | 13 | 14 | 15 | 16 |
17 | 18 | 19 | 20 | 21 | 22 | 23 |
24 | 25 | 26 | 27 | 28 | 29 | 30 |