AWS Lambda ile SQS Mesaj İşleme: Serverless Kuyruk Yönetimi
Mesaj kuyruğu sistemleri, modern dağıtık uygulamaların bel kemiğini oluşturuyor. Özellikle yüksek trafikli sistemlerde, bileşenler arasındaki iletişimi asenkron hale getirmek hem ölçeklenebilirlik hem de güvenilirlik açısından kritik önem taşıyor. AWS Lambda ile SQS’i birleştirdiğinizde ise sunucu yönetmeden, otomatik ölçeklenen ve maliyet etkin bir mesaj işleme pipeline’ı elde ediyorsunuz. Bu yazıda, gerçek dünya senaryoları üzerinden Lambda-SQS entegrasyonunu derinlemesine inceleyeceğiz.
SQS ve Lambda Entegrasyonuna Neden İhtiyaç Duyarız?
Bir e-ticaret platformu düşünün. Kullanıcı sipariş verdiğinde şu işlemlerin gerçekleşmesi gerekiyor: stok güncelleme, ödeme işleme, e-posta bildirimi, kargo sistemi entegrasyonu ve müşteri puanı hesaplama. Bu işlemleri senkron yapmak hem kullanıcıyı bekletir hem de bir adımın başarısız olması tüm süreci bozar. SQS kuyruğuna mesaj atıp Lambda ile asenkron işlemek bu sorunu elegance bir şekilde çözer.
Temel avantajlar şunlardır:
- Decoupling: Üretici ve tüketici birbirinden bağımsız çalışır
- Hata yönetimi: Başarısız mesajlar Dead Letter Queue’ya (DLQ) yönlendirilir
- Otomatik ölçekleme: Lambda, kuyruk derinliğine göre otomatik scale eder
- Maliyet avantajı: Sadece işlenen mesaj başına ödeme yaparsınız
- Retry mekanizması: Başarısız işlemler otomatik yeniden denenir
Temel Kavramlar
SQS Kuyruk Tipleri
Standard Queue: Mesaj sırası garanti edilmez, en az bir kez teslim garantisi vardır. Yüksek throughput gerektiren senaryolar için idealdir. Saniyede neredeyse sınırsız işlem kapasitesi sunar.
FIFO Queue: First-In-First-Out garantisi verir. Mesajlar tam olarak bir kez işlenir. Saniyede 300 mesaj (batch ile 3000 mesaj) limitine tabidir. Finansal işlemler veya sıra önemli olan senaryolar için kullanılır.
Önemli SQS Parametreleri
- Visibility Timeout: Bir mesajın alındıktan sonra kuyruğa geri dönmeden önce beklediği süre. Lambda fonksiyonunuzun timeout değerinin en az 6 katı olmalıdır
- Message Retention Period: Mesajların kuyrukta bekleyebileceği maksimum süre (1 dakika ile 14 gün arası)
- Receive Message Wait Time: Long polling için bekleme süresi (0-20 saniye). 20 saniye ayarlamak boş polling maliyetini azaltır
- Max Receive Count: DLQ’ya gönderilmeden önce kaç kez deneneceği
- Batch Size: Lambda’nın tek seferde alacağı maksimum mesaj sayısı (1-10000 arası)
İlk Kurulum: Terraform ile Altyapı Oluşturma
Sysadmin olarak her şeyi elle yapmak yerine Infrastructure as Code kullanmayı tercih ediyorum. İşte temel bir Lambda-SQS kurulumu için Terraform konfigürasyonu:
# main.tf - Temel SQS ve Lambda altyapısı
# DLQ oluşturma
resource "aws_sqs_queue" "order_dlq" {
name = "order-processing-dlq"
message_retention_seconds = 1209600 # 14 gün
tags = {
Environment = "production"
Service = "order-processing"
}
}
# Ana kuyruk oluşturma
resource "aws_sqs_queue" "order_queue" {
name = "order-processing-queue"
visibility_timeout_seconds = 300 # 5 dakika
message_retention_seconds = 86400 # 1 gün
receive_wait_time_seconds = 20 # Long polling
redrive_policy = jsonencode({
deadLetterTargetArn = aws_sqs_queue.order_dlq.arn
maxReceiveCount = 3
})
tags = {
Environment = "production"
Service = "order-processing"
}
}
# Lambda event source mapping
resource "aws_lambda_event_source_mapping" "sqs_trigger" {
event_source_arn = aws_sqs_queue.order_queue.arn
function_name = aws_lambda_function.order_processor.arn
batch_size = 10
function_response_types = ["ReportBatchItemFailures"]
}
Lambda Fonksiyonu Yazma
Temel Mesaj İşleyici
Python ile yazılmış temel bir sipariş işleme Lambda fonksiyonu:
# handler.py - Temel SQS mesaj işleyici
import json
import boto3
import logging
from typing import Dict, Any
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event: Dict[str, Any], context) -> Dict[str, Any]:
"""
SQS tetiklemeli Lambda handler.
Batch item failure pattern'i kullanır.
"""
batch_item_failures = []
successful_message_ids = []
logger.info(f"Toplam {len(event['Records'])} mesaj alındı")
for record in event['Records']:
message_id = record['messageId']
try:
# SQS mesaj body'sini parse et
body = json.loads(record['body'])
logger.info(f"İşleniyor - MessageId: {message_id}, OrderId: {body.get('order_id')}")
# Asıl işlem mantığı
process_order(body)
successful_message_ids.append(message_id)
logger.info(f"Başarıyla işlendi - MessageId: {message_id}")
except json.JSONDecodeError as e:
logger.error(f"JSON parse hatası - MessageId: {message_id}, Hata: {str(e)}")
# Geçersiz JSON mesajı DLQ'ya gönder
batch_item_failures.append({"itemIdentifier": message_id})
except Exception as e:
logger.error(f"İşleme hatası - MessageId: {message_id}, Hata: {str(e)}")
# Başarısız mesajı retry için işaretle
batch_item_failures.append({"itemIdentifier": message_id})
logger.info(f"Başarılı: {len(successful_message_ids)}, Başarısız: {len(batch_item_failures)}")
# Batch item failures döndür (ReportBatchItemFailures pattern)
return {"batchItemFailures": batch_item_failures}
def process_order(order_data: Dict[str, Any]) -> None:
"""Sipariş işleme iş mantığı"""
order_id = order_data.get('order_id')
customer_id = order_data.get('customer_id')
items = order_data.get('items', [])
if not order_id or not customer_id:
raise ValueError(f"Eksik zorunlu alan - order_id veya customer_id yok")
# Stok kontrolü
check_inventory(items)
# Ödeme işleme
process_payment(order_data)
# Bildirim gönder
send_notification(customer_id, order_id)
def check_inventory(items: list) -> None:
"""Stok kontrolü - gerçek uygulamada DB sorgusu yapılır"""
for item in items:
if item.get('quantity', 0) <= 0:
raise ValueError(f"Geçersiz miktar: {item}")
logger.info(f"Stok kontrolü tamamlandı - {len(items)} ürün")
def process_payment(order_data: Dict) -> None:
"""Ödeme işleme simülasyonu"""
amount = order_data.get('total_amount', 0)
if amount <= 0:
raise ValueError(f"Geçersiz ödeme tutarı: {amount}")
logger.info(f"Ödeme işlendi - Tutar: {amount} TL")
def send_notification(customer_id: str, order_id: str) -> None:
"""SNS ile bildirim gönderme"""
sns = boto3.client('sns')
# SNS topic ARN environment variable'dan alınır
logger.info(f"Bildirim gönderildi - Customer: {customer_id}, Order: {order_id}")
Batch Item Failures Pattern
Bu pattern çok önemli. Lambda’ya 10 mesaj geldiğinde 9 tanesi başarılı, 1 tanesi başarısız olursa, sadece başarısız olanı retry etmek istiyorsunuz. Eski yöntemde tüm batch başarısız sayılırdı ve 9 başarılı mesaj tekrar işlenirdi. ReportBatchItemFailures bunu çözüyor.
AWS CLI ile Kuyruk Yönetimi
Günlük operasyonlarda sık kullandığım CLI komutları:
#!/bin/bash
# sqs-operations.sh - Günlük SQS yönetim komutları
# Kuyruk durumunu kontrol et
echo "=== Kuyruk Metrikleri ==="
aws sqs get-queue-attributes
--queue-url "https://sqs.eu-west-1.amazonaws.com/123456789/order-processing-queue"
--attribute-names ApproximateNumberOfMessages
ApproximateNumberOfMessagesNotVisible
ApproximateNumberOfMessagesDelayed
--query 'Attributes'
--output json
# DLQ'daki mesaj sayısını kontrol et
echo "=== DLQ Mesaj Sayısı ==="
aws sqs get-queue-attributes
--queue-url "https://sqs.eu-west-1.amazonaws.com/123456789/order-processing-dlq"
--attribute-names ApproximateNumberOfMessages
--query 'Attributes.ApproximateNumberOfMessages'
--output text
# Test mesajı gönder
echo "=== Test Mesajı Gönderme ==="
aws sqs send-message
--queue-url "https://sqs.eu-west-1.amazonaws.com/123456789/order-processing-queue"
--message-body '{
"order_id": "ORD-2024-001",
"customer_id": "CUST-456",
"items": [{"product_id": "PROD-789", "quantity": 2}],
"total_amount": 299.99
}'
--message-attributes '{
"source": {
"DataType": "String",
"StringValue": "web-frontend"
}
}'
# Batch mesaj gönder (load test için)
echo "=== Batch Mesaj Gönderme ==="
aws sqs send-message-batch
--queue-url "https://sqs.eu-west-1.amazonaws.com/123456789/order-processing-queue"
--entries file://test-messages.json
Dead Letter Queue Yönetimi ve Mesaj Kurtarma
Prodüksiyon ortamında DLQ yönetimi kritik bir operasyonel görevdir. İşte DLQ’daki mesajları ana kuyruğa geri taşıyan bir script:
#!/usr/bin/env python3
# dlq-redrive.py - DLQ mesajlarını ana kuyruğa geri taşı
import boto3
import json
import time
import argparse
from typing import Optional
def redrive_messages(
dlq_url: str,
target_queue_url: str,
max_messages: Optional[int] = None,
dry_run: bool = False
) -> None:
"""
DLQ'daki mesajları hedef kuyruğa yeniden gönderir.
Args:
dlq_url: Dead Letter Queue URL'i
target_queue_url: Hedef kuyruk URL'i
max_messages: İşlenecek maksimum mesaj sayısı (None = hepsi)
dry_run: True ise mesajları silmez
"""
sqs = boto3.client('sqs', region_name='eu-west-1')
total_processed = 0
total_failed = 0
print(f"DLQ Redrive başlıyor - {'DRY RUN' if dry_run else 'CANLI'}")
print(f"Kaynak: {dlq_url}")
print(f"Hedef: {target_queue_url}")
print("-" * 50)
while True:
if max_messages and total_processed >= max_messages:
print(f"Maksimum mesaj limitine ulaşıldı: {max_messages}")
break
# DLQ'dan mesaj al
response = sqs.receive_message(
QueueUrl=dlq_url,
MaxNumberOfMessages=10,
MessageAttributeNames=['All'],
WaitTimeSeconds=5
)
messages = response.get('Messages', [])
if not messages:
print("DLQ boş, işlem tamamlandı.")
break
for message in messages:
try:
message_body = message['Body']
# Mesajı parse et ve logla
try:
body_json = json.loads(message_body)
order_id = body_json.get('order_id', 'N/A')
print(f"İşleniyor - OrderId: {order_id}, MessageId: {message['MessageId']}")
except json.JSONDecodeError:
print(f"Ham mesaj işleniyor - MessageId: {message['MessageId']}")
if not dry_run:
# Hedef kuyruğa gönder
sqs.send_message(
QueueUrl=target_queue_url,
MessageBody=message_body,
MessageAttributes=message.get('MessageAttributes', {})
)
# DLQ'dan sil
sqs.delete_message(
QueueUrl=dlq_url,
ReceiptHandle=message['ReceiptHandle']
)
total_processed += 1
except Exception as e:
print(f"HATA - MessageId: {message['MessageId']}, {str(e)}")
total_failed += 1
# Rate limiting
time.sleep(0.5)
print("-" * 50)
print(f"Tamamlandı - İşlenen: {total_processed}, Başarısız: {total_failed}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='DLQ Redrive Tool')
parser.add_argument('--dlq-url', required=True)
parser.add_argument('--target-url', required=True)
parser.add_argument('--max-messages', type=int, default=None)
parser.add_argument('--dry-run', action='store_true')
args = parser.parse_args()
redrive_messages(
dlq_url=args.dlq_url,
target_queue_url=args.target_url,
max_messages=args.max_messages,
dry_run=args.dry_run
)
Monitoring ve Alerting
CloudWatch ile Lambda-SQS sistemini izlemek için kritik metrikler:
#!/bin/bash
# setup-monitoring.sh - CloudWatch alarm kurulumu
QUEUE_NAME="order-processing-queue"
DLQ_NAME="order-processing-dlq"
LAMBDA_NAME="order-processor"
ALARM_SNS_ARN="arn:aws:sns:eu-west-1:123456789:ops-alerts"
# DLQ'da mesaj var mı alarmı (en kritik alarm!)
aws cloudwatch put-metric-alarm
--alarm-name "SQS-DLQ-Messages-Detected"
--alarm-description "DLQ'da mesaj tespit edildi - acil müdahale gerekli"
--metric-name ApproximateNumberOfMessagesVisible
--namespace AWS/SQS
--statistic Maximum
--period 60
--threshold 1
--comparison-operator GreaterThanOrEqualToThreshold
--evaluation-periods 1
--dimensions Name=QueueName,Value=$DLQ_NAME
--alarm-actions $ALARM_SNS_ARN
--treat-missing-data notBreaching
# Kuyruk derinliği alarmı (işleme gecikmesi)
aws cloudwatch put-metric-alarm
--alarm-name "SQS-Queue-Depth-High"
--alarm-description "Kuyruk derinliği yüksek - Lambda kapasitesi yetersiz olabilir"
--metric-name ApproximateNumberOfMessagesVisible
--namespace AWS/SQS
--statistic Maximum
--period 300
--threshold 1000
--comparison-operator GreaterThanThreshold
--evaluation-periods 2
--dimensions Name=QueueName,Value=$QUEUE_NAME
--alarm-actions $ALARM_SNS_ARN
# Lambda hata oranı alarmı
aws cloudwatch put-metric-alarm
--alarm-name "Lambda-Error-Rate-High"
--alarm-description "Lambda hata oranı %5 üzerinde"
--metric-name Errors
--namespace AWS/Lambda
--statistic Sum
--period 300
--threshold 10
--comparison-operator GreaterThanThreshold
--evaluation-periods 1
--dimensions Name=FunctionName,Value=$LAMBDA_NAME
--alarm-actions $ALARM_SNS_ARN
echo "CloudWatch alarmları kuruldu"
Lambda Concurrency Ayarları ve Throttling
Prodüksiyonda en sık karşılaştığım sorunlardan biri Lambda throttling. SQS ile kullanırken özellikle dikkat edilmesi gereken noktalar:
#!/bin/bash
# concurrency-config.sh - Lambda eşzamanlılık ayarları
FUNCTION_NAME="order-processor"
REGION="eu-west-1"
# Reserved concurrency ayarla (diğer fonksiyonları korumak için)
# Bu değer, fonksiyonun maksimum eşzamanlı çalışma sayısını sınırlar
aws lambda put-function-concurrency
--function-name $FUNCTION_NAME
--reserved-concurrent-executions 50
--region $REGION
# Mevcut concurrency durumunu kontrol et
echo "=== Concurrency Durumu ==="
aws lambda get-function-concurrency
--function-name $FUNCTION_NAME
--region $REGION
# Provisioned concurrency ayarla (cold start'ı azaltmak için)
# Bu maliyet yaratır, sadece gerekliyse kullanın
aws lambda put-provisioned-concurrency-config
--function-name $FUNCTION_NAME
--qualifier production
--provisioned-concurrent-executions 5
--region $REGION
# Event source mapping'i geçici olarak devre dışı bırak
# (maintenance window için)
MAPPING_UUID=$(aws lambda list-event-source-mappings
--function-name $FUNCTION_NAME
--query 'EventSourceMappings[0].UUID'
--output text)
echo "Event Source Mapping UUID: $MAPPING_UUID"
# Durdur
aws lambda update-event-source-mapping
--uuid $MAPPING_UUID
--enabled false
echo "Event source mapping devre dışı bırakıldı"
# Tekrar aktifleştir
# aws lambda update-event-source-mapping --uuid $MAPPING_UUID --enabled true
FIFO Kuyruk ile Sipariş Garantisi
Bazı durumlarda mesaj sırası kritik önem taşır. Örneğin banka hesap işlemlerinde önce para yatırma, sonra para çekme işlemi işlenmeli. FIFO kuyruk için Lambda handler’ı:
# fifo-handler.py - FIFO kuyruk işleyici
import json
import logging
from typing import Dict, Any
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event: Dict[str, Any], context) -> Dict[str, Any]:
"""
FIFO SQS için Lambda handler.
MessageGroupId bazında sıralı işleme yapar.
"""
batch_item_failures = []
# FIFO'da mesajlar MessageGroupId'ye göre gelir
# Aynı group içinde sıra korunur
for record in event['Records']:
message_id = record['messageId']
# FIFO'ya özgü attribute'lar
attributes = record.get('attributes', {})
message_group_id = attributes.get('MessageGroupId', 'default')
sequence_number = attributes.get('SequenceNumber', '0')
logger.info(
f"FIFO Mesaj - GroupId: {message_group_id}, "
f"Sequence: {sequence_number}, MessageId: {message_id}"
)
try:
body = json.loads(record['body'])
transaction_type = body.get('transaction_type')
account_id = body.get('account_id')
amount = body.get('amount', 0)
# İşlem tipine göre yönlendir
if transaction_type == 'DEPOSIT':
process_deposit(account_id, amount)
elif transaction_type == 'WITHDRAWAL':
process_withdrawal(account_id, amount)
elif transaction_type == 'TRANSFER':
process_transfer(body)
else:
raise ValueError(f"Bilinmeyen işlem tipi: {transaction_type}")
logger.info(
f"İşlem tamamlandı - Type: {transaction_type}, "
f"Account: {account_id}, Amount: {amount}"
)
except Exception as e:
logger.error(f"HATA - MessageId: {message_id}, {str(e)}")
batch_item_failures.append({"itemIdentifier": message_id})
# FIFO'da hata durumunda sonraki mesajları da durdurabiliriz
# Bu iş mantığına bağlıdır
return {"batchItemFailures": batch_item_failures}
def process_deposit(account_id: str, amount: float) -> None:
logger.info(f"Para yatırma - Hesap: {account_id}, Tutar: {amount}")
# DB güncelleme işlemleri burada
def process_withdrawal(account_id: str, amount: float) -> None:
logger.info(f"Para çekme - Hesap: {account_id}, Tutar: {amount}")
# Bakiye kontrolü ve güncelleme
def process_transfer(transfer_data: Dict) -> None:
logger.info(f"Transfer işlemi - {transfer_data}")
# Transfer mantığı
Gerçek Dünya Senaryosu: E-Ticaret Sipariş Pipeline’ı
Birlikte çalıştığım bir projede şöyle bir mimari kurmuştuk: kullanıcı sipariş verdiğinde API Gateway webhook alıyor, direkt SQS’e mesaj atıyor. Lambda bu mesajı alıp işliyor, gerekirse başka SQS kuyruklarına mesaj gönderiyor. Yani fan-out pattern uyguluyorduk.
Bu mimarinin avantajları:
- API Gateway timeout’u (29 saniye) aşan işlemleri asenkron yapabiliyorsunuz
- Ani trafik artışlarında (kampanya, indirim günleri) SQS buffer görevi görüyor
- Bir microservice’in çökmesi tüm pipeline’ı durdurmyor
- Her kuyruk bağımsız monitor edilebiliyor
Dikkat edilmesi gereken noktalar:
- Idempotency: Aynı mesaj birden fazla işlenebilir, bu durumu handle etmek gerekir. Order ID bazında DynamoDB’de işlenmiş kayıt tutmak iyi bir çözüm
- Message Size Limiti: SQS’de mesaj başına 256KB limit var. Büyük payload’lar için S3’e referans gönderme yaklaşımı kullanılmalı
- Lambda Timeout: SQS visibility timeout, Lambda timeout’unun 6 katı olmalı. Lambda 5 dakika çalışıyorsa visibility timeout 30 dakika olmalı
- Concurrency Limiti: Her AWS hesabında varsayılan 1000 eşzamanlı Lambda limiti var. Burst durumlarında bu dolabilir
Maliyet Optimizasyonu
Lambda-SQS kombinasyonunda maliyeti optimize etmek için:
- Batch size artırma: Batch size’ı maksimuma çıkarmak (10000’e kadar) çağrı sayısını azaltır
- Long polling:
ReceiveMessageWaitTimeSecondsdeğerini 20 saniye yapmak boş polling maliyetini düşürür - ARM mimarisi: Lambda fonksiyonlarını ARM (Graviton2) mimarisinde çalıştırmak aynı işlem gücü için %20 daha ucuz
- Memory optimizasyonu: Lambda memory’sini gerçek kullanıma göre ayarlamak. Çok fazla memory vermek gereksiz maliyete yol açar
- Provisioned concurrency dikkatli kullan: Sadece gerçekten cold start sorunu yaşanan senaryolarda kullan, yoksa her zaman maliyet doğurur
Sonuç
AWS Lambda ile SQS entegrasyonu, doğru konfigüre edildiğinde son derece güçlü ve maliyet etkin bir mesaj işleme sistemi sunuyor. Önemli olan noktaları özetlersek: ReportBatchItemFailures pattern’ini kesinlikle kullanın, DLQ’yu her zaman kurun ve monitoring’i ihmal etmeyin. Visibility timeout değerini Lambda timeout’unuzla uyumlu ayarlamayı unutmayın, aksi takdirde aynı mesaj birden fazla kez işlenir.
Başlangıçta Standard Queue ile başlayıp ihtiyaç halinde FIFO’ya geçmeyi öneririm. Çoğu senaryoda Standard Queue yeterli oluyor ve daha yüksek throughput sunuyor. Son olarak, idempotency tasarımını baştan düşünün. Mesajın birden fazla işlenmesi durumunu handle eden bir sistem kurmak, sonradan düzeltmeye çalışmaktan çok daha kolay.
Bu mimariyi prodüksiyonda kullananlar sorularını yorumlarda paylaşabilir, deneyimlerinizi duymak isterim.
