LangChain LCEL ile Modern Chain Yazımı
Uzun süredir LangChain kullananlar için LCEL (LangChain Expression Language) gerçekten bir paradigma değişikliği oldu. İlk gördüğümde “bir pipe operatörü ne işe yarayacak ki” diye düşündüğümü itiraf etmeliyim. Ama production’da birkaç ciddi proje geliştirdikten sonra, LCEL’in sadece sözdizimsel şeker olmadığını, arkasında çok daha derin bir tasarım felsefesi yattığını anladım. Bu yazıda LCEL’i sıfırdan ele alacağız, gerçek dünya senaryolarıyla ne zaman nasıl kullanılması gerektiğini konuşacağız.
LCEL Nedir ve Neden Önemlidir
LCEL, LangChain bileşenlerini | operatörüyle birbirine zincirleyen bir yapıdır. Ama bu tanım işin yüzeyinde kalıyor. Asıl önemli olan şey şu: LCEL ile yazdığınız her zincir otomatik olarak async desteği, streaming desteği, paralel çalışma ve retry mekanizması kazanıyor. Bunları tek tek implement etmek zorunda kalmıyorsunuz.
Eski yöntemle basit bir RAG zinciri yazmak istediğinizde onlarca satır callback, async wrapper ve error handling yazıyordunuz. LCEL ile aynı şeyi çok daha az kodla ve üstüne dahası daha sağlam bir şekilde yapabiliyorsunuz.
Kurulum için önce ortamı hazırlayalım:
pip install langchain langchain-openai langchain-community python-dotenv
pip install faiss-cpu # vector store için
Temel Zincir Yapısı
LCEL’in kalbi Runnable protokolüdür. LangChain’deki neredeyse her bileşen bu protokolü implement eder: promptlar, modeller, output parser’lar, retriever’lar. Bunların hepsi | ile birbirine bağlanabilir.
En basit zinciri görelim:
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from dotenv import load_dotenv
load_dotenv()
# Temel zincir: prompt | model | parser
prompt = ChatPromptTemplate.from_template(
"Sen deneyimli bir Linux sistem yöneticisisin. "
"Şu soruyu Türkçe olarak yanıtla: {soru}"
)
model = ChatOpenAI(model="gpt-4o-mini", temperature=0)
parser = StrOutputParser()
# LCEL zinciri
zincir = prompt | model | parser
# Çalıştır
sonuc = zincir.invoke({"soru": "Zombie process nedir ve nasıl temizlenir?"})
print(sonuc)
Bu kadar basit. Ama şimdi bu basitliğin arkasında ne olduğuna bakalım. invoke çağrısında LangChain her adımı sırayla çalıştırıyor, hata durumunda uygun exception fırlatıyor ve her adımın çıktısını bir sonrakine geçiriyor. Bunu elle yazsaydınız en az 30-40 satır sürerdi.
RunnablePassthrough ve RunnableLambda
Gerçek dünya senaryolarında veriyi zincir boyunca taşımanız ve dönüştürmeniz gerekir. İşte burada RunnablePassthrough ve RunnableLambda devreye girer.
from langchain_core.runnables import RunnablePassthrough, RunnableLambda
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
model = ChatOpenAI(model="gpt-4o-mini")
parser = StrOutputParser()
# Log ve metrik toplama için lambda
def log_girdi(girdi):
print(f"[LOG] Zincire gelen girdi: {girdi}")
return girdi
def kelime_sayisi_ekle(metin):
return {
"yanit": metin,
"kelime_sayisi": len(metin.split()),
"karakter_sayisi": len(metin)
}
prompt = ChatPromptTemplate.from_template(
"Şu Linux komutunu açıkla: {komut}"
)
# Zincire lambda'ları entegre et
zincir = (
RunnableLambda(log_girdi)
| prompt
| model
| parser
| RunnableLambda(kelime_sayisi_ekle)
)
sonuc = zincir.invoke({"komut": "awk '{print $1}' /var/log/syslog | sort | uniq -c"})
print(f"Yanıt: {sonuc['yanit'][:100]}...")
print(f"Kelime sayısı: {sonuc['kelime_sayisi']}")
print(f"Karakter sayısı: {sonuc['karakter_sayisi']}")
RunnablePassthrough ise veriyi değiştirmeden geçirir ama aynı zamanda yeni anahtarlar eklemenizi sağlar. RAG uygulamalarında context ile birlikte orijinal soruyu da tutmak için çok kullanışlıdır:
from langchain_core.runnables import RunnablePassthrough
# Context eklerken orijinal soruyu da koru
zincir = RunnablePassthrough.assign(
context=lambda x: "Buraya retriever'dan gelen context gelecek"
) | prompt | model | parser
Paralel Çalışma ile RunnableParallel
Sistem yönetimi senaryolarında bazen birden fazla şeyi aynı anda analiz etmek istersiniz. Örneğin bir log dosyasını hem güvenlik açısından hem de performans açısından değerlendirmek gibi. RunnableParallel tam bu iş için var.
from langchain_core.runnables import RunnableParallel
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
model = ChatOpenAI(model="gpt-4o-mini", temperature=0)
parser = StrOutputParser()
# İki farklı analiz prompt'u
guvenlik_prompt = ChatPromptTemplate.from_template(
"Şu sistem logunu güvenlik açısından analiz et ve potansiyel tehditleri listele:n{log}"
)
performans_prompt = ChatPromptTemplate.from_template(
"Şu sistem logunu performans açısından analiz et, yavaşlık veya kaynak sorunlarını belirle:n{log}"
)
ozet_prompt = ChatPromptTemplate.from_template(
"Şu sistem logunu tek cümleyle özetle:n{log}"
)
# Paralel zincirler
paralel_analiz = RunnableParallel(
guvenlik=guvenlik_prompt | model | parser,
performans=performans_prompt | model | parser,
ozet=ozet_prompt | model | parser
)
ornek_log = """
2024-01-15 03:42:11 WARN sshd[1234]: Failed password for root from 192.168.1.105
2024-01-15 03:42:15 WARN sshd[1235]: Failed password for root from 192.168.1.105
2024-01-15 03:42:19 WARN sshd[1236]: Failed password for root from 192.168.1.105
2024-01-15 03:45:00 INFO kernel: CPU usage spike: 98% for 30 seconds
2024-01-15 03:45:30 WARN OOM: process nginx killed, out of memory
"""
sonuc = paralel_analiz.invoke({"log": ornek_log})
print("=== GÜVENLİK ANALİZİ ===")
print(sonuc["guvenlik"])
print("n=== PERFORMANS ANALİZİ ===")
print(sonuc["performans"])
print("n=== ÖZET ===")
print(sonuc["ozet"])
Bu kodu eski yöntemle yazsaydınız ya sıralı çalıştırıp 3 kat daha uzun süre beklerdiniz ya da thread/asyncio ile uğraşıp saatler harcardınız. LCEL ile paralel çalışma ücretsiz geliyor.
Streaming ile Gerçek Zamanlı Çıktı
Production’da kullanıcı deneyimi için streaming şart. Özellikle uzun yanıtlar üretirken kullanıcının ekranda bir şeyler görmesi gerekiyor. LCEL’de her zincir otomatik olarak streaming destekler.
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
import sys
model = ChatOpenAI(model="gpt-4o-mini", temperature=0.7)
parser = StrOutputParser()
prompt = ChatPromptTemplate.from_template(
"Kubernetes cluster'ında pod scheduling sorunlarını gidermek için "
"adım adım bir troubleshooting rehberi yaz. Türkçe olarak detaylı açıkla: {senaryo}"
)
zincir = prompt | model | parser
senaryo = "Pod'lar Pending durumunda kalıyor ve describe çıktısında 'Insufficient memory' hatası görünüyor"
print("Yanıt akıyor:n")
for chunk in zincir.stream({"senaryo": senaryo}):
print(chunk, end="", flush=True)
sys.stdout.flush()
print("nnStreaming tamamlandı.")
Async streaming için de aynı zinciri kullanabilirsiniz:
import asyncio
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
model = ChatOpenAI(model="gpt-4o-mini")
parser = StrOutputParser()
prompt = ChatPromptTemplate.from_template("Şunu açıkla: {konu}")
zincir = prompt | model | parser
async def streaming_calistir():
async for chunk in zincir.astream({"konu": "Linux inode nedir"}):
print(chunk, end="", flush=True)
asyncio.run(streaming_calistir())
Koşullu Dallanma ile Dinamik Zincirler
Gerçek uygulamalarda her sorgu aynı yolu izlemez. Bazı sorgular basit, bazıları karmaşık; bazıları teknik, bazıları genel bilgi içerir. RunnableBranch ile bu dallanmayı yönetebilirsiniz.
from langchain_core.runnables import RunnableBranch, RunnableLambda
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
model = ChatOpenAI(model="gpt-4o-mini", temperature=0)
parser = StrOutputParser()
# Farklı kategoriler için prompt'lar
linux_prompt = ChatPromptTemplate.from_template(
"Linux/Unix sistem yöneticisi olarak yanıtla. Komut örnekleri ver: {soru}"
)
windows_prompt = ChatPromptTemplate.from_template(
"Windows Server yöneticisi olarak yanıtla. PowerShell veya CMD örnekleri ver: {soru}"
)
genel_prompt = ChatPromptTemplate.from_template(
"Sistem yönetimi uzmanı olarak genel bir yanıt ver: {soru}"
)
# Kategori belirleyici
def kategori_belirle(girdi):
soru = girdi["soru"].lower()
linux_anahtar = ["linux", "ubuntu", "centos", "bash", "shell", "apt", "yum", "systemd", "grep", "awk"]
windows_anahtar = ["windows", "powershell", "iis", "active directory", "registry", "cmd", "hyper-v"]
if any(k in soru for k in linux_anahtar):
return "linux"
elif any(k in soru for k in windows_anahtar):
return "windows"
else:
return "genel"
def route_ekle(girdi):
return {**girdi, "kategori": kategori_belirle(girdi)}
# Dallanma zinciri
dal = RunnableBranch(
(lambda x: x["kategori"] == "linux", linux_prompt | model | parser),
(lambda x: x["kategori"] == "windows", windows_prompt | model | parser),
genel_prompt | model | parser # default
)
tam_zincir = RunnableLambda(route_ekle) | dal
# Test
sorular = [
{"soru": "Ubuntu'da nginx kurulumu nasıl yapılır?"},
{"soru": "Windows Server'da IIS nasıl yapılandırılır?"},
{"soru": "Load balancer nedir ve ne işe yarar?"}
]
for soru in sorular:
print(f"nSoru: {soru['soru']}")
print(f"Yanıt: {tam_zincir.invoke(soru)[:150]}...")
print("-" * 50)
Gerçek Dünya Senaryosu: RAG Tabanlı Dokümantasyon Asistanı
Şimdi bunların hepsini bir araya getirelim. DevOps ekiplerinin en çok ihtiyaç duyduğu şeylerden biri: kendi iç dokümantasyonlarına dayalı bir soru-cevap sistemi.
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough, RunnableLambda
from langchain_community.vectorstores import FAISS
from langchain_text_splitters import RecursiveCharacterTextSplitter
# Örnek iç dokümantasyon (gerçekte dosyadan okunur)
dahili_dokumanlar = [
"Deployment prosedürü: Her deployment öncesi staging ortamında test edilmelidir. "
"Blue-green deployment stratejisi kullanılır. Rollback için önceki image tag saklanır.",
"Monitoring alarmları: CPU %85 üzerinde 5 dakika boyunca kalırsa PagerDuty alarmı tetiklenir. "
"Memory %90 üzerinde olduğunda otomatik scale-out başlar. Disk doluluk %80'de uyarı verilir.",
"Database backup politikası: PostgreSQL veritabanları her gece 02:00'de full backup alır. "
"Incrementel backup 4 saatte bir çalışır. Backuplar 30 gün saklanır, S3'te şifreli olarak tutulur.",
"Incident response: P1 incident'larda maksimum 15 dakika içinde response verilmelidir. "
"On-call mühendis Slack #incidents kanalında bilgilendirilir. Post-mortem 24 saat içinde yazılmalıdır."
]
# Vector store oluştur
embeddings = OpenAIEmbeddings()
text_splitter = RecursiveCharacterTextSplitter(chunk_size=200, chunk_overlap=20)
splits = text_splitter.create_documents(dahili_dokumanlar)
vectorstore = FAISS.from_documents(splits, embeddings)
retriever = vectorstore.as_retriever(search_kwargs={"k": 2})
# RAG prompt
rag_prompt = ChatPromptTemplate.from_template(
"""Sen bu şirketin DevOps asistanısın. Sadece aşağıdaki şirket dokümantasyonuna dayanarak yanıt ver.
Eğer bilgi dokümantasyonda yoksa 'Bu konuda dokümantasyonumuzda bilgi bulunmuyor' de.
Şirket Dokümantasyonu:
{context}
Soru: {soru}
Yanıt:"""
)
model = ChatOpenAI(model="gpt-4o-mini", temperature=0)
parser = StrOutputParser()
def context_formatla(docs):
return "nn".join(doc.page_content for doc in docs)
# RAG zinciri
rag_zinciri = (
RunnablePassthrough.assign(
context=lambda x: context_formatla(retriever.invoke(x["soru"]))
)
| rag_prompt
| model
| parser
)
# Test soruları
test_sorulari = [
"Deployment sırasında bir sorun çıkarsa nasıl rollback yaparız?",
"Database backupları ne zaman alınıyor ve ne kadar süre saklanıyor?",
"CPU alarmı ne zaman tetikleniyor?"
]
for soru in test_sorulari:
print(f"nSoru: {soru}")
yanit = rag_zinciri.invoke({"soru": soru})
print(f"Yanıt: {yanit}")
print("-" * 60)
Hata Yönetimi ve Retry Mekanizması
Production’da her şey yolunda gitmez. API timeout’ları, rate limit’ler, geçici ağ sorunları. LCEL’de bununla başa çıkmak için with_retry ve with_fallbacks var.
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableLambda
import time
# Ana model - pahalı ama kaliteli
gpt4_model = ChatOpenAI(model="gpt-4o", temperature=0)
# Fallback model - daha ucuz
gpt35_model = ChatOpenAI(model="gpt-4o-mini", temperature=0)
parser = StrOutputParser()
prompt = ChatPromptTemplate.from_template(
"Şu Kubernetes hatasını analiz et ve çözüm öner:n{hata}"
)
# Retry ile güvenilir model
guvenilir_model = gpt4_model.with_retry(
stop_after_attempt=3,
wait_exponential_jitter=True
)
# Fallback zinciri: önce gpt4 dener, başarısız olursa mini'ye geçer
zincir = prompt | guvenilir_model.with_fallbacks([gpt35_model]) | parser
# Gecikme ve başarı loglaması için wrapper
def performans_izle(fn):
def wrapper(girdi):
baslangic = time.time()
try:
sonuc = fn(girdi)
sure = time.time() - baslangic
print(f"[BAŞARILI] Süre: {sure:.2f}s")
return sonuc
except Exception as e:
sure = time.time() - baslangic
print(f"[HATA] Süre: {sure:.2f}s, Hata: {str(e)}")
raise
return wrapper
ornek_hata = """
Error from server (Forbidden): pods is forbidden:
User "system:serviceaccount:default:myapp" cannot list resource "pods"
in API group "" in the namespace "production"
"""
yanit = zincir.invoke({"hata": ornek_hata})
print(yanit)
Zincir Bileşimini Yeniden Kullanılabilir Hale Getirme
Büyük projelerde zincirleri modüler tutmak hayat kurtarır. Her zinciri bağımsız test edebilir ve farklı kombinasyonlarda kullanabilirsiniz.
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser, JsonOutputParser
from langchain_core.runnables import RunnablePassthrough
from pydantic import BaseModel
from typing import List
model = ChatOpenAI(model="gpt-4o-mini", temperature=0)
# Yapılandırılmış çıktı için model
class LogAnalizi(BaseModel):
onem_seviyesi: str
tespit_edilen_sorunlar: List[str]
onerim: str
acil_mi: bool
# Modüler zincirler
log_temizle_zinciri = (
RunnablePassthrough()
| (lambda x: {"log": x["log"].strip().replace("t", " ")})
)
log_analiz_prompt = ChatPromptTemplate.from_template(
"""Şu sistem logunu analiz et ve JSON formatında yanıt ver.
Log:
{log}
Şu formatta JSON döndür:
{{
"onem_seviyesi": "KRITIK/YUKSEK/ORTA/DUSUK",
"tespit_edilen_sorunlar": ["sorun1", "sorun2"],
"onerim": "yapılması gereken işlem",
"acil_mi": true/false
}}"""
)
json_parser = JsonOutputParser(pydantic_object=LogAnalizi)
# Birleşik analiz zinciri
analiz_zinciri = (
log_temizle_zinciri
| log_analiz_prompt
| model
| json_parser
)
# Kullanım
test_log = """
Jan 15 04:23:01 webserver01 kernel: [Hardware Error]: Machine check events logged
Jan 15 04:23:01 webserver01 kernel: EDAC MC0: 1 CE memory read error on CPU_SrcID#0
Jan 15 04:23:05 webserver01 systemd: nginx.service: Main process exited with error
"""
sonuc = analiz_zinciri.invoke({"log": test_log})
print(f"Önem Seviyesi: {sonuc['onem_seviyesi']}")
print(f"Tespit Edilen Sorunlar: {', '.join(sonuc['tespit_edilen_sorunlar'])}")
print(f"Öneri: {sonuc['onerim']}")
print(f"Acil Mi: {'Evet' if sonuc['acil_mi'] else 'Hayır'}")
LCEL’de Dikkat Edilmesi Gereken Noktalar
Sahadan gelen notlar olarak şunları paylaşmak istiyorum:
- Tip uyumsuzlukları: Zincirdeki her adım bir öncekinin çıktısını bekler.
StrOutputParsersonrasına dict bekleyen bir prompt koyarsanız hata alırsınız. Zincirleri küçük parçalar halinde test edin. - Streaming ve JSON parser:
JsonOutputParserilestream()kullanırsanız tüm JSON gelene kadar parse edemezsiniz. Streaming’e ihtiyacınız varsaStrOutputParserkullanıp elle parse edin. - Memory kullanımı: Büyük document koleksiyonlarıyla çalışırken
RunnableParallelçok sayıda eş zamanlı API isteği açabilir. Rate limit’e dikkat edin, gerekirse semaphore ekleyin. - Debug için invoke vs stream: Geliştirme sırasında
invokekullanmak daha kolay debug sağlar. Production’a geçerken streaming’e alabilirsiniz. - Zincir uzunluğu: Çok uzun zincirler okunabilirliği düşürür. 5-6 adımdan uzun zincirleri alt fonksiyonlara bölün.
Sonuç
LCEL ilk başta “neden | kullanalım ki, fonksiyon çağrısı yazmak daha açık değil mi” dedirtebilir. Ben de öyle düşündüm. Ama bir süre sonra şunu fark ettim: LCEL sayesinde orchestration koduna değil, asıl business logic’e odaklanıyorsunuz.
Paralel çalışma, streaming, retry, fallback, async destek; bunların hepsi neredeyse bedava geliyor. Özellikle production’da birden fazla LLM çağrısı olan karmaşık iş akışları yazarken bu fark çok belirginleşiyor. Bir DevOps ekibi için log analizi, dokümantasyon asistanı veya incident response otomasyonu geliştiriyorsanız, LCEL bu işleri çok daha hızlı ve güvenilir hale getiriyor.
Başlangıç için tavsiyem şu: önce en basit prompt | model | parser zinciriyle başlayın. Ardından RunnablePassthrough.assign ile context eklemeyi öğrenin. Oradan RunnableParallel ve RunnableBranch ile kompleks senaryolara geçin. Her adımı ayrı ayrı test edin, sonra birleştirin. Bu metodoloji hem öğrenmeyi hızlandırır hem de debugging’i kolaylaştırır.
