실시간 가격 업데이트는 설계 방향에 따라 서버 부하와 구현 복잡도가 크게 달라집니다.
이 글에서는 외부 API 호출 수를 클라이언트 수와 분리하는 것을 핵심 원칙으로 삼고, Worker 프로세스가 가격을 수집해 Redis에 저장하면 FastAPI가 SSE로 클라이언트에 푸시하는 파이프라인을 설명합니다.
구현 예시로는 코인 모의투자 서비스 twojaking.com의 실제 코드를 사용합니다.
전체 흐름
Binance API → Worker 프로세스 → Redis → FastAPI (SSE) → 프론트엔드
각 단계는 역할이 완전히 분리되어 있습니다.
- Worker: 3초 간격으로 Binance에서 가격을 받아 Redis에 저장
- FastAPI: Redis에서 읽어 SSE 스트림으로 클라이언트에 push
- 프론트엔드: EventSource API로 구독, React 상태에 반영
Binance는 요청 횟수 제한이 있습니다. 이 구조에서 Binance 호출 수는 클라이언트 수와 무관하게 Worker 하나가 고정 주기로만 호출합니다.
1단계: Worker — Redis 캐시 관리
FastAPI와 별도 프로세스로 돌아가는 Worker가 가격을 수집합니다.
- 상위 50개 거래량 코인의 가격·변동률·거래량을 일괄 수집
- Redis에 TTL 15초로 저장
- DB(PostgreSQL)에는 기준 가격(reference)만 저장 — 실시간 데이터는 Redis에만 유지
Redis를 선택한 이유는 세 가지입니다.
- TTL 적용이 단순합니다 (
setex한 줄) - API 서버 여러 인스턴스가 동일 캐시를 공유합니다
- 메모리 기반이라 조회 속도가 빠릅니다
실제 RedisCacheManager 구현에서 가격 저장은 다음과 같습니다.
# app/core/cache.py
class RedisCacheManager:
def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None:
ttl_to_use = ttl if ttl is not None else self._ttl_seconds
serialized = json.dumps(value, default=str)
self._redis.setex(key, ttl_to_use, serialized)
def mget(self, keys: List[str]) -> Dict[str, Any]:
"""여러 코인 가격을 한 번에 조회 (배치 조회)."""
values = self._redis.mget(keys)
return {
key: json.loads(value)
for key, value in zip(keys, values)
if value is not None
}
def get_coin_price_cache(ttl_seconds: int = 15):
"""Worker와 API 간 가격 데이터 공유용 캐시. TTL 15초."""
global _coin_price_cache
if _coin_price_cache is None:
settings = get_settings()
_coin_price_cache = RedisCacheManager(
redis_url=settings.REDIS_URL,
ttl_seconds=ttl_seconds
)
return _coin_price_cache
2단계: FastAPI — SSE 스트림
가격을 폴링 방식으로 제공하면 클라이언트 수에 비례해 요청이 증가합니다. SSE를 쓰면 연결을 유지한 채 서버가 일방적으로 push합니다.
GET /coins/stream/all
- 클라이언트는
EventSource로 구독 - 서버는 설정된 주기로 Redis에서 읽어 push
- 재연결은 브라우저가 자동 처리
WebSocket 대신 SSE를 선택한 이유는 가격 스트림이 단방향이기 때문입니다.
| 특성 | SSE | WebSocket |
|---|---|---|
| 방향 | 서버 → 클라이언트 (단방향) | 양방향 |
| 재연결 | 브라우저 자동 처리 | 직접 구현 필요 |
| 방화벽 통과 | HTTP 기반이라 유리 | 별도 포트 필요한 경우 있음 |
| 구현 복잡도 | 낮음 | 높음 |
채팅처럼 양방향이 필요한 기능만 WebSocket을 별도로 사용합니다.
FastAPI에서 SSE 스트림은 StreamingResponse와 EventSourceResponse(sse-starlette)로 구현합니다.
from sse_starlette.sse import EventSourceResponse
import asyncio
async def price_stream_generator():
cache = get_coin_price_cache()
while True:
prices = cache.mget(COIN_KEYS) # Redis 배치 조회
yield {"data": json.dumps(prices)}
await asyncio.sleep(3) # Worker 수집 주기와 맞춤
@router.get("/coins/stream/all")
async def stream_all_prices():
return EventSourceResponse(price_stream_generator())
3단계: 프론트엔드 — EventSource 구독
React에서는 React Query 없이 EventSource → 상태 업데이트만으로 충분합니다.
useEffect(() => {
const es = new EventSource(`${API_BASE_URL}/api/v1/coins/stream/all`);
es.onmessage = (event) => {
const prices: CoinPrice[] = JSON.parse(event.data);
setPrices(prices); // React 상태 업데이트
};
es.onerror = () => {
// 브라우저가 자동 재연결 처리
};
return () => es.close();
}, []);
한 스트림으로 모든 코인 데이터가 들어오기 때문에, React Query 폴링처럼 코인 수에 비례해 요청이 증가하지 않습니다.
가격 상태가 업데이트될 때 포트폴리오 수익률도 자동으로 계산됩니다.
- 보유 코인 평균 매수가 vs. SSE 현재가
- 레버리지 포지션 PnL
- 수익/손실
가격 한 번 push될 때마다 화면 전체가 자연스럽게 갱신되는 구조입니다.
차트 데이터 통합
차트(lightweight-charts)는 두 소스를 합칩니다.
- 캔들 데이터: Binance Klines 기반, Redis에 인터벌별 캐시 (1m, 5m, 15m, 1h, 1d, TTL 5분)
- 실시간 틱: SSE로 받은 현재가를 마지막 캔들에 merge
차트 진입 → Redis 캐시에서 캔들 데이터 로드
→ SSE 가격으로 마지막 캔들 실시간 업데이트
구조 요약
Worker가 외부 API를 주기적으로 호출하고 Redis에 씁니다. FastAPI는 Redis에서 읽어 SSE로 push합니다. 외부 API 호출 수는 클라이언트 수와 완전히 분리됩니다.
이 구조의 장점은 명확합니다.
- API 서버 트래픽 적음
- DB 부하 없음 (실시간 데이터는 Redis에만)
- API 서버를 여러 인스턴스로 수평 확장 가능
- 지연 최소화 (Redis 조회 + SSE push)
- Worker와 API 서버 독립 배포 가능