Asenkron API İstekleri: Paralel Çalıştırma Yöntemleri

Modern uygulamaların büyük çoğunluğu artık birden fazla servisle konuşmak zorunda. Bir kullanıcı isteği geldiğinde; kullanıcı verisini bir servisten, ürün bilgisini başka bir servisten, stok durumunu ayrı bir API’den çekmeniz gerekiyor. Eğer bunları sırayla yaparsanız, her istek bir öncekinin bitmesini bekler ve toplam süre zincirleme uzar. İşte asenkron ve paralel API istekleri tam bu noktada devreye giriyor. Bu yazıda, hem teorik temelleri hem de gerçek prodüksiyon senaryolarında işe yarayan pratik yaklaşımları ele alacağız.

Senkron vs Asenkron: Neden Önemli?

Önce problemi somutlaştıralım. Diyelim ki bir e-ticaret uygulamanızda sipariş detay sayfası var. Bu sayfa için:

  • Kullanıcı bilgisi (150ms)
  • Ürün detayları (200ms)
  • Stok durumu (180ms)
  • Kargo bilgisi (220ms)

Senkron çalışırsa toplam: 150 + 200 + 180 + 220 = 750ms

Paralel çalışırsa toplam: max(150, 200, 180, 220) = 220ms

Fark net. Ama iş sadece hızla bitmiyor. Asenkron mimari aynı zamanda bir servis yavaşladığında diğerlerinin bundan etkilenmemesini sağlar, zaman aşımı yönetimini kolaylaştırır ve sistem kaynaklarını daha verimli kullanmanızı mümkün kılar.

Python ile Asenkron API İstekleri

Python’da asenkron programlamanın omurgası asyncio kütüphanesi. HTTP istekleri için ise aiohttp kütüphanesini kullanıyoruz. Klasik requests kütüphanesi senkron çalışır ve asenkron kod içinde kullanıldığında event loop’u bloklar, buna dikkat edin.

Temel asyncio ve aiohttp Kullanımı

pip install aiohttp asyncio
import asyncio
import aiohttp
import time

async def fetch_user(session, user_id):
    url = f"https://api.example.com/users/{user_id}"
    async with session.get(url) as response:
        return await response.json()

async def fetch_product(session, product_id):
    url = f"https://api.example.com/products/{product_id}"
    async with session.get(url) as response:
        return await response.json()

async def fetch_stock(session, product_id):
    url = f"https://api.example.com/stock/{product_id}"
    async with session.get(url) as response:
        return await response.json()

async def get_order_details(user_id, product_id):
    start = time.time()
    
    async with aiohttp.ClientSession() as session:
        # asyncio.gather ile hepsini paralel calistir
        user, product, stock = await asyncio.gather(
            fetch_user(session, user_id),
            fetch_product(session, product_id),
            fetch_stock(session, product_id)
        )
    
    elapsed = time.time() - start
    print(f"Toplam sure: {elapsed:.2f}s")
    
    return {
        "user": user,
        "product": product,
        "stock": stock
    }

# Calistir
result = asyncio.run(get_order_details(123, 456))

asyncio.gather burada kilit rol oynuyor. Verdiğiniz coroutine’leri aynı anda başlatır ve hepsi tamamlandığında sonuçları liste olarak döner. Sıralama garantisi var, yani user, product, stock değişken ataması her zaman doğru sırayla gerçekleşir.

Hata Yönetimi ile Birlikte Paralel İstekler

Prodüksiyonda her şey yolunda gitmez. Bir servis çökebilir, timeout olabilir, rate limit yiyebilirsiniz. asyncio.gather‘ın return_exceptions=True parametresi burada hayat kurtarır:

import asyncio
import aiohttp
from typing import Optional

async def safe_fetch(session, url, service_name):
    try:
        timeout = aiohttp.ClientTimeout(total=5)  # 5 saniye timeout
        async with session.get(url, timeout=timeout) as response:
            if response.status == 200:
                return await response.json()
            else:
                print(f"{service_name} hata dondu: {response.status}")
                return None
    except asyncio.TimeoutError:
        print(f"{service_name} zaman asimina ugradi")
        return None
    except aiohttp.ClientError as e:
        print(f"{service_name} baglanti hatasi: {e}")
        return None

async def fetch_dashboard_data(user_id):
    endpoints = {
        "profile": f"https://api.example.com/users/{user_id}",
        "orders": f"https://api.example.com/orders?user={user_id}",
        "notifications": f"https://api.example.com/notifications/{user_id}",
        "recommendations": f"https://api.example.com/recommend/{user_id}"
    }
    
    async with aiohttp.ClientSession() as session:
        tasks = [
            safe_fetch(session, url, name)
            for name, url in endpoints.items()
        ]
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        dashboard = {}
        for key, result in zip(endpoints.keys(), results):
            if isinstance(result, Exception):
                print(f"{key} servisi beklenmedik hata: {result}")
                dashboard[key] = None
            else:
                dashboard[key] = result
        
        return dashboard

Bu yaklaşımla bir servis patlasa bile diğerleri çalışmaya devam eder ve kullanıcıya kısmi veri gösterebilirsiniz. Graceful degradation denen şey budur ve prodüksiyon sistemlerinde olmazsa olmazdır.

Toplu İsteklerde Semaphore ile Hız Sınırlama

Diyelim ki 500 ürünün fiyatını bir harici API’den çekmeniz gerekiyor. Hepsini aynı anda göndermek hem karşı tarafın rate limit’ine takılmanıza neden olur hem de kendi sisteminizin kaynaklarını boğar. asyncio.Semaphore ile eş zamanlı istek sayısını kontrol altında tutabilirsiniz:

import asyncio
import aiohttp

async def fetch_price(session, semaphore, product_id):
    async with semaphore:  # maksimum N istek ayni anda
        url = f"https://pricing-api.example.com/price/{product_id}"
        try:
            async with session.get(url) as response:
                data = await response.json()
                return {"product_id": product_id, "price": data.get("price")}
        except Exception as e:
            return {"product_id": product_id, "price": None, "error": str(e)}

async def bulk_price_fetch(product_ids, concurrent_limit=20):
    semaphore = asyncio.Semaphore(concurrent_limit)
    
    connector = aiohttp.TCPConnector(limit=50)  # toplam baglanti havuzu
    async with aiohttp.ClientSession(connector=connector) as session:
        tasks = [
            fetch_price(session, semaphore, pid)
            for pid in product_ids
        ]
        results = await asyncio.gather(*tasks)
    
    return results

# 500 urun icin kullanim
product_ids = list(range(1, 501))
prices = asyncio.run(bulk_price_fetch(product_ids, concurrent_limit=20))
print(f"Toplam {len(prices)} urun fiyati alindi")

concurrent_limit=20 değerini API dokümantasyonuna ve karşı tarafın rate limit politikasına göre ayarlayın. Genellikle API sağlayıcıları saniyede kaç istek kabul ettiklerini açıklar.

Node.js ile Promise.all ve Paralel İstekler

JavaScript dünyasında asenkron zaten dilin DNA’sında var. Promise.all Python’daki asyncio.gather‘ın kardeşi gibi düşünebilirsiniz:

const axios = require('axios');

async function fetchOrderDetails(orderId) {
    const baseUrl = 'https://api.example.com';
    
    try {
        const [order, payment, shipment] = await Promise.all([
            axios.get(`${baseUrl}/orders/${orderId}`),
            axios.get(`${baseUrl}/payments/${orderId}`),
            axios.get(`${baseUrl}/shipments/${orderId}`)
        ]);
        
        return {
            order: order.data,
            payment: payment.data,
            shipment: shipment.data
        };
    } catch (error) {
        // Promise.all icindeki herhangi bir promise reddedilirse
        // tum istek grubu hata verir
        console.error('Siparis detayi alinamadi:', error.message);
        throw error;
    }
}

// Promise.allSettled - hata olsa da devam et
async function fetchDashboardData(userId) {
    const requests = [
        axios.get(`https://api.example.com/profile/${userId}`),
        axios.get(`https://api.example.com/orders/${userId}`),
        axios.get(`https://api.example.com/wishlist/${userId}`)
    ];
    
    const results = await Promise.allSettled(requests);
    
    const data = {};
    const keys = ['profile', 'orders', 'wishlist'];
    
    results.forEach((result, index) => {
        if (result.status === 'fulfilled') {
            data[keys[index]] = result.value.data;
        } else {
            console.warn(`${keys[index]} yuklenemedi:`, result.reason.message);
            data[keys[index]] = null;
        }
    });
    
    return data;
}

Promise.all ile Promise.allSettled arasındaki farka dikkat edin. Promise.all herhangi bir hata aldığında tamamen durur. Promise.allSettled ise hepsi tamamlanana kadar bekler, başarılı veya başarısız fark etmez. Dashboard gibi kısmi veri gösterebildiğiniz yerlerde allSettled çok daha iyi bir seçim.

curl ile Paralel İstekler: Shell Script Yaklaşımı

Bazen bir bash script yazıyorsunuz ve birden fazla endpoint’i hızlıca test etmeniz ya da toplu veri çekmeniz gerekiyor. curl‘ün --parallel özelliği tam burada işe yarıyor:

#!/bin/bash

# curl 7.66+ ile gelen paralel istek ozelligi
curl --parallel --parallel-immediate 
     --parallel-max 5 
     -o user.json "https://api.example.com/users/123" 
     -o product.json "https://api.example.com/products/456" 
     -o stock.json "https://api.example.com/stock/456" 
     -o shipping.json "https://api.example.com/shipping/789"

echo "Tum dosyalar indirildi"
cat user.json | python3 -m json.tool

Daha eski curl versiyonları veya daha fazla kontrol istiyorsanız arka plan process’leri kullanabilirsiniz:

#!/bin/bash

API_BASE="https://api.example.com"
TOKEN="Bearer your-token-here"
RESULTS_DIR="/tmp/api_results"
mkdir -p "$RESULTS_DIR"

# Arka planda paralel istekler
curl -s -H "Authorization: $TOKEN" 
     -o "$RESULTS_DIR/user.json" 
     "$API_BASE/users/123" &
PID1=$!

curl -s -H "Authorization: $TOKEN" 
     -o "$RESULTS_DIR/orders.json" 
     "$API_BASE/orders?user=123" &
PID2=$!

curl -s -H "Authorization: $TOKEN" 
     -o "$RESULTS_DIR/notifications.json" 
     "$API_BASE/notifications/123" &
PID3=$!

# Tum proseslerin bitmesini bekle
wait $PID1 $PID2 $PID3

echo "Tum istekler tamamlandi"

# Sonuclari kontrol et
for file in "$RESULTS_DIR"/*.json; do
    if [ -s "$file" ]; then
        echo "OK: $file ($(wc -c < "$file") bytes)"
    else
        echo "HATA: $file bos veya mevcut degil"
    fi
done

Retry Mekanizması ile Sağlam Paralel İstekler

Gerçek dünyada API’ler zaman zaman geçici hatalar verir. 429 (Too Many Requests), 503 (Service Unavailable) gibi durumlar için exponential backoff ile retry mantığı şart:

import asyncio
import aiohttp
import random
from functools import wraps

async def retry_request(session, url, max_retries=3, base_delay=1.0):
    """
    Exponential backoff ile yeniden deneme yapan istek fonksiyonu
    """
    last_exception = None
    
    for attempt in range(max_retries):
        try:
            timeout = aiohttp.ClientTimeout(total=10)
            async with session.get(url, timeout=timeout) as response:
                
                if response.status == 429:
                    # Rate limit: Retry-After header'ina bak
                    retry_after = int(response.headers.get('Retry-After', base_delay))
                    print(f"Rate limit! {retry_after}s bekleniyor...")
                    await asyncio.sleep(retry_after)
                    continue
                
                if response.status in (500, 502, 503, 504):
                    # Sunucu hatasi, yeniden dene
                    raise aiohttp.ClientResponseError(
                        response.request_info,
                        response.history,
                        status=response.status
                    )
                
                return await response.json()
        
        except (aiohttp.ClientError, asyncio.TimeoutError) as e:
            last_exception = e
            if attempt < max_retries - 1:
                # Jitter ekleyerek thundering herd problemini onle
                delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
                print(f"Deneme {attempt + 1} basarisiz, {delay:.2f}s sonra tekrar...")
                await asyncio.sleep(delay)
    
    print(f"Maksimum deneme sayisina ulasildi: {url}")
    return None

async def resilient_parallel_fetch(endpoints):
    async with aiohttp.ClientSession() as session:
        tasks = [retry_request(session, url) for url in endpoints]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        return results

# Kullanim
endpoints = [
    "https://api.example.com/data/1",
    "https://api.example.com/data/2",
    "https://api.example.com/data/3",
]

results = asyncio.run(resilient_parallel_fetch(endpoints))

Jitter eklenmesi kritik. Tüm isteklerin aynı anda retry yapması sunucuyu yeniden vurur ve problemi çözmez, aksine kötüleştirir. Rastgele bir bekleme süresi ekleyerek bu “thundering herd” problemini önlersiniz.

Gerçek Dünya Senaryosu: Webhook Bildirimi Dağıtımı

Bir SaaS ürünü işlettiğinizi düşünün. Bir müşteri işlemi gerçekleştiğinde 50 farklı müşterinin webhook endpoint’ine bildirim göndermeniz gerekiyor. Hepsini sırayla göndermek yerine paralel gönderip, başarısız olanları loglayın:

import asyncio
import aiohttp
import json
from datetime import datetime

async def send_webhook(session, semaphore, webhook_config, payload):
    """
    Tek bir webhook gonderimine ait asenkron fonksiyon
    """
    url = webhook_config['url']
    secret = webhook_config.get('secret', '')
    customer_id = webhook_config['customer_id']
    
    headers = {
        'Content-Type': 'application/json',
        'X-Webhook-Secret': secret,
        'X-Timestamp': datetime.utcnow().isoformat()
    }
    
    async with semaphore:
        try:
            timeout = aiohttp.ClientTimeout(total=8)
            async with session.post(
                url,
                json=payload,
                headers=headers,
                timeout=timeout
            ) as response:
                success = response.status in (200, 201, 202, 204)
                
                return {
                    'customer_id': customer_id,
                    'status': 'success' if success else 'failed',
                    'http_status': response.status,
                    'url': url
                }
        
        except asyncio.TimeoutError:
            return {
                'customer_id': customer_id,
                'status': 'timeout',
                'url': url
            }
        except Exception as e:
            return {
                'customer_id': customer_id,
                'status': 'error',
                'error': str(e),
                'url': url
            }

async def dispatch_webhooks(webhook_configs, event_payload):
    semaphore = asyncio.Semaphore(15)  # ayni anda max 15 webhook
    
    connector = aiohttp.TCPConnector(
        limit=30,
        ssl=False  # self-signed cert kullanan musteriler icin, produksiyonda dikkatli kullanin
    )
    
    async with aiohttp.ClientSession(connector=connector) as session:
        tasks = [
            send_webhook(session, semaphore, config, event_payload)
            for config in webhook_configs
        ]
        results = await asyncio.gather(*tasks)
    
    # Istatistikleri hesapla
    success_count = sum(1 for r in results if r['status'] == 'success')
    failed = [r for r in results if r['status'] != 'success']
    
    print(f"Webhook dagilimi: {success_count}/{len(results)} basarili")
    
    if failed:
        print(f"Basarisiz webhook'lar yeniden kuyruğa aliniyor...")
        for item in failed:
            print(f"  - Musteri {item['customer_id']}: {item['status']}")
    
    return results

# Kullanim
webhooks = [
    {'customer_id': i, 'url': f'https://customer{i}.example.com/webhook', 'secret': f'secret{i}'}
    for i in range(1, 51)
]

payload = {
    'event': 'order.completed',
    'data': {'order_id': 12345, 'amount': 299.99}
}

results = asyncio.run(dispatch_webhooks(webhooks, payload))

Performans İzleme ve Metrik Toplama

Paralel istekleri prodüksiyona aldıktan sonra ne kadar iyileşme sağladığınızı ölçmeniz lazım. Basit bir timing decorator yazabilirsiniz:

import asyncio
import time
from functools import wraps

def async_timer(func_name=None):
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            start = time.perf_counter()
            try:
                result = await func(*args, **kwargs)
                elapsed = (time.perf_counter() - start) * 1000
                name = func_name or func.__name__
                print(f"[METRIC] {name}: {elapsed:.2f}ms - BASARILI")
                return result
            except Exception as e:
                elapsed = (time.perf_counter() - start) * 1000
                name = func_name or func.__name__
                print(f"[METRIC] {name}: {elapsed:.2f}ms - HATA: {e}")
                raise
        return wrapper
    return decorator

@async_timer("kullanici_dashboard")
async def get_user_dashboard(user_id):
    async with aiohttp.ClientSession() as session:
        results = await asyncio.gather(
            safe_fetch(session, f"/api/profile/{user_id}", "profile"),
            safe_fetch(session, f"/api/orders/{user_id}", "orders"),
            safe_fetch(session, f"/api/activity/{user_id}", "activity")
        )
    return results

Dikkat Edilmesi Gereken Noktalar

Paralel API istekleri güçlü ama dikkat edilmezse yeni problemler yaratır.

  • Connection pool boyutu: Çok fazla eş zamanlı bağlantı açmak hem kendi sunucunuzda hem de karşı tarafta sorun çıkarır. TCPConnector(limit=...) ile sınırlayın.
  • Memory kullanımı: Binlerce isteği aynı anda başlatmak bellekte birikim yapar. Semaphore veya batch’leme kullanın.
  • Sıralı bağımlılıklar: Bazı istekler birbirinin sonucuna bağlı olabilir. A’nın cevabına göre B’yi çağıracaksanız bunları paralel yapamazsınız, bu bir mimari kararı.
  • Timeout ayarları: Her istek için makul bir timeout belirleyin. Varsayılan değerlere güvenmeyin; bir servis cevap vermese event loop’unuzu saatlerce bekletebilir.
  • Rate limiting farkındalığı: Karşı API’nin limitlerini aşmamak için semaphore değerlerini buna göre ayarlayın ve 429 yanıtlarını doğru handle edin.
  • İzleme ve loglama: Paralel sistemlerde hangi isteğin ne zaman gittiğini takip etmek zordur. Her isteğe bir correlation ID ekleyin.

Sonuç

Asenkron ve paralel API istekleri, özellikle birden fazla downstream servisle çalışan sistemlerde hem gecikmeyi dramatik biçimde düşürür hem de sistemin genel dayanıklılığını artırır. Python’da asyncio ve aiohttp ikilisi, Node.js’de Promise.all ve Promise.allSettled, shell scriptlerde curl’ün paralel modu ile bu yaklaşımı farklı katmanlarda uygulayabilirsiniz.

Önemli olan sadece paralel yapmak değil, hataları doğru yönetmek, rate limit’e saygı göstermek ve performansı ölçmek. Prodüksiyona almadan önce mutlaka yük testleri yapın ve timeout, retry, circuit breaker gibi dayanıklılık mekanizmalarını yerleştirin. Kademeli geçiş yapın; önce en yüksek etkiyi beklediğiniz tek bir endpoint grubundan başlayın, metrikleri izleyin, sonra genişletin.

İyi yapılandırılmış paralel istek sistemi, kullanıcılarınızın fark etmeden yaşadığı ama fark ettiğinde çok övdüğü türden bir iyileştirmedir.

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir