CQRS Pattern ile Komut ve Sorgu Ayrımı
Yıllar önce bir e-ticaret projesinde şöyle bir sorunla karşılaştım: Ürün listeleme sayfası, kullanıcı sipariş verdiğinde neden bu kadar yavaşlıyor? Profiler’ı açtım, veritabanına baktım, ve gördüm ki aynı Order entity’si hem sipariş oluşturmak hem de raporlama sorguları için kullanılıyor. SELECT ile INSERT/UPDATE aynı tabloda, aynı model üzerinden, aynı transaction yöneticisiyle koşuyor. İşte CQRS’in tam olarak çözdüğü problem buydu.
CQRS (Command Query Responsibility Segregation), bir sistemdeki okuma (Query) ve yazma (Command) işlemlerini birbirinden tamamen ayıran bir mimari pattern’dir. Greg Young tarafından popülerleştirilen bu yaklaşım, özellikle mesaj kuyruğu mimarileriyle birleşince inanılmaz derecede güçlü bir yapı ortaya çıkarıyor. Ama önce temelleri doğru atmak lazım.
CQRS Nedir, Ne Değildir?
Çok sık yapılan bir hata var: CQRS’i sadece “okuma ve yazma için ayrı veritabanı kullanmak” olarak tanımlamak. Bu eksik bir tanım. CQRS özünde bir sorumluluk ayrımıdır. Okuma modeli ve yazma modeli birbirinden bağımsız olarak gelişebilir, scale edilebilir, hatta farklı teknoloji stackleri kullanabilir.
Yazma tarafında iş kuralları yoğun olur: validation, domain logic, business rule enforcement. Okuma tarafında ise hız kritiktir: denormalize edilmiş veri, düzleştirilmiş objeler, cache dostu yapılar.
Mesaj kuyruklarıyla birleşince şöyle bir akış ortaya çıkıyor:
- Kullanıcı bir komut gönderir (örn:
CreateOrderCommand) - Command Handler bu komutu işler, domain event fırlatır
- Event mesaj kuyruğuna düşer (RabbitMQ, Kafka, vb.)
- Query tarafı bu event’leri consume ederek kendi okuma modelini günceller
- Kullanıcı sorgu yaptığında, optimize edilmiş okuma modelinden cevap alır
Bu ayrımın gücünü gerçek dünya örnekleriyle inceleyelim.
Temel Yapı: Command ve Query Sınıfları
Python ile basit bir CQRS iskeleti kuralım. Gerçek projelerde çok daha karmaşık hale geliyor elbette, ama kavramsal olarak bu yapı yeterli:
from dataclasses import dataclass
from typing import Optional
from datetime import datetime
import uuid
# Command tarafı - Yazma işlemleri için
@dataclass
class CreateOrderCommand:
customer_id: str
product_id: str
quantity: int
shipping_address: str
command_id: str = None
def __post_init__(self):
if not self.command_id:
self.command_id = str(uuid.uuid4())
# Query tarafı - Okuma işlemleri için
@dataclass
class GetOrdersByCustomerQuery:
customer_id: str
status_filter: Optional[str] = None
page: int = 1
page_size: int = 20
# Command Handler
class CreateOrderCommandHandler:
def __init__(self, order_repository, event_bus):
self.order_repository = order_repository
self.event_bus = event_bus
def handle(self, command: CreateOrderCommand):
# İş kuralı validasyonu
if command.quantity <= 0:
raise ValueError("Miktar sıfırdan büyük olmalıdır")
order = self.order_repository.create(
customer_id=command.customer_id,
product_id=command.product_id,
quantity=command.quantity,
shipping_address=command.shipping_address
)
# Domain event fırlat
event = {
"event_type": "OrderCreated",
"order_id": order.id,
"customer_id": command.customer_id,
"product_id": command.product_id,
"quantity": command.quantity,
"timestamp": datetime.utcnow().isoformat()
}
self.event_bus.publish("order.events", event)
return order.id
Şimdi query handler tarafına bakalım. Dikkat ederseniz, query handler’lar doğrudan okuma modeline gidiyor, yazma tarafındaki repository’ye değil:
# Query Handler - Ayrı bir okuma modeli kullanıyor
class GetOrdersByCustomerQueryHandler:
def __init__(self, read_model_db):
# Bu, yazma tarafındaki DB değil!
# Optimize edilmiş, denormalize edilmiş okuma DB'si
self.read_model_db = read_model_db
def handle(self, query: GetOrdersByCustomerQuery):
filters = {"customer_id": query.customer_id}
if query.status_filter:
filters["status"] = query.status_filter
offset = (query.page - 1) * query.page_size
# Doğrudan denormalize view'a sorgu
results = self.read_model_db.execute("""
SELECT
o.order_id,
o.status,
o.total_amount,
o.created_at,
c.customer_name,
c.email,
p.product_name,
p.sku
FROM orders_read_model o
JOIN customers_snapshot c ON o.customer_id = c.customer_id
JOIN products_snapshot p ON o.product_id = p.product_id
WHERE o.customer_id = %(customer_id)s
LIMIT %(limit)s OFFSET %(offset)s
""", {**filters, "limit": query.page_size, "offset": offset})
return results
RabbitMQ ile Command Bus Implementasyonu
Gerçek dünya senaryosunda command’ları ve event’leri RabbitMQ üzerinden taşımak yaygın bir yaklaşım. Aşağıdaki yapı, microservice ortamında CQRS uygulamak için kullandığım bir template:
import pika
import json
import logging
from functools import wraps
logger = logging.getLogger(__name__)
class RabbitMQCommandBus:
def __init__(self, connection_params):
self.connection = pika.BlockingConnection(connection_params)
self.channel = self.connection.channel()
# Command kuyruğunu tanımla
self.channel.exchange_declare(
exchange='commands',
exchange_type='direct',
durable=True
)
self.channel.queue_declare(
queue='order_commands',
durable=True,
arguments={
'x-dead-letter-exchange': 'commands.dlx',
'x-message-ttl': 300000 # 5 dakika TTL
}
)
self.channel.queue_bind(
queue='order_commands',
exchange='commands',
routing_key='order.commands'
)
def send_command(self, routing_key: str, command: dict):
"""Command'ı kuyruğa gönder"""
message_body = json.dumps(command, ensure_ascii=False)
self.channel.basic_publish(
exchange='commands',
routing_key=routing_key,
body=message_body.encode('utf-8'),
properties=pika.BasicProperties(
delivery_mode=2, # Persistent message
content_type='application/json',
message_id=command.get('command_id'),
timestamp=int(datetime.utcnow().timestamp())
)
)
logger.info(f"Command gönderildi: {routing_key} -> {command.get('command_id')}")
def consume_commands(self, queue_name: str, handler_func):
"""Command'ları tüket ve handler'a ilet"""
def callback(ch, method, properties, body):
try:
command_data = json.loads(body.decode('utf-8'))
handler_func(command_data)
ch.basic_ack(delivery_tag=method.delivery_tag)
logger.info(f"Command başarıyla işlendi: {properties.message_id}")
except ValueError as e:
# Validation hatası - DLQ'ya at
logger.error(f"Validation hatası: {e}")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
except Exception as e:
# Geçici hata - tekrar kuyruğa al
logger.error(f"İşlem hatası, tekrar denenecek: {e}")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(
queue=queue_name,
on_message_callback=callback
)
self.channel.start_consuming()
Event Sourcing ile CQRS Entegrasyonu
CQRS sıklıkla Event Sourcing ile birlikte kullanılır. Bu kombinasyon, okuma modelini event’lerden yeniden inşa etme imkanı tanıyor. Şöyle düşünün: yazma tarafında her değişikliği event olarak saklıyorsunuz, okuma tarafı bu event’leri tüketerek kendi görünümünü güncel tutuyor:
# Event Store - Yazma tarafı için
class EventStore:
def __init__(self, db_connection):
self.db = db_connection
def append_events(self, aggregate_id: str, events: list, expected_version: int):
"""
Optimistic concurrency control ile event kaydet
expected_version: Bu aggregate için beklenen mevcut versiyon
"""
current_version = self._get_current_version(aggregate_id)
if current_version != expected_version:
raise ConcurrencyException(
f"Aggregate {aggregate_id} versiyonu tutarsız. "
f"Beklenen: {expected_version}, Mevcut: {current_version}"
)
for i, event in enumerate(events):
self.db.execute("""
INSERT INTO event_store
(aggregate_id, event_type, payload, version, occurred_at)
VALUES
(%(aggregate_id)s, %(event_type)s, %(payload)s, %(version)s, NOW())
""", {
"aggregate_id": aggregate_id,
"event_type": event["event_type"],
"payload": json.dumps(event),
"version": expected_version + i + 1
})
def get_events(self, aggregate_id: str, from_version: int = 0):
"""Bir aggregate'in event geçmişini getir"""
return self.db.execute("""
SELECT event_type, payload, version, occurred_at
FROM event_store
WHERE aggregate_id = %(aggregate_id)s
AND version > %(from_version)s
ORDER BY version ASC
""", {"aggregate_id": aggregate_id, "from_version": from_version})
# Okuma modeli güncelleyici - Event Consumer
class OrderReadModelProjector:
def __init__(self, read_db):
self.read_db = read_db
def handle_order_created(self, event: dict):
self.read_db.execute("""
INSERT INTO orders_read_model
(order_id, customer_id, product_id, quantity, status, created_at)
VALUES
(%(order_id)s, %(customer_id)s, %(product_id)s,
%(quantity)s, 'PENDING', %(timestamp)s)
ON CONFLICT (order_id) DO NOTHING
""", event)
def handle_order_shipped(self, event: dict):
self.read_db.execute("""
UPDATE orders_read_model
SET status = 'SHIPPED',
shipped_at = %(shipped_at)s,
tracking_number = %(tracking_number)s
WHERE order_id = %(order_id)s
""", event)
def handle_order_cancelled(self, event: dict):
self.read_db.execute("""
UPDATE orders_read_model
SET status = 'CANCELLED',
cancelled_at = %(cancelled_at)s,
cancellation_reason = %(reason)s
WHERE order_id = %(order_id)s
""", event)
Kafka ile Yüksek Hacimli CQRS
Büyük ölçekli sistemlerde, özellikle günde milyonlarca işlem yapan platformlarda RabbitMQ yerine Kafka tercih ediliyor. Kafka’nın log-based yapısı, okuma modellerini herhangi bir noktadan yeniden inşa etmeyi mümkün kılıyor:
# Kafka topic'lerini oluştur
# Command topic'i - az partition, sıralı işlem gerekiyor
kafka-topics.sh --create
--bootstrap-server localhost:9092
--topic order-commands
--partitions 6
--replication-factor 3
--config retention.ms=604800000
--config cleanup.policy=delete
# Event topic'i - daha fazla partition, paralel okuma için
kafka-topics.sh --create
--bootstrap-server localhost:9092
--topic order-events
--partitions 24
--replication-factor 3
--config retention.ms=-1
--config cleanup.policy=compact
# Okuma modeli güncellemeleri için ayrı topic
kafka-topics.sh --create
--bootstrap-server localhost:9092
--topic order-read-model-updates
--partitions 12
--replication-factor 3
# Consumer group durumlarını kontrol et
kafka-consumer-groups.sh
--bootstrap-server localhost:9092
--group order-read-model-projector
--describe
# Okuma modelini sıfırdan yeniden inşa etmek için (replay)
# Consumer'ı en başa al
kafka-consumer-groups.sh
--bootstrap-server localhost:9092
--group order-read-model-projector
--topic order-events
--reset-offsets
--to-earliest
--execute
Okuma modelini yeniden inşa etme (projection rebuild) senaryosu çok değerli. Diyelim ki okuma tarafında yeni bir alan eklediniz. Kafka’da event geçmişi saklı olduğu için consumer group’u sıfırlayıp tüm event’leri baştan işleyerek yeni okuma modelini oluşturabilirsiniz. Bu özelliği RabbitMQ ile elde edemezsiniz.
Idempotency ve Exactly-Once Processing
CQRS sistemlerinin en kritik sorunu: aynı command veya event’in birden fazla işlenmesi. Bunu çözmek için idempotency mekanizması şart:
import hashlib
import redis
from functools import wraps
class IdempotencyMiddleware:
def __init__(self, redis_client, ttl_seconds=86400):
self.redis = redis_client
self.ttl = ttl_seconds
def ensure_idempotent(self, handler_func):
"""
Decorator: Aynı command_id ile gelen komutları tekrar işleme
"""
@wraps(handler_func)
def wrapper(command: dict):
command_id = command.get("command_id")
if not command_id:
raise ValueError("command_id zorunludur")
# Redis'te işlenmiş mi kontrol et
idempotency_key = f"processed_command:{command_id}"
already_processed = self.redis.get(idempotency_key)
if already_processed:
result = json.loads(already_processed)
logger.info(
f"Command zaten işlenmiş, cache'den döndürülüyor: {command_id}"
)
return result
# İşle ve sonucu kaydet
result = handler_func(command)
self.redis.setex(
idempotency_key,
self.ttl,
json.dumps(result, default=str)
)
return result
return wrapper
# Kullanım
redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
idempotency = IdempotencyMiddleware(redis_client)
@idempotency.ensure_idempotent
def handle_create_order(command: dict):
# Command işleme mantığı
pass
Eventual Consistency ile Yaşamak
CQRS’in en büyük dezavantajlarından biri eventual consistency. Yazma tarafında işlem tamamlandıktan sonra okuma modeli hemen güncellenmiyor. Bu gecikme milisaniyelerden birkaç saniyeye uzayabilir. Kullanıcıya “siparişiniz oluşturuldu” dedikten birkaç saniye sonra hala “sipariş bulunamadı” dönebilirsiniz.
Bu durumu yönetmenin birkaç yolu var:
- Optimistic UI: Frontend, sunucudan gelen yanıtı beklemeden UI’ı günceller. Hata gelirse geri alır.
- Polling: Yazma işlemi sonrası client belirli aralıklarla okuma modelini yoklar.
- WebSocket Push: Event işlendiğinde server client’a push notification gönderir.
- Read-your-writes consistency: Kullanıcı kendi yazdığı veriye erişirken geçici olarak yazma modelini kullanır.
Son yaklaşım için basit bir implementasyon:
class SmartOrderQueryHandler:
def __init__(self, read_model_handler, write_model_handler, redis_client):
self.read_model = read_model_handler
self.write_model = write_model_handler
self.redis = redis_client
def get_order(self, order_id: str, requesting_user_id: str):
"""
Kullanıcı kendi oluşturduğu siparişe erişiyorsa
ve okuma modeli henüz güncellenmediyse,
yazma modelinden döndür
"""
# Bu kullanıcının bu siparişi az önce oluşturup oluşturmadığını kontrol et
recent_key = f"recent_order:{requesting_user_id}:{order_id}"
is_recent_write = self.redis.get(recent_key)
if is_recent_write:
logger.info(f"Recent write detected, using write model for order {order_id}")
return self.write_model.get_order_by_id(order_id)
# Normal akış - okuma modelinden getir
result = self.read_model.get_order(order_id)
if not result:
# Okuma modeli henüz güncellenmemiş olabilir, fallback
logger.warning(f"Read model miss for order {order_id}, falling back to write model")
return self.write_model.get_order_by_id(order_id)
return result
Monitoring ve Observability
CQRS sistemlerinde izleme kritik önem taşıyor. Command queue lag, projection lag ve consistency gap metrikleri olmadan sistemi kör yönetirsiniz:
# RabbitMQ queue derinliğini izle
# Management API üzerinden
curl -s -u guest:guest
http://localhost:15672/api/queues/%2F/order_commands |
jq '{
queue: .name,
messages: .messages,
messages_ready: .messages_ready,
messages_unacknowledged: .messages_unacknowledged,
consumer_count: .consumers,
publish_rate: .message_stats.publish_details.rate,
deliver_rate: .message_stats.deliver_details.rate
}'
# Prometheus metrik endpoint'inden CQRS lag'ı oku
curl -s http://localhost:9090/api/v1/query
--data-urlencode 'query=cqrs_projection_lag_seconds{service="order-service"}' |
jq '.data.result[] | {
instance: .metric.instance,
lag_seconds: .value[1]
}'
# Kafka consumer group lag'ını kontrol et
kafka-consumer-groups.sh
--bootstrap-server localhost:9092
--group order-read-model-projector
--describe |
awk 'NR>1 {
lag = $6;
if (lag > 1000) print "UYARI: Yüksek lag tespit edildi - Partition " $3 ": " lag " mesaj"
}'
Gerçek Dünya Tuzakları
Birkaç yıllık CQRS deneyiminde karşılaştığım ve başkalarının da sıklıkla düştüğü tuzakları paylaşayım:
- Her şeye CQRS uygulamak: CQRS her sisteme uymaz. Basit CRUD uygulamalarında gereksiz karmaşıklık yaratır. Yoğun iş kuralları, farklı ölçekleme gereksinimleri ve yüksek trafikli sistemler için düşünün.
- Command Handler’da sorgu yapmak: Command Handler içinde “bu müşterinin kaç siparişi var?” gibi sorgular yapmak temeli yıkıyor. Bu bilgiyi ya command’ın içinde gönderin ya da ayrı bir anti-corruption layer ile yönetin.
- Projection’ları senkron yazmak: Okuma modelini senkron olarak güncellemek, event’in işlenmesini yavaşlatır ve CQRS’in faydalarını ortadan kaldırır. Her zaman asenkron tutun.
- Eksik idempotency: At-least-once delivery semantics kullanan tüm mesaj kuyruklarında idempotency şart. Bunu atladığınızda aynı siparişin iki kez oluştuğunu görmek şok edici olabiliyor.
- Projection state’ini test etmemek: Command tarafını test edip projection’ları ihmal etmek yaygın bir hata. Projection’lar için integration test yazın ve her event tipinin okuma modelini doğru güncellediğini doğrulayın.
Sonuç
CQRS, doğru uygulandığında sistemlere inanılmaz bir esneklik kazandırıyor. Okuma ve yazma taraflarını bağımsız olarak scale edebilmek, farklı teknolojiler kullanabilmek ve okuma modellerini iş ihtiyaçlarına göre özelleştirebilmek ciddi avantajlar. Ama bu pattern bir bedelle geliyor: artan sistem karmaşıklığı, eventual consistency yönetimi ve güçlü bir altyapı ihtiyacı.
Benim önerim: Önce monolitik bir yapıda mantıksal CQRS uygulayın. Aynı veritabanında ayrı okuma ve yazma modelleri oluşturun, command ve query handler’larınızı ayırın. Sisteminiz büyüdükçe ve gerçekten ayrı veritabanlarına ve mesaj kuyruklarına ihtiyaç duyduğunuzda fiziksel ayrımı uygulayın. Sıfırdan “tam CQRS + Event Sourcing + Kafka” kurmak, deneyimsiz ekipler için genellikle felakete davet çıkarmak anlamına geliyor.
Mesaj kuyruğu mimarisi söz konusu olduğunda CQRS, Saga Pattern ve Event Sourcing üçlüsü birbirini tamamlıyor. Bu üçünü anlamak, modern dağıtık sistem tasarımında çok önemli bir bilgi birikimi sağlıyor. Bir sonraki yazıda Saga Pattern’i ve distributed transaction yönetimini incelemeyi planlıyorum.
