LangChain Chain Mimarisi: Adım Adım İş Akışı Tasarlama

Sistem yöneticisi olarak yıllar içinde öğrendiğim en önemli şeylerden biri şu: Karmaşık bir işi otomatize etmek istiyorsan, önce o işin adımlarını kağıda dök. LangChain’in Chain mimarisi de tam olarak bu mantıkla çalışıyor. Bir LLM’e tek seferlik soru sormanın ötesine geçip, birbirine bağlı iş akışları oluşturmak istiyorsan, Chain kavramını içselleştirmek zorundasın.

Bu yazıda LangChain’in Chain mimarisini gerçek dünya senaryolarıyla ele alacağım. Sadece teorik değil, production ortamında karşılaşacağın durumları da işleyeceğiz.

Chain Nedir ve Neden İhtiyaç Duyarız

LangChain’de bir “Chain”, birden fazla bileşeni sıralı veya paralel olarak birbirine bağlayan yapıdır. Bunu bir shell pipeline’ı gibi düşünebilirsin. cat log.txt | grep ERROR | awk '{print $1}' | sort | uniq -c komutunda her araç bir öncekinin çıktısını alıp işliyor. Chain’ler de aynı mantıkla çalışıyor, sadece bu araçlardan biri veya birkaçı LLM oluyor.

Tek bir prompt ile halledilemeyen durumlar Chain kullanımını zorunlu kılar:

  • Çok adımlı düşünme gerektiren problemler: Önce veriyi analiz et, sonra önerileri listele, son olarak özet çıkar
  • Harici veri kaynakları: Veritabanından çek, LLM ile işle, sonucu formatla
  • Validasyon döngüleri: LLM çıktısını başka bir LLM veya fonksiyonla doğrula
  • Dinamik yönlendirme: Girdi tipine göre farklı Chain’lere yönlendir

Ortamı Hazırlayalım

Başlamadan önce gerekli paketleri kuralım:

pip install langchain langchain-openai langchain-community python-dotenv

# Ortam değişkenlerini ayarla
cat > .env << 'EOF'
OPENAI_API_KEY=sk-your-api-key-here
LANGCHAIN_TRACING_V2=true
LANGCHAIN_API_KEY=your-langsmith-key
EOF

# Python ortamını test et
python3 -c "import langchain; print(f'LangChain version: {langchain.__version__}')"

LangSmith key opsiyonel ama şiddetle tavsiye ediyorum. Chain’lerin her adımını görselleştirip debug etmeni sağlıyor. Production’a almadan önce mutlaka kullan.

En Temel Yapı: LLMChain

Modern LangChain’de LCEL (LangChain Expression Language) kullanımı standart hale geldi. Eski LLMChain sınıfı hala çalışıyor ama yeni projelerde LCEL’i tercih et.

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from dotenv import load_dotenv

load_dotenv()

# Model ve prompt tanımla
model = ChatOpenAI(model="gpt-4o-mini", temperature=0)

prompt = ChatPromptTemplate.from_template(
    "Aşağıdaki sistem logunu analiz et ve kritik hataları listele:nn{log_content}"
)

# LCEL ile chain oluştur - pipe operatörü kullanıyoruz
chain = prompt | model | StrOutputParser()

# Test edelim
log_sample = """
2024-01-15 09:23:11 ERROR Database connection timeout after 30s
2024-01-15 09:23:12 INFO Retry attempt 1/3
2024-01-15 09:23:42 ERROR Database connection timeout after 30s
2024-01-15 09:23:43 WARN Memory usage at 87%
2024-01-15 09:24:01 CRITICAL Service unavailable - all retries exhausted
"""

result = chain.invoke({"log_content": log_sample})
print(result)

Pipe operatörü (|) burada Unix pipe’ı gibi çalışıyor. Prompt’un çıktısı model’e, model’in çıktısı parser’a gidiyor. Basit ama güçlü.

Sequential Chain: Adım Adım İş Akışı

Gerçek dünyada tek bir LLM çağrısı nadiren yeterli olur. Bir log analiz sisteminde şu adımları düşünelim: Ham logu parse et, hataları kategorize et, çözüm önerileri üret, yönetici için özet hazırla.

from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI

model = ChatOpenAI(model="gpt-4o-mini", temperature=0)
parser = StrOutputParser()

# Adım 1: Log parsing ve hata tespiti
parse_prompt = ChatPromptTemplate.from_template("""
Sen bir sistem yöneticisi asistanısın.
Aşağıdaki ham log verisini analiz et.
Sadece ERROR ve CRITICAL seviyesindeki olayları JSON formatında listele.

Log:
{raw_log}

Format: {{"errors": [{{"timestamp": "", "level": "", "message": ""}}]}}
""")

# Adım 2: Hataları kategorize et
categorize_prompt = ChatPromptTemplate.from_template("""
Aşağıdaki hata listesini kategorize et.
Her hatayı şu kategorilerden birine koy: DATABASE, NETWORK, MEMORY, APPLICATION, SECURITY

Hatalar:
{parsed_errors}

Her kategori için etkilenen hata sayısını ve kritiklik seviyesini belirt.
""")

# Adım 3: Çözüm önerileri
solution_prompt = ChatPromptTemplate.from_template("""
Aşağıdaki kategorize edilmiş hataları için adım adım çözüm önerileri sun.
Önerileri öncelik sırasına göre listele (kritik önce).

Kategorize hatalar:
{categorized_errors}
""")

# Chain'leri birbirine bağla
parse_chain = parse_prompt | model | parser
categorize_chain = categorize_prompt | model | parser
solution_chain = solution_prompt | model | parser

# Tüm pipeline'ı oluştur
full_pipeline = (
    {"parsed_errors": parse_chain}
    | categorize_chain
)

# Ya da daha açık yazalım
def run_log_analysis(raw_log: str) -> dict:
    step1 = parse_chain.invoke({"raw_log": raw_log})
    print(f"[Adım 1 tamamlandı] Parsing: {len(step1)} karakter çıktı")
    
    step2 = categorize_chain.invoke({"parsed_errors": step1})
    print(f"[Adım 2 tamamlandı] Kategorizasyon: {len(step2)} karakter çıktı")
    
    step3 = solution_chain.invoke({"categorized_errors": step2})
    print(f"[Adım 3 tamamlandı] Öneriler hazır")
    
    return {
        "parsed": step1,
        "categorized": step2,
        "solutions": step3
    }

test_log = """
2024-01-15 10:00:01 ERROR PostgreSQL: max_connections reached (100/100)
2024-01-15 10:00:05 CRITICAL Application server OOM killed
2024-01-15 10:00:07 ERROR Redis connection refused on port 6379
2024-01-15 10:00:09 ERROR SSL certificate expired for api.example.com
"""

results = run_log_analysis(test_log)
print("n=== ÇÖZÜM ÖNERİLERİ ===")
print(results["solutions"])

Bu yapıda her adımın çıktısını logluyoruz. Production’da bu loglar sana neyin nerede takıldığını gösterecek.

RunnableParallel: Eş Zamanlı İşlemler

Bazen adımların birbirini beklemesine gerek yok. LangChain’de paralel çalıştırma için RunnableParallel kullanıyoruz:

from langchain_core.runnables import RunnableParallel, RunnablePassthrough
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

model = ChatOpenAI(model="gpt-4o-mini", temperature=0)
parser = StrOutputParser()

# İki farklı analiz paralel çalışsın
security_prompt = ChatPromptTemplate.from_template(
    "Bu log verisinde güvenlik açıklarını ve şüpheli aktiviteleri tespit et:n{log}"
)

performance_prompt = ChatPromptTemplate.from_template(
    "Bu log verisinde performans sorunlarını ve darboğazları analiz et:n{log}"
)

# Paralel chain
parallel_analysis = RunnableParallel(
    security=security_prompt | model | parser,
    performance=performance_prompt | model | parser,
    original_log=RunnablePassthrough()
)

# Paralel sonuçları birleştiren son adım
combine_prompt = ChatPromptTemplate.from_template("""
Güvenlik analizi:
{security}

Performans analizi:
{performance}

Yukarıdaki iki analizi birleştirerek yönetici için kapsamlı bir özet rapor hazırla.
Acil aksiyon gerektiren maddeleri başa al.
""")

final_chain = parallel_analysis | combine_prompt | model | parser

result = final_chain.invoke({"log": test_log})
print(result)

RunnablePassthrough() burada önemli. Girdi verisini olduğu gibi bir sonraki adıma taşıyor, bu da combine adımında orijinal veriyi de kullanabilmeni sağlıyor.

Router Chain: Dinamik Yönlendirme

Farklı girdi tiplerini farklı chain’lere yönlendirmek isteyebilirsin. Örneğin: Nginx logu mu, PostgreSQL logu mu, uygulama logu mu?

from langchain_core.output_parsers import JsonOutputParser
from langchain_core.runnables import RunnableLambda
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate

model = ChatOpenAI(model="gpt-4o-mini", temperature=0)

# Classifier chain - log tipini belirle
classifier_prompt = ChatPromptTemplate.from_template("""
Aşağıdaki log içeriğini analiz et ve tipini belirle.
Sadece şu değerlerden birini döndür: nginx, postgresql, application, unknown

Log:
{log_content}

Yanıt olarak sadece tip adını yaz, başka hiçbir şey ekleme.
""")

classifier_chain = classifier_prompt | model | StrOutputParser()

# Her log tipi için özelleşmiş chain'ler
nginx_prompt = ChatPromptTemplate.from_template(
    "Bu Nginx logunu analiz et. HTTP status kodlarını, yavaş istekleri ve 5xx hatalarını raporla:n{log_content}"
)

postgres_prompt = ChatPromptTemplate.from_template(
    "Bu PostgreSQL logunu analiz et. Slow query'leri, connection sorunlarını ve lock'ları raporla:n{log_content}"
)

app_prompt = ChatPromptTemplate.from_template(
    "Bu uygulama logunu analiz et. Exception'ları, stack trace'leri ve hata paternlerini raporla:n{log_content}"
)

nginx_chain = nginx_prompt | model | StrOutputParser()
postgres_chain = postgres_prompt | model | StrOutputParser()
app_chain = app_prompt | model | StrOutputParser()

def route_to_chain(inputs: dict) -> str:
    log_content = inputs["log_content"]
    log_type = classifier_chain.invoke({"log_content": log_content}).strip().lower()
    
    print(f"[Router] Tespit edilen log tipi: {log_type}")
    
    if log_type == "nginx":
        return nginx_chain.invoke({"log_content": log_content})
    elif log_type == "postgresql":
        return postgres_chain.invoke({"log_content": log_content})
    elif log_type == "application":
        return app_chain.invoke({"log_content": log_content})
    else:
        return f"Bilinmeyen log tipi tespit edildi: {log_type}. Manuel inceleme gerekiyor."

router_chain = RunnableLambda(route_to_chain)

# Test
nginx_log = """
192.168.1.100 - - [15/Jan/2024:10:23:45 +0300] "GET /api/users HTTP/1.1" 504 1234 "23.456"
192.168.1.101 - - [15/Jan/2024:10:23:46 +0300] "POST /api/data HTTP/1.1" 200 567 "0.123"
192.168.1.102 - - [15/Jan/2024:10:23:47 +0300] "GET /health HTTP/1.1" 500 89 "45.678"
"""

print(router_chain.invoke({"log_content": nginx_log}))

Memory ile Stateful Chain’ler

Conversation geçmişini tutman gereken durumlarda memory kullanmak zorundasın. Bir IT helpdesk botu düşün:

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.output_parsers import StrOutputParser
from langchain_core.messages import HumanMessage, AIMessage

model = ChatOpenAI(model="gpt-4o-mini", temperature=0.3)

# Conversation geçmişini destekleyen prompt
helpdesk_prompt = ChatPromptTemplate.from_messages([
    ("system", """Sen deneyimli bir IT helpdesk uzmanısın.
    Kullanıcılara sistem sorunlarını çözmekte yardım ediyorsun.
    Her zaman adım adım açıklama yap.
    Türkçe yanıt ver."""),
    MessagesPlaceholder(variable_name="history"),
    ("human", "{input}")
])

helpdesk_chain = helpdesk_prompt | model | StrOutputParser()

# Basit in-memory conversation yönetimi
class ConversationManager:
    def __init__(self, chain):
        self.chain = chain
        self.history = []
        self.session_id = None
    
    def chat(self, user_input: str) -> str:
        response = self.chain.invoke({
            "input": user_input,
            "history": self.history
        })
        
        # Geçmişi güncelle
        self.history.append(HumanMessage(content=user_input))
        self.history.append(AIMessage(content=response))
        
        # Geçmiş çok uzarsa eski mesajları sil (token limiti)
        if len(self.history) > 20:
            self.history = self.history[-20:]
        
        return response

# Kullanım
bot = ConversationManager(helpdesk_chain)

# Simüle edilmiş konuşma
questions = [
    "Sunucuma SSH ile bağlanamıyorum, connection refused hatası alıyorum",
    "SSH servisi çalışıyor mu nasıl kontrol ederim?",
    "systemctl status ssh yazdım, active (running) görünüyor ama hala bağlanamıyorum"
]

for q in questions:
    print(f"nKullanıcı: {q}")
    response = bot.chat(q)
    print(f"Bot: {response}")

Output Parser ile Yapılandırılmış Çıktı

LLM çıktısını doğrudan parse etmek yerine, Pydantic modelleri ile yapılandırılmış veri almak production kodunda çok daha güvenli:

from pydantic import BaseModel, Field
from typing import List
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import JsonOutputParser

class SystemIssue(BaseModel):
    severity: str = Field(description="Kritiklik seviyesi: critical, high, medium, low")
    category: str = Field(description="Sorun kategorisi: database, network, memory, application")
    description: str = Field(description="Sorunun kısa açıklaması")
    affected_service: str = Field(description="Etkilenen servis veya bileşen")
    recommended_action: str = Field(description="Önerilen aksiyon")
    estimated_fix_time: str = Field(description="Tahmini çözüm süresi")

class LogAnalysisReport(BaseModel):
    total_errors: int = Field(description="Toplam hata sayısı")
    critical_count: int = Field(description="Kritik hata sayısı")
    issues: List[SystemIssue] = Field(description="Tespit edilen sorunlar listesi")
    overall_health: str = Field(description="Genel sistem sağlığı: healthy, degraded, critical")
    summary: str = Field(description="Yönetici özeti")

model = ChatOpenAI(model="gpt-4o-mini", temperature=0)
parser = JsonOutputParser(pydantic_object=LogAnalysisReport)

analysis_prompt = ChatPromptTemplate.from_template("""
Aşağıdaki sistem logunu analiz et ve istenen formatta rapor oluştur.

{format_instructions}

Log verisi:
{log_content}
""")

structured_chain = (
    analysis_prompt.partial(format_instructions=parser.get_format_instructions())
    | model
    | parser
)

sample_log = """
2024-01-15 14:00:01 CRITICAL MySQL: Disk space critical - 98% full on /var/lib/mysql
2024-01-15 14:00:03 ERROR Apache: Worker process killed - max processes reached
2024-01-15 14:00:05 ERROR Redis: AOF rewrite failed - insufficient memory
2024-01-15 14:00:08 WARN CPU load average: 18.5 (threshold: 8.0)
2024-01-15 14:00:10 ERROR PHP-FPM: Pool www reached max_children limit
"""

report = structured_chain.invoke({"log_content": sample_log})
print(f"Genel Sistem Sağlığı: {report['overall_health']}")
print(f"Toplam Hata: {report['total_errors']}, Kritik: {report['critical_count']}")
print(f"nÖzet: {report['summary']}")
print("nSorunlar:")
for issue in report['issues']:
    print(f"  [{issue['severity'].upper()}] {issue['category']}: {issue['description']}")

Gerçek Dünya Senaryosu: Otomatik Incident Yönetimi

Tüm öğrendiklerimizi birleştirerek gerçekçi bir incident yönetim sistemi kuralım:

# Önce gerekli klasör yapısını oluştur
mkdir -p /opt/incident-manager/{logs,reports,config}
cat > /opt/incident-manager/config/settings.yaml << 'EOF'
model: gpt-4o-mini
temperature: 0
max_tokens: 2000
report_output_dir: /opt/incident-manager/reports
log_watch_dir: /var/log
severity_threshold: high
notification_webhook: https://hooks.slack.com/your-webhook
EOF
import os
import json
from datetime import datetime
from pathlib import Path
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser, JsonOutputParser
from langchain_core.runnables import RunnableParallel, RunnableLambda
from dotenv import load_dotenv

load_dotenv()

class IncidentManager:
    def __init__(self, output_dir: str = "/opt/incident-manager/reports"):
        self.model = ChatOpenAI(model="gpt-4o-mini", temperature=0)
        self.parser = StrOutputParser()
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(parents=True, exist_ok=True)
        self._build_chains()
    
    def _build_chains(self):
        # Hızlı triage chain - ilk değerlendirme
        triage_prompt = ChatPromptTemplate.from_template("""
        Bu incident için hızlı triage yap. 30 saniye içinde yanıt gerekiyor.
        
        Şunu belirle:
        1. Kritiklik seviyesi (P1/P2/P3/P4)
        2. Etkilenen servisler
        3. Tahmini etki alanı (kullanıcı sayısı, servisler)
        4. İlk 5 dakikada yapılması gerekenler
        
        Incident detayı: {incident_data}
        """)
        
        # Kök neden analizi chain
        rca_prompt = ChatPromptTemplate.from_template("""
        Aşağıdaki incident için kök neden analizi yap.
        5 Why metodolojisini kullan.
        
        Incident: {incident_data}
        İlk bulgular: {triage_result}
        """)
        
        # Runbook oluşturma chain
        runbook_prompt = ChatPromptTemplate.from_template("""
        Bu incident için adım adım çözüm runbook'u oluştur.
        Her adım için:
        - Komutu yaz
        - Beklenen çıktıyı belirt
        - Başarısız olursa ne yapılacağını açıkla
        
        Incident: {incident_data}
        Kök neden: {rca_result}
        """)
        
        self.triage_chain = triage_prompt | self.model | self.parser
        self.rca_chain = rca_prompt | self.model | self.parser
        self.runbook_chain = runbook_prompt | self.model | self.parser
    
    def process_incident(self, incident_data: str) -> dict:
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        print(f"[{timestamp}] Incident işleniyor...")
        
        # Adım 1: Triage
        print("[1/3] Triage yapılıyor...")
        triage = self.triage_chain.invoke({"incident_data": incident_data})
        
        # Adım 2: Kök neden analizi
        print("[2/3] Kök neden analizi yapılıyor...")
        rca = self.rca_chain.invoke({
            "incident_data": incident_data,
            "triage_result": triage
        })
        
        # Adım 3: Runbook
        print("[3/3] Runbook oluşturuluyor...")
        runbook = self.runbook_chain.invoke({
            "incident_data": incident_data,
            "rca_result": rca
        })
        
        report = {
            "timestamp": timestamp,
            "incident_summary": incident_data[:200],
            "triage": triage,
            "root_cause_analysis": rca,
            "runbook": runbook
        }
        
        # Raporu kaydet
        report_file = self.output_dir / f"incident_{timestamp}.json"
        with open(report_file, "w", encoding="utf-8") as f:
            json.dump(report, f, ensure_ascii=False, indent=2)
        
        print(f"[OK] Rapor kaydedildi: {report_file}")
        return report

# Kullanım
manager = IncidentManager()

incident_description = """
Saat 15:30'da production veritabanı sunucusu (db-prod-01) yanıt vermemeye başladı.
- MySQL servis yanıt süresi 30 saniyeyi aştı
- Uygulama sunucuları connection pool exhausted hatası veriyor
- 500 aktif kullanıcı etkileniyor
- E-ticaret sitenin checkout işlemleri çalışmıyor
- Son deployment 2 saat önce yapıldı
- db-prod-01 CPU: %95, RAM: %89, Disk I/O: %98
"""

report = manager.process_incident(incident_description)
print("n=== TRİAJ SONUCU ===")
print(report["triage"])

Chain’leri Production’a Alma

Chain’lerini production ortamına almadan önce dikkat etmen gereken noktalar:

  • Retry mekanizması: LLM API’ları zaman zaman hata verir, otomatik retry şart
  • Timeout ayarları: Her chain adımı için makul timeout değerleri belirle
  • Token takibi: Her chain çağrısı token tüketir, maliyeti izle
  • Caching: Aynı inputlar için LLM’i tekrar çağırmak yerine cache kullan
  • Async kullanımı: Yüksek trafikli uygulamalarda ainvoke() ile async chain’ler kullan
# LangSmith ile chain takibi için environment ayarları
export LANGCHAIN_TRACING_V2="true"
export LANGCHAIN_ENDPOINT="https://api.smith.langchain.com"
export LANGCHAIN_API_KEY="your-langsmith-api-key"
export LANGCHAIN_PROJECT="production-log-analyzer"

# Retry ve timeout için
python3 -c "
from langchain_openai import ChatOpenAI
from langchain_core.runnables import RunnableConfig

model = ChatOpenAI(
    model='gpt-4o-mini',
    max_retries=3,
    timeout=30,
    temperature=0
)
print('Model yapılandırması OK')
"

Sonuç

LangChain Chain mimarisi, LLM uygulamalarını gerçek iş problemlerine uygulamak için çok güçlü bir altyapı sunuyor. Temel fikirleri özetlersek:

  • LCEL pipe operatörü ile basit ve okunabilir chain’ler oluşturabilirsin
  • RunnableParallel ile birbirinden bağımsız adımları aynı anda çalıştırabilirsin
  • Router pattern ile girdi tipine göre dinamik yönlendirme yapabilirsin
  • Structured output ile LLM çıktısını güvenli şekilde parse edebilirsin
  • Memory ile stateful conversation’lar yönetebilirsin

Sysadmin perspektifinden bakınca, Chain mimarisi aslında iyi eski shell scripting mantığının LLM dünyasına taşınmış hali. Her adımın ne yaptığını biliyorsun, her adımın çıktısını logluyorsun, her adım için hata yönetimi yapıyorsun.

Başlamak için büyük projeler bekleme. Şu an manuel yaptığın log analizi, incident raporlama ya da dokümantasyon güncelleme işlerinden birini al ve Chain ile otomatize etmeye çalış. Birkaç iterasyondan sonra hangi pattern’in sana uyduğunu anlayacaksın.

Sonraki yazıda RAG (Retrieval-Augmented Generation) ile kendi dokümantasyonunu LLM’e besleyip akıllı bir iç knowledge base nasıl kurarsın, onu işleyeceğiz.

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir