- 1. FastAPI 입문: 설치부터 Hello World 까지
- 2. Path 와 Query 매개변수 (URL 로 데이터 받기)
- 3. Pydantic 으로 Request Body 다루기
- 4. Response Model 과 HTTP 상태 코드
- 5. 예외 처리 (HTTPException 완벽 가이드)
- 6. 의존성 주입 (Dependency Injection) 정복
- 7. SQLAlchemy 비동기 데이터베이스
- 8. JWT 인증과 보안 (로그인부터 토큰 갱신까지)
- 9. 실시간 통신 (WebSockets & SSE)
- 10. 테스트 작성법 (Pytest 완전 가이드)
9. 실시간 통신 (WebSockets & SSE)
# 9 강. 실시간 통신 (WebSockets & SSE)
## 🎯 학습 목표
- WebSocket 기본 사용법
- Connection Manager 구현
- Server-Sent Events (SSE)
---
## 1. WebSocket 기본
```python
from fastapi import WebSocket
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
while True:
data = await websocket.receive_text()
await websocket.send_text(f"You said: {data}")
```
---
## 2. Connection Manager
```python
class ConnectionManager:
def __init__(self):
self.connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.connections.append(websocket)
async def broadcast(self, message: str):
for conn in self.connections:
await conn.send_text(message)
```
---
## 3. SSE (Server-Sent Events)
```python
from fastapi.responses import StreamingResponse
async def event_generator():
while True:
yield f"data: {count}\n\n"
await asyncio.sleep(1)
@app.get("/events")
async def stream_events():
return StreamingResponse(event_generator(), media_type="text/event-stream")
```