OpenAI Assistants API ile Durum Bilen Yapay Zeka Geliştirme
Bir müşteri destek botu yazıyorsunuz, basit bir chat completion çağrısıyla başladınız, her şey güzel gidiyor. Sonra fark ediyorsunuz: kullanıcı “az önce söylediğim şeyi hatırlıyor musun?” diye soruyor, bot ise “Üzgünüm, hangi şeyi kastediyorsunuz?” diye yanıtlıyor. Konuşma geçmişini kendiniz yönetmek zorunda kalıyorsunuz, her istekte tüm mesaj dizisini tekrar gönderiyorsunuz, token maliyetleri tırmanıyor. OpenAI’nin Assistants API’si tam bu noktada devreye giriyor ve “thread” kavramıyla durumu sizin yerinize yönetiyor.
Assistants API Nedir ve Neden Önemlidir
Klasik Chat Completions API’si stateless çalışır. Her çağrı bağımsızdır, geçmişi siz taşırsınız. Bu büyük uygulamalar için ciddi bir yük oluşturur. Assistants API ise farklı bir paradigma sunar: thread adını verdiği kalıcı konuşma oturumları, assistant nesnesi üzerinden tanımlanan sistem konfigürasyonları ve run mekanizmasıyla asenkron işlem yönetimi.
Pratikte bu şu anlama geliyor: bir kez assistant oluşturuyorsunuz, her kullanıcı için bir thread açıyorsunuz, mesajları thread’e ekliyorsunuz ve run başlatıyorsunuz. Geçmişi kendiniz takip etmenize gerek yok, token limitini aşmamak için mesajları budamanıza gerek yok (API bunu otomatik yönetiyor), ve thread’i veritabanınızda saklarsanız kullanıcı yarım kalan konuşmaya günler sonra bile devam edebiliyor.
Sistem yöneticisi perspektifinden bakarsak: bu bir durum makinesi. Konuşma durumu OpenAI tarafında barındırılıyor, siz sadece thread ID’sini ve assistant ID’sini saklıyorsunuz.
Temel Kurulum ve İlk Assistant Oluşturma
Başlamadan önce ortamı kuralım:
pip install openai python-dotenv
export OPENAI_API_KEY="sk-..."
İlk assistant’ımızı oluşturalım. Bu işlemi bir kez yapmanız yeterli, assistant ID’sini saklayıp tekrar kullanırsınız:
from openai import OpenAI
import json
client = OpenAI()
assistant = client.beta.assistants.create(
name="Sistem Yönetici Asistanı",
instructions="""Sen deneyimli bir Linux sistem yöneticisisin.
Kullanıcıların altyapı sorunlarını çözmelerine yardım ediyorsun.
Komutları her zaman güvenli versiyonlarıyla öneriyorsun.
Tehlikeli operasyonlar için önce onay istiyorsun.
Türkçe yanıt veriyorsun.""",
model="gpt-4o",
tools=[{"type": "code_interpreter"}]
)
print(f"Assistant ID: {assistant.id}")
# Bu ID'yi .env dosyanıza kaydedin: ASSISTANT_ID=asst_xxxxx
Çıktı olarak bir asst_ ile başlayan ID alırsınız. Bunu .env dosyanıza yazın, çünkü assistant’ı her seferinde yeniden oluşturmanıza gerek yok.
Thread Yönetimi: Konuşma Durumunu Koruma
Thread, konuşmanın yaşadığı yerdir. Kullanıcı başına bir thread oluşturup ID’sini veritabanınızda saklıyorsunuz:
import os
from openai import OpenAI
from dotenv import load_dotenv
load_dotenv()
client = OpenAI()
def get_or_create_thread(user_id: str, db_connection) -> str:
"""
Kullanıcının mevcut thread'ini getirir ya da yeni oluşturur.
db_connection: SQLite, PostgreSQL vb. bağlantı nesnesi
"""
# Önce veritabanında ara
cursor = db_connection.cursor()
cursor.execute(
"SELECT thread_id FROM user_threads WHERE user_id = ?",
(user_id,)
)
row = cursor.fetchone()
if row:
return row[0]
# Yoksa yeni thread oluştur
thread = client.beta.threads.create()
cursor.execute(
"INSERT INTO user_threads (user_id, thread_id, created_at) VALUES (?, ?, datetime('now'))",
(user_id, thread.id)
)
db_connection.commit()
return thread.id
# Kullanım örneği
# thread_id = get_or_create_thread("user_123", conn)
Burada kritik nokta şu: thread ID’si kullanıcı oturumuna değil, kullanıcının kimliğine bağlı. Yani kullanıcı tarayıcıyı kapatıp açsa da, uygulamayı restart etseniz de, konuşma kaldığı yerden devam ediyor.
Mesaj Gönderme ve Run Yönetimi
Şimdi asıl işin yürütme mekanizmasına bakalım. Assistants API asenkron çalışır, yani mesajı gönderip hemen yanıt alamazsınız, run’ın tamamlanmasını beklemeniz gerekir:
import time
def send_message_and_wait(thread_id: str, assistant_id: str, user_message: str) -> str:
"""
Thread'e mesaj ekler, run başlatır ve sonucu döndürür.
"""
# Mesajı thread'e ekle
client.beta.threads.messages.create(
thread_id=thread_id,
role="user",
content=user_message
)
# Run başlat
run = client.beta.threads.runs.create(
thread_id=thread_id,
assistant_id=assistant_id
)
# Tamamlanmasını bekle (polling)
max_attempts = 30
attempt = 0
while attempt < max_attempts:
run_status = client.beta.threads.runs.retrieve(
thread_id=thread_id,
run_id=run.id
)
if run_status.status == "completed":
break
elif run_status.status in ["failed", "cancelled", "expired"]:
raise Exception(f"Run başarısız: {run_status.status} - {run_status.last_error}")
elif run_status.status == "requires_action":
# Function calling durumu - ayrıca ele alacağız
handle_required_action(run_status, thread_id, run.id)
time.sleep(1)
attempt += 1
# Son mesajı al
messages = client.beta.threads.messages.list(
thread_id=thread_id,
order="desc",
limit=1
)
return messages.data[0].content[0].text.value
Polling yaklaşımı production ortamı için biraz kaba kaçabilir. OpenAI’nin create_and_poll yardımcı metodunu da kullanabilirsiniz:
def send_message_simple(thread_id: str, assistant_id: str, user_message: str) -> str:
"""
Daha temiz bir yaklaşım: create_and_poll kullanımı
"""
client.beta.threads.messages.create(
thread_id=thread_id,
role="user",
content=user_message
)
# Bu metod run'ı başlatır ve otomatik olarak tamamlanmasını bekler
run = client.beta.threads.runs.create_and_poll(
thread_id=thread_id,
assistant_id=assistant_id,
timeout=60 # maksimum 60 saniye bekle
)
if run.status != "completed":
raise Exception(f"Run tamamlanamadı: {run.status}")
messages = client.beta.threads.messages.list(
thread_id=thread_id,
order="desc",
limit=1
)
return messages.data[0].content[0].text.value
Function Calling ile Gerçek Sistem Entegrasyonu
Assistants API’nin gerçek gücü burada ortaya çıkıyor. Assistant’a araçlar tanımlıyorsunuz, model bu araçları ne zaman çağıracağına kendisi karar veriyor. Pratik bir senaryo: sunucu durumu soran bir kullanıcıya gerçek sistem metriklerini döndüren bir bot.
# Assistant'ı araçlarla birlikte oluşturma
assistant_with_tools = client.beta.assistants.create(
name="Altyapı Monitör Asistanı",
instructions="""Sen bir altyapı izleme asistanısın.
Kullanıcılar sunucu durumu, servis metrikleri ve log analizi hakkında soru sorar.
Gerçek verileri araçlarını kullanarak getir, tahmin yürütme.
Kritik sorunları tespit edersen hemen uyar.""",
model="gpt-4o",
tools=[
{
"type": "function",
"function": {
"name": "get_server_metrics",
"description": "Belirtilen sunucunun CPU, RAM ve disk kullanımını döndürür",
"parameters": {
"type": "object",
"properties": {
"server_name": {
"type": "string",
"description": "Sunucu adı veya IP adresi"
},
"metric_type": {
"type": "string",
"enum": ["cpu", "memory", "disk", "all"],
"description": "İstenen metrik türü"
}
},
"required": ["server_name", "metric_type"]
}
}
},
{
"type": "function",
"function": {
"name": "get_service_status",
"description": "Linux servisinin çalışma durumunu kontrol eder",
"parameters": {
"type": "object",
"properties": {
"service_name": {
"type": "string",
"description": "Systemd servis adı (nginx, postgresql vb.)"
},
"server_name": {
"type": "string",
"description": "Kontrol edilecek sunucu"
}
},
"required": ["service_name", "server_name"]
}
}
}
]
)
Şimdi requires_action durumunu yöneten fonksiyonu yazalım:
import subprocess
import json
def get_server_metrics(server_name: str, metric_type: str) -> dict:
"""
Gerçek metrik çekme fonksiyonu.
Production'da Prometheus/Grafana API çağrısı yaparsınız.
"""
# Bu örnekte SSH ile uzak sunucudan veri çekiyoruz
commands = {
"cpu": f"ssh {server_name} "top -bn1 | grep 'Cpu(s)' | awk '{{print $2}}'"",
"memory": f"ssh {server_name} "free -m | awk 'NR==2{{printf \"%s %s %s\", $2,$3,$4}}'"",
"disk": f"ssh {server_name} "df -h / | awk 'NR==2{{print $5}}'"",
}
results = {}
if metric_type == "all":
for key, cmd in commands.items():
try:
output = subprocess.check_output(cmd, shell=True, timeout=10).decode().strip()
results[key] = output
except Exception as e:
results[key] = f"Hata: {str(e)}"
else:
try:
output = subprocess.check_output(commands[metric_type], shell=True, timeout=10).decode().strip()
results[metric_type] = output
except Exception as e:
results[metric_type] = f"Hata: {str(e)}"
return {"server": server_name, "metrics": results}
def handle_required_action(run, thread_id: str, run_id: str):
"""
Model bir tool çağırmak istediğinde bu fonksiyon devreye girer.
"""
tool_calls = run.required_action.submit_tool_outputs.tool_calls
tool_outputs = []
for tool_call in tool_calls:
function_name = tool_call.function.name
arguments = json.loads(tool_call.function.arguments)
print(f"Araç çağrılıyor: {function_name} - Parametreler: {arguments}")
if function_name == "get_server_metrics":
result = get_server_metrics(
arguments["server_name"],
arguments["metric_type"]
)
elif function_name == "get_service_status":
# Benzer implementasyon
result = {"status": "active", "uptime": "5 gün 3 saat"}
else:
result = {"error": f"Bilinmeyen fonksiyon: {function_name}"}
tool_outputs.append({
"tool_call_id": tool_call.id,
"output": json.dumps(result, ensure_ascii=False)
})
# Sonuçları OpenAI'ye gönder
client.beta.threads.runs.submit_tool_outputs(
thread_id=thread_id,
run_id=run_id,
tool_outputs=tool_outputs
)
Streaming ile Anlık Yanıt Akışı
Uzun yanıtlar için kullanıcıyı bekletmek kötü UX. Streaming kullanarak yanıtı gerçek zamanlı aktarabilirsiniz:
from openai import AssistantEventHandler
from typing import override
class StreamHandler(AssistantEventHandler):
@override
def on_text_created(self, text) -> None:
print("nAsistan: ", end="", flush=True)
@override
def on_text_delta(self, delta, snapshot):
print(delta.value, end="", flush=True)
@override
def on_tool_call_created(self, tool_call):
print(f"n[Araç çağrılıyor: {tool_call.type}]", flush=True)
@override
def on_event(self, event):
if event.event == "thread.run.requires_action":
run_id = event.data.id
self.handle_requires_action(event.data, run_id)
def handle_requires_action(self, data, run_id):
tool_outputs = []
for tool_call in data.required_action.submit_tool_outputs.tool_calls:
arguments = json.loads(tool_call.function.arguments)
result = get_server_metrics(
arguments.get("server_name", "localhost"),
arguments.get("metric_type", "all")
)
tool_outputs.append({
"tool_call_id": tool_call.id,
"output": json.dumps(result, ensure_ascii=False)
})
self.submit_tool_outputs(tool_outputs)
def stream_response(thread_id: str, assistant_id: str, user_message: str):
client.beta.threads.messages.create(
thread_id=thread_id,
role="user",
content=user_message
)
with client.beta.threads.runs.stream(
thread_id=thread_id,
assistant_id=assistant_id,
event_handler=StreamHandler()
) as stream:
stream.until_done()
print() # Yeni satır
# Test
# stream_response(thread_id, assistant_id, "prod-web-01 sunucusunun durumu nasıl?")
Gerçek Dünya Senaryosu: DevOps Yardım Botu
Tüm parçaları bir araya getiren tam bir örnek. Şirketteki sysadmin ekibinin kullandığı, Slack ile entegre, hafızalı bir yardım botu:
import sqlite3
import os
from openai import OpenAI
from dotenv import load_dotenv
load_dotenv()
class DevOpsBot:
def __init__(self):
self.client = OpenAI()
self.assistant_id = os.getenv("ASSISTANT_ID")
self.db = self._init_db()
def _init_db(self):
conn = sqlite3.connect("devops_bot.db")
conn.execute("""
CREATE TABLE IF NOT EXISTS user_threads (
user_id TEXT PRIMARY KEY,
thread_id TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_activity TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
conn.commit()
return conn
def get_thread(self, user_id: str) -> str:
cursor = self.db.cursor()
cursor.execute("SELECT thread_id FROM user_threads WHERE user_id = ?", (user_id,))
row = cursor.fetchone()
if row:
# Son aktiviteyi güncelle
cursor.execute(
"UPDATE user_threads SET last_activity = datetime('now') WHERE user_id = ?",
(user_id,)
)
self.db.commit()
return row[0]
thread = self.client.beta.threads.create(
metadata={"user_id": user_id, "created_by": "devops_bot"}
)
cursor.execute(
"INSERT INTO user_threads (user_id, thread_id) VALUES (?, ?)",
(user_id, thread.id)
)
self.db.commit()
return thread.id
def reset_conversation(self, user_id: str):
"""Kullanıcının konuşmasını sıfırla - yeni thread oluştur"""
cursor = self.db.cursor()
thread = self.client.beta.threads.create()
cursor.execute(
"UPDATE user_threads SET thread_id = ?, last_activity = datetime('now') WHERE user_id = ?",
(thread.id, user_id)
)
self.db.commit()
return thread.id
def chat(self, user_id: str, message: str) -> str:
if message.strip().lower() in ["/reset", "/yenile"]:
self.reset_conversation(user_id)
return "Konuşma geçmişi temizlendi. Yeni bir başlangıç!"
thread_id = self.get_thread(user_id)
self.client.beta.threads.messages.create(
thread_id=thread_id,
role="user",
content=message
)
run = self.client.beta.threads.runs.create_and_poll(
thread_id=thread_id,
assistant_id=self.assistant_id,
timeout=120
)
if run.status != "completed":
return f"Yanıt alınamadı (durum: {run.status}). Lütfen tekrar deneyin."
messages = self.client.beta.threads.messages.list(
thread_id=thread_id,
order="desc",
limit=1
)
return messages.data[0].content[0].text.value
# Kullanım
bot = DevOpsBot()
# Simüle edilmiş çoklu tur konuşma
print(bot.chat("user_ahmet", "nginx servisimiz çöktü, ne yapmalıyım?"))
print(bot.chat("user_ahmet", "az önce bahsettiğin log komutunu tekrar yazar mısın?"))
print(bot.chat("user_ahmet", "/reset"))
print(bot.chat("user_ahmet", "merhaba, yeni bir konu açmak istiyorum"))
Maliyet ve Token Yönetimi
Assistants API kullanırken dikkat etmeniz gereken birkaç nokta var. Thread’ler OpenAI sunucularında saklanır ve her run’da tüm geçmiş işlenir. Bu uzun konuşmalarda ciddi maliyet yaratabilir.
Thread metadata’sında thread boyutunu takip etmek için:
# Thread'deki tüm mesajları listeleyen ve token tahminini hesaplayan basit script
python3 << 'EOF'
from openai import OpenAI
import os
client = OpenAI()
thread_id = os.getenv("TEST_THREAD_ID")
messages = client.beta.threads.messages.list(thread_id=thread_id)
total_chars = 0
for msg in messages.data:
content = msg.content[0].text.value if msg.content else ""
total_chars += len(content)
print(f"[{msg.role}] {content[:100]}...")
# Kaba token tahmini (4 karakter ~ 1 token)
estimated_tokens = total_chars / 4
print(f"nTahmini token sayısı: {estimated_tokens:.0f}")
print(f"Tahmini maliyet (gpt-4o): ${estimated_tokens * 0.000005:.4f}")
EOF
Run parametrelerinde max_prompt_tokens ve max_completion_tokens kullanarak maliyeti kontrol altına alabilirsiniz. Ayrıca truncation_strategy ile eski mesajların nasıl budanacağını belirleyebilirsiniz.
Production’da Dikkat Edilmesi Gerekenler
Birkaç önemli nokta var, deneyimlerimden konuşuyorum:
- Rate limiting: Assistants API’nin run bazında rate limiti var. Yoğun kullanımda exponential backoff implementasyonu şart.
- Thread yaşam döngüsü: OpenAI thread’leri 60 gün sonra otomatik siler. Uzun süreli müşteri konuşmaları için bunu göz önünde bulundurun.
- Hata yönetimi:
requires_actiondurumunda function çağrısı başarısız olursa 10 dakika içinde tool output submit etmeniz gerekiyor, aksi halde run expire oluyor. - Concurrent run yasağı: Bir thread’de aynı anda sadece bir run çalışabilir. Birden fazla kullanıcı mesajı aynı anda gelirse queue mekanizması kurmanız gerekiyor.
- Webhook vs polling: Şu an Assistants API webhook desteklemiyor, polling yapmanız gerekiyor. Bu yüksek trafikte sorun yaratabilir.
Metadata kullanımı da ciddi kolaylık sağlıyor. Thread ve message’lara metadata ekleyerek kendi uygulama verilerinizi OpenAI tarafında saklayabilirsiniz, ama buna çok güvenmeyin, asıl verilerinizi her zaman kendi veritabanınızda tutun.
Sonuç
Assistants API, durum yönetimini framework’e devrederek ciddi bir geliştirme kolaylığı sunuyor. Özellikle çok turlu konuşma gerektiren uygulamalarda, her şeyi kendiniz yönetmeye kıyasla kod karmaşıklığını önemli ölçüde azaltıyor.
Ancak her soyutlama katmanı gibi bu da bazı kontrol noktalarını elinizden alıyor. Token yönetimi, rate limiting ve thread yaşam döngüsü gibi konularda dikkatli olmazsanız hem maliyet hem de güvenilirlik sorunlarıyla karşılaşabilirsiniz.
Üretim ortamı için önerim şu: küçük başlayın, tek bir use case için Assistants API kullanın, metriklerinizi toplayın ve gördüklerinize göre ilerleyin. Function calling özelliğini ekibin gerçek ihtiyaçlarına göre genişletin. En önemlisi, OpenAI tarafında sakladığınız her şeyin bir kopyasını kendi veritabanınızda bulundurun, bağımlılığı yönetilebilir tutun.
