LangChain Uygulamasını Production Ortamına Taşıma
Production ortamında bir LangChain uygulaması çalıştırmak, development aşamasında yazdığın “hello world” seviyesindeki koddan çok farklı bir dünya. Geliştirme ortamında her şey güzel çalışıyor, API anahtarın .env dosyasında duruyor, hata aldığında terminale bakıyorsun ve sorun çözülüyor. Ama production’a geçtiğinde işler ciddi bir hal alıyor: yüzlerce eş zamanlı kullanıcı, maliyet kontrolü, hata yönetimi, monitoring, güvenlik… Bu yazıda gerçek dünya senaryoları üzerinden LangChain uygulamasını production ortamına nasıl taşıyacağını adım adım anlatacağım.
Neden Production Deployment Zordur?
Development ortamında LangChain ile bir chatbot yazmak birkaç saatlik iş. Ama aynı uygulamayı production’a taşırken karşılaştığın problemler bambaşka kategoride:
- Latency yönetimi: LLM çağrıları yavaş, kullanıcılar beklemek istemiyor
- Maliyet kontrolü: Her token para demek, kontrolsüz kullanım faturayı patlatır
- Rate limiting: OpenAI ve benzeri servisler API limitine sahip
- State yönetimi: Conversation history’yi nerede saklayacaksın?
- Hata toleransı: API timeout aldığında ne yapacaksın?
- Güvenlik: Prompt injection saldırılarına karşı nasıl korunacaksın?
Şimdi bunların her birini çözen bir production setup oluşturalım.
Proje Yapısı
Önce sağlam bir proje yapısı kurmak gerekiyor. Kaotik bir kod tabanıyla production’a çıkmak felakete davet çıkarmak demek.
mkdir langchain-prod && cd langchain-prod
mkdir -p src/{chains,agents,prompts,memory,utils}
config
tests/{unit,integration}
scripts
monitoring
touch src/__init__.py
src/chains/__init__.py
src/agents/__init__.py
config/settings.py
docker-compose.yml
Dockerfile
.env.example
requirements.txt
Proje dizin yapısı bu şekilde olmalı:
- src/chains/: LangChain chain tanımları
- src/agents/: Agent implementasyonları
- src/prompts/: Prompt template’leri
- src/memory/: Conversation memory yönetimi
- src/utils/: Yardımcı fonksiyonlar, middleware
- config/: Ortam bazlı konfigürasyonlar
- monitoring/: Prometheus metrikleri, alert kuralları
- tests/: Unit ve integration testler
Konfigürasyon Yönetimi
Production’da konfigürasyon yönetimi kritik. Pydantic ile tip güvenli bir settings sistemi kur:
# config/settings.py
from pydantic_settings import BaseSettings
from pydantic import Field, validator
from typing import Optional
import os
class LLMSettings(BaseSettings):
openai_api_key: str = Field(..., env="OPENAI_API_KEY")
openai_model: str = Field(default="gpt-4-turbo-preview", env="OPENAI_MODEL")
openai_temperature: float = Field(default=0.7, env="OPENAI_TEMPERATURE")
openai_max_tokens: int = Field(default=2000, env="OPENAI_MAX_TOKENS")
openai_request_timeout: int = Field(default=30, env="OPENAI_REQUEST_TIMEOUT")
# Rate limiting
max_requests_per_minute: int = Field(default=60, env="MAX_REQUESTS_PER_MINUTE")
max_tokens_per_day: int = Field(default=1000000, env="MAX_TOKENS_PER_DAY")
# Redis for caching and memory
redis_url: str = Field(default="redis://localhost:6379", env="REDIS_URL")
redis_ttl: int = Field(default=3600, env="REDIS_TTL")
# Database for conversation history
database_url: str = Field(..., env="DATABASE_URL")
# Monitoring
langsmith_api_key: Optional[str] = Field(default=None, env="LANGSMITH_API_KEY")
langsmith_project: str = Field(default="production", env="LANGSMITH_PROJECT")
class Config:
env_file = ".env"
case_sensitive = False
@validator("openai_temperature")
def validate_temperature(cls, v):
if not 0.0 <= v <= 2.0:
raise ValueError("Temperature 0.0 ile 2.0 arasında olmalı")
return v
settings = LLMSettings()
.env.example dosyası repo’ya ekle, ama asla gerçek .env dosyasını commit etme. Production’da bu değerleri Kubernetes secrets veya HashiCorp Vault ile yönetmek en doğrusu.
Rate Limiting ve Token Yönetimi
Production’da en sık karşılaşılan problem rate limiting. OpenAI’nin API limitlerine çarpınca uygulaман duruyor. Bunun için bir middleware katmanı yazalım:
# src/utils/rate_limiter.py
import asyncio
import time
from collections import deque
from typing import Optional
import redis.asyncio as redis
from config.settings import settings
class TokenBudgetManager:
def __init__(self):
self.redis_client = redis.from_url(settings.redis_url)
async def check_and_consume_budget(
self,
user_id: str,
estimated_tokens: int
) -> bool:
"""
Kullanıcının token bütçesini kontrol et ve tüket.
Günlük limit aşılırsa False döner.
"""
key = f"token_budget:{user_id}:{time.strftime('%Y-%m-%d')}"
async with self.redis_client.pipeline() as pipe:
try:
current = await self.redis_client.get(key)
used_tokens = int(current) if current else 0
if used_tokens + estimated_tokens > settings.max_tokens_per_day:
return False
await pipe.incrby(key, estimated_tokens)
await pipe.expire(key, 86400) # 24 saat TTL
await pipe.execute()
return True
except Exception as e:
# Budget kontrolü başarısız olursa izin ver ama logla
print(f"Budget kontrolü hatası: {e}")
return True
async def get_usage_stats(self, user_id: str) -> dict:
key = f"token_budget:{user_id}:{time.strftime('%Y-%m-%d')}"
used = await self.redis_client.get(key)
return {
"used_tokens": int(used) if used else 0,
"daily_limit": settings.max_tokens_per_day,
"remaining": settings.max_tokens_per_day - (int(used) if used else 0)
}
class RateLimiter:
"""Sliding window rate limiter"""
def __init__(self, max_requests: int, window_seconds: int = 60):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.redis_client = redis.from_url(settings.redis_url)
async def is_allowed(self, identifier: str) -> bool:
key = f"rate_limit:{identifier}"
now = time.time()
window_start = now - self.window_seconds
async with self.redis_client.pipeline() as pipe:
pipe.zremrangebyscore(key, 0, window_start)
pipe.zcard(key)
pipe.zadd(key, {str(now): now})
pipe.expire(key, self.window_seconds)
results = await pipe.execute()
request_count = results[1]
return request_count < self.max_requests
Production-Ready Chain Yapısı
Şimdi asıl LangChain implementasyonuna gelelim. Retry mekanizması, caching ve hata yönetimi olan bir chain yapısı:
# src/chains/production_chain.py
from langchain_openai import ChatOpenAI
from langchain.chains import ConversationalRetrievalChain
from langchain.callbacks import AsyncIteratorCallbackHandler
from langchain.cache import RedisCache
from langchain.globals import set_llm_cache
from langchain_community.chat_message_histories import RedisChatMessageHistory
from langchain.memory import ConversationBufferWindowMemory
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type
)
import openai
import logging
from typing import AsyncGenerator, Optional
from config.settings import settings
from src.utils.rate_limiter import RateLimiter, TokenBudgetManager
logger = logging.getLogger(__name__)
# Redis cache'i global olarak ayarla
import redis
redis_client = redis.from_url(settings.redis_url)
set_llm_cache(RedisCache(redis_client))
class ProductionChain:
def __init__(self, vectorstore):
self.vectorstore = vectorstore
self.rate_limiter = RateLimiter(
max_requests=settings.max_requests_per_minute
)
self.budget_manager = TokenBudgetManager()
self._setup_llm()
def _setup_llm(self):
self.llm = ChatOpenAI(
model=settings.openai_model,
temperature=settings.openai_temperature,
max_tokens=settings.openai_max_tokens,
request_timeout=settings.openai_request_timeout,
api_key=settings.openai_api_key,
# Otomatik retry için
max_retries=3,
)
def _get_memory(self, session_id: str) -> ConversationBufferWindowMemory:
"""Redis tabanlı conversation memory"""
message_history = RedisChatMessageHistory(
session_id=session_id,
url=settings.redis_url,
ttl=settings.redis_ttl
)
return ConversationBufferWindowMemory(
chat_memory=message_history,
memory_key="chat_history",
return_messages=True,
k=10 # Son 10 mesajı tut
)
@retry(
retry=retry_if_exception_type((
openai.RateLimitError,
openai.APITimeoutError,
openai.APIConnectionError
)),
wait=wait_exponential(multiplier=1, min=4, max=60),
stop=stop_after_attempt(3)
)
async def invoke_with_retry(self, chain, inputs: dict) -> dict:
return await chain.ainvoke(inputs)
async def chat(
self,
question: str,
session_id: str,
user_id: str
) -> dict:
# Rate limit kontrolü
if not await self.rate_limiter.is_allowed(user_id):
raise Exception("Rate limit aşıldı. Lütfen bir dakika bekleyin.")
# Token bütçe kontrolü (yaklaşık token sayısı)
estimated_tokens = len(question.split()) * 2 + settings.openai_max_tokens
if not await self.budget_manager.check_and_consume_budget(
user_id, estimated_tokens
):
raise Exception("Günlük token limitinizi aştınız.")
memory = self._get_memory(session_id)
chain = ConversationalRetrievalChain.from_llm(
llm=self.llm,
retriever=self.vectorstore.as_retriever(
search_kwargs={"k": 5}
),
memory=memory,
return_source_documents=True,
verbose=False # Production'da False olmalı
)
try:
result = await self.invoke_with_retry(
chain,
{"question": question}
)
logger.info(
f"Chat başarılı | user_id={user_id} | "
f"session_id={session_id} | "
f"tokens_used={estimated_tokens}"
)
return {
"answer": result["answer"],
"sources": [
doc.metadata.get("source", "unknown")
for doc in result.get("source_documents", [])
]
}
except openai.RateLimitError:
logger.error(f"OpenAI rate limit | user_id={user_id}")
raise Exception("Sistem şu anda yoğun. Lütfen tekrar deneyin.")
except openai.APITimeoutError:
logger.error(f"OpenAI timeout | user_id={user_id}")
raise Exception("İstek zaman aşımına uğradı. Lütfen tekrar deneyin.")
FastAPI ile Servis Katmanı
LangChain chain’ini bir API arkasına al. Streaming desteği ile:
# src/api/main.py
from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from pydantic import BaseModel
import asyncio
import json
from prometheus_client import Counter, Histogram, generate_latest
from starlette.responses import Response
import time
app = FastAPI(title="LangChain Production API", docs_url=None, redoc_url=None)
# Production'da docs endpoint'ini kapat
# docs_url=None, redoc_url=None
app.add_middleware(
CORSMiddleware,
allow_origins=["https://yourdomain.com"], # Wildcard kullanma!
allow_methods=["POST", "GET"],
allow_headers=["Authorization", "Content-Type"],
)
app.add_middleware(
TrustedHostMiddleware,
allowed_hosts=["yourdomain.com", "api.yourdomain.com"]
)
# Prometheus metrikleri
REQUEST_COUNT = Counter(
"langchain_requests_total",
"Toplam istek sayısı",
["method", "endpoint", "status"]
)
REQUEST_LATENCY = Histogram(
"langchain_request_duration_seconds",
"İstek latency",
["endpoint"]
)
LLM_TOKEN_USAGE = Counter(
"langchain_token_usage_total",
"Toplam token kullanımı",
["model", "user_id"]
)
class ChatRequest(BaseModel):
question: str
session_id: str
user_id: str
class Config:
# Maksimum soru uzunluğu
max_length = 4000
class ChatResponse(BaseModel):
answer: str
sources: list[str]
session_id: str
@app.post("/chat", response_model=ChatResponse)
async def chat_endpoint(
request: ChatRequest,
background_tasks: BackgroundTasks
):
start_time = time.time()
# Input validation
if len(request.question.strip()) < 3:
raise HTTPException(status_code=400, detail="Soru çok kısa")
if len(request.question) > 4000:
raise HTTPException(status_code=400, detail="Soru çok uzun (max 4000 karakter)")
try:
# Chain'i burada inject et (dependency injection kullan)
from src.chains.production_chain import ProductionChain
chain = app.state.chain
result = await chain.chat(
question=request.question,
session_id=request.session_id,
user_id=request.user_id
)
duration = time.time() - start_time
REQUEST_LATENCY.labels(endpoint="/chat").observe(duration)
REQUEST_COUNT.labels(
method="POST",
endpoint="/chat",
status="success"
).inc()
return ChatResponse(
answer=result["answer"],
sources=result["sources"],
session_id=request.session_id
)
except Exception as e:
REQUEST_COUNT.labels(
method="POST",
endpoint="/chat",
status="error"
).inc()
raise HTTPException(status_code=500, detail=str(e))
@app.get("/metrics")
async def metrics():
"""Prometheus scraping için"""
return Response(generate_latest(), media_type="text/plain")
@app.get("/health")
async def health_check():
return {"status": "healthy", "version": "1.0.0"}
Docker ve Production Deployment
Gunicorn + Uvicorn kombinasyonu ile production-grade bir Dockerfile:
# Dockerfile
FROM python:3.11-slim as builder
WORKDIR /app
# Sistem bağımlılıkları
RUN apt-get update && apt-get install -y
build-essential
&& rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir --user -r requirements.txt
FROM python:3.11-slim as runtime
WORKDIR /app
# Non-root user oluştur
RUN useradd --create-home --shell /bin/bash appuser
COPY --from=builder /root/.local /home/appuser/.local
COPY --chown=appuser:appuser . .
USER appuser
ENV PATH=/home/appuser/.local/bin:$PATH
ENV PYTHONPATH=/app
ENV PYTHONUNBUFFERED=1
# Gunicorn ile Uvicorn worker
CMD ["gunicorn", "src.api.main:app",
"-w", "4",
"-k", "uvicorn.workers.UvicornWorker",
"--bind", "0.0.0.0:8000",
"--timeout", "120",
"--keepalive", "5",
"--max-requests", "1000",
"--max-requests-jitter", "50",
"--log-level", "info"]
# docker-compose.yml ile tüm stack'i ayağa kaldır
cat > docker-compose.yml << 'EOF'
version: '3.9'
services:
app:
build: .
ports:
- "8000:8000"
env_file:
- .env
depends_on:
redis:
condition: service_healthy
restart: unless-stopped
deploy:
resources:
limits:
memory: 2G
cpus: '2'
redis:
image: redis:7-alpine
command: redis-server --appendonly yes --maxmemory 512mb --maxmemory-policy allkeys-lru
volumes:
- redis_data:/data
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 5s
retries: 3
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx/nginx.conf:/etc/nginx/nginx.conf:ro
- ./nginx/ssl:/etc/nginx/ssl:ro
depends_on:
- app
volumes:
redis_data:
EOF
# Servisleri başlat
docker-compose up -d
# Logları takip et
docker-compose logs -f app
LangSmith ile Monitoring
Production’da ne olduğunu görmek için LangSmith entegrasyonu şart. Özellikle hangi prompt’ların iyi çalıştığını, nerede hata aldığını ve maliyet dağılımını takip etmek için:
# src/utils/monitoring.py
import os
from langsmith import Client
from langchain.callbacks.tracers import LangChainTracer
from langchain.callbacks.manager import CallbackManager
from config.settings import settings
def setup_langsmith():
"""LangSmith monitoring kurulumu"""
if not settings.langsmith_api_key:
print("LangSmith API key bulunamadı, monitoring devre dışı")
return None
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_ENDPOINT"] = "https://api.smith.langchain.com"
os.environ["LANGCHAIN_API_KEY"] = settings.langsmith_api_key
os.environ["LANGCHAIN_PROJECT"] = settings.langsmith_project
tracer = LangChainTracer(project_name=settings.langsmith_project)
callback_manager = CallbackManager([tracer])
return callback_manager
def get_usage_report():
"""Günlük kullanım raporu"""
if not settings.langsmith_api_key:
return {}
client = Client()
# Son 24 saatin run'larını getir
from datetime import datetime, timedelta
runs = client.list_runs(
project_name=settings.langsmith_project,
start_time=datetime.now() - timedelta(hours=24),
run_type="chain"
)
total_tokens = 0
total_cost = 0
error_count = 0
for run in runs:
if run.total_tokens:
total_tokens += run.total_tokens
if run.total_cost:
total_cost += run.total_cost
if run.error:
error_count += 1
return {
"total_tokens_24h": total_tokens,
"estimated_cost_24h_usd": round(total_cost, 4),
"error_count_24h": error_count,
}
Güvenlik Önlemleri
Prompt injection saldırılarına karşı temel bir koruma katmanı eklemek gerekiyor:
# src/utils/security.py
import re
from typing import Optional
INJECTION_PATTERNS = [
r"ignore (previous|all) instructions",
r"you are now",
r"forget (everything|all|your instructions)",
r"act as (a|an)",
r"pretend (you are|to be)",
r"jailbreak",
r"<script>",
r"system prompt",
]
def sanitize_input(text: str, max_length: int = 4000) -> Optional[str]:
"""
Kullanıcı girdisini temizle ve güvenlik kontrolü yap.
"""
# Uzunluk kontrolü
if len(text) > max_length:
return None
# Tehlikeli pattern kontrolü
text_lower = text.lower()
for pattern in INJECTION_PATTERNS:
if re.search(pattern, text_lower):
return None
# Temel temizlik
text = text.strip()
# Çoklu boşlukları tek boşluğa indir
text = re.sub(r's+', ' ', text)
return text
def validate_session_id(session_id: str) -> bool:
"""Session ID format doğrulaması"""
# UUID formatı kontrolü
uuid_pattern = r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$'
return bool(re.match(uuid_pattern, session_id, re.IGNORECASE))
CI/CD Pipeline
GitHub Actions ile otomatik deployment:
# .github/workflows/deploy.yml içeriği
cat > .github/workflows/deploy.yml << 'EOF'
name: Production Deploy
on:
push:
branches: [main]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Run tests
run: pytest tests/ -v --tb=short
env:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY_TEST }}
DATABASE_URL: sqlite:///test.db
REDIS_URL: redis://localhost:6379
deploy:
needs: test
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Build and push Docker image
run: |
docker build -t your-registry/langchain-app:${{ github.sha }} .
docker push your-registry/langchain-app:${{ github.sha }}
- name: Deploy to production
run: |
ssh deploy@your-server "
docker pull your-registry/langchain-app:${{ github.sha }} &&
docker-compose up -d --no-deps app &&
docker system prune -f
"
EOF
Production Checklist
Deployment öncesinde şu maddeleri tek tek kontrol et:
- API anahtarları: Tüm secret’lar environment variable olarak ayarlandı mı, kod içinde hardcode yok mu?
- Rate limiting: Hem uygulama hem de API gateway seviyesinde rate limit var mı?
- Caching: Benzer sorular için Redis cache aktif mi?
- Retry mekanizması: Geçici API hataları için exponential backoff var mı?
- Token limitleri: Kullanıcı başına günlük token bütçesi tanımlandı mı?
- Monitoring: LangSmith veya alternatif bir tracing sistemi kurulu mu?
- Logging: Structured logging (JSON formatında) aktif mi?
- Health check:
/healthendpoint düzgün çalışıyor mu? - Güvenlik taraması: Prompt injection koruması var mı?
- Yük testi:
locustveyak6ile yük testi yapıldı mı? - Rollback planı: Bir şeyler ters giderse hızlıca eski versiyona dönebilir misin?
Sonuç
LangChain uygulamasını production’a taşımak bir sprint işi değil, ciddi bir mühendislik çalışması gerektiriyor. Development’ta işe yarayan her şey production’da farklı davranabilir: API limitleri, network gecikmeleri, eş zamanlı kullanıcılar, maliyet kontrolü… Bunların hepsini önceden planlamak zorunda olduğun için.
Bu yazıda anlattıklarımın özeti şu: Konfigürasyonu kod’dan ayır, rate limiting’i çok katmanlı uygula, Redis’i hem cache hem memory için kullan, retry mekanizmasını tenacity ile yönet, LangSmith ile her şeyi izle ve güvenlik kontrollerini asla atlatma.
Gerçek production deployment’larında en sık yapılan hata, LLM çağrılarının ne kadar yavaş ve pahalı olduğunu küçümsemek. Her bir API çağrısının 2-5 saniye sürdüğünü ve para harcadığını unutma. Caching ve akıllı rate limiting ile hem kullanıcı deneyimini iyileştirir hem de maliyeti kontrol altında tutarsın.
Son olarak: production’a çıkmadan önce gerçek yük altında test et. locust veya k6 ile 100 eş zamanlı kullanıcı simüle et ve sistemin nasıl davrandığını gör. Sorunları production’da değil, test ortamında bulmak çok daha az maliyetli.
