Gerçek Zamanlı Yanıt: OpenAI Streaming API Kullanımı

Bir kullanıcı ChatGPT’ye soru soruyor ve cevap kelime kelime ekrana geliyor. Bu deneyim sihirli görünse de aslında arkasında çok basit ama son derece etkili bir mekanizma var: streaming. OpenAI’nin streaming API’si, büyük dil modellerinin ürettiği tokenleri gerçek zamanlı olarak istemciye iletmesini sağlar. Bütün cevabın hazır olmasını beklemek yerine, her token üretildiği anda kullanıcıya ulaşır. Sysadmin olarak bu teknolojiyi script’lerinize, iç araçlarınıza veya monitoring sistemlerinize entegre ettiğinizde hem kullanıcı deneyimi hem de sistem verimliliği açısından ciddi kazanımlar elde edersiniz.

Streaming Neden Önemli?

Standart API çağrısında şu olur: model tüm cevabı üretir, sizin uygulamanız HTTP yanıtını bekler, cevap gelir, ekrana basarsınız. 500 token’lık bir cevap için bu bekleme süresi model kapasitesine ve sunucu yüküne göre 10-30 saniyeyi bulabilir. Kullanıcı bu süre boyunca boş ekrana bakar.

Streaming ile ise model ilk tokeni ürettiği anda (genellikle 1-2 saniye içinde) siz almaya başlarsınız. Kullanıcı “bir şeyler oluyor” hissini anında yaşar. Bu fark özellikle uzun analizler, log yorumlama veya dokümantasyon üretme gibi senaryolarda kritik önem taşır.

Teknik olarak konuşursak, OpenAI streaming Server-Sent Events (SSE) protokolünü kullanır. Sunucu bağlantıyı açık tutar ve data: prefiksi ile JSON chunk’ları gönderir. Her chunk bir veya birkaç token içerir. Bağlantı sonunda data: [DONE] mesajı gelir.

Ortam Hazırlığı

Örnekleri çalıştırmadan önce ortamı hazırlayalım:

# Python sanal ortamı oluştur
python3 -m venv openai-streaming-env
source openai-streaming-env/bin/activate

# Gerekli paketleri yükle
pip install openai>=1.0.0 httpx python-dotenv

# API anahtarını ortam değişkeni olarak ayarla
export OPENAI_API_KEY="sk-proj-xxxxxxxxxxxxxxxx"

# Kalıcı yapmak için .bashrc veya .zshrc'ye ekle
echo 'export OPENAI_API_KEY="sk-proj-xxxxxxxxxxxxxxxx"' >> ~/.bashrc

Python kütüphanesi versiyonuna dikkat edin. OpenAI’nin 1.0.0 sonrası API’si öncekiyle uyumsuz. Eski projeleriniz varsa openai==0.28 ile devam edebilirsiniz ama yeni projeler için mutlaka 1.x kullanın.

İlk Streaming Çağrısı

En basit streaming örneğiyle başlayalım:

from openai import OpenAI
import sys

client = OpenAI()

def stream_response(prompt: str, model: str = "gpt-4o"):
    """Temel streaming fonksiyonu"""
    print(f"Model: {model}n{'-'*40}")
    
    with client.chat.completions.create(
        model=model,
        messages=[
            {"role": "system", "content": "Sen yardımcı bir asistansın."},
            {"role": "user", "content": prompt}
        ],
        stream=True,
        max_tokens=1000,
        temperature=0.7
    ) as stream:
        for chunk in stream:
            # Delta içeriğini al
            delta = chunk.choices[0].delta
            
            if delta.content:
                print(delta.content, end="", flush=True)
            
            # Stream bitti mi kontrol et
            if chunk.choices[0].finish_reason == "stop":
                print("n" + "-"*40)
                print("Stream tamamlandı.")
    
if __name__ == "__main__":
    prompt = sys.argv[1] if len(sys.argv) > 1 else "Linux inode nedir, kısaca açıkla."
    stream_response(prompt)

Buradaki kritik nokta flush=True parametresi. Python’da print fonksiyonu varsayılan olarak buffer’a yazar. flush=True olmadan karakterler birikiр ve sonunda toplu gelir, yani streaming’in amacını ortadan kaldırır.

Token Sayacı ve Maliyet Takibi

Production ortamında streaming kullanırken token tüketimini takip etmek zorundasınız. Streaming modunda usage bilgisi genellikle son chunk’ta gelir:

from openai import OpenAI
import time

client = OpenAI()

def stream_with_metrics(messages: list, model: str = "gpt-4o-mini"):
    """Token sayacı ve süre ölçümü ile streaming"""
    
    start_time = time.time()
    total_content = []
    token_usage = None
    
    # Fiyatlar (1000 token başına dolar)
    pricing = {
        "gpt-4o": {"input": 0.005, "output": 0.015},
        "gpt-4o-mini": {"input": 0.00015, "output": 0.0006},
        "gpt-4-turbo": {"input": 0.01, "output": 0.03}
    }
    
    print("n[STREAMING BAŞLADI]n")
    
    with client.chat.completions.create(
        model=model,
        messages=messages,
        stream=True,
        stream_options={"include_usage": True}  # Usage bilgisini al
    ) as stream:
        for chunk in stream:
            if chunk.choices and chunk.choices[0].delta.content:
                content = chunk.choices[0].delta.content
                total_content.append(content)
                print(content, end="", flush=True)
            
            # Son chunk'ta usage bilgisi gelir
            if hasattr(chunk, 'usage') and chunk.usage:
                token_usage = chunk.usage
    
    elapsed = time.time() - start_time
    full_response = "".join(total_content)
    
    print("nn" + "="*50)
    print(f"Süre: {elapsed:.2f} saniye")
    print(f"Karakter sayısı: {len(full_response)}")
    
    if token_usage:
        input_tokens = token_usage.prompt_tokens
        output_tokens = token_usage.completion_tokens
        
        price = pricing.get(model, {"input": 0, "output": 0})
        cost = (input_tokens * price["input"] / 1000) + 
               (output_tokens * price["output"] / 1000)
        
        print(f"Input token: {input_tokens}")
        print(f"Output token: {output_tokens}")
        print(f"Tahmini maliyet: ${cost:.6f}")
        print(f"Token/saniye: {output_tokens/elapsed:.1f}")
    
    return full_response

# Kullanım
messages = [
    {"role": "system", "content": "Sen bir sysadmin uzmansın."},
    {"role": "user", "content": "Kubernetes pod'larında OOMKilled hatasının nedenlerini ve çözümlerini anlat."}
]

response = stream_with_metrics(messages, model="gpt-4o-mini")

stream_options={"include_usage": True} parametresi yeni API versiyonunda geldi. Bu olmadan streaming modunda usage bilgisi almak zordur ve token’ları manuel saymak gerekir.

Gerçek Dünya Senaryosu: Log Analiz Aracı

Bir sysadmin’in en çok ihtiyaç duyduğu şeylerden biri log analizi. Aşağıdaki script, sistem loglarını OpenAI’ye gönderip gerçek zamanlı analiz yapıyor:

#!/usr/bin/env python3
"""
log_analyzer.py - Sistem loglarını AI ile analiz et
Kullanım: python3 log_analyzer.py /var/log/syslog 100
"""

from openai import OpenAI
import sys
import subprocess
import argparse
from datetime import datetime

client = OpenAI()

def get_log_tail(log_file: str, lines: int = 100) -> str:
    """Log dosyasından son N satırı oku"""
    try:
        result = subprocess.run(
            ["tail", "-n", str(lines), log_file],
            capture_output=True,
            text=True,
            timeout=10
        )
        if result.returncode != 0:
            raise Exception(f"tail komutu başarısız: {result.stderr}")
        return result.stdout
    except FileNotFoundError:
        raise Exception(f"Log dosyası bulunamadı: {log_file}")

def analyze_logs_streaming(log_content: str, log_file: str):
    """Logları streaming modunda analiz et"""
    
    system_prompt = """Sen deneyimli bir Linux sistem yöneticisisin. 
    Sana verilen log içeriğini analiz edeceksin ve şunları yapacaksın:
    1. Kritik hataları ve uyarıları tespit et
    2. Olası nedenleri açıkla
    3. Çözüm önerileri sun
    4. Acil müdahale gerektiren durumları öne çıkar
    
    Yanıtını Türkçe ver ve pratik, uygulanabilir öneriler sun."""
    
    user_message = f"""
    Log dosyası: {log_file}
    Analiz zamanı: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
    
    LOG İÇERİĞİ:
    {log_content[:8000]}  # Token limitini aşmamak için kırp
    
    Bu logları analiz et ve önemli bulguları raporla.
    """
    
    print(f"n[LOG ANALİZİ BAŞLIYOR]")
    print(f"Dosya: {log_file}")
    print(f"Tarih: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    print("="*60 + "n")
    
    with client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_message}
        ],
        stream=True,
        max_tokens=2000,
        temperature=0.3  # Analiz için düşük temperature
    ) as stream:
        for chunk in stream:
            if chunk.choices[0].delta.content:
                print(chunk.choices[0].delta.content, end="", flush=True)
    
    print("nn[ANALİZ TAMAMLANDI]")

def main():
    parser = argparse.ArgumentParser(description="AI destekli log analizi")
    parser.add_argument("log_file", help="Analiz edilecek log dosyası")
    parser.add_argument("lines", nargs="?", type=int, default=100,
                       help="Analiz edilecek son satır sayısı (varsayılan: 100)")
    
    args = parser.parse_args()
    
    log_content = get_log_tail(args.log_file, args.lines)
    
    if not log_content.strip():
        print("Log dosyası boş veya okunamadı.")
        sys.exit(1)
    
    analyze_logs_streaming(log_content, args.log_file)

if __name__ == "__main__":
    main()

Bu script’i cron ile veya bir monitoring alert tetikleyicisiyle çalıştırabilirsiniz. Mesela disk dolduğunda, servis çöktüğünde veya belirli bir hata pattern’i yakalandığında otomatik analiz başlatabilirsiniz.

Async Streaming ile Paralel İstekler

Birden fazla kaynaktan eş zamanlı analiz yapmanız gerektiğinde async kullanımı kaçınılmaz:

import asyncio
from openai import AsyncOpenAI
import aiofiles

async_client = AsyncOpenAI()

async def async_stream_analysis(task_id: str, content: str, task_type: str):
    """Async streaming ile paralel analiz"""
    
    prompts = {
        "security": "Bu log içeriğinde güvenlik açığı veya saldırı belirtisi var mı?",
        "performance": "Bu metriklerde performans sorunu veya darboğaz görüyor musun?",
        "error": "Bu hataları kategorize et ve öncelik sırasına koy."
    }
    
    prompt = prompts.get(task_type, "Bu içeriği analiz et.")
    
    print(f"n[Task {task_id}] {task_type.upper()} analizi başladı...n")
    
    result_chunks = []
    
    async with async_client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "user", "content": f"{prompt}nnİçerik:n{content[:3000]}"}
        ],
        stream=True
    ) as stream:
        async for chunk in stream:
            if chunk.choices[0].delta.content:
                content_chunk = chunk.choices[0].delta.content
                result_chunks.append(content_chunk)
                # Her task için prefix ile yazdır
                print(f"[{task_id}] {content_chunk}", end="", flush=True)
    
    result = "".join(result_chunks)
    print(f"n[Task {task_id}] TAMAMLANDIn")
    return {"task_id": task_id, "type": task_type, "result": result}

async def parallel_analysis(analyses: list):
    """Birden fazla analizi paralel çalıştır"""
    tasks = [
        async_stream_analysis(
            task["id"],
            task["content"],
            task["type"]
        )
        for task in analyses
    ]
    
    results = await asyncio.gather(*tasks)
    return results

# Kullanım örneği
async def main():
    analyses = [
        {
            "id": "LOG-001",
            "content": "Apr 15 10:23:11 server sshd[1234]: Failed password for root from 192.168.1.100",
            "type": "security"
        },
        {
            "id": "METRIC-001", 
            "content": "CPU: 95%, Memory: 87%, Disk I/O wait: 45%",
            "type": "performance"
        }
    ]
    
    results = await parallel_analysis(analyses)
    
    print("n" + "="*60)
    print("TÜM ANALİZLER TAMAMLANDI")
    print(f"Toplam {len(results)} analiz yapıldı.")

if __name__ == "__main__":
    asyncio.run(main())

Hata Yönetimi ve Retry Mekanizması

Production’da her şey yolunda gitmez. Rate limit, network timeout veya API hatalarına karşı robust bir yapı kurmak gerekir:

from openai import OpenAI, RateLimitError, APITimeoutError, APIConnectionError
import time
import logging

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

client = OpenAI(
    timeout=30.0,       # 30 saniye timeout
    max_retries=0       # Manuel retry yöneteceğiz
)

def robust_stream(messages: list, max_retries: int = 3, model: str = "gpt-4o-mini"):
    """Hata yönetimi ve retry ile güvenilir streaming"""
    
    retry_delays = [1, 5, 15]  # Exponential backoff (saniye)
    
    for attempt in range(max_retries):
        try:
            logger.info(f"Streaming başlatılıyor (deneme {attempt + 1}/{max_retries})")
            
            collected_response = []
            
            with client.chat.completions.create(
                model=model,
                messages=messages,
                stream=True,
                max_tokens=1500
            ) as stream:
                for chunk in stream:
                    if chunk.choices[0].delta.content:
                        content = chunk.choices[0].delta.content
                        collected_response.append(content)
                        print(content, end="", flush=True)
                    
                    finish_reason = chunk.choices[0].finish_reason
                    if finish_reason and finish_reason != "stop":
                        logger.warning(f"Beklenmedik bitiş: {finish_reason}")
            
            print()  # Satır sonu
            logger.info("Streaming başarıyla tamamlandı.")
            return "".join(collected_response)
        
        except RateLimitError as e:
            logger.warning(f"Rate limit aşıldı: {e}")
            if attempt < max_retries - 1:
                delay = retry_delays[attempt]
                logger.info(f"{delay} saniye bekleniyor...")
                time.sleep(delay)
            else:
                raise
        
        except APITimeoutError as e:
            logger.error(f"Timeout hatası: {e}")
            if attempt < max_retries - 1:
                time.sleep(retry_delays[attempt])
            else:
                raise
        
        except APIConnectionError as e:
            logger.error(f"Bağlantı hatası: {e}")
            if attempt < max_retries - 1:
                logger.info("Bağlantı yeniden deneniyor...")
                time.sleep(retry_delays[attempt])
            else:
                raise
        
        except KeyboardInterrupt:
            print("nn[Kullanıcı tarafından durduruldu]")
            return "".join(collected_response) if collected_response else ""
        
        except Exception as e:
            logger.error(f"Beklenmedik hata: {type(e).__name__}: {e}")
            raise
    
    return None

# Test
messages = [
    {"role": "user", "content": "Nginx'te 502 Bad Gateway hatasının yaygın nedenlerini listele."}
]

response = robust_stream(messages)

Canlı Monitoring Dashboard Entegrasyonu

Son olarak, streaming’i bir monitoring senaryosuna entegre edelim. Bu örnek, sistem metriklerini toplayıp AI ile yorumluyor ve sonucu terminal’de canlı gösteriyor:

#!/usr/bin/env python3
"""
system_health_ai.py - Sistem sağlığını AI ile yorumla
"""

import subprocess
import psutil
from openai import OpenAI
from datetime import datetime

client = OpenAI()

def collect_system_metrics() -> dict:
    """Anlık sistem metriklerini topla"""
    metrics = {}
    
    # CPU
    metrics["cpu_percent"] = psutil.cpu_percent(interval=1)
    metrics["cpu_count"] = psutil.cpu_count()
    metrics["load_avg"] = psutil.getloadavg()
    
    # Memory
    mem = psutil.virtual_memory()
    metrics["mem_total_gb"] = round(mem.total / (1024**3), 2)
    metrics["mem_used_gb"] = round(mem.used / (1024**3), 2)
    metrics["mem_percent"] = mem.percent
    
    # Disk
    disk = psutil.disk_usage('/')
    metrics["disk_total_gb"] = round(disk.total / (1024**3), 2)
    metrics["disk_used_gb"] = round(disk.used / (1024**3), 2)
    metrics["disk_percent"] = round((disk.used / disk.total) * 100, 1)
    
    # Network
    net = psutil.net_io_counters()
    metrics["net_bytes_sent_mb"] = round(net.bytes_sent / (1024**2), 2)
    metrics["net_bytes_recv_mb"] = round(net.bytes_recv / (1024**2), 2)
    
    # Top 5 CPU process
    processes = []
    for proc in sorted(psutil.process_iter(['name', 'cpu_percent', 'memory_percent']),
                      key=lambda p: p.info['cpu_percent'] or 0, reverse=True)[:5]:
        processes.append({
            "name": proc.info['name'],
            "cpu": proc.info['cpu_percent'],
            "mem": proc.info['memory_percent']
        })
    metrics["top_processes"] = processes
    
    return metrics

def format_metrics_for_prompt(metrics: dict) -> str:
    """Metrikleri prompt için formatla"""
    proc_list = "n".join([
        f"  - {p['name']}: CPU %{p['cpu']:.1f}, MEM %{p['mem']:.1f}"
        for p in metrics['top_processes']
    ])
    
    return f"""
    SİSTEM METRİKLERİ ({datetime.now().strftime('%Y-%m-%d %H:%M:%S')})
    
    CPU Kullanımı: %{metrics['cpu_percent']}
    CPU Sayısı: {metrics['cpu_count']} core
    Load Average (1/5/15 dk): {metrics['load_avg'][0]:.2f} / {metrics['load_avg'][1]:.2f} / {metrics['load_avg'][2]:.2f}
    
    RAM: {metrics['mem_used_gb']}GB / {metrics['mem_total_gb']}GB (%{metrics['mem_percent']})
    
    Disk (/): {metrics['disk_used_gb']}GB / {metrics['disk_total_gb']}GB (%{metrics['disk_percent']})
    
    Network: Gönderilen {metrics['net_bytes_sent_mb']}MB / Alınan {metrics['net_bytes_recv_mb']}MB
    
    En Yoğun Prosesler:
    {proc_list}
    """

def ai_health_check():
    """Sistem sağlık kontrolünü AI ile yap"""
    print("n" + "="*60)
    print("YAPAY ZEKA DESTEKLİ SİSTEM SAĞLIK KONTROLÜ")
    print("="*60)
    
    print("nMetrikler toplanıyor...")
    metrics = collect_system_metrics()
    metrics_text = format_metrics_for_prompt(metrics)
    
    print(metrics_text)
    print("n" + "-"*60)
    print("AI ANALİZİ:n")
    
    with client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {
                "role": "system",
                "content": "Sen Linux sistem yöneticisisin. Verilen metrikleri analiz ederek durum değerlendirmesi yap. Eğer kritik bir durum varsa acil uyarı ver. Kısa ve öz ol."
            },
            {
                "role": "user",
                "content": f"Bu sistem metriklerini değerlendir ve gerekirse öneride bulun:n{metrics_text}"
            }
        ],
        stream=True,
        max_tokens=600,
        temperature=0.2
    ) as stream:
        for chunk in stream:
            if chunk.choices[0].delta.content:
                print(chunk.choices[0].delta.content, end="", flush=True)
    
    print("nn" + "="*60)

if __name__ == "__main__":
    ai_health_check()

Performans İpuçları

Streaming API’yi verimli kullanmak için dikkat etmeniz gereken birkaç nokta:

  • Model seçimi: gpt-4o-mini streaming için ideal başlangıç noktası. Hem hızlı hem ucuz. Karmaşık analizler için gpt-4o tercih edin.
  • max_tokens: Her zaman belirtin. Sınırsız bırakmak hem maliyeti hem bekleme süresini artırır.
  • temperature: Analiz ve teknik içerik için 0.1-0.3, yaratıcı içerik için 0.7-0.9 kullanın.
  • Buffer yönetimi: Uzun stream’lerde topladığınız chunk’ları belirli aralıklarla flush edin veya dosyaya yazın.
  • Connection pooling: Çok sayıda istek gönderiyorsanız httpx.Client ile bağlantı havuzu kullanın.
  • Stream timeout: client = OpenAI(timeout=httpx.Timeout(connect=5.0, read=60.0)) şeklinde granüler timeout tanımlayın.

Şunu da unutmayın: streaming bağlantısı açıkken sunucu kaynağı tüketir. Çok sayıda eş zamanlı stream açmak rate limit’e çarpmana neden olabilir. OpenAI’nin tier’ınıza göre belirlediği RPM (requests per minute) ve TPM (tokens per minute) limitlerine dikkat edin.

Sonuç

OpenAI Streaming API, sysadmin araç kutusuna güçlü bir eklenti. Uzun süren model yanıtlarını gerçek zamanlı göstermek, kullanıcı deneyimini doğrudan iyileştiriyor. Log analiz araçlarından monitoring dashboard’larına, incident response script’lerinden otomatik raporlama sistemlerine kadar geniş bir yelpazede kullanabilirsiniz.

Temel prensipleri özetlersek: stream=True parametresiyle başlarsınız, chunk’ları iteratif işlersiniz, flush=True ile anında gösterirsiniz, hata yönetimini ihmal etmezsiniz. Async yapı ile paralel analizler yapabilir, retry mekanizmaları ile production dayanıklılığı sağlayabilirsiniz.

Bir sonraki adım olarak bu yapıları Slack bot’larına, Grafana webhook’larına veya internal ticketing sistemlerinize entegre etmeyi düşünebilirsiniz. Streaming sayesinde bot’unuz “Analiz ediyorum…” mesajı yerine doğrudan analiz sonuçlarını kelime kelime iletebilir. Bu, özellikle incident sırasında ekibinizin çok daha hızlı karar vermesini sağlar.

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir