Kendi Veritabanını Yapay Zekaya Bağlama: LangChain ile RAG Uygulaması

Üretim ortamındaki bir veritabanını yapay zekaya bağlamak kulağa karmaşık gelir, ama LangChain’in RAG (Retrieval-Augmented Generation) mimarisiyle bu iş düşündüğünden çok daha yönetilebilir bir hale geliyor. Bu yazıda PostgreSQL ve MySQL veritabanlarını LangChain üzerinden bir dil modeline bağlayacağız, gerçek sysadmin senaryolarıyla konuyu somutlaştıracağız.

RAG Nedir ve Neden Veritabanına İhtiyaç Duyar?

RAG, bir dil modelinin yalnızca eğitim verisine dayanmak yerine, sorgu anında harici kaynaklardan bilgi çekip cevap üretmesini sağlayan bir mimaridir. GPT-4 veya Llama’ya “Geçen ay hangi sunucularımız alarm verdi?” diye sorduğunda, model bunu bilemez. Ama senin log veritabanına erişimi olsaydı ve oradan ilgili kayıtları çekip analiz etseydi, anlamlı bir cevap üretebilirdi.

İşte tam olarak bunu yapacağız. Klasik RAG akışı şöyle işler:

  • Kullanıcı bir soru sorar
  • Sistem, soruyu vektöre dönüştürür
  • Benzer vektörleri veritabanında arar
  • Bulunan belgeleri LLM’e bağlam olarak verir
  • LLM, bu bağlamla birlikte cevap üretir

Burada iki farklı yaklaşım var: Birincisi SQL veritabanlarını doğrudan sorgulamak (SQLDatabaseChain), ikincisi ise vektör veritabanı kullanarak semantik arama yapmak. İkisini de ele alacağız.

Ortamı Hazırlamak

Python 3.10+ ve pip kurulu bir ortam varsayıyorum. Sanal ortam oluşturmakla başlayalım:

python3 -m venv langchain-rag-env
source langchain-rag-env/bin/activate
pip install langchain langchain-community langchain-openai
pip install psycopg2-binary pymysql sqlalchemy
pip install chromadb tiktoken sentence-transformers
pip install python-dotenv

Ortam değişkenlerini .env dosyasına koyacağız. OpenAI API anahtarını ve veritabanı bağlantı bilgilerini buraya yaz:

cat > .env << 'EOF'
OPENAI_API_KEY=sk-your-key-here
DB_HOST=localhost
DB_PORT=5432
DB_NAME=production_logs
DB_USER=readonly_user
DB_PASS=supersecretpassword
EOF

Bir not: Veritabanına bağlarken mutlaka salt okunur bir kullanıcı kullan. LangChain’in SQL aracına yazma yetkisi verme, model hata yapabilir ve beklenmedik sorgular çalıştırabilir.

PostgreSQL’i Doğrudan LangChain’e Bağlamak

En hızlı yol SQLDatabase sınıfını kullanmak. Bu yaklaşımda LangChain, veritabanı şemasını okur ve doğal dil sorgularını SQL’e çevirir.

from langchain_community.utilities import SQLDatabase
from langchain_experimental.sql import SQLDatabaseChain
from langchain_openai import ChatOpenAI
from dotenv import load_dotenv
import os

load_dotenv()

# Veritabanı bağlantısı
db = SQLDatabase.from_uri(
    f"postgresql+psycopg2://{os.getenv('DB_USER')}:{os.getenv('DB_PASS')}"
    f"@{os.getenv('DB_HOST')}:{os.getenv('DB_PORT')}/{os.getenv('DB_NAME')}",
    include_tables=['server_logs', 'alert_history', 'server_inventory'],
    sample_rows_in_table_info=3
)

# LLM tanımla
llm = ChatOpenAI(
    model="gpt-4o-mini",
    temperature=0,
    api_key=os.getenv('OPENAI_API_KEY')
)

# Chain oluştur
db_chain = SQLDatabaseChain.from_llm(
    llm=llm,
    db=db,
    verbose=True,
    return_intermediate_steps=True
)

# Sorgu çalıştır
response = db_chain.invoke({
    "query": "Geçen hafta en çok CPU alarmı veren 5 sunucuyu listele"
})

print(response['result'])

verbose=True ile çalıştırdığında, modelin ürettiği SQL sorgusunu da terminalde görebilirsin. Bu hem debug için hem de ne yaptığını anlamak için kritik.

Tablo Bazlı Kısıtlamalar

Production ortamında tüm tabloları açmak istemeyebilirsin. include_tables parametresi tam olarak bunun için var. Ama bazen şema bilgisini de özelleştirmen gerekir:

# Belirli tabloları hariç tutmak için
db = SQLDatabase.from_uri(
    connection_string,
    ignore_tables=['user_passwords', 'billing_info', 'audit_raw'],
    sample_rows_in_table_info=2,
    max_string_length=300
)

# Şemayı kontrol et
print(db.get_table_info())

sample_rows_in_table_info parametresi önemli: Model, tablonun yapısını anlamak için birkaç örnek satıra bakıyor. Hassas veriler içeren tablolarda bunu 0 yapabilirsin, ama bu modelin doğru SQL üretme başarısını düşürür.

Vektör Veritabanı ile Semantik Arama

SQL sorgusu yerine doküman bazlı arama yapmak istiyorsan, vektör veritabanı yaklaşımı daha uygun. Örnek senaryo: Runbook’larını, incident raporlarını ve konfigürasyon dokümanlarını yapay zekaya öğretmek istiyorsun.

from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import DirectoryLoader, TextLoader

# Dokümanları yükle (runbook dizininden)
loader = DirectoryLoader(
    '/opt/runbooks/',
    glob='**/*.md',
    loader_cls=TextLoader,
    loader_kwargs={'encoding': 'utf-8'}
)
documents = loader.load()

print(f"Yüklenen doküman sayısı: {len(documents)}")

# Metni parçalara böl
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,
    chunk_overlap=200,
    separators=["nn", "n", ".", " "]
)
splits = text_splitter.split_documents(documents)

# Embedding modeli
embeddings = OpenAIEmbeddings(
    model="text-embedding-3-small",
    api_key=os.getenv('OPENAI_API_KEY')
)

# Vektör deposu oluştur ve kaydet
vectorstore = Chroma.from_documents(
    documents=splits,
    embedding=embeddings,
    persist_directory='/opt/chroma-db/runbooks'
)

print(f"Toplam {len(splits)} chunk vektöre dönüştürüldü")

Bu işlemi bir kez çalıştırıp vektörleri diske kaydediyorsun. Sonraki sorgularda yeniden embedding üretmene gerek kalmaz.

RAG Chain Kurulumu

Vektör deposunu bir retriever’a dönüştürüp chain’e bağlamak:

from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate

# Var olan vektör deposunu yükle
vectorstore = Chroma(
    persist_directory='/opt/chroma-db/runbooks',
    embedding_function=embeddings
)

# Retriever ayarları
retriever = vectorstore.as_retriever(
    search_type="mmr",  # Maximum Marginal Relevance - çeşitli sonuçlar getirir
    search_kwargs={
        "k": 5,
        "fetch_k": 20
    }
)

# Özel prompt tanımla
prompt_template = """Sen bir Linux sistem yöneticisi asistanısın.
Aşağıdaki bağlam bilgilerini kullanarak soruyu yanıtla.
Bağlamda yoksa "Bu bilgi runbook'larda yer almıyor" de.

Bağlam:
{context}

Soru: {question}

Cevap (Türkçe, teknik ve pratik ol):"""

PROMPT = PromptTemplate(
    template=prompt_template,
    input_variables=["context", "question"]
)

# QA Chain
qa_chain = RetrievalQA.from_chain_type(
    llm=llm,
    chain_type="stuff",
    retriever=retriever,
    chain_type_kwargs={"prompt": PROMPT},
    return_source_documents=True
)

# Kullan
result = qa_chain.invoke({"query": "Nginx 502 hatası aldığımda ne yapmalıyım?"})
print(result['result'])
print("nKaynak dokümanlar:")
for doc in result['source_documents']:
    print(f"  - {doc.metadata.get('source', 'Bilinmiyor')}")

Gerçek Dünya Senaryosu: Log Analiz Sistemi

Bir şirketin production ortamında her gün binlerce satır log biriktiğini düşün. Bunları server_logs tablosuna yazıyorsun. DBA olmayan bir ekip arkadaşın “Dün gece 02:00-04:00 arası ne oldu?” diye soruyor. İşte tam bu senaryoyu çözelim:

from langchain.agents import create_sql_agent
from langchain_community.agent_toolkits import SQLDatabaseToolkit
from langchain.agents.agent_types import AgentType

# Daha gelişmiş SQL agent
toolkit = SQLDatabaseToolkit(db=db, llm=llm)

agent_executor = create_sql_agent(
    llm=llm,
    toolkit=toolkit,
    agent_type=AgentType.OPENAI_FUNCTIONS,
    verbose=True,
    max_iterations=10,
    handle_parsing_errors=True,
    extra_tools=[]
)

# Karmaşık sorular sorabilirsin
sorular = [
    "Dün gece 02:00 ile 04:00 arasında ERROR seviyesinde kaç log kaydı var?",
    "En çok hata üreten servis hangisi ve hata mesajları neler?",
    "Bu haftaki uptime ortalaması en düşük 3 sunucuyu bul"
]

for soru in sorular:
    print(f"nSoru: {soru}")
    print("-" * 50)
    try:
        cevap = agent_executor.invoke({"input": soru})
        print(f"Cevap: {cevap['output']}")
    except Exception as e:
        print(f"Hata: {e}")

Bu agent yaklaşımı, chain yaklaşımından daha güçlü: Birden fazla sorgu çalıştırabilir, sonuçları birleştirebilir ve hata durumunda farklı bir yaklaşım deneyebilir.

Hibrit Yaklaşım: SQL + Vektör Arama

En güçlü senaryo ikisini birleştirmek. Yapısal veriyi SQL’den, dokümanter bilgiyi vektör veritabanından çek:

from langchain.tools import Tool
from langchain.agents import initialize_agent, AgentType

# SQL aracı
def sql_query_tool(query: str) -> str:
    """Veritabanında SQL sorgusu çalıştırır"""
    try:
        result = db_chain.invoke({"query": query})
        return result['result']
    except Exception as e:
        return f"SQL hatası: {str(e)}"

# Runbook arama aracı
def search_runbooks(query: str) -> str:
    """Runbook'larda ilgili prosedürü arar"""
    result = qa_chain.invoke({"query": query})
    return result['result']

# Tool listesi
tools = [
    Tool(
        name="DatabaseQuery",
        func=sql_query_tool,
        description="Sunucu logları, alarm geçmişi ve envanter veritabanını sorgulamak için kullan"
    ),
    Tool(
        name="RunbookSearch",
        func=search_runbooks,
        description="Operasyonel prosedürler, troubleshooting adımları ve konfigürasyon rehberleri için kullan"
    )
]

# Hibrit agent
hybrid_agent = initialize_agent(
    tools=tools,
    llm=llm,
    agent=AgentType.OPENAI_FUNCTIONS,
    verbose=True,
    max_iterations=15
)

# Artık hem veri hem de prosedür sorabilirsin
response = hybrid_agent.invoke({
    "input": "web-prod-03 sunucusu dün 3 kez disk alarmı vermiş. "
             "Bu sunucuda hangi servislerin çalıştığını bul ve "
             "disk doluluk sorununu çözmek için ne yapmalıyım?"
})
print(response['output'])

Güvenlik ve Production Hazırlığı

Bunu gerçek ortama almadan önce birkaç kritik noktayı atlatmak gerekiyor.

Rate limiting ve maliyet kontrolü: Her sorgu OpenAI API’sine gidiyor, bu maliyet demek. Token sayısını sınırla:

from langchain.callbacks import get_openai_callback

with get_openai_callback() as cb:
    result = qa_chain.invoke({"query": "Sorgun"})
    print(f"Kullanılan token: {cb.total_tokens}")
    print(f"Tahmini maliyet: ${cb.total_cost:.4f}")

# LLM seviyesinde token limiti
llm = ChatOpenAI(
    model="gpt-4o-mini",
    temperature=0,
    max_tokens=500,  # Cevap uzunluğunu sınırla
    request_timeout=30
)

SQL injection koruması: LangChain’in ürettiği SQL’i whitelist ile doğrula:

import re

def validate_sql(sql: str) -> bool:
    """Sadece SELECT sorgularına izin ver"""
    sql_clean = sql.strip().upper()
    
    # Tehlikeli komutları engelle
    forbidden = ['DROP', 'DELETE', 'INSERT', 'UPDATE', 'TRUNCATE', 
                 'ALTER', 'CREATE', 'GRANT', 'REVOKE', 'EXEC']
    
    for keyword in forbidden:
        if keyword in sql_clean:
            return False
    
    # SELECT ile başlamalı
    if not sql_clean.startswith('SELECT'):
        return False
    
    return True

Önbellekleme: Aynı soruları tekrar tekrar API’ye gönderme:

from langchain.cache import SQLiteCache
from langchain.globals import set_llm_cache

# LLM cevaplarını SQLite'a önbellekle
set_llm_cache(SQLiteCache(database_path="/opt/langchain-cache/llm_cache.db"))

Sistemi Servise Almak

FastAPI ile basit bir REST endpoint’i ekle, böylece başka araçlar da bu sistemi kullanabilir:

pip install fastapi uvicorn
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

app = FastAPI(title="Sysadmin AI Assistant")

class QueryRequest(BaseModel):
    question: str
    mode: str = "hybrid"  # "sql", "docs", "hybrid"

class QueryResponse(BaseModel):
    answer: str
    sources: list = []
    tokens_used: int = 0

@app.post("/query", response_model=QueryResponse)
async def query_system(request: QueryRequest):
    logger.info(f"Gelen sorgu: {request.question[:100]}")
    
    try:
        with get_openai_callback() as cb:
            if request.mode == "sql":
                result = db_chain.invoke({"query": request.question})
                answer = result['result']
                sources = []
            elif request.mode == "docs":
                result = qa_chain.invoke({"query": request.question})
                answer = result['result']
                sources = [d.metadata.get('source', '') 
                          for d in result.get('source_documents', [])]
            else:
                result = hybrid_agent.invoke({"input": request.question})
                answer = result['output']
                sources = []
        
        return QueryResponse(
            answer=answer,
            sources=sources,
            tokens_used=cb.total_tokens
        )
    
    except Exception as e:
        logger.error(f"Sorgu hatası: {e}")
        raise HTTPException(status_code=500, detail=str(e))

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8080)

Bunu systemd servisi olarak çalıştır:

cat > /etc/systemd/system/sysadmin-ai.service << 'EOF'
[Unit]
Description=Sysadmin AI Assistant
After=network.target

[Service]
Type=simple
User=ai-service
WorkingDirectory=/opt/sysadmin-ai
EnvironmentFile=/opt/sysadmin-ai/.env
ExecStart=/opt/sysadmin-ai/venv/bin/python main.py
Restart=always
RestartSec=10

[Install]
WantedBy=multi-user.target
EOF

systemctl daemon-reload
systemctl enable --now sysadmin-ai

Embedding Güncelleme Stratejisi

Runbook’lar güncellendikçe vektör deposunu da güncellemen gerekir. Bunu bir cron job ile otomatize et:

cat > /opt/sysadmin-ai/update_vectorstore.py << 'EOF'
#!/usr/bin/env python3
"""Runbook değişikliklerinde vektör deposunu güncelle"""

import hashlib
import json
import os
from pathlib import Path

HASH_FILE = '/opt/sysadmin-ai/doc_hashes.json'
RUNBOOK_DIR = '/opt/runbooks'

def get_file_hash(filepath):
    with open(filepath, 'rb') as f:
        return hashlib.md5(f.read()).hexdigest()

def find_changed_files():
    current_hashes = {}
    changed = []
    
    for path in Path(RUNBOOK_DIR).rglob('*.md'):
        file_hash = get_file_hash(path)
        current_hashes[str(path)] = file_hash
    
    # Önceki hash'lerle karşılaştır
    if os.path.exists(HASH_FILE):
        with open(HASH_FILE) as f:
            old_hashes = json.load(f)
        
        for filepath, hash_val in current_hashes.items():
            if old_hashes.get(filepath) != hash_val:
                changed.append(filepath)
    else:
        changed = list(current_hashes.keys())
    
    # Hash dosyasını güncelle
    with open(HASH_FILE, 'w') as f:
        json.dump(current_hashes, f)
    
    return changed

if __name__ == '__main__':
    changed = find_changed_files()
    if changed:
        print(f"{len(changed)} dosya değişmiş, vektör deposu güncelleniyor...")
        # Burada embedding güncelleme kodunu çağır
    else:
        print("Değişiklik yok")
EOF

# Her gece 02:00'de çalıştır
echo "0 2 * * * ai-service /opt/sysadmin-ai/venv/bin/python /opt/sysadmin-ai/update_vectorstore.py >> /var/log/vectorstore-update.log 2>&1" | crontab -

Sonuç

Bu yazıda veritabanını LangChain RAG mimarisine bağlamanın birden fazla yolunu gördük. SQL veritabanları için SQLDatabaseChain ve SQL Agent yaklaşımı, dokümanter bilgi için Chroma tabanlı vektör arama, ikisini birleştiren hibrit agent mimarisi. Production’a almadan önce salt okunur veritabanı kullanıcısı, token limitleri, önbellekleme ve SQL doğrulama gibi güvenlik katmanlarını mutlaka ekle.

En büyük pratik kazanım şu: Artık ekibindeki her kişinin SQL bilmesine gerek kalmıyor. “Geçen ay en fazla kapasiteye ulaşan diskler hangileri?” sorusu, standart bir doğal dil sorusu olarak sisteme gidebilir ve anlamlı bir cevap alabilir. Runbook’lar için de aynısı geçerli: Yeni başlayan bir ekip arkadaşı, saatlerce doküman okumak yerine direkt sorabilir.

Tabii ki bu sistemi kör bir şekilde güvenmek yerine, ürettiği SQL sorgularını düzenli olarak gözden geçir. Model bazen beklenmedik sorgular üretebilir ve şema değişikliklerinde davranışı farklılaşabilir. İzleme ve loglama mekanizmalarını baştan kur, sonradan eklemek çok daha zahmetli oluyor.

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir