AWS Lambda ile S3 Olay İşleme: Serverless Dosya İşleme Rehberi

S3’e bir dosya yüklediğinde Lambda’nın tetiklenmesi, serverless dünyanın en güzel “sihir anlarından” biri. Dosya geldi, işlendi, sonuç başka bir yere gitti, sen hiç sunucu ayağa kaldırmadın. Kulağa basit geliyor ama production ortamında bu akışı sağlıklı kurgulamak ciddi bir bilgi birikimi gerektiriyor. Bu yazıda S3 olay işleme mimarisini baştan sona, gerçek dünya senaryolarıyla ele alacağız.

S3 Olay Modelini Anlamak

S3, belirli olaylar gerçekleştiğinde dışarıya bildirim gönderebilir. Bu olaylar temelde üç kategoride toplanır: nesne oluşturma (s3:ObjectCreated:), nesne silme (s3:ObjectRemoved:) ve replikasyon olayları. Lambda entegrasyonunda en çok kullanılan, tabii ki nesne oluşturma olaylarıdır.

Bir dosya S3’e yüklendiğinde Lambda’ya gelen event objesi şuna benzer:

# Lambda'ya gelen ham event yapısını loglamak için
import json
import boto3

def lambda_handler(event, context):
    print(json.dumps(event, indent=2))
    
    for record in event['Records']:
        bucket = record['s3']['bucket']['name']
        key = record['s3']['object']['key']
        size = record['s3']['object']['size']
        event_name = record['eventName']
        
        print(f"Olay: {event_name}")
        print(f"Bucket: {bucket}")
        print(f"Dosya: {key}")
        print(f"Boyut: {size} bytes")

Burada dikkat etmen gereken kritik bir nokta var: Lambda bir seferde birden fazla S3 olayını alabilir. event['Records'] her zaman bir liste, ve bu listede birden fazla kayıt olabilir. Bunu görmezden gelip sadece event['Records'][0] ile çalışırsan, bazı olayları kaçırırsın. Üstelik bu hata production’da kendini nadiren gösterir, stres altında yüksek trafikte patlar.

IAM Rolleri ve Güvenlik Yapılandırması

Lambda’nın S3’ten okuyabilmesi ve sonuçları yazabilmesi için doğru IAM rolünü oluşturman şart. Bunu elle yapmak yerine Infrastructure as Code kullanmak çok daha sağlıklı.

# IAM politikası oluşturma - s3-lambda-policy.json
cat > s3-lambda-policy.json << 'EOF'
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:GetObjectVersion"
            ],
            "Resource": "arn:aws:s3:::kaynak-bucket-adi/*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:PutObjectAcl"
            ],
            "Resource": "arn:aws:s3:::hedef-bucket-adi/*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "arn:aws:logs:*:*:*"
        }
    ]
}
EOF

# Politikayı oluştur
aws iam create-policy 
    --policy-name S3LambdaProcessingPolicy 
    --policy-document file://s3-lambda-policy.json

# Lambda execution role oluştur
aws iam create-role 
    --role-name S3LambdaExecutionRole 
    --assume-role-policy-document '{
        "Version": "2012-10-17",
        "Statement": [{
            "Effect": "Allow",
            "Principal": {"Service": "lambda.amazonaws.com"},
            "Action": "sts:AssumeRole"
        }]
    }'

# Politikayı role ekle
aws iam attach-role-policy 
    --role-name S3LambdaExecutionRole 
    --policy-arn arn:aws:iam::HESAP_ID:policy/S3LambdaProcessingPolicy

En az yetki prensibi (Principle of Least Privilege) burada hayat kurtarır. Lambda’na tüm S3 bucket’larına erişim verme, sadece ihtiyaç duyduğu bucket ve prefix’lere izin ver. Resource: "arn:aws:s3:::*" şeklinde wildcard kullanmak güvenlik açığı değil felakettir.

Lambda Fonksiyonu Oluşturma ve S3 Tetikleyici Bağlama

# Lambda fonksiyonunu zip'le ve yükle
zip function.zip lambda_function.py

# Fonksiyonu oluştur
aws lambda create-function 
    --function-name s3-image-processor 
    --runtime python3.11 
    --role arn:aws:iam::HESAP_ID:role/S3LambdaExecutionRole 
    --handler lambda_function.lambda_handler 
    --zip-file fileb://function.zip 
    --timeout 60 
    --memory-size 512 
    --environment Variables='{
        "DESTINATION_BUCKET": "islenmis-dosyalar",
        "LOG_LEVEL": "INFO"
    }'

# S3'ün Lambda'yı çağırmasına izin ver
aws lambda add-permission 
    --function-name s3-image-processor 
    --statement-id s3-trigger-permission 
    --action lambda:InvokeFunction 
    --principal s3.amazonaws.com 
    --source-arn arn:aws:s3:::kaynak-bucket-adi 
    --source-account HESAP_ID

# S3 bucket notification'ı yapılandır
aws s3api put-bucket-notification-configuration 
    --bucket kaynak-bucket-adi 
    --notification-configuration '{
        "LambdaFunctionConfigurations": [{
            "LambdaFunctionArn": "arn:aws:lambda:eu-west-1:HESAP_ID:function:s3-image-processor",
            "Events": ["s3:ObjectCreated:*"],
            "Filter": {
                "Key": {
                    "FilterRules": [
                        {"Name": "prefix", "Value": "uploads/"},
                        {"Name": "suffix", "Value": ".jpg"}
                    ]
                }
            }
        }]
    }'

Buradaki Filter kısmı çok önemli. Eğer Lambda fonksiyonun aynı bucket’a dosya yazıyorsa ve filter koymadan her ObjectCreated olayını dinlersen, sonsuz döngüye girersin. Lambda çalışır, dosya yazar, dosya yazılınca Lambda tekrar tetiklenir, o da yeni dosya yazar… AWS faturan dakikalar içinde astronomik rakamlara ulaşabilir. Prefix ve suffix filtreleri bu felaketi önler.

Gerçek Dünya Senaryosu 1: Görsel İşleme Pipeline’ı

Kullanıcıların profil fotoğrafı yüklediği bir uygulama düşün. Fotoğraf S3’e geldiğinde otomatik olarak thumbnail oluşturman, boyutlandırman ve WebP formatına çevirmen gerekiyor.

# requirements.txt
# Pillow==10.1.0
# boto3==1.29.0

import boto3
import os
import io
import urllib.parse
from PIL import Image

s3_client = boto3.client('s3')
DESTINATION_BUCKET = os.environ['DESTINATION_BUCKET']
THUMBNAIL_SIZE = (150, 150)
MEDIUM_SIZE = (800, 600)

def process_image(bucket, key):
    # URL encoding'e dikkat et - boşluk içeren dosya adları sorun çıkarır
    decoded_key = urllib.parse.unquote_plus(key)
    
    # Orijinal görseli indir
    response = s3_client.get_object(Bucket=bucket, Key=decoded_key)
    image_data = response['Body'].read()
    
    img = Image.open(io.BytesIO(image_data))
    
    # EXIF verisiyle gelen rotasyon sorununu düzelt
    if hasattr(img, '_getexif') and img._getexif():
        exif = img._getexif()
        if exif and 274 in exif:  # Orientation tag
            orientation = exif[274]
            if orientation == 3:
                img = img.rotate(180, expand=True)
            elif orientation == 6:
                img = img.rotate(270, expand=True)
            elif orientation == 8:
                img = img.rotate(90, expand=True)
    
    # RGB'ye çevir (PNG'lerde RGBA olabilir)
    if img.mode in ('RGBA', 'P'):
        img = img.convert('RGB')
    
    base_name = os.path.splitext(decoded_key)[0]
    results = {}
    
    # Thumbnail oluştur
    thumb = img.copy()
    thumb.thumbnail(THUMBNAIL_SIZE, Image.LANCZOS)
    thumb_buffer = io.BytesIO()
    thumb.save(thumb_buffer, format='WEBP', quality=85)
    thumb_buffer.seek(0)
    
    thumb_key = f"thumbnails/{base_name}.webp"
    s3_client.put_object(
        Bucket=DESTINATION_BUCKET,
        Key=thumb_key,
        Body=thumb_buffer,
        ContentType='image/webp',
        CacheControl='max-age=31536000'
    )
    results['thumbnail'] = thumb_key
    
    # Medium boyut oluştur
    medium = img.copy()
    medium.thumbnail(MEDIUM_SIZE, Image.LANCZOS)
    medium_buffer = io.BytesIO()
    medium.save(medium_buffer, format='WEBP', quality=85)
    medium_buffer.seek(0)
    
    medium_key = f"medium/{base_name}.webp"
    s3_client.put_object(
        Bucket=DESTINATION_BUCKET,
        Key=medium_key,
        Body=medium_buffer,
        ContentType='image/webp'
    )
    results['medium'] = medium_key
    
    return results

def lambda_handler(event, context):
    processed = []
    errors = []
    
    for record in event['Records']:
        bucket = record['s3']['bucket']['name']
        key = record['s3']['object']['key']
        
        try:
            result = process_image(bucket, key)
            processed.append({'key': key, 'outputs': result})
            print(f"Basariyla islendi: {key}")
        except Exception as e:
            print(f"Hata - {key}: {str(e)}")
            errors.append({'key': key, 'error': str(e)})
    
    return {
        'processed': len(processed),
        'errors': len(errors),
        'details': processed
    }

Bu kodda urllib.parse.unquote_plus(key) kullanımına dikkat et. S3 olay bildirimlerinde dosya adındaki boşluklar + işaretiyle, özel karakterler ise URL encode edilmiş halde gelir. Bunu decode etmezsen "Profil Fotografi.jpg" gibi dosya adlarında NoSuchKey hatası alırsın.

Gerçek Dünya Senaryosu 2: CSV Dosyası İşleme ve DynamoDB’ye Yazma

E-ticaret sistemlerinde stok güncellemesi için CSV dosyaları hala yaygın kullanılıyor. S3’e gelen CSV’yi parse edip DynamoDB’ye yazmak klasik bir senaryo.

import boto3
import csv
import io
import os
import urllib.parse
from datetime import datetime

s3_client = boto3.client('s3')
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(os.environ['DYNAMODB_TABLE'])

def process_csv(bucket, key):
    decoded_key = urllib.parse.unquote_plus(key)
    
    response = s3_client.get_object(Bucket=bucket, Key=decoded_key)
    content = response['Body'].read().decode('utf-8-sig')  # BOM karakterini temizle
    
    reader = csv.DictReader(io.StringIO(content))
    
    # Batch write için liste hazırla (DynamoDB batch max 25 item)
    items = []
    batch_count = 0
    
    for row in reader:
        item = {
            'product_id': row['urun_id'],
            'stock': int(row['stok']),
            'price': str(row['fiyat']),  # Decimal sorununu önlemek için str
            'updated_at': datetime.utcnow().isoformat(),
            'source_file': decoded_key
        }
        items.append(item)
        
        # 25'lik gruplar halinde yaz
        if len(items) == 25:
            batch_write(items)
            batch_count += 1
            items = []
    
    # Kalan itemları yaz
    if items:
        batch_write(items)
        batch_count += 1
    
    return batch_count

def batch_write(items):
    with table.batch_writer() as batch:
        for item in items:
            batch.put_item(Item=item)

def lambda_handler(event, context):
    for record in event['Records']:
        bucket = record['s3']['bucket']['name']
        key = record['s3']['object']['key']
        size = record['s3']['object']['size']
        
        # Büyük dosyalar için uyarı
        if size > 10 * 1024 * 1024:  # 10MB
            print(f"UYARI: Büyük dosya ({size} bytes) - {key}")
        
        batch_count = process_csv(bucket, key)
        print(f"Tamamlandi: {key} - {batch_count} batch yazildi")

utf-8-sig encoding kullanımı Windows’ta oluşturulan CSV’lerde BOM (Byte Order Mark) karakterini temizler. Excel’den export edilen CSV’lerin başında bu görünmez karakter bulunur ve sütun adlarınızın başına ufeff eklenmesine yol açar. Bunu görene kadar saatlerce debug yapabilirsin.

Hata Yönetimi ve Dead Letter Queue

Lambda hata aldığında S3 tetikleyicisi varsayılan olarak 3 kez tekrar dener. Bu yeniden denemeler arasında birkaç dakika geçer. Sorun şu ki bazı hatalar kalıcıdır; bozuk bir dosya, yanlış format, eksik alan. Bu dosyaları 3 kez deneyen Lambda hem kaynak israf eder hem de diğer mesajları geciktirir.

# DLQ (Dead Letter Queue) için SQS kuyruğu oluştur
aws sqs create-queue 
    --queue-name lambda-s3-dlq 
    --attributes '{
        "MessageRetentionPeriod": "1209600",
        "VisibilityTimeout": "300"
    }'

# Lambda'ya DLQ ekle
aws lambda update-function-configuration 
    --function-name s3-image-processor 
    --dead-letter-config TargetArn=arn:aws:sqs:eu-west-1:HESAP_ID:lambda-s3-dlq

# Lambda'ya SQS'e yazma izni ver
aws iam attach-role-policy 
    --role-name S3LambdaExecutionRole 
    --policy-arn arn:aws:iam::aws:policy/AmazonSQSFullAccess

DLQ’ya düşen mesajları izlemek için CloudWatch alarm kur:

# DLQ'da mesaj birikince alarm tetikle
aws cloudwatch put-metric-alarm 
    --alarm-name "Lambda-S3-DLQ-Alert" 
    --alarm-description "S3 Lambda DLQ mesaj bekleniyor" 
    --metric-name ApproximateNumberOfMessagesVisible 
    --namespace AWS/SQS 
    --statistic Maximum 
    --period 300 
    --threshold 1 
    --comparison-operator GreaterThanOrEqualToThreshold 
    --dimensions Name=QueueName,Value=lambda-s3-dlq 
    --evaluation-periods 1 
    --alarm-actions arn:aws:sns:eu-west-1:HESAP_ID:ops-alerts

Performans Optimizasyonu ve Concurrency Yönetimi

Ani bir trafik artışında yüzlerce dosya aynı anda S3’e yüklenirse, Lambda aynı anda yüzlerce instance başlatır. Bu Lambda throttling veya downstream sistemlerde (DynamoDB, RDS gibi) baskıya yol açabilir.

# Fonksiyon için reserved concurrency limiti koy
aws lambda put-function-concurrency 
    --function-name s3-image-processor 
    --reserved-concurrent-executions 50

# Provisioned concurrency - soğuk başlatma sorununu çözmek için
aws lambda put-provisioned-concurrency-config 
    --function-name s3-image-processor 
    --qualifier production 
    --provisioned-concurrent-executions 5

# Lambda layer ile Pillow kütüphanesini optimize et
# Layer oluşturma (deployment package boyutunu küçültür)
mkdir -p python/lib/python3.11/site-packages
pip install Pillow -t python/lib/python3.11/site-packages/
zip -r pillow-layer.zip python/

aws lambda publish-layer-version 
    --layer-name pillow-layer 
    --description "Pillow image processing library" 
    --zip-file fileb://pillow-layer.zip 
    --compatible-runtimes python3.11

# Fonksiyona layer ekle
aws lambda update-function-configuration 
    --function-name s3-image-processor 
    --layers arn:aws:lambda:eu-west-1:HESAP_ID:layer:pillow-layer:1

Connection reuse için boto3 client’ını fonksiyon handler’ının dışında, global scope’ta tanımlamak önemli. Lambda container’ı warm olduğunda global değişkenler korunur ve her invocation’da yeniden bağlantı kurulmaz. Bu küçük optimizasyon yüksek frekanslı senaryolarda gecikmeyi belirgin şekilde azaltır.

Monitoring ve Observability

Lambda’nın ne yaptığını görmeden yönetmek kör uçuştur. CloudWatch Insights ile hızlı sorgular kurabilirsin:

# CloudWatch Logs Insights sorgusu
# Son 1 saatteki hataları bul
aws logs start-query 
    --log-group-name /aws/lambda/s3-image-processor 
    --start-time $(date -d '1 hour ago' +%s) 
    --end-time $(date +%s) 
    --query-string '
fields @timestamp, @message
| filter @message like /ERROR/
| stats count(*) as error_count by bin(5m)
| sort @timestamp desc
| limit 100
'

# Ortalama işlem süresini hesapla
aws logs start-query 
    --log-group-name /aws/lambda/s3-image-processor 
    --start-time $(date -d '24 hours ago' +%s) 
    --end-time $(date +%s) 
    --query-string '
filter @type = "REPORT"
| stats avg(@duration) as avg_duration,
        max(@duration) as max_duration,
        avg(@maxMemoryUsed) as avg_memory
by bin(1h)
'

Lambda’nın ne kadar memory kullandığını izlemek de kritik. Eğer fonksiyonun 512MB ayarladığında sürekli 480-490MB kullanıyorsa, 1024MB’a çıkmak hem çökmemi önler hem de CPU oranını artırarak (Lambda’da memory ve CPU bağlantılıdır) işlemi hızlandırabilir.

S3 Event Notification ile SQS Entegrasyonu

Yüksek hacimli senaryolarda S3’ü doğrudan Lambda’ya bağlamak yerine araya SQS koymak daha sağlam bir mimari kurar. S3 olayı SQS’e düşer, Lambda SQS’i okur. Bu sayede:

  • Mesaj kaybı riski azalır
  • Lambda’nın concurrency’si kontrol altında kalır
  • Hata durumunda retry mekanizması SQS tarafından yönetilir
# SQS kuyruğu oluştur
aws sqs create-queue 
    --queue-name s3-events-queue 
    --attributes '{
        "VisibilityTimeout": "120",
        "MessageRetentionPeriod": "86400",
        "ReceiveMessageWaitTimeSeconds": "20"
    }'

# S3'ün SQS'e yazması için queue policy ekle
aws sqs set-queue-attributes 
    --queue-url https://sqs.eu-west-1.amazonaws.com/HESAP_ID/s3-events-queue 
    --attributes '{
        "Policy": "{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Principal":{"Service":"s3.amazonaws.com"},"Action":"SQS:SendMessage","Resource":"arn:aws:sqs:eu-west-1:HESAP_ID:s3-events-queue","Condition":{"ArnLike":{"aws:SourceArn":"arn:aws:s3:::kaynak-bucket-adi"}}}]}"
    }'

# S3 notification'ı SQS'e yönlendir
aws s3api put-bucket-notification-configuration 
    --bucket kaynak-bucket-adi 
    --notification-configuration '{
        "QueueConfigurations": [{
            "QueueArn": "arn:aws:sqs:eu-west-1:HESAP_ID:s3-events-queue",
            "Events": ["s3:ObjectCreated:*"],
            "Filter": {
                "Key": {
                    "FilterRules": [
                        {"Name": "prefix", "Value": "incoming/"}
                    ]
                }
            }
        }]
    }'

# Lambda'yı SQS trigger ile bağla
aws lambda create-event-source-mapping 
    --function-name s3-image-processor 
    --event-source-arn arn:aws:sqs:eu-west-1:HESAP_ID:s3-events-queue 
    --batch-size 10 
    --maximum-batching-window-in-seconds 30

maximum-batching-window-in-seconds 30 parametresi Lambda’nın 30 saniye bekleyip biriken mesajları toplu işlemesini sağlar. Az sayıda büyük batch, çok sayıda küçük invocation’dan genellikle daha verimlidir.

Yaygın Hatalar ve Çözümleri

Sonsuz döngü problemi: Lambda çıktısını aynı bucket’a yazıyorsa mutlaka farklı prefix kullan ve S3 notification’a suffix ya da prefix filtresi koy. Kaynak prefix uploads/, hedef prefix processed/ olsun, notification sadece uploads/ prefix’ini dinlesin.

Timeout hataları: Lambda varsayılan timeout’u 3 saniyedir. Büyük dosyalar için bu kesinlikle yetmez. İşlem süresini CloudWatch Logs’tan analiz et, timeout değerini gerçekçi bir upper bound’a ayarla. S3’ten büyük dosya okuma için streaming kullan, tüm dosyayı memory’ye çekme.

Memory yetersizliği: Görsel işleme veya büyük veri dosyaları için 128MB kesinlikle yetersiz. Başlangıçta 512MB ver, CloudWatch’tan gerçek kullanımı izle, sonra optimize et.

URL encoding sorunları: Türkçe karakter içeren dosya adlarında key her zaman URL decode edilmeli. urllib.parse.unquote_plus(key) her S3 Lambda fonksiyonunun olmazmazı.

Cold start gecikmesi: Düşük frekanslı ama latency-sensitive fonksiyonlarda provisioned concurrency kullan. Görsel işleme pipeline’ları gibi yüksek frekanslı fonksiyonlarda soğuk başlatma genellikle büyük sorun olmaz çünkü container’lar warm kalır.

Sonuç

AWS Lambda ile S3 olay işleme, doğru kurgulandığında gerçekten güçlü ve bakım maliyeti düşük bir mimari sunar. Temel prensipleri özetlemek gerekirse: her zaman Records listesini döngüyle işle, filter kullanarak sonsuz döngüden kaçın, URL decode etmeyi unutma, DLQ kur, concurrency limitini belirle ve CloudWatch’u aktif kullan.

Basit görünen bu servis entegrasyonu, production’da edge case’lerle dolup taşıyor. Özellikle yüksek hacimli senaryolarda S3 ile Lambda arasına SQS koymak mimarini çok daha dayanıklı yapar. Başlangıçta biraz fazla efor gibi görünse de, gece 2’de sonsuz döngü yüzünden patlayan hesap faturasıyla uğraşmaktan çok daha iyi.

Bir sonraki adım olarak bu Lambda fonksiyonlarını AWS SAM veya Terraform ile Infrastructure as Code haline getirmeni öneririm. Elle kurduğun her şeyin bir gün kaybolacağını, kodla kurduğun her şeyin ise yeniden oluşturulabileceğini unutma.

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir