AWS Lambda ile DynamoDB Entegrasyonu: Serverless Veri Yönetimi
Serverless dünyasına adım attığınızda, ilk karşılaştığınız kombinasyonlardan biri genellikle Lambda ve DynamoDB ikilisi oluyor. Bu iki AWS servisi birlikte kullanıldığında gerçekten güçlü, ölçeklenebilir ve yönetim yükü düşük sistemler kurabiliyorsunuz. Ama “birkaç tıkla hallederim” diye başlayıp saatlerce hata ayıklamak zorunda kaldığınız da oluyor. Bu yazıda, gerçek dünya senaryolarını baz alarak Lambda-DynamoDB entegrasyonunu her yönüyle ele alacağız.
Neden Lambda + DynamoDB?
Klasik bir web uygulaması düşünün: EC2 üzerinde bir uygulama sunucusu, yanında RDS, önünde de bir load balancer. Trafik saatte bir kez mi geliyor? Sunucu yine de ayakta ve para akıyor. İşte burada serverless devreye giriyor.
Lambda, sadece çağrıldığında çalışır ve milisaniye bazında fiyatlandırılır. DynamoDB de benzer şekilde, kullandığın kadar öde modeliyle çalışır (on-demand mod seçerseniz). İkisini birleştirdiğinizde:
- Sıfır sunucu yönetimi: Patch, güncelleme, kapasite planlaması yok
- Otomatik ölçekleme: Trafiğin 10 katına çıkması sizi korkutmaz
- Düşük maliyet: Küçük uygulamalar için aylık maliyetler bazen sıfıra yakın
- Hızlı geliştirme: Altyapı yerine iş mantığına odaklanırsınız
Tabii her şeyin bir bedeli var. Cold start sorunları, DynamoDB’nin eventual consistency modeli, karmaşık sorgu limitasyonları… Bunlara da değineceğiz.
Ortamı Hazırlamak
Önce geliştirme ortamını kuralım. AWS CLI ve gerekli araçların kurulu olduğunu varsayıyorum.
# AWS CLI kurulumu kontrolü
aws --version
# Python Lambda geliştirmesi için sanal ortam
python3 -m venv lambda-env
source lambda-env/bin/activate
# Gerekli paketler
pip install boto3 aws-lambda-powertools
# AWS kimlik bilgilerini yapılandır
aws configure
DynamoDB tablosunu CLI üzerinden oluşturalım:
# Basit bir kullanıcı tablosu oluşturma
aws dynamodb create-table
--table-name Users
--attribute-definitions
AttributeName=userId,AttributeType=S
AttributeName=email,AttributeType=S
--key-schema
AttributeName=userId,KeyType=HASH
--global-secondary-indexes
'[{
"IndexName": "EmailIndex",
"KeySchema": [{"AttributeName":"email","KeyType":"HASH"}],
"Projection": {"ProjectionType":"ALL"},
"ProvisionedThroughput": {"ReadCapacityUnits":5,"WriteCapacityUnits":5}
}]'
--provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5
--region eu-west-1
# Tablo durumunu kontrol et
aws dynamodb describe-table --table-name Users --query 'Table.TableStatus'
IAM Rolü ve İzinler
Lambda fonksiyonunuzun DynamoDB’ye erişebilmesi için doğru IAM rolüne ihtiyacı var. Bu adımı atlamak veya “AdministratorAccess ver, geçelim” yapmak sık yapılan hatalardan. Least privilege prensibini uygulayalım:
# Lambda execution role için policy belgesi oluştur
cat > lambda-dynamodb-policy.json << 'EOF'
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"dynamodb:GetItem",
"dynamodb:PutItem",
"dynamodb:UpdateItem",
"dynamodb:DeleteItem",
"dynamodb:Query",
"dynamodb:Scan",
"dynamodb:BatchGetItem",
"dynamodb:BatchWriteItem"
],
"Resource": [
"arn:aws:dynamodb:eu-west-1:123456789:table/Users",
"arn:aws:dynamodb:eu-west-1:123456789:table/Users/index/*"
]
},
{
"Effect": "Allow",
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": "arn:aws:logs:*:*:*"
}
]
}
EOF
# Policy oluştur
aws iam create-policy
--policy-name LambdaDynamoDBPolicy
--policy-document file://lambda-dynamodb-policy.json
# Role oluştur ve policy'yi ekle
aws iam create-role
--role-name LambdaDynamoDBRole
--assume-role-policy-document '{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Principal":{"Service":"lambda.amazonaws.com"},"Action":"sts:AssumeRole"}]}'
aws iam attach-role-policy
--role-name LambdaDynamoDBRole
--policy-arn arn:aws:iam::123456789:policy/LambdaDynamoDBPolicy
İlk Lambda Fonksiyonu: CRUD İşlemleri
Şimdi gerçek koda geçelim. Kullanıcı yönetimi için temel CRUD operasyonlarını içeren bir Lambda yazalım:
# user_handler.py
import json
import boto3
import uuid
from datetime import datetime
from botocore.exceptions import ClientError
dynamodb = boto3.resource('dynamodb', region_name='eu-west-1')
table = dynamodb.Table('Users')
def lambda_handler(event, context):
"""API Gateway'den gelen istekleri yönlendir"""
http_method = event.get('httpMethod', '')
path = event.get('path', '')
try:
if http_method == 'GET' and '/users/' in path:
user_id = event['pathParameters']['userId']
return get_user(user_id)
elif http_method == 'POST' and path == '/users':
body = json.loads(event.get('body', '{}'))
return create_user(body)
elif http_method == 'PUT' and '/users/' in path:
user_id = event['pathParameters']['userId']
body = json.loads(event.get('body', '{}'))
return update_user(user_id, body)
elif http_method == 'DELETE' and '/users/' in path:
user_id = event['pathParameters']['userId']
return delete_user(user_id)
else:
return response(405, {'error': 'Method not allowed'})
except Exception as e:
print(f"Beklenmedik hata: {str(e)}")
return response(500, {'error': 'Internal server error'})
def get_user(user_id):
try:
result = table.get_item(
Key={'userId': user_id},
ConsistentRead=True # Tutarlı okuma, biraz daha pahalı ama güncel veri garantisi
)
if 'Item' not in result:
return response(404, {'error': 'Kullanıcı bulunamadı'})
return response(200, result['Item'])
except ClientError as e:
print(f"DynamoDB hatası: {e.response['Error']['Message']}")
return response(500, {'error': 'Veritabanı hatası'})
def create_user(body):
required_fields = ['email', 'name']
for field in required_fields:
if field not in body:
return response(400, {'error': f'{field} zorunludur'})
user_id = str(uuid.uuid4())
timestamp = datetime.utcnow().isoformat()
item = {
'userId': user_id,
'email': body['email'],
'name': body['name'],
'createdAt': timestamp,
'updatedAt': timestamp,
'status': 'active'
}
try:
# Aynı email'den ikinci kayıt olmasın diye conditional write
table.put_item(
Item=item,
ConditionExpression='attribute_not_exists(userId)'
)
return response(201, item)
except ClientError as e:
if e.response['Error']['Code'] == 'ConditionalCheckFailedException':
return response(409, {'error': 'Bu kullanıcı zaten mevcut'})
raise
def update_user(user_id, body):
allowed_fields = ['name', 'email', 'status']
update_fields = {k: v for k, v in body.items() if k in allowed_fields}
if not update_fields:
return response(400, {'error': 'Güncellenecek alan bulunamadı'})
# DynamoDB update expression dinamik oluştur
update_expression = 'SET updatedAt = :timestamp'
expression_values = {':timestamp': datetime.utcnow().isoformat()}
for key, value in update_fields.items():
update_expression += f', #{key} = :{key}'
expression_values[f':{key}'] = value
expression_names = {f'#{key}': key for key in update_fields.keys()}
try:
result = table.update_item(
Key={'userId': user_id},
UpdateExpression=update_expression,
ExpressionAttributeValues=expression_values,
ExpressionAttributeNames=expression_names,
ConditionExpression='attribute_exists(userId)',
ReturnValues='ALL_NEW'
)
return response(200, result['Attributes'])
except ClientError as e:
if e.response['Error']['Code'] == 'ConditionalCheckFailedException':
return response(404, {'error': 'Kullanıcı bulunamadı'})
raise
def delete_user(user_id):
try:
table.delete_item(
Key={'userId': user_id},
ConditionExpression='attribute_exists(userId)'
)
return response(204, {})
except ClientError as e:
if e.response['Error']['Code'] == 'ConditionalCheckFailedException':
return response(404, {'error': 'Kullanıcı bulunamadı'})
raise
def response(status_code, body):
return {
'statusCode': status_code,
'headers': {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*'
},
'body': json.dumps(body, default=str)
}
Bu kodu paketleyip deploy edelim:
# Kodu zip'le ve Lambda'ya yükle
zip -j user_handler.zip user_handler.py
aws lambda create-function
--function-name UserManagement
--runtime python3.11
--role arn:aws:iam::123456789:role/LambdaDynamoDBRole
--handler user_handler.lambda_handler
--zip-file fileb://user_handler.zip
--timeout 30
--memory-size 256
--environment Variables='{REGION=eu-west-1,TABLE_NAME=Users}'
# Test et
aws lambda invoke
--function-name UserManagement
--payload '{"httpMethod":"POST","path":"/users","body":"{"email":"[email protected]","name":"Test Kullanıcı"}"}'
output.json
cat output.json
DynamoDB Streams ile Event-Driven Mimari
DynamoDB Streams, tablodaki değişiklikleri yakalayıp Lambda’yı tetiklemek için kullanılan güçlü bir özellik. Örneğin, yeni kullanıcı oluşturulduğunda hoşgeldin emaili göndermek istiyorsunuz. Bunu Lambda’nın içine yazmak yerine, streams üzerinden ayrı bir fonksiyon tetikleyebilirsiniz.
# Önce stream'i etkinleştir
aws dynamodb update-table
--table-name Users
--stream-specification StreamEnabled=true,StreamViewType=NEW_AND_OLD_IMAGES
# Stream ARN'ı al
STREAM_ARN=$(aws dynamodb describe-table
--table-name Users
--query 'Table.LatestStreamArn'
--output text)
echo "Stream ARN: $STREAM_ARN"
# Lambda'ya stream trigger ekle
aws lambda create-event-source-mapping
--function-name UserWelcomeEmail
--event-source-arn $STREAM_ARN
--batch-size 10
--starting-position LATEST
--filter-criteria '{"Filters":[{"Pattern":"{"eventName":["INSERT"]}"}]}'
Stream’den gelen olayları işleyen Lambda:
# stream_processor.py
import json
import boto3
from botocore.exceptions import ClientError
ses_client = boto3.client('ses', region_name='eu-west-1')
def lambda_handler(event, context):
"""DynamoDB Stream olaylarını işle"""
processed = 0
failed = 0
for record in event['Records']:
try:
event_name = record['eventName']
if event_name == 'INSERT':
new_image = record['dynamodb']['NewImage']
user = deserialize_dynamodb_item(new_image)
send_welcome_email(user)
processed += 1
elif event_name == 'MODIFY':
old_image = record['dynamodb']['OldImage']
new_image = record['dynamodb']['NewImage']
old_status = old_image.get('status', {}).get('S', '')
new_status = new_image.get('status', {}).get('S', '')
if old_status != 'suspended' and new_status == 'suspended':
user = deserialize_dynamodb_item(new_image)
send_suspension_notification(user)
processed += 1
except Exception as e:
print(f"Record işleme hatası: {str(e)}, Record: {json.dumps(record)}")
failed += 1
print(f"İşlenen: {processed}, Başarısız: {failed}")
# Eğer kritik hata varsa Lambda'nın retry yapması için exception fırlat
if failed > 0:
raise Exception(f"{failed} adet kayıt işlenemedi")
def deserialize_dynamodb_item(dynamodb_item):
"""DynamoDB formatındaki veriyi normal Python dict'e çevir"""
from boto3.dynamodb.types import TypeDeserializer
deserializer = TypeDeserializer()
return {
key: deserializer.deserialize(value)
for key, value in dynamodb_item.items()
}
def send_welcome_email(user):
try:
ses_client.send_email(
Source='[email protected]',
Destination={'ToAddresses': [user['email']]},
Message={
'Subject': {'Data': f"Hoşgeldiniz, {user['name']}!"},
'Body': {
'Text': {
'Data': f"Merhaba {user['name']},nnHesabınız başarıyla oluşturuldu."
}
}
}
)
print(f"Hoşgeldin emaili gönderildi: {user['email']}")
except ClientError as e:
print(f"Email gönderilemedi: {e.response['Error']['Message']}")
raise
Performans Optimizasyonu: Bağlantı Yeniden Kullanımı
Lambda’da her çağrıda boto3 client oluşturuyorsanız, gereksiz yere zaman harcıyorsunuz. Bağlantıları handler dışında tanımlayın:
# Kötü yaklaşım - her çağrıda yeniden bağlantı kurulur
def lambda_handler(event, context):
dynamodb = boto3.resource('dynamodb') # YANLIŞ YERDE
table = dynamodb.Table('Users')
# ...
# İyi yaklaşım - bağlantı warm container'da yeniden kullanılır
import boto3
from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.typing import LambdaContext
logger = Logger()
tracer = Tracer()
# Handler dışında tanımla
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('Users')
@logger.inject_lambda_context
@tracer.capture_lambda_handler
def lambda_handler(event: dict, context: LambdaContext):
# table zaten hazır, direkt kullan
result = table.get_item(Key={'userId': event['userId']})
logger.info("Kullanıcı getirildi", extra={"user_id": event['userId']})
return result.get('Item')
Batch İşlemler ve Pagination
Büyük veri setleriyle çalışırken batch operasyonları ve pagination şart:
# Toplu veri yüklemek için script
cat > bulk_load.py << 'EOF'
import boto3
import json
import uuid
from datetime import datetime
dynamodb = boto3.resource('dynamodb', region_name='eu-west-1')
table = dynamodb.Table('Users')
def bulk_insert_users(users_list):
"""25'erli batch'ler halinde yükle (DynamoDB limiti)"""
batch_size = 25
total_written = 0
with table.batch_writer() as batch:
for user in users_list:
batch.put_item(
Item={
'userId': str(uuid.uuid4()),
'email': user['email'],
'name': user['name'],
'createdAt': datetime.utcnow().isoformat(),
'status': 'active'
}
)
total_written += 1
if total_written % 100 == 0:
print(f"{total_written} kayıt yazıldı...")
print(f"Toplam {total_written} kayıt yüklendi")
def scan_all_users(filter_status=None):
"""Tüm kullanıcıları sayfalayarak getir"""
all_users = []
scan_kwargs = {}
if filter_status:
scan_kwargs['FilterExpression'] = 'status = :status'
scan_kwargs['ExpressionAttributeValues'] = {':status': filter_status}
while True:
response = table.scan(**scan_kwargs)
all_users.extend(response['Items'])
# LastEvaluatedKey varsa daha fazla sayfa var demektir
if 'LastEvaluatedKey' not in response:
break
scan_kwargs['ExclusiveStartKey'] = response['LastEvaluatedKey']
print(f"Şu ana kadar getirilen: {len(all_users)}")
return all_users
if __name__ == '__main__':
# Test verisi oluştur
test_users = [
{'email': f'user{i}@test.com', 'name': f'Test User {i}'}
for i in range(100)
]
bulk_insert_users(test_users)
active_users = scan_all_users(filter_status='active')
print(f"Aktif kullanıcı sayısı: {len(active_users)}")
EOF
python3 bulk_load.py
Hata Yönetimi ve Retry Mekanizması
Prodüksiyonda işler her zaman yolunda gitmez. DynamoDB throttling, geçici bağlantı sorunları ve diğer hatalar için sağlam bir hata yönetimi şart:
# retry_handler.py
import boto3
import time
import random
from functools import wraps
from botocore.exceptions import ClientError
def exponential_backoff(max_retries=3, base_delay=0.1):
"""Exponential backoff ile otomatik retry decorator"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
retries = 0
while retries <= max_retries:
try:
return func(*args, **kwargs)
except ClientError as e:
error_code = e.response['Error']['Code']
# Retry yapılabilir hatalar
retryable_errors = [
'ProvisionedThroughputExceededException',
'RequestLimitExceeded',
'ThrottlingException',
'InternalServerError'
]
if error_code not in retryable_errors or retries == max_retries:
raise
# Jitter eklenmiş exponential backoff
delay = base_delay * (2 ** retries) + random.uniform(0, 0.1)
print(f"Hata: {error_code}. {delay:.2f}s sonra tekrar denenecek... (Deneme {retries + 1}/{max_retries})")
time.sleep(delay)
retries += 1
return wrapper
return decorator
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('Users')
@exponential_backoff(max_retries=3, base_delay=0.2)
def safe_get_user(user_id):
return table.get_item(
Key={'userId': user_id},
ConsistentRead=True
)
@exponential_backoff(max_retries=5, base_delay=0.1)
def safe_put_user(item):
return table.put_item(Item=item)
def lambda_handler(event, context):
user_id = event.get('userId')
try:
result = safe_get_user(user_id)
if 'Item' not in result:
return {'statusCode': 404, 'body': 'Kullanıcı bulunamadı'}
return {'statusCode': 200, 'body': result['Item']}
except ClientError as e:
print(f"Kritik DynamoDB hatası: {e.response['Error']['Message']}")
# CloudWatch alarm tetiklensin diye özel metrik gönder
cloudwatch = boto3.client('cloudwatch')
cloudwatch.put_metric_data(
Namespace='Lambda/Errors',
MetricData=[{
'MetricName': 'DynamoDBCriticalError',
'Value': 1,
'Unit': 'Count'
}]
)
raise
Gerçek Dünya Senaryosu: E-ticaret Sipariş Sistemi
Teoriden pratiğe geçelim. Bir e-ticaret sitesi için sipariş yönetimi sistemi kuralım. Bu senaryoda stok kontrolü, sipariş oluşturma ve bildirim gönderme işlemlerini Lambda ve DynamoDB ile yönetiyoruz.
# Sipariş ve ürün tabloları
aws dynamodb create-table
--table-name Orders
--attribute-definitions
AttributeName=orderId,AttributeType=S
AttributeName=customerId,AttributeType=S
AttributeName=createdAt,AttributeType=S
--key-schema
AttributeName=orderId,KeyType=HASH
--global-secondary-indexes
'[{
"IndexName": "CustomerOrdersIndex",
"KeySchema": [
{"AttributeName":"customerId","KeyType":"HASH"},
{"AttributeName":"createdAt","KeyType":"RANGE"}
],
"Projection": {"ProjectionType":"ALL"},
"ProvisionedThroughput": {"ReadCapacityUnits":10,"WriteCapacityUnits":10}
}]'
--provisioned-throughput ReadCapacityUnits=10,WriteCapacityUnits=10
aws dynamodb create-table
--table-name Products
--attribute-definitions
AttributeName=productId,AttributeType=S
--key-schema
AttributeName=productId,KeyType=HASH
--provisioned-throughput ReadCapacityUnits=10,WriteCapacityUnits=10
# order_processor.py
import boto3
import uuid
import json
from datetime import datetime
from botocore.exceptions import ClientError
dynamodb = boto3.resource('dynamodb')
orders_table = dynamodb.Table('Orders')
products_table = dynamodb.Table('Products')
sqs = boto3.client('sqs')
def lambda_handler(event, context):
"""Sipariş oluşturma Lambda'sı"""
body = json.loads(event.get('body', '{}'))
customer_id = body.get('customerId')
items = body.get('items', [])
if not customer_id or not items:
return {'statusCode': 400, 'body': json.dumps({'error': 'Eksik parametreler'})}
# Stok kontrolü ve sipariş oluşturma transaction içinde
try:
order_id = str(uuid.uuid4())
total_amount = 0
order_items = []
# DynamoDB Transaction ile atomik işlem
transact_items = []
for item in items:
product_id = item['productId']
quantity = item['quantity']
# Ürün bilgisini al
product_result = products_table.get_item(
Key={'productId': product_id},
ConsistentRead=True
)
if 'Item' not in product_result:
return {'statusCode': 404, 'body': json.dumps({'error': f'Ürün bulunamadı: {product_id}'})}
product = product_result['Item']
if product['stock'] < quantity:
return {
'statusCode': 400,
'body': json.dumps({'error': f'{product["name"]} için yeterli stok yok'})
}
item_total = float(product['price']) * quantity
total_amount += item_total
order_items.append({
'productId': product_id,
'name': product['name'],
'quantity': quantity,
'unitPrice': float(product['price']),
'subtotal': item_total
})
# Stok azaltma işlemini transaction'a ekle
transact_items.append({
'Update': {
'TableName': 'Products',
'Key': {'productId': {'S': product_id}},
'UpdateExpression': 'SET stock = stock - :qty',
'ConditionExpression': 'stock >= :qty',
'ExpressionAttributeValues': {
':qty': {'N': str(quantity)}
}
}
})
# Sipariş kaydını transaction'a ekle
order = {
'orderId': order_id,
'customerId': customer_id,
'items': order_items,
'totalAmount': total_amount,
'status': 'pending',
'createdAt': datetime.utcnow().isoformat()
}
transact_items.append({
'Put': {
'TableName': 'Orders',
'Item': boto3.dynamodb.types.TypeSerializer().serialize(order)['M']
}
})
# Transaction'ı çalıştır
dynamodb_client = boto3.client('dynamodb')
dynamodb_client.transact_write_items(TransactItems=transact_items)
# SQS'e bildirim gönder
sqs.send_message(
QueueUrl='https://sqs.eu-west-1.amazonaws.com/123456789/OrderNotifications',
MessageBody=json.dumps({
'orderId': order_id,
'customerId': customer_id,
'totalAmount': total_amount
})
)
return {
'statusCode': 201,
'body': json.dumps({'orderId': order_id, 'totalAmount': total_amount})
}
except ClientError as e:
if e.response['Error']['Code'] == 'TransactionCanceledException':
return {'statusCode': 409, 'body': json.dumps({'error': 'Stok değişikliği sırasında çakışma oluştu, lütfen tekrar deneyin'})}
raise
İzleme ve Alarm Kurulumu
Prodüksiyonda kör uçmak istemezsiniz. CloudWatch alarmlari ve dashboardlar şart:
# Lambda hata alarmı
aws cloudwatch put-metric-alarm
--alarm-name "Lambda-UserManagement-Errors"
--alarm-description "UserManagement Lambda hata oranı yüksek"
--metric-name Errors
--namespace AWS/Lambda
--statistic Sum
--period 60
--threshold 5
--comparison-operator GreaterThanThreshold
--dimensions Name=FunctionName,Value=UserManagement
--evaluation-periods 1
--alarm-actions arn:aws:sns:eu-west-1:123456789:AlertTeam
# DynamoDB throttle alarmı
aws cloudwatch put-metric-alarm
--alarm-name "DynamoDB-Users-ReadThrottle"
--alarm-description "DynamoDB Users tablosu okuma throttle'ı"
--metric-name ReadThrottleEvents
--namespace AWS/DynamoDB
--statistic Sum
--period 60
--threshold 10
--comparison-operator GreaterThanThreshold
--dimensions Name=TableName,Value=Users
--evaluation-periods 1
--alarm-actions arn:aws:sns:eu-west-1:123456789:AlertTeam
# Lambda cold start süresini izle
aws logs put-metric-filter
--log-group-name /aws/lambda/UserManagement
--filter-name ColdStartFilter
--filter-pattern "REPORT Init Duration"
--metric-transformations
metricName=ColdStartCount,metricNamespace=CustomLambda,metricValue=1
Dikkat Edilmesi Gereken Noktalar
Prodüksiyon ortamında sık karşılaşılan tuzaklar:
- Scan yerine Query kullanın: Scan tüm tabloyu tarar, pahalıdır ve yavaştır. GSI tasarımınızı buna göre yapın.
- DynamoDB hot partition: Partition key olarak timestamp veya sıralı ID kullanmayın. Tüm yük tek partition’a düşer.
- Lambda timeout ayarı: DynamoDB işlemleri normalde hızlıdır ama yoğun yük altında gecikmeler olabilir. Timeout’u buna göre ayarlayın.
- Eventual consistency: Global Secondary Index’lerde okuma her zaman en güncel veriyi döndürmeyebilir. Kritik okumalar için ConsistentRead kullanın (sadece ana tabloda mümkün).
- Item boyutu limiti: DynamoDB’de her item maksimum 400KB. Büyük veriler için S3 + DynamoDB referans pattern’ini kullanın.
- TTL özelliğini kullanın: Geçici veriler, session’lar veya cache benzeri veriler için TTL attribute’u ekleyerek otomatik temizleme sağlayın.
- Provisioned vs On-demand: Tahmin edilebilir trafik için provisioned daha ucuz, değişken trafik için on-demand daha güvenli.
Sonuç
Lambda ve DynamoDB entegrasyonu ilk bakışta basit görünse de, prodüksiyon kalitesinde bir sistem kurmak için birçok detayı göz önünde bulundurmanız gerekiyor. IAM izinlerinden başlayıp doğru partition key tasarımına, transaction yönetiminden hata handling stratejilerine kadar her katmanın düşünülmesi gerekiyor.
Bu yazıda anlattıklarımı özetleyecek olursam: bağlantılarınızı handler dışında tutun, least privilege ile IAM rollerinizi yapılandırın, Streams ile event-driven mimariyi benimseyin, transaction’ları kritik operasyonlarda kullanın ve her şeyi izleyin. CloudWatch alarmlari olmadan prodüksiyona çıkmak, gözleriniz kapalı araba kullanmak gibi.
Sonraki adım olarak bu yapının üzerine API Gateway, Cognito authentication ve WAF ekleyerek tam kapsamlı bir serverless backend oluşturabilirsiniz. AWS SAM veya Terraform ile altyapıyı kod olarak yönetmek de işinizi ciddi ölçüde kolaylaştıracak.
