Toplu Görsel Üretimi: Stable Diffusion ile Batch Processing Kullanarak Zaman Kazanma

Onlarca görsel üretmesi gereken bir iş akışınız var ve her seferinde tek tek “Generate” butonuna basmaktan bıktınız mı? Ya da gece uyurken sunucunuzun sizin için yüzlerce görsel üretmesini istiyorsunuz ama nasıl yapacağınızı bilmiyorsunuz? İşte tam olarak bu yazıda bunu konuşacağız.

Stable Diffusion’ın batch processing yetenekleri, özellikle production ortamlarında veya büyük ölçekli görsel üretim projelerinde hayat kurtarıcı. Bir kere doğru kurup yapılandırdıktan sonra, siz kahvenizi içerken sistem sizin için çalışıyor.

Batch Processing Neden Bu Kadar Önemli?

Tek tek görsel üretmek, küçük projeler için tamam. Ama gerçek dünya senaryolarında işler hızla karmaşıklaşıyor. Diyelim ki bir e-ticaret sitesi için 500 farklı ürün görseli üretmeniz gerekiyor. Ya da bir oyun için 200 farklı karakter varyasyonu lazım. Ya da her gün otomatik olarak sosyal medya için içerik görselleri oluşturmanız gerekiyor.

Bu senaryolarda manuel yaklaşım tamamen çöküyor. Batch processing ile şunları kazanıyorsunuz:

  • GPU kullanımını optimize etme: Tek bir görsel için GPU’yu ısıtıp soğutmak yerine sürekli çalışır halde tutmak
  • Otomasyon: Cron job’larla gece işlemleri çalıştırma
  • Tutarlılık: Aynı parametrelerle üretilen görsel setleri
  • Zaman tasarrufu: İnsan müdahalesi olmadan saatler, hatta günler boyunca çalışma

Ortamı Hazırlamak

Önce temel kurulumun yapılmış olduğunu varsayıyorum. Eğer değilse, bir önceki yazımıza bakmanızı öneririm. Batch işlemleri için ihtiyacımız olan ek bileşenler:

# Python virtual environment oluştur
python3 -m venv sd-batch-env
source sd-batch-env/bin/activate

# Gerekli kütüphaneleri kur
pip install diffusers transformers accelerate torch torchvision
pip install Pillow requests tqdm

# API erişimi için ekstra paketler
pip install aiohttp asyncio

AUTOMATIC1111 Web UI kullanıyorsanız, API’yi etkinleştirmek için başlatma parametrelerine --api eklemek zorundasınız:

# webui.sh içindeki COMMANDLINE_ARGS değişkenini düzenle
export COMMANDLINE_ARGS="--api --api-log --nowebui --listen --port 7860"

# Veya doğrudan başlatırken
./webui.sh --api --listen --port 7860

--nowebui parametresi tamamen headless modda çalıştırmanızı sağlıyor. Sunucu ortamlarında bu çok işe yarıyor çünkü web arayüzü için gereksiz kaynak harcamıyorsunuz.

Basit Bir Batch Script Yazmak

İlk batch script’imizi yazalım. Bu script, bir prompt listesini okuyup her biri için görsel üretecek:

#!/bin/bash
# batch_generate.sh

PROMPTS_FILE="prompts.txt"
OUTPUT_DIR="/opt/sd-output/batch-$(date +%Y%m%d-%H%M%S)"
API_URL="http://localhost:7860"
STEPS=20
CFG_SCALE=7
WIDTH=512
HEIGHT=512

mkdir -p "$OUTPUT_DIR"

# Prompts dosyasını satır satır oku
while IFS= read -r prompt || [[ -n "$prompt" ]]; do
    # Boş satırları ve yorum satırlarını atla
    [[ -z "$prompt" || "$prompt" =~ ^# ]] && continue
    
    echo "Generating: $prompt"
    
    # API çağrısı yap
    response=$(curl -s -X POST "$API_URL/sdapi/v1/txt2img" 
        -H "Content-Type: application/json" 
        -d "{
            "prompt": "$prompt",
            "negative_prompt": "blurry, bad quality, distorted",
            "steps": $STEPS,
            "cfg_scale": $CFG_SCALE,
            "width": $WIDTH,
            "height": $HEIGHT
        }")
    
    # Base64 görsel verisini çıkar ve kaydet
    echo "$response" | python3 -c "
import sys, json, base64
data = json.load(sys.stdin)
img_data = base64.b64decode(data['images'][0])
filename = '${OUTPUT_DIR}/' + '$(echo "$prompt" | tr ' ' '_' | cut -c1-50)' + '.png'
with open(filename, 'wb') as f:
    f.write(img_data)
print(f'Saved: {filename}')
"
done < "$PROMPTS_FILE"

echo "Batch tamamlandi. Gorseller: $OUTPUT_DIR"

prompts.txt dosyası şöyle görünecek:

# E-ticaret urun gorselleri
professional product photo of red sneakers on white background
minimalist ceramic coffee mug, studio lighting, top view
elegant wooden desk organizer, natural lighting
vintage leather wallet, dark background, dramatic lighting

Python ile Daha Gelişmiş Batch Processing

Bash script’i basit durumlar için yeterli ama gerçek production senaryoları için Python çok daha güçlü. Aşağıdaki script asenkron istek gönderimi, hata yönetimi ve ilerleme takibi içeriyor:

#!/usr/bin/env python3
# advanced_batch.py

import asyncio
import aiohttp
import base64
import json
import os
import csv
from pathlib import Path
from datetime import datetime
from tqdm.asyncio import tqdm

class StableDiffusionBatch:
    def __init__(self, api_url="http://localhost:7860", concurrent_limit=3):
        self.api_url = api_url
        self.semaphore = asyncio.Semaphore(concurrent_limit)
        self.output_dir = Path(f"batch_output_{datetime.now().strftime('%Y%m%d_%H%M%S')}")
        self.output_dir.mkdir(exist_ok=True)
        self.results = []
        
    async def generate_single(self, session, job):
        """Tek bir gorsel uret"""
        async with self.semaphore:
            payload = {
                "prompt": job["prompt"],
                "negative_prompt": job.get("negative_prompt", "blurry, distorted, low quality"),
                "steps": int(job.get("steps", 20)),
                "cfg_scale": float(job.get("cfg_scale", 7)),
                "width": int(job.get("width", 512)),
                "height": int(job.get("height", 512)),
                "seed": int(job.get("seed", -1)),
                "batch_size": 1
            }
            
            try:
                async with session.post(
                    f"{self.api_url}/sdapi/v1/txt2img",
                    json=payload,
                    timeout=aiohttp.ClientTimeout(total=300)
                ) as response:
                    if response.status == 200:
                        data = await response.json()
                        img_data = base64.b64decode(data["images"][0])
                        
                        filename = self.output_dir / f"{job['id']}.png"
                        with open(filename, "wb") as f:
                            f.write(img_data)
                        
                        # Metadata kaydet
                        meta_file = self.output_dir / f"{job['id']}.json"
                        with open(meta_file, "w") as f:
                            json.dump({
                                "job_id": job["id"],
                                "prompt": job["prompt"],
                                "parameters": payload,
                                "info": data.get("info", "")
                            }, f, indent=2)
                        
                        return {"id": job["id"], "status": "success", "file": str(filename)}
                    else:
                        return {"id": job["id"], "status": "error", "code": response.status}
                        
            except Exception as e:
                return {"id": job["id"], "status": "error", "message": str(e)}
    
    async def run_batch(self, jobs):
        """Tum isleri paralel olarak calistir"""
        async with aiohttp.ClientSession() as session:
            tasks = [self.generate_single(session, job) for job in jobs]
            results = await tqdm.gather(*tasks, desc="Gorseller uretiliyor")
            
        self.results = results
        self.save_report()
        return results
    
    def save_report(self):
        """Sonuc raporu kaydet"""
        report_file = self.output_dir / "batch_report.json"
        success_count = sum(1 for r in self.results if r["status"] == "success")
        
        report = {
            "total": len(self.results),
            "success": success_count,
            "failed": len(self.results) - success_count,
            "results": self.results
        }
        
        with open(report_file, "w") as f:
            json.dump(report, f, indent=2)
        
        print(f"nRapor: {success_count}/{len(self.results)} basarili")

def load_jobs_from_csv(csv_file):
    """CSV dosyasindan is yukle"""
    jobs = []
    with open(csv_file, newline='', encoding='utf-8') as f:
        reader = csv.DictReader(f)
        for i, row in enumerate(reader):
            row["id"] = row.get("id", f"job_{i:04d}")
            jobs.append(row)
    return jobs

if __name__ == "__main__":
    jobs = load_jobs_from_csv("batch_jobs.csv")
    batch = StableDiffusionBatch(concurrent_limit=2)
    asyncio.run(batch.run_batch(jobs))

CSV dosyası formatı şu şekilde olmalı:

# batch_jobs.csv ornegi olustur
cat > batch_jobs.csv << 'EOF'
id,prompt,negative_prompt,steps,cfg_scale,width,height,seed
product_001,"red leather handbag, white background, product photography","blurry,distorted",25,7.5,512,512,-1
product_002,"blue denim jacket, studio lighting","low quality",20,7,512,768,-1
portrait_001,"professional headshot, business attire, neutral background","amateur",30,8,512,512,42
EOF

Model Değiştirme ve Lora Yönetimi

Farklı görev tipleri için farklı modeller kullanmanız gerekebilir. Batch işlemi sırasında model değiştirme:

#!/usr/bin/env python3
# model_switcher.py

import requests
import time

API_URL = "http://localhost:7860"

def get_available_models():
    """Yuklu modelleri listele"""
    response = requests.get(f"{API_URL}/sdapi/v1/sd-models")
    return response.json()

def switch_model(model_title):
    """Modeli degistir ve yuklenmeyi bekle"""
    print(f"Model degistiriliyor: {model_title}")
    
    payload = {"sd_model_checkpoint": model_title}
    response = requests.post(
        f"{API_URL}/sdapi/v1/options",
        json=payload
    )
    
    if response.status_code == 200:
        # Model yuklenmesini bekle
        time.sleep(5)
        print(f"Model yuklendi: {model_title}")
        return True
    return False

def run_batch_with_model_groups(job_groups):
    """
    Her grup farkli model kullanarak calistir
    job_groups: [{"model": "model_title", "jobs": [...]}, ...]
    """
    for group in job_groups:
        model = group["model"]
        jobs = group["jobs"]
        
        if not switch_model(model):
            print(f"HATA: {model} yuklenemedi, atlaniyor")
            continue
        
        print(f"{len(jobs)} is {model} ile calistirilacak")
        # Normal batch islemi burada

# Ornek kullanim
job_groups = [
    {
        "model": "v1-5-pruned-emaonly.safetensors",
        "jobs": [
            {"id": "general_001", "prompt": "landscape photography"},
            {"id": "general_002", "prompt": "portrait of a person"}
        ]
    },
    {
        "model": "dreamshaper_8.safetensors", 
        "jobs": [
            {"id": "art_001", "prompt": "fantasy character art"},
            {"id": "art_002", "prompt": "digital painting, sci-fi scene"}
        ]
    }
]

run_batch_with_model_groups(job_groups)

Cron Job ile Otomatik Zamanlama

Gece yarısı GPU’nun boş olduğu saatlerde batch işlemleri otomatik başlatmak için cron kullanabilirsiniz:

# crontab -e ile ekleyin
# Her gece 02:00'de batch islemi calistir
0 2 * * * /usr/bin/bash /opt/sd-scripts/nightly_batch.sh >> /var/log/sd-batch.log 2>&1

# Her Pazartesi sabah 06:00'da haftalik raporlama
0 6 * * 1 /usr/bin/python3 /opt/sd-scripts/weekly_report.py >> /var/log/sd-weekly.log 2>&1

nightly_batch.sh scripti şöyle görünebilir:

#!/bin/bash
# nightly_batch.sh

LOG_FILE="/var/log/sd-batch.log"
LOCK_FILE="/tmp/sd-batch.lock"
SCRIPT_DIR="/opt/sd-scripts"
INPUT_DIR="/opt/sd-jobs/pending"
DONE_DIR="/opt/sd-jobs/completed"
FAILED_DIR="/opt/sd-jobs/failed"

# Zaten calisiyorsa cik
if [ -f "$LOCK_FILE" ]; then
    echo "$(date): Baska bir batch islemi zaten calisiyor, cikiliyor" >> "$LOG_FILE"
    exit 1
fi

# Lock olustur
touch "$LOCK_FILE"
trap "rm -f $LOCK_FILE" EXIT

echo "$(date): Nightly batch basliyor" >> "$LOG_FILE"

# SD Web UI'in ayakta olup olmadigini kontrol et
if ! curl -s "http://localhost:7860/sdapi/v1/progress" > /dev/null 2>&1; then
    echo "$(date): SD Web UI calismıyor, baslatiliyor..." >> "$LOG_FILE"
    cd /opt/stable-diffusion-webui
    ./webui.sh --api --nowebui --listen &
    sleep 30
fi

# Bekleyen her job dosyasini isle
for job_file in "$INPUT_DIR"/*.csv; do
    [ -f "$job_file" ] || continue
    
    job_name=$(basename "$job_file" .csv)
    echo "$(date): Processing: $job_name" >> "$LOG_FILE"
    
    if python3 "$SCRIPT_DIR/advanced_batch.py" --input "$job_file"; then
        mv "$job_file" "$DONE_DIR/"
        echo "$(date): Tamamlandi: $job_name" >> "$LOG_FILE"
    else
        mv "$job_file" "$FAILED_DIR/"
        echo "$(date): HATA: $job_name basarisiz" >> "$LOG_FILE"
    fi
done

echo "$(date): Nightly batch tamamlandi" >> "$LOG_FILE"

GPU Bellek Yönetimi ve Optimizasyon

Batch işlemlerinde en büyük sorunlardan biri GPU bellek sızıntıları ve out-of-memory hataları. Bunları önlemek için:

#!/usr/bin/env python3
# gpu_monitor.py

import subprocess
import time
import requests

def get_gpu_memory_usage():
    """NVIDIA GPU bellek kullanimi"""
    try:
        result = subprocess.run(
            ["nvidia-smi", "--query-gpu=memory.used,memory.free,memory.total",
             "--format=csv,noheader,nounits"],
            capture_output=True, text=True
        )
        used, free, total = map(int, result.stdout.strip().split(", "))
        return {"used": used, "free": free, "total": total, "percent": (used/total)*100}
    except Exception:
        return None

def wait_for_gpu_cooldown(threshold_percent=85, check_interval=10):
    """GPU kullanimi threshold altina inene kadar bekle"""
    while True:
        gpu_info = get_gpu_memory_usage()
        if gpu_info is None:
            break
            
        if gpu_info["percent"] < threshold_percent:
            print(f"GPU hazir: {gpu_info['percent']:.1f}% kullanımda")
            break
            
        print(f"GPU mesgul ({gpu_info['percent']:.1f}%), bekleniyor...")
        time.sleep(check_interval)

def unload_model():
    """GPU bellegini temizle"""
    requests.post("http://localhost:7860/sdapi/v1/unload-checkpoint")
    time.sleep(2)
    
def reload_model(model_title):
    """Modeli yeniden yukle"""
    requests.post(
        "http://localhost:7860/sdapi/v1/reload-checkpoint",
        json={"sd_model_checkpoint": model_title}
    )
    time.sleep(10)

def smart_batch_runner(jobs, batch_size=10):
    """Bellek durumuna gore akilli batch calistirici"""
    for i in range(0, len(jobs), batch_size):
        chunk = jobs[i:i+batch_size]
        print(f"Chunk {i//batch_size + 1}: {len(chunk)} is")
        
        # Her chunk oncesinde GPU durumunu kontrol et
        wait_for_gpu_cooldown(threshold_percent=80)
        
        # Chunk'i isle
        process_chunk(chunk)
        
        # Her N chunk'dan sonra bellek temizle
        if (i // batch_size + 1) % 5 == 0:
            print("Periyodik bellek temizligi...")
            unload_model()
            time.sleep(5)
            reload_model("v1-5-pruned-emaonly.safetensors")

def process_chunk(jobs):
    """Bir job grubunu isle - asıl batch logic buraya gelir"""
    pass

Sonuc

Batch processing, Stable Diffusion’ı gerçek anlamda production-ready bir araca dönüştüren şey. Manuel tek tek görsel üretmekten, saatte yüzlerce görsel üretebilen otomatik bir sisteme geçmek, doğru altyapıyı kurduğunuzda oldukça erişilebilir.

Özetlemek gerekirse şu noktaları aklınızda tutun:

  • API modunu mutlaka etkinleştirin: --api parametresi olmadan dışarıdan erişim mümkün değil
  • Concurrent limit’i iyi ayarlayın: GPU kapasitenizie göre 1-3 arası genellikle optimal
  • Lock dosyaları kullanın: Cron job’larınızın üst üste binmesini engelleyin
  • GPU belleğini izleyin: Uzun süreli batch işlemlerinde bellek sızıntıları kaçınılmaz, periyodik temizlik şart
  • Hata loglamayı ihmal etmeyin: Gece çalışan bir job’ın nerede patladığını sabah anlamak için detaylı log şart
  • Job dosyalarını kategorize edin: Pending, completed, failed klasörleri iş akışınızı çok kolaylaştırır

Bu yapıyı bir kere kurduğunuzda, görsel üretim iş akışlarınız temelden değişecek. Gece uyurken GPU’nuzun sizin için çalışması oldukça tatmin edici bir his. Sorularınız olursa yorumlarda buluşalım.

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir