Apollo Server ile WebSocket Üzerinden Gerçek Zamanlı GraphQL Subscriptions Kurulumu

Gerçek zamanlı uygulamalar geliştirirken en sık karşılaştığım sorulardan biri şu oluyor: “REST API ile polling yapmak yerine neden subscription kullanayım?” Cevap basit; polling sunucuyu gereksiz yere döver, kaynakları boşa harcar ve gecikme kaçınılmaz olur. GraphQL Subscriptions ise WebSocket bağlantısı üzerinden sunucudan istemciye anlık veri akışı sağlar. Canlı bildirimler, gerçek zamanlı sohbet uygulamaları, hisse senedi fiyatları veya oyun skorları gibi senaryolarda bu yaklaşım hem daha temiz hem de çok daha verimli çalışır. Bu yazıda Apollo Server 4 ile WebSocket üzerinden subscription kurulumunu adım adım anlatacağım, üretim ortamında dikkat etmeniz gereken detayları da atlamayacağım.

Temel Kavramlar ve Mimari

Subscription’ların nasıl çalıştığını anlamadan kuruluma geçmek, ileride başınızı ağrıtacak sorunlara zemin hazırlar. Klasik HTTP isteklerinde istemci bir istek gönderir, sunucu yanıt verir ve bağlantı kapanır. WebSocket ise kalıcı, çift yönlü bir bağlantı tünel açar. GraphQL subscription’lar bu tünel üzerinden çalışır.

Apollo Server, subscription desteği için graphql-ws kütüphanesini kullanır. Eski sürümlerde subscriptions-transport-ws vardı, ancak artık bu paket bakımsız ve bu nedenle üretim ortamında graphql-ws tercih edilmeli.

Mimari şöyle işler:

  • İstemci WebSocket bağlantısı açar
  • Subscription query’si gönderir
  • Sunucu event yayınladığında istemciye push eder
  • İstemci bağlantıyı kesene kadar akış sürer

Proje Kurulumu

Önce temiz bir Node.js projesi oluşturalım:

mkdir graphql-subscriptions-demo
cd graphql-subscriptions-demo
npm init -y
npm install @apollo/server graphql graphql-ws ws @graphql-tools/schema
npm install express cors body-parser
npm install -D typescript ts-node @types/node @types/ws nodemon

TypeScript kullanacağız çünkü üretim ortamında tip güvenliği hayat kurtarır. tsconfig.json dosyasını oluşturalım:

cat > tsconfig.json << 'EOF'
{
  "compilerOptions": {
    "target": "ES2020",
    "module": "commonjs",
    "lib": ["ES2020"],
    "outDir": "./dist",
    "rootDir": "./src",
    "strict": true,
    "esModuleInterop": true,
    "skipLibCheck": true,
    "forceConsistentCasingInFileNames": true,
    "resolveJsonModule": true
  },
  "include": ["src/**/*"],
  "exclude": ["node_modules", "dist"]
}
EOF

Schema Tanımı ve PubSub Mekanizması

GraphQL subscription’ların can damarı PubSub (publish/subscribe) mekanizmasıdır. Sunucu içindeki bir olay tetiklendiğinde ilgili topic’e yayın yapılır, o topic’e abone olan istemciler mesajı alır. Apollo Server’ın kendi PubSub implementasyonu tek sunucu senaryoları için gayet iyi çalışır ancak yatay ölçekleme gerektiğinde Redis PubSub’a geçiş yapmanız gerekir.

Önce şema ve resolver dosyamızı yazalım:

mkdir src
cat > src/schema.ts << 'EOF'
import { gql } from 'graphql-tag';

export const typeDefs = gql`
  type Message {
    id: ID!
    content: String!
    author: String!
    channel: String!
    createdAt: String!
  }

  type Notification {
    id: ID!
    type: String!
    message: String!
    timestamp: String!
  }

  type Query {
    messages(channel: String!): [Message!]!
    ping: String!
  }

  type Mutation {
    sendMessage(content: String!, author: String!, channel: String!): Message!
    triggerNotification(type: String!, message: String!): Notification!
  }

  type Subscription {
    messageSent(channel: String!): Message!
    notificationReceived: Notification!
    userTyping(channel: String!): String!
  }
`;
EOF

Şimdi PubSub örneği ve in-memory store oluşturalım:

cat > src/pubsub.ts << 'EOF'
import { PubSub } from 'graphql-subscriptions';

// Tek instance olarak export ediyoruz
export const pubsub = new PubSub();

// Event isimleri için sabitler - typo hatalarını önler
export const EVENTS = {
  MESSAGE_SENT: 'MESSAGE_SENT',
  NOTIFICATION_RECEIVED: 'NOTIFICATION_RECEIVED',
  USER_TYPING: 'USER_TYPING',
} as const;

// Basit in-memory store (prod'da Redis veya DB kullanın)
export const messageStore: Map<string, Array<{
  id: string;
  content: string;
  author: string;
  channel: string;
  createdAt: string;
}>> = new Map();
EOF

Resolver’ların Yazılması

Subscription resolver’larının yapısı Query/Mutation’dan farklıdır. Her subscription field’ı için subscribe ve resolve fonksiyonları yazılır:

cat > src/resolvers.ts << 'EOF'
import { pubsub, EVENTS, messageStore } from './pubsub';
import { v4 as uuidv4 } from 'uuid';
// npm install uuid @types/uuid

export const resolvers = {
  Query: {
    messages: (_: unknown, { channel }: { channel: string }) => {
      return messageStore.get(channel) || [];
    },
    ping: () => 'pong',
  },

  Mutation: {
    sendMessage: async (
      _: unknown,
      { content, author, channel }: { content: string; author: string; channel: string }
    ) => {
      const message = {
        id: uuidv4(),
        content,
        author,
        channel,
        createdAt: new Date().toISOString(),
      };

      // Store'a kaydet
      const channelMessages = messageStore.get(channel) || [];
      channelMessages.push(message);
      messageStore.set(channel, channelMessages);

      // Abonelere yayın yap
      await pubsub.publish(EVENTS.MESSAGE_SENT, { messageSent: message });

      return message;
    },

    triggerNotification: async (
      _: unknown,
      { type, message }: { type: string; message: string }
    ) => {
      const notification = {
        id: uuidv4(),
        type,
        message,
        timestamp: new Date().toISOString(),
      };

      await pubsub.publish(EVENTS.NOTIFICATION_RECEIVED, {
        notificationReceived: notification,
      });

      return notification;
    },
  },

  Subscription: {
    messageSent: {
      // asyncIterator: hangi event'i dinleyeceğimizi belirtir
      subscribe: (_: unknown, { channel }: { channel: string }) => {
        console.log(`[Subscription] Yeni abone: channel="${channel}"`);
        return pubsub.asyncIterator([EVENTS.MESSAGE_SENT]);
      },
      // resolve: gelen event'i filtrele ve şekillendik
      resolve: (payload: { messageSent: any }, { channel }: { channel: string }) => {
        if (payload.messageSent.channel !== channel) {
          return null;
        }
        return payload.messageSent;
      },
    },

    notificationReceived: {
      subscribe: () => pubsub.asyncIterator([EVENTS.NOTIFICATION_RECEIVED]),
      resolve: (payload: { notificationReceived: any }) => payload.notificationReceived,
    },

    userTyping: {
      subscribe: (_: unknown, { channel }: { channel: string }) => {
        return pubsub.asyncIterator([`${EVENTS.USER_TYPING}_${channel}`]);
      },
      resolve: (payload: { userTyping: string }) => payload.userTyping,
    },
  },
};
EOF

Dikkat edilmesi gereken önemli bir nokta: messageSent subscription’ında resolve fonksiyonu içinde kanal filtresi uyguladık. Bu yaklaşım işe yarasa da yüzlerce farklı kanalla binlerce abonen varsa tüm mesajları tüm abonelere broadcast edip sonra filtrelemek verimsizdir. withFilter kullanmak daha temizdir.

withFilter ile Akıllı Filtreleme

cat > src/resolvers-filtered.ts << 'EOF'
import { withFilter } from 'graphql-subscriptions';
import { pubsub, EVENTS } from './pubsub';

// Sadece ilgili kısım - diğer resolver'lar aynı kalır
export const subscriptionResolvers = {
  Subscription: {
    messageSent: {
      subscribe: withFilter(
        // İlk argüman: asyncIterator factory
        () => pubsub.asyncIterator([EVENTS.MESSAGE_SENT]),
        // İkinci argüman: filtre fonksiyonu - true dönerse istemciye iletilir
        (payload, variables) => {
          return payload.messageSent.channel === variables.channel;
        }
      ),
    },
    
    notificationReceived: {
      subscribe: withFilter(
        () => pubsub.asyncIterator([EVENTS.NOTIFICATION_RECEIVED]),
        // Context üzerinden kullanıcı bazlı filtreleme de yapabilirsiniz
        (payload, variables, context) => {
          // Örnek: sadece admin kullanıcılar kritik bildirimleri alır
          if (payload.notificationReceived.type === 'CRITICAL') {
            return context.user?.role === 'admin';
          }
          return true;
        }
      ),
    },
  },
};
EOF

Ana Sunucu Kurulumu

Şimdi en kritik kısma geldik: HTTP ve WebSocket sunucularını birlikte çalıştırmak.

cat > src/index.ts << 'EOF'
import express from 'express';
import cors from 'cors';
import http from 'http';
import { ApolloServer } from '@apollo/server';
import { expressMiddleware } from '@apollo/server/express4';
import { ApolloServerPluginDrainHttpServer } from '@apollo/server/plugin/drainHttpServer';
import { makeExecutableSchema } from '@graphql-tools/schema';
import { WebSocketServer } from 'ws';
import { useServer } from 'graphql-ws/lib/use/ws';
import { typeDefs } from './schema';
import { resolvers } from './resolvers';

interface MyContext {
  user?: { id: string; role: string; username: string };
}

async function startServer() {
  const app = express();
  const httpServer = http.createServer(app);

  // Executable schema oluştur (hem HTTP hem WS kullanacak)
  const schema = makeExecutableSchema({ typeDefs, resolvers });

  // WebSocket Sunucusu - /graphql yolunda dinle
  const wsServer = new WebSocketServer({
    server: httpServer,
    path: '/graphql',
  });

  // graphql-ws entegrasyonu
  const serverCleanup = useServer(
    {
      schema,
      // Context factory - her WS bağlantısı için çalışır
      context: async (ctx) => {
        // Authorization header'dan token al
        const token = ctx.connectionParams?.authorization as string;
        
        if (token) {
          try {
            // Gerçek uygulamada JWT verify yapın
            // const user = verifyToken(token);
            const user = { id: '1', role: 'user', username: 'demo' };
            return { user };
          } catch (err) {
            console.error('[WS Auth] Token geçersiz:', err);
          }
        }
        
        return {};
      },
      
      // Bağlantı açıldığında
      onConnect: async (ctx) => {
        console.log('[WS] Yeni bağlantı:', ctx.extra.socket.remoteAddress);
        return true; // false döndürürseniz bağlantıyı reddedersiniz
      },
      
      // Bağlantı kapandığında
      onDisconnect: (ctx) => {
        console.log('[WS] Bağlantı kapandı');
      },
      
      // Hata yönetimi
      onError: (ctx, message, errors) => {
        console.error('[WS] Hata:', errors);
      },
    },
    wsServer
  );

  // Apollo Server HTTP için
  const apolloServer = new ApolloServer<MyContext>({
    schema,
    plugins: [
      // HTTP sunucusu graceful shutdown
      ApolloServerPluginDrainHttpServer({ httpServer }),
      // WebSocket sunucusu graceful shutdown
      {
        async serverWillStart() {
          return {
            async drainServer() {
              await serverCleanup.dispose();
            },
          };
        },
      },
    ],
  });

  await apolloServer.start();

  app.use(
    '/graphql',
    cors<cors.CorsRequest>({
      origin: ['http://localhost:3000', 'https://studio.apollographql.com'],
      credentials: true,
    }),
    express.json(),
    expressMiddleware(apolloServer, {
      context: async ({ req }) => {
        const token = req.headers.authorization?.replace('Bearer ', '');
        if (token) {
          // JWT doğrulama burada
          return { user: { id: '1', role: 'user', username: 'demo' } };
        }
        return {};
      },
    })
  );

  const PORT = process.env.PORT || 4000;
  
  httpServer.listen(PORT, () => {
    console.log(`HTTP GraphQL: http://localhost:${PORT}/graphql`);
    console.log(`WS GraphQL:   ws://localhost:${PORT}/graphql`);
  });
}

startServer().catch(console.error);
EOF

package.json script’lerini ekleyelim:

npm pkg set scripts.dev="nodemon --exec ts-node src/index.ts"
npm pkg set scripts.build="tsc"
npm pkg set scripts.start="node dist/index.js"
npm install uuid @types/uuid

Redis PubSub ile Yatay Ölçekleme

Tek sunucu üzerinde her şey güzel çalışır. Ancak yük dengeleyici arkasında birden fazla Node.js instance çalıştırdığınızda sorun çıkar: A sunucusuna bağlı bir istemci, B sunucusuna gelen mutation’ı asla göremez çünkü PubSub bellekte yaşar. Çözüm Redis PubSub kullanmaktır.

npm install graphql-redis-subscriptions ioredis
npm install -D @types/ioredis
cat > src/redis-pubsub.ts << 'EOF'
import { RedisPubSub } from 'graphql-redis-subscriptions';
import Redis from 'ioredis';

const redisOptions = {
  host: process.env.REDIS_HOST || 'localhost',
  port: parseInt(process.env.REDIS_PORT || '6379'),
  password: process.env.REDIS_PASSWORD,
  retryStrategy: (times: number) => {
    // Bağlantı hatalarında üstel geri çekilme
    const delay = Math.min(times * 50, 2000);
    console.log(`[Redis] Yeniden bağlanılıyor... Deneme: ${times}, Bekleme: ${delay}ms`);
    return delay;
  },
};

export const pubsub = new RedisPubSub({
  publisher: new Redis(redisOptions),
  subscriber: new Redis(redisOptions),
});

console.log('[Redis PubSub] Bağlantı başlatıldı');

export const EVENTS = {
  MESSAGE_SENT: 'MESSAGE_SENT',
  NOTIFICATION_RECEIVED: 'NOTIFICATION_RECEIVED',
  USER_TYPING: 'USER_TYPING',
} as const;
EOF

Bu değişiklikle src/pubsub.ts içindeki pubsub export’unu bu dosyayla değiştirmeniz yeterlidir. Resolver’larda hiçbir şey değişmez, bu da güzel bir soyutlama örneğidir.

İstemci Tarafında Bağlantı Testi

Sunucuyu test etmek için basit bir Node.js istemcisi yazalım:

cat > test-client.js << 'EOF'
const { createClient } = require('graphql-ws');
const { WebSocket } = require('ws');

const client = createClient({
  url: 'ws://localhost:4000/graphql',
  webSocketImpl: WebSocket,
  connectionParams: {
    authorization: 'Bearer test-token-123',
  },
  on: {
    connected: () => console.log('[İstemci] Bağlandı'),
    closed: () => console.log('[İstemci] Bağlantı kapandı'),
    error: (err) => console.error('[İstemci] Hata:', err),
  },
});

// general-channel kanalını dinle
const unsubscribe = client.subscribe(
  {
    query: `
      subscription MessageSent {
        messageSent(channel: "general") {
          id
          content
          author
          channel
          createdAt
        }
      }
    `,
  },
  {
    next: (data) => console.log('[Mesaj Alındı]', JSON.stringify(data, null, 2)),
    error: (err) => console.error('[Subscription Hata]', err),
    complete: () => console.log('[Subscription Tamamlandı]'),
  }
);

// 30 saniye sonra unsubscribe
setTimeout(() => {
  console.log('[İstemci] Abonelik iptal ediliyor');
  unsubscribe();
  client.dispose();
}, 30000);

console.log('Sunucuya bağlanıyor... (30 saniye dinleyecek)');
EOF

node test-client.js

Üretim Ortamında Dikkat Edilmesi Gerekenler

Gerçek dünya senaryolarında karşılaştığım ve sizi bekleyen birkaç kritik nokta var:

Bağlantı limitleri: Nginx veya başka bir reverse proxy kullanıyorsanız WebSocket bağlantılarını düzgün proxy’lemeniz gerekir. Nginx için /etc/nginx/sites-available/your-app dosyasına şunları ekleyin:

cat > /etc/nginx/sites-available/graphql-app << 'EOF'
upstream graphql_backend {
    server 127.0.0.1:4000;
    keepalive 64;
}

server {
    listen 80;
    server_name api.yourdomain.com;

    location /graphql {
        proxy_pass http://graphql_backend;
        proxy_http_version 1.1;
        
        # WebSocket için kritik header'lar
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        
        # WebSocket bağlantıları uzun süre açık kalabilir
        proxy_read_timeout 86400s;
        proxy_send_timeout 86400s;
        
        # Ping/pong için
        proxy_set_header X-Forwarded-Proto $scheme;
    }
}
EOF
nginx -t && systemctl reload nginx

Memory leak önleme: Abonelik bırakılmadan bağlantı koptuğunda kaynaklar serbest bırakılmayabilir. onDisconnect callback’inde temizlik yapın ve bağlantı başına maksimum subscription sayısı belirleyin.

Heartbeat mekanizması: Ağ ekipmanları uzun süre veri geçmeyen bağlantıları koparabilir. graphql-ws yerleşik ping/pong desteği sunar:

const serverCleanup = useServer(
  {
    schema,
    // 12 saniyede bir ping at, 5 saniye içinde pong gelmezse kapat
    keepAlive: 12_000,
  },
  wsServer
);

Hata izleme: Üretimde tüm WS hatalarını loglayın ve Sentry gibi bir araçla izleyin. Sessizce kaybolan abonelikler debugging’i cehenneme çevirir.

Ölçekleme stratejisi: Eğer her WebSocket bağlantısı yaklaşık 20-50KB bellek tüketiyorsa ve 10.000 eş zamanlı kullanıcınız varsa bu 200-500MB demektir. Bunu hesaba katarak sunucu boyutlandırması yapın.

Sonuç

Apollo Server 4 ile WebSocket üzerinden subscription kurmak artık eskiye kıyasla çok daha basit. graphql-ws kütüphanesi protokolü temiz bir şekilde ele alıyor, withFilter ile abonelik granülerliğini kolayca kontrol edebiliyorsunuz. Tek sunucu senaryoları için yerleşik PubSub yeterli, ama ilk gün Redis PubSub altyapısını kurmak ilerideki migrasyon ağrısından sizi kurtarır.

Pratik önerim şu: Geliştirme ortamında yerleşik PubSub ile başlayın, ama kodu Redis PubSub ile değiştirilebilir hale getirin, yani pubsub instance’ını tek bir yerden export edin. Nginx konfigürasyonunu doğru yapmazsanız WebSocket bağlantıları garip şekillerde kopabilir, bu yüzden proxy ayarlarını ilk günden doğru yapın. Son olarak istemci tarafında otomatik yeniden bağlanma mantığını mutlaka ekleyin; graphql-ws istemci kütüphanesi bunu destekliyor ve ağ kesintilerinde kullanıcı deneyimini çok daha iyi hale getiriyor.

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir