Cloudflare Workers ile Gerçek Zamanlı Bildirim Sistemi ve WebSocket Yönetimi
Edge computing dünyasında sunucusuz mimari artık sadece basit API endpoint’leri yazmaktan ibaret değil. Cloudflare Workers, WebSocket desteğiyle birlikte gerçek zamanlı bildirim sistemleri kurmak için son derece güçlü bir platform haline geldi. Geleneksel yaklaşımda WebSocket bağlantılarını yönetmek için ayrı bir sunucu, Redis Pub/Sub ve load balancer konfigürasyonu gerekiyordu. Workers ve Durable Objects kombinasyonu bu karmaşıklığı büyük ölçüde ortadan kaldırıyor.
Bu yazıda, production ortamında çalışan bir bildirim sistemi kuracağız. Kullanıcılara anlık bildirim gönderme, bağlantı yönetimi ve ölçeklendirme konularını ele alacağız.
Mimariyi Anlamak
Cloudflare Workers’ta WebSocket yönetiminin klasik Node.js’ten farkını kavramak önemli. Standart Workers stateless çalışır, yani her request bağımsız bir context’te işlenir. WebSocket’ler ise kalıcı bağlantı gerektirir.
İşte bu noktada Durable Objects devreye giriyor. Durable Objects, global olarak tek bir instance’a sahip, durumu koruyan JavaScript nesneleri. Bir WebSocket hub’ı için mükemmel uyum sağlıyor:
- Worker: İstemci isteklerini karşılar, Durable Object’e yönlendirir
- Durable Object: WebSocket bağlantılarını tutar, mesaj dağıtımını yönetir
- KV Store: Kalıcı bildirim geçmişi için kullanılır
- R2 veya D1: Kullanıcı tercihlerini saklamak için tercih edilebilir
Bir e-ticaret platformu için düşünelim. Sipariş durumu değiştiğinde, müşteriye anlık bildirim göndermek istiyoruz. Aynı müşteri birden fazla sekme açmış olabilir, belki mobil uygulama da bağlı. Tüm bu bağlantıların senkronize çalışması gerekiyor.
Proje Yapısını Kurmak
Önce Wrangler CLI ile projeyi başlatıyoruz:
npm install -g wrangler
wrangler login
wrangler init notification-system --type=javascript
cd notification-system
wrangler.toml dosyasını düzenleyelim:
cat > wrangler.toml << 'EOF'
name = "notification-system"
main = "src/index.js"
compatibility_date = "2024-01-01"
[durable_objects]
bindings = [
{ name = "NOTIFICATION_HUB", class_name = "NotificationHub" }
]
[[migrations]]
tag = "v1"
new_classes = ["NotificationHub"]
[[kv_namespaces]]
binding = "NOTIFICATION_HISTORY"
id = "YOUR_KV_NAMESPACE_ID"
[vars]
ENVIRONMENT = "production"
MAX_CONNECTIONS_PER_USER = "10"
EOF
KV namespace oluşturmak için:
wrangler kv:namespace create "NOTIFICATION_HISTORY"
wrangler kv:namespace create "NOTIFICATION_HISTORY" --preview
# Çıktıdaki ID'yi wrangler.toml'a ekle
Durable Object ile WebSocket Hub
Ana Durable Object sınıfını yazıyoruz. Bu sınıf, tüm WebSocket bağlantılarını bellekte tutuyor ve mesaj dağıtımını koordine ediyor:
cat > src/notification-hub.js << 'EOF'
export class NotificationHub {
constructor(state, env) {
this.state = state;
this.env = env;
// Kullanıcı ID'si -> WebSocket listesi mapping'i
this.sessions = new Map();
// Her bağlantı için metadata
this.connectionMeta = new Map();
}
async fetch(request) {
const url = new URL(request.url);
const path = url.pathname;
if (path === "/websocket") {
return this.handleWebSocket(request);
}
if (path === "/broadcast") {
return this.handleBroadcast(request);
}
if (path === "/stats") {
return this.handleStats();
}
return new Response("Not Found", { status: 404 });
}
async handleWebSocket(request) {
const upgradeHeader = request.headers.get("Upgrade");
if (!upgradeHeader || upgradeHeader !== "websocket") {
return new Response("WebSocket bağlantısı gerekli", { status: 426 });
}
const userId = new URL(request.url).searchParams.get("userId");
const token = new URL(request.url).searchParams.get("token");
if (!userId || !token) {
return new Response("Kimlik doğrulama parametreleri eksik", { status: 401 });
}
// Token doğrulama (gerçek implementasyonda JWT doğrula)
const isValid = await this.validateToken(token, userId);
if (!isValid) {
return new Response("Geçersiz token", { status: 401 });
}
const [client, server] = Object.values(new WebSocketPair());
await this.handleSession(server, userId);
return new Response(null, {
status: 101,
webSocket: client,
});
}
async handleSession(webSocket, userId) {
webSocket.accept();
const connectionId = crypto.randomUUID();
const meta = {
userId,
connectionId,
connectedAt: Date.now(),
lastPing: Date.now(),
};
// Kullanıcının mevcut bağlantı listesine ekle
if (!this.sessions.has(userId)) {
this.sessions.set(userId, new Set());
}
this.sessions.get(userId).add(webSocket);
this.connectionMeta.set(webSocket, meta);
// Bağlantı kuruldu mesajı gönder
webSocket.send(JSON.stringify({
type: "connected",
connectionId,
timestamp: Date.now(),
}));
// Bekleyen bildirimleri gönder
await this.sendPendingNotifications(webSocket, userId);
webSocket.addEventListener("message", async (event) => {
try {
const data = JSON.parse(event.data);
await this.handleClientMessage(webSocket, userId, data);
} catch (err) {
webSocket.send(JSON.stringify({
type: "error",
message: "Geçersiz mesaj formatı",
}));
}
});
webSocket.addEventListener("close", () => {
this.removeConnection(webSocket, userId);
});
webSocket.addEventListener("error", () => {
this.removeConnection(webSocket, userId);
});
}
async handleClientMessage(webSocket, userId, data) {
const meta = this.connectionMeta.get(webSocket);
switch (data.type) {
case "ping":
meta.lastPing = Date.now();
webSocket.send(JSON.stringify({ type: "pong", timestamp: Date.now() }));
break;
case "mark_read":
await this.markNotificationRead(userId, data.notificationId);
webSocket.send(JSON.stringify({
type: "marked_read",
notificationId: data.notificationId,
}));
break;
case "subscribe":
meta.channels = data.channels || [];
webSocket.send(JSON.stringify({
type: "subscribed",
channels: meta.channels,
}));
break;
default:
webSocket.send(JSON.stringify({
type: "error",
message: `Bilinmeyen mesaj tipi: ${data.type}`,
}));
}
}
async handleBroadcast(request) {
const body = await request.json();
const { userId, notification, targetAll } = body;
if (targetAll) {
// Tüm bağlı kullanıcılara gönder
let totalSent = 0;
for (const [uid, sockets] of this.sessions.entries()) {
totalSent += this.sendToSockets(sockets, notification);
}
return new Response(JSON.stringify({ sent: totalSent }), {
headers: { "Content-Type": "application/json" },
});
}
if (!userId) {
return new Response("userId veya targetAll gerekli", { status: 400 });
}
// Bildirimi KV'ye kaydet (kalıcılık için)
await this.saveNotification(userId, notification);
const userSockets = this.sessions.get(userId);
if (!userSockets || userSockets.size === 0) {
return new Response(JSON.stringify({
sent: 0,
queued: true,
message: "Kullanıcı çevrimdışı, bildirim kuyruğa alındı",
}), { headers: { "Content-Type": "application/json" } });
}
const sent = this.sendToSockets(userSockets, notification);
return new Response(JSON.stringify({ sent, queued: false }), {
headers: { "Content-Type": "application/json" },
});
}
sendToSockets(sockets, notification) {
let sent = 0;
const deadSockets = [];
for (const socket of sockets) {
try {
socket.send(JSON.stringify({
type: "notification",
...notification,
timestamp: Date.now(),
}));
sent++;
} catch (err) {
deadSockets.push(socket);
}
}
// Ölü bağlantıları temizle
for (const dead of deadSockets) {
const meta = this.connectionMeta.get(dead);
if (meta) {
this.removeConnection(dead, meta.userId);
}
}
return sent;
}
removeConnection(webSocket, userId) {
const userSockets = this.sessions.get(userId);
if (userSockets) {
userSockets.delete(webSocket);
if (userSockets.size === 0) {
this.sessions.delete(userId);
}
}
this.connectionMeta.delete(webSocket);
}
async handleStats() {
const stats = {
totalUsers: this.sessions.size,
totalConnections: this.connectionMeta.size,
users: [],
};
for (const [userId, sockets] of this.sessions.entries()) {
stats.users.push({
userId,
connections: sockets.size,
});
}
return new Response(JSON.stringify(stats), {
headers: { "Content-Type": "application/json" },
});
}
async saveNotification(userId, notification) {
const key = `pending:${userId}`;
const existing = await this.env.NOTIFICATION_HISTORY.get(key, "json") || [];
existing.push({
...notification,
id: crypto.randomUUID(),
createdAt: Date.now(),
read: false,
});
// Son 50 bildirimi tut
const trimmed = existing.slice(-50);
await this.env.NOTIFICATION_HISTORY.put(key, JSON.stringify(trimmed), {
expirationTtl: 86400 * 7, // 7 gün
});
}
async sendPendingNotifications(webSocket, userId) {
const key = `pending:${userId}`;
const pending = await this.env.NOTIFICATION_HISTORY.get(key, "json") || [];
const unread = pending.filter(n => !n.read);
if (unread.length > 0) {
webSocket.send(JSON.stringify({
type: "pending_notifications",
count: unread.length,
notifications: unread,
}));
}
}
async markNotificationRead(userId, notificationId) {
const key = `pending:${userId}`;
const notifications = await this.env.NOTIFICATION_HISTORY.get(key, "json") || [];
const updated = notifications.map(n =>
n.id === notificationId ? { ...n, read: true } : n
);
await this.env.NOTIFICATION_HISTORY.put(key, JSON.stringify(updated), {
expirationTtl: 86400 * 7,
});
}
async validateToken(token, userId) {
// Gerçek implementasyonda JWT doğrulaması yapılmalı
// Burada basit bir örnek gösteriyoruz
const storedToken = await this.env.NOTIFICATION_HISTORY.get(`token:${userId}`);
return storedToken === token;
}
}
EOF
Ana Worker Entry Point
cat > src/index.js << 'EOF'
export { NotificationHub } from "./notification-hub.js";
export default {
async fetch(request, env, ctx) {
const url = new URL(request.url);
const path = url.pathname;
// CORS headers
const corsHeaders = {
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "GET, POST, OPTIONS",
"Access-Control-Allow-Headers": "Content-Type, Authorization",
};
if (request.method === "OPTIONS") {
return new Response(null, { headers: corsHeaders });
}
// WebSocket bağlantısı için Durable Object'e yönlendir
if (path === "/ws") {
const userId = url.searchParams.get("userId");
if (!userId) {
return new Response("userId parametresi gerekli", { status: 400 });
}
// Kullanıcıya özel Durable Object ID oluştur
// Tüm kullanıcılar aynı hub'a bağlanır
const hubId = env.NOTIFICATION_HUB.idFromName("global-hub");
const hub = env.NOTIFICATION_HUB.get(hubId);
return hub.fetch(new Request(`${url.origin}/websocket${url.search}`, request));
}
// Bildirim gönderme endpoint'i (internal API)
if (path === "/api/notify" && request.method === "POST") {
return handleNotifyRequest(request, env, corsHeaders);
}
// Sistem istatistikleri
if (path === "/api/stats") {
const hubId = env.NOTIFICATION_HUB.idFromName("global-hub");
const hub = env.NOTIFICATION_HUB.get(hubId);
const response = await hub.fetch(new Request(`${url.origin}/stats`));
return new Response(response.body, {
headers: { ...corsHeaders, "Content-Type": "application/json" },
});
}
return new Response("Cloudflare Workers Bildirim Sistemi", {
headers: corsHeaders,
});
},
};
async function handleNotifyRequest(request, env, corsHeaders) {
// İç API güvenliği - secret key kontrolü
const authHeader = request.headers.get("Authorization");
const expectedSecret = `Bearer ${env.INTERNAL_API_SECRET}`;
if (authHeader !== expectedSecret) {
return new Response("Yetkisiz erişim", { status: 401 });
}
const body = await request.json();
// Input validasyonu
if (!body.userId && !body.targetAll) {
return new Response("userId veya targetAll alanı gerekli", { status: 400 });
}
if (!body.notification?.title || !body.notification?.message) {
return new Response("notification.title ve notification.message gerekli", { status: 400 });
}
const hubId = env.NOTIFICATION_HUB.idFromName("global-hub");
const hub = env.NOTIFICATION_HUB.get(hubId);
const hubRequest = new Request(`https://hub/broadcast`, {
method: "POST",
body: JSON.stringify(body),
headers: { "Content-Type": "application/json" },
});
const result = await hub.fetch(hubRequest);
const resultData = await result.json();
return new Response(JSON.stringify(resultData), {
headers: { ...corsHeaders, "Content-Type": "application/json" },
});
}
EOF
İstemci Tarafı JavaScript
Frontend’de WebSocket bağlantısını yönetmek için reconnect mantığını da dahil eden bir sınıf yazıyoruz:
cat > public/notification-client.js << 'EOF'
class NotificationClient {
constructor(userId, token, options = {}) {
this.userId = userId;
this.token = token;
this.wsUrl = options.wsUrl || "wss://your-worker.workers.dev/ws";
this.reconnectDelay = options.reconnectDelay || 3000;
this.maxReconnectAttempts = options.maxReconnectAttempts || 5;
this.reconnectAttempts = 0;
this.ws = null;
this.pingInterval = null;
this.listeners = {};
}
connect() {
const url = `${this.wsUrl}?userId=${this.userId}&token=${this.token}`;
this.ws = new WebSocket(url);
this.ws.onopen = () => {
console.log("Bildirim sistemine bağlandı");
this.reconnectAttempts = 0;
this.startPing();
this.emit("connected");
};
this.ws.onmessage = (event) => {
const data = JSON.parse(event.data);
this.handleMessage(data);
};
this.ws.onclose = () => {
this.stopPing();
this.emit("disconnected");
this.scheduleReconnect();
};
this.ws.onerror = (error) => {
console.error("WebSocket hatası:", error);
this.emit("error", error);
};
}
handleMessage(data) {
switch (data.type) {
case "notification":
this.showNotification(data);
this.emit("notification", data);
break;
case "pending_notifications":
data.notifications.forEach(n => this.emit("notification", n));
break;
case "pong":
// Bağlantı canlı
break;
case "connected":
console.log("Connection ID:", data.connectionId);
break;
}
}
showNotification(data) {
if ("Notification" in window && Notification.permission === "granted") {
new Notification(data.title, {
body: data.message,
icon: data.icon || "/favicon.ico",
});
}
}
startPing() {
this.pingInterval = setInterval(() => {
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify({ type: "ping" }));
}
}, 30000);
}
stopPing() {
if (this.pingInterval) {
clearInterval(this.pingInterval);
this.pingInterval = null;
}
}
scheduleReconnect() {
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
console.error("Maksimum yeniden bağlanma denemesi aşıldı");
return;
}
this.reconnectAttempts++;
const delay = this.reconnectDelay * this.reconnectAttempts;
console.log(`${delay}ms sonra yeniden bağlanılıyor...`);
setTimeout(() => this.connect(), delay);
}
markRead(notificationId) {
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify({
type: "mark_read",
notificationId,
}));
}
}
on(event, callback) {
if (!this.listeners[event]) {
this.listeners[event] = [];
}
this.listeners[event].push(callback);
}
emit(event, data) {
(this.listeners[event] || []).forEach(cb => cb(data));
}
disconnect() {
this.stopPing();
this.ws?.close();
}
}
// Kullanım örneği
const client = new NotificationClient("user-123", "jwt-token-here", {
wsUrl: "wss://notification-system.your-subdomain.workers.dev/ws",
});
client.on("notification", (notification) => {
console.log("Yeni bildirim:", notification);
// UI'ı güncelle
document.getElementById("notification-count").textContent++;
});
client.connect();
EOF
Bildirim Gönderme API’si Testi
Sistemi test etmek için birkaç curl komutu:
# Tek kullanıcıya bildirim gönder
curl -X POST https://notification-system.workers.dev/api/notify
-H "Authorization: Bearer your-internal-secret"
-H "Content-Type: application/json"
-d '{
"userId": "user-123",
"notification": {
"title": "Siparişiniz Güncellendi",
"message": "Sipariş #1234 kargoya verildi",
"type": "order_update",
"data": {
"orderId": "1234",
"status": "shipped"
}
}
}'
# Tüm bağlı kullanıcılara broadcast
curl -X POST https://notification-system.workers.dev/api/notify
-H "Authorization: Bearer your-internal-secret"
-H "Content-Type: application/json"
-d '{
"targetAll": true,
"notification": {
"title": "Sistem Bakımı",
"message": "30 dakika içinde planlı bakım başlayacak",
"type": "system_alert"
}
}'
# Bağlantı istatistiklerini görüntüle
curl https://notification-system.workers.dev/api/stats
# Deploy et
wrangler deploy
# Logları takip et
wrangler tail --format=pretty
Production İçin Rate Limiting
Sistemi aşırı yükten korumak için rate limiting ekleyelim:
cat > src/rate-limiter.js << 'EOF'
export class RateLimiter {
constructor(env) {
this.env = env;
this.windowMs = 60000; // 1 dakika
this.maxRequests = 100;
}
async check(identifier) {
const key = `ratelimit:${identifier}:${Math.floor(Date.now() / this.windowMs)}`;
const current = await this.env.NOTIFICATION_HISTORY.get(key);
const count = parseInt(current || "0");
if (count >= this.maxRequests) {
return {
allowed: false,
remaining: 0,
resetAt: Math.ceil(Date.now() / this.windowMs) * this.windowMs,
};
}
await this.env.NOTIFICATION_HISTORY.put(
key,
String(count + 1),
{ expirationTtl: 120 }
);
return {
allowed: true,
remaining: this.maxRequests - count - 1,
resetAt: Math.ceil(Date.now() / this.windowMs) * this.windowMs,
};
}
}
// Kullanım - index.js içinde
// const limiter = new RateLimiter(env);
// const result = await limiter.check(clientIP);
// if (!result.allowed) {
// return new Response("Rate limit aşıldı", {
// status: 429,
// headers: { "Retry-After": String((result.resetAt - Date.now()) / 1000) }
// });
// }
EOF
Gerçek Dünya Senaryosu: E-Ticaret Entegrasyonu
Sipariş yönetim sisteminden bildirim göndermek için bir örnek:
# Node.js backend'den bildirim tetikleme
cat > backend-integration-example.js << 'EOF'
class NotificationService {
constructor(workerUrl, apiSecret) {
this.workerUrl = workerUrl;
this.apiSecret = apiSecret;
}
async sendOrderUpdate(userId, orderId, status, details = {}) {
const statusMessages = {
confirmed: "Siparişiniz onaylandı",
preparing: "Siparişiniz hazırlanıyor",
shipped: "Siparişiniz kargoya verildi",
delivered: "Siparişiniz teslim edildi",
cancelled: "Siparişiniz iptal edildi",
};
const notification = {
title: "Sipariş Güncelleme",
message: statusMessages[status] || `Sipariş durumu: ${status}`,
type: "order_update",
priority: status === "cancelled" ? "high" : "normal",
data: { orderId, status, ...details },
actions: status === "delivered" ? [
{ label: "Değerlendir", url: `/orders/${orderId}/review` },
] : [],
};
const response = await fetch(`${this.workerUrl}/api/notify`, {
method: "POST",
headers: {
"Authorization": `Bearer ${this.apiSecret}`,
"Content-Type": "application/json",
},
body: JSON.stringify({ userId, notification }),
});
if (!response.ok) {
const error = await response.text();
throw new Error(`Bildirim gönderilemedi: ${error}`);
}
return response.json();
}
}
// Kullanım
const notifier = new NotificationService(
"https://notification-system.workers.dev",
process.env.NOTIFICATION_SECRET
);
// Sipariş durumu değiştiğinde çağır
await notifier.sendOrderUpdate("user-456", "order-789", "shipped", {
trackingNumber: "TR123456789",
carrier: "Yurtiçi Kargo",
estimatedDelivery: "2024-12-20",
});
EOF
İzleme ve Hata Ayıklama
Production’da sistemin sağlıklı çalışıp çalışmadığını kontrol etmek için birkaç pratik yöntem:
- Wrangler tail komutu ile gerçek zamanlı log akışını izleyebilirsiniz.
wrangler tail --format=prettykomutu JSON yerine okunabilir format verir - Cloudflare Dashboard üzerinden Worker Analytics kısmı, saniyedeki istek sayısını, CPU süresini ve hata oranlarını gösterir
- Durable Object metrikleri özellikle önemli. Tek bir hub’ın aşırı yüklenmesi durumunda
idFromNameyerine kullanıcı gruplarına göre farklı hub’lar oluşturabilirsiniz - KV okuma/yazma limitleri free plan’da dakikada 1000 ile sınırlı. Bunu aşıyorsanız paid plan’a geçmeli ya da önbellek katmanı eklemelisiniz
- WebSocket bağlantı sayısı tek bir Durable Object için pratik sınır yaklaşık 32.000. Bunu aşan kullanıcı tabanları için sharding gerekir
Hata durumlarında ise Dead Letter Queue mantığı uygulamak iyi bir pratik. Bildirim gönderilemediğinde KV’ye kaydedip, kullanıcı bir sonraki bağlandığında göndermek sistemi daha güvenilir yapıyor. Bunu zaten sendPendingNotifications metodunda implement ettik.
Sonuç
Cloudflare Workers ve Durable Objects kombinasyonu, geleneksel mimarilerde birden fazla bileşen gerektiren gerçek zamanlı sistemleri tek bir platform üzerinde çalıştırmayı mümkün kılıyor. Edge’de çalışması sayesinde kullanıcılara yakın sunucularda işlem yapılıyor, bu da gecikmeyi minimuma indiriyor.
Bu yazıda inşa ettiğimiz sistem şunları yapabiliyor:
- Birden fazla sekme veya cihazda aynı anda bağlı kullanıcılara anlık bildirim gönderme
- Çevrimdışı kullanıcılar için bildirim kuyruğu tutma ve yeniden bağlandığında iletme
- Ping/pong mekanizmasıyla bağlantı sağlığını izleme
- Otomatik reconnect ve hatalı bağlantıları temizleme
Sistemi genişletmek için birkaç öneri: Alarm API kullanarak periyodik temizlik işlemleri çalıştırabilirsiniz. D1 veritabanı ile bildirim geçmişini daha yapılandırılmış tutabilirsiniz. Queue API ile yüksek hacimli bildirim gönderimlerini asenkron işleyebilirsiniz.
Cloudflare Workers ekosistemi hızla olgunlaşıyor ve bu tür kullanım senaryoları için giderek daha cazip hale geliyor. Özellikle global ölçekte çalışması gereken sistemler için geleneksel sunucu tabanlı mimarilere kıyasla hem maliyet hem de operasyonel karmaşıklık açısından ciddi avantajlar sunuyor.
