Gerçek Zamanlı Veri Akışı: GraphQL Subscription Kullanımı

Gerçek zamanlı veri, modern uygulamaların olmazsa olmazı haline geldi. Kullanıcılar artık bir sayfayı yenilemek yerine bildirimlerin, mesajların ve güncellemelerin anında ekranlarına yansımasını bekliyor. GraphQL bu ihtiyacı karşılamak için Subscription mekanizmasını sunuyor. REST API’larda bu işi genellikle WebSocket ya da Server-Sent Events ile kendiniz çözmek zorunda kalıyordunuz. GraphQL Subscription ise bu altyapıyı şema seviyesinde standartlaştırıyor ve istemci ile sunucu arasında kalıcı bir bağlantı kurmanızı sağlıyor.

GraphQL Subscription Nedir?

GraphQL’in üç temel operasyonu vardır: Query, Mutation ve Subscription. Query veri okur, Mutation veri değiştirir, Subscription ise belirli bir olayı dinler ve o olay gerçekleştiğinde istemciye otomatik olarak veri gönderir. Teknik olarak WebSocket protokolü üzerine inşa edilmiştir ve istemci ile sunucu arasında kalıcı bir bağlantı açık tutulur.

Pratik bir senaryo düşünelim: Bir e-ticaret platformu geliştiriyorsunuz. Sipariş durumu değiştiğinde müşteriye anında bildirim gitmesi gerekiyor. Eski yöntemde istemci her birkaç saniyede bir sunucuyu yoklardı (polling). Bu hem sunucu kaynaklarını tüketir hem de gecikme yaratır. Subscription ile sipariş durumu değiştiği anda, tam o anda istemciye veri akar.

Subscription Nasıl Çalışır?

Akış şöyle işler:

  • İstemci bir Subscription operasyonu gönderir ve sunucuyla WebSocket bağlantısı açılır
  • Sunucu bu aboneliği kaydeder ve ilgili olayı bekler
  • Bir olay tetiklendiğinde (örneğin bir sipariş durumu güncellemesi) sunucu abone olan tüm istemcilere veriyi iletir
  • Bağlantı istemci kapatana veya sunucu tarafında timeout gerçekleşene kadar açık kalır

Bu yapı publish-subscribe (pub/sub) desenine dayanır. Sunucu tarafında bir pub/sub altyapısına ihtiyaç duyarsınız. Redis, Apache Kafka ya da basit in-memory event emitter bu iş için kullanılabilir.

Şema Tanımı

Önce GraphQL şemasında Subscription tipini tanımlamanız gerekir. Normal bir Query veya Mutation tanımından çok da farklı değil:

# schema.graphql

type Subscription {
  siparisGuncellendi(musteriId: ID!): SiparisDurumu
  yeniMesaj(sohbetId: ID!): Mesaj
  stokDegisti(urunId: ID!): StokBilgisi
}

type SiparisDurumu {
  siparisId: ID!
  durum: String!
  guncellenmeTarihi: String!
  aciklama: String
}

type Mesaj {
  id: ID!
  icerik: String!
  gonderen: Kullanici!
  gonderimZamani: String!
}

type StokBilgisi {
  urunId: ID!
  mevcutAdet: Int!
  rezerveAdet: Int!
}

Bu tanımda her Subscription alanı bir olayı temsil eder. siparisGuncellendi bir müşterinin siparişlerini dinlemek için kullanılır. yeniMesaj ise belirli bir sohbet odasındaki mesajları takip eder.

Sunucu Tarafı Implementasyonu

Node.js ve Apollo Server ile bir Subscription resolver yazalım. Önce gerekli paketleri kuralım:

npm install @apollo/server graphql-ws ws graphql-subscriptions

Şimdi sunucu kodunu yazalım:

# server.js - Apollo Server 4 ile WebSocket entegrasyonu

const { ApolloServer } = require('@apollo/server');
const { expressMiddleware } = require('@apollo/server/express4');
const { makeExecutableSchema } = require('@graphql-tools/schema');
const { WebSocketServer } = require('ws');
const { useServer } = require('graphql-ws/lib/use/ws');
const { PubSub } = require('graphql-subscriptions');
const express = require('express');
const http = require('http');

const pubsub = new PubSub();

const SIPARIS_GUNCELLENDI = 'SIPARIS_GUNCELLENDI';
const YENI_MESAJ = 'YENI_MESAJ';

const resolvers = {
  Subscription: {
    siparisGuncellendi: {
      subscribe: (_, { musteriId }) => {
        return pubsub.asyncIterator(`${SIPARIS_GUNCELLENDI}_${musteriId}`);
      },
      resolve: (payload) => payload.siparisGuncellendi,
    },
    yeniMesaj: {
      subscribe: (_, { sohbetId }) => {
        return pubsub.asyncIterator(`${YENI_MESAJ}_${sohbetId}`);
      },
      resolve: (payload) => payload.yeniMesaj,
    },
  },
  Mutation: {
    siparisGuncelle: async (_, { siparisId, musteriId, durum }) => {
      const guncelSiparis = {
        siparisId,
        durum,
        guncellenmeTarihi: new Date().toISOString(),
        aciklama: `Sipariş ${durum} durumuna geçti`,
      };

      await pubsub.publish(
        `${SIPARIS_GUNCELLENDI}_${musteriId}`,
        { siparisGuncellendi: guncelSiparis }
      );

      return guncelSiparis;
    },
  },
};

async function baslatSunucu() {
  const schema = makeExecutableSchema({ typeDefs, resolvers });
  const app = express();
  const httpServer = http.createServer(app);

  const wsServer = new WebSocketServer({
    server: httpServer,
    path: '/graphql',
  });

  const serverCleanup = useServer({ schema }, wsServer);

  const server = new ApolloServer({
    schema,
    plugins: [
      {
        async serverWillStart() {
          return {
            async drainServer() {
              await serverCleanup.dispose();
            },
          };
        },
      },
    ],
  });

  await server.start();
  app.use('/graphql', expressMiddleware(server));

  httpServer.listen(4000, () => {
    console.log('GraphQL sunucusu 4000 portunda calisiyor');
    console.log('WebSocket baglantisi: ws://localhost:4000/graphql');
  });
}

baslatSunucu().catch(console.error);

Redis ile Ölçeklenebilir Pub/Sub

Yukarıdaki örnekte in-memory PubSub kullandık. Bu yöntem tek sunucu ortamı için iyidir. Ancak uygulamanız birden fazla sunucu instance’ında çalışıyorsa (yani horizontal scaling yapıyorsanız), in-memory pub/sub yetersiz kalır. Bir sunucuda yayınlanan olay diğer sunucudaki abonelere ulaşamaz. Bunun çözümü Redis’tir:

npm install graphql-redis-subscriptions ioredis
# redis-pubsub.js

const { RedisPubSub } = require('graphql-redis-subscriptions');
const Redis = require('ioredis');

const redisOptions = {
  host: process.env.REDIS_HOST || 'localhost',
  port: process.env.REDIS_PORT || 6379,
  password: process.env.REDIS_PASSWORD,
  retryStrategy: (times) => {
    const gecikme = Math.min(times * 50, 2000);
    return gecikme;
  },
};

const pubsub = new RedisPubSub({
  publisher: new Redis(redisOptions),
  subscriber: new Redis(redisOptions),
});

module.exports = { pubsub };

Redis tabanlı pub/sub kullandığınızda, hangi sunucu instance’ı event yayınlarsa yayınlasın, Redis bu olayı tüm instance’lardaki abonelere iletir. Bu yapı production ortamı için kritiktir.

Kimlik Doğrulama ve Yetkilendirme

Subscription’larda authentication biraz farklı çalışır. HTTP başlıklarını WebSocket bağlantısında normal şekilde gönderemezsiniz. graphql-ws kütüphanesi bu durumu connectionParams aracılığıyla çözer:

# auth-subscription.js

const { useServer } = require('graphql-ws/lib/use/ws');
const { verify } = require('jsonwebtoken');

const serverCleanup = useServer(
  {
    schema,
    context: async (ctx) => {
      const token = ctx.connectionParams?.authorization?.replace('Bearer ', '');

      if (!token) {
        throw new Error('Yetkilendirme token bulunamadi');
      }

      try {
        const kullanici = verify(token, process.env.JWT_SECRET);
        return { kullanici };
      } catch (hata) {
        throw new Error('Gecersiz token');
      }
    },
    onConnect: async (ctx) => {
      const token = ctx.connectionParams?.authorization;
      if (!token) {
        console.log('Yetkisiz baglanti denemesi engellendi');
        return false;
      }
      console.log('Yeni WebSocket baglantisi kuruldu');
      return true;
    },
    onDisconnect: (ctx) => {
      console.log('WebSocket baglantisi kapatildi');
    },
  },
  wsServer
);

Resolver tarafında ise context içindeki kullanıcıyı kontrol edebilirsiniz:

# yetkilendirilmis-subscription-resolver.js

const resolvers = {
  Subscription: {
    siparisGuncellendi: {
      subscribe: (_, { musteriId }, context) => {
        if (!context.kullanici) {
          throw new Error('Bu islemi yapmak icin giris yapmaniz gerekiyor');
        }

        if (context.kullanici.id !== musteriId && context.kullanici.rol !== 'ADMIN') {
          throw new Error('Bu siparisleri izleme yetkiniz yok');
        }

        return pubsub.asyncIterator(`${SIPARIS_GUNCELLENDI}_${musteriId}`);
      },
      resolve: (payload) => payload.siparisGuncellendi,
    },
  },
};

İstemci Tarafı: React ile Kullanım

İstemci tarafında Apollo Client ile Subscription kullanımını inceleyelim. React uygulamanızda useSubscription hook’u bu işi kolaylaştırır:

# apollo-client-setup.js

import { ApolloClient, InMemoryCache, split, HttpLink } from '@apollo/client';
import { GraphQLWsLink } from '@apollo/client/link/subscriptions';
import { createClient } from 'graphql-ws';
import { getMainDefinition } from '@apollo/client/utilities';

const httpLink = new HttpLink({
  uri: 'http://localhost:4000/graphql',
});

const wsLink = new GraphQLWsLink(
  createClient({
    url: 'ws://localhost:4000/graphql',
    connectionParams: () => ({
      authorization: `Bearer ${localStorage.getItem('token')}`,
    }),
    on: {
      connected: () => console.log('WebSocket baglantisi kuruldu'),
      closed: () => console.log('WebSocket baglantisi kapandi'),
      error: (hata) => console.error('WebSocket hatasi:', hata),
    },
    retryAttempts: 5,
    shouldRetry: () => true,
  })
);

const splitLink = split(
  ({ query }) => {
    const tanim = getMainDefinition(query);
    return (
      tanim.kind === 'OperationDefinition' &&
      tanim.operation === 'subscription'
    );
  },
  wsLink,
  httpLink
);

const client = new ApolloClient({
  link: splitLink,
  cache: new InMemoryCache(),
});
# SiparisIzleme.jsx

import { gql, useSubscription } from '@apollo/client';

const SIPARIS_SUBSCRIPTION = gql`
  subscription SiparisIzle($musteriId: ID!) {
    siparisGuncellendi(musteriId: $musteriId) {
      siparisId
      durum
      guncellenmeTarihi
      aciklama
    }
  }
`;

function SiparisIzleme({ musteriId }) {
  const { data, loading, error } = useSubscription(
    SIPARIS_SUBSCRIPTION,
    {
      variables: { musteriId },
      onData: ({ data }) => {
        const siparis = data.data?.siparisGuncellendi;
        if (siparis) {
          console.log(`Siparis ${siparis.siparisId} guncellendi: ${siparis.durum}`);
        }
      },
      onError: (hata) => {
        console.error('Subscription hatasi:', hata);
      },
    }
  );

  if (loading) return <p>Baglanti kuruluyor...</p>;
  if (error) return <p>Hata: {error.message}</p>;

  return (
    <div>
      {data?.siparisGuncellendi && (
        <div className="bildirim">
          <strong>Siparis Durumu:</strong> {data.siparisGuncellendi.durum}
          <p>{data.siparisGuncellendi.aciklama}</p>
        </div>
      )}
    </div>
  );
}

Gerçek Dünya Senaryosu: Canlı Bildirim Sistemi

Bir iç iletişim platformu için bildirim sistemi tasarlayalım. Bu sistemde kullanıcılar kendilerine gelen bildirimleri anında alır:

# bildirim-subscription.js

const typeDefs = `
  type Bildirim {
    id: ID!
    tip: BildirimTipi!
    baslik: String!
    mesaj: String!
    okundu: Boolean!
    olusturmaTarihi: String!
    gonderen: Kullanici
  }

  enum BildirimTipi {
    MESAJ
    GOREV_ATAMASI
    SISTEM
    UYARI
  }

  type Subscription {
    yeniBildirim(kullaniciId: ID!): Bildirim
    gorevDurumu(gorevId: ID!): GorevGuncelleme
  }

  type Mutation {
    bildirimGonder(
      aliciId: ID!
      tip: BildirimTipi!
      baslik: String!
      mesaj: String!
    ): Bildirim
  }
`;

const resolvers = {
  Mutation: {
    bildirimGonder: async (_, args, { kullanici, db }) => {
      const bildirim = await db.bildirim.olustur({
        ...args,
        gonderId: kullanici.id,
        olusturmaTarihi: new Date().toISOString(),
        okundu: false,
      });

      await pubsub.publish(
        `YENI_BILDIRIM_${args.aliciId}`,
        { yeniBildirim: bildirim }
      );

      return bildirim;
    },
  },
  Subscription: {
    yeniBildirim: {
      subscribe: (_, { kullaniciId }, { kullanici }) => {
        if (kullanici.id !== kullaniciId) {
          throw new Error('Yetki hatasi');
        }
        return pubsub.asyncIterator(`YENI_BILDIRIM_${kullaniciId}`);
      },
      resolve: (payload) => payload.yeniBildirim,
    },
  },
};

Subscription Filtresi Kullanımı

Bazen tüm olayları değil, belirli kriterleri karşılayanları istemciye göndermek istersiniz. withFilter yardımcı fonksiyonu tam bunun için vardır:

# filtrelenmiş-subscription.js

const { withFilter } = require('graphql-subscriptions');

const resolvers = {
  Subscription: {
    stokDegisti: {
      subscribe: withFilter(
        () => pubsub.asyncIterator('STOK_DEGISTI'),
        (payload, variables) => {
          if (variables.urunId) {
            return payload.stokDegisti.urunId === variables.urunId;
          }

          if (variables.kritikSeviye) {
            return payload.stokDegisti.mevcutAdet < variables.kritikSeviye;
          }

          return true;
        }
      ),
      resolve: (payload) => payload.stokDegisti,
    },
  },
};

withFilter ikinci argüman olarak bir predicate fonksiyonu alır. Bu fonksiyon true döndürürse event istemciye iletilir, false döndürürse geçilir. Bu sayede gereksiz veri akışının önüne geçmiş olursunuz.

Bağlantı Yönetimi ve Hata Toleransı

Production ortamında bağlantı kopmaları, timeout’lar ve yeniden bağlanma senaryolarını düzgün yönetmeniz gerekir. Aşağıdaki yapı daha sağlam bir istemci konfigürasyonu sunar:

# robust-ws-client.js

import { createClient } from 'graphql-ws';

let aktifSoket = null;

const wsClient = createClient({
  url: process.env.REACT_APP_WS_URL,
  retryAttempts: Infinity,
  retryWait: async (deneme) => {
    const bekleme = Math.min(1000 * Math.pow(2, deneme), 30000);
    console.log(`${bekleme}ms sonra yeniden baglaniliyor... (Deneme: ${deneme})`);
    await new Promise((coz) => setTimeout(coz, bekleme));
  },
  keepAlive: 10000,
  connectionParams: async () => {
    const token = await tokenYenile();
    return { authorization: `Bearer ${token}` };
  },
  on: {
    connected: (soket) => {
      aktifSoket = soket;
      console.log('WebSocket baglantisi aktif');
    },
    closed: () => {
      aktifSoket = null;
      console.warn('WebSocket baglantisi kapandi, yeniden baglanma bekleniyor');
    },
    ping: (alinan) => {
      if (!alinan) {
        console.debug('Ping gonderildi');
      }
    },
    pong: (alinan) => {
      if (alinan) {
        console.debug('Pong alindi');
      }
    },
  },
});

Performans ve Dikkat Edilecek Noktalar

Subscription kullanırken göz önünde bulundurmanız gereken bazı kritik konular var:

  • Bağlantı sınırı: Her istemci bir WebSocket bağlantısı açar. 10.000 eş zamanlı kullanıcı için sunucu kapasitesini buna göre planlamalısınız
  • Bellek yönetimi: In-memory PubSub tek sunucu için iyidir ama event birikimi bellek sorununa yol açabilir, Redis gibi harici bir çözüm kullanın
  • Abonelik temizleme: İstemci bağlantıyı kopardığında sunucu tarafındaki aboneliğin de temizlendiğinden emin olun, bellek sızıntısına dikkat edin
  • Throttling: Çok sık tetiklenen eventler için debounce ya da throttle mekanizması ekleyin
  • Güvenlik: Subscription operasyonlarını da rate limiting kapsamına alın, kötü niyetli istemciler binlerce abonelik açmaya çalışabilir
  • Monitoring: WebSocket bağlantı sayısını, event throughput’unu ve hata oranlarını izlemek için Prometheus gibi araçları kullanın

Sonuç

GraphQL Subscription, gerçek zamanlı özellikler için güçlü ve standartlaşmış bir çözüm sunuyor. WebSocket altyapısını kendiniz yazmak yerine şema seviyesinde tanımlayabiliyor ve istemci ile sunucu arasındaki sözleşmeyi açıkça ifade edebiliyorsunuz. Küçük ölçekli uygulamalar için in-memory PubSub yeterli olsa da production ortamında Redis tabanlı bir pub/sub altyapısı zorunlu hale geliyor.

Authentication ve yetkilendirmeyi doğru implemente etmek, connection pooling ve retry mekanizmalarını düzgün yapılandırmak ise sistemin güvenilirliğini doğrudan etkiliyor. withFilter kullanımı gereksiz veri trafiğini azaltırken, sunucu kaynaklarını da daha verimli kullanmanızı sağlıyor.

Gerçek zamanlı özellik gerektiren her projede önce hangi olayların subscription gerektirdiğini netleştirin, ardından pub/sub altyapısını ölçeklenebilir şekilde tasarlayın. Subscription güçlü bir araç ama her ihtiyaç için doğru çözüm olmayabilir; basit, seyrek güncellenen veriler için polling hala makul bir tercih olabilir.

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir