Veritabanı Değişikliklerini İzleme: Hasura Event Trigger Kullanımı

Veritabanında bir şey değiştiğinde haberdar olmak istiyorsunuz ama polling yapmaktan bıktınız mı? Her 5 saniyede bir “yeni kayıt var mı?” diye soran worker’lardan, gereksiz yere dönen CPU’lardan, log dosyalarını dolduran tekrar eden sorgulardan kurtulmak istiyorsanız, Hasura’nın Event Trigger özelliği tam size göre.

Hasura Event Trigger, PostgreSQL tablolarındaki INSERT, UPDATE ve DELETE işlemlerini yakalayıp belirlediğiniz bir HTTP endpoint’e otomatik olarak webhook gönderiyor. Yani veritabanında bir kayıt değiştiği anda sizin servisiniz tetikleniyor. Bu yaklaşım hem daha temiz hem de çok daha ölçeklenebilir.

Event Trigger Nedir ve Nasıl Çalışır?

Hasura, PostgreSQL’in trigger mekanizmasını kullanarak event’leri yakalar. Arka planda Hasura kendi event log tablosuna değişiklikleri yazar ve bu kayıtları işleyecek worker’larını devreye sokar. Sizin webhook endpoint’iniz başarılı yanıt döndüğünde event tamamlanmış sayılır, başarısız olursa retry mekanizması devreye girer.

Temel akış şu şekilde işler:

  • Kullanıcı bir GraphQL mutation çalıştırır veya direkt SQL ile bir kayıt değişir
  • PostgreSQL trigger’ı Hasura’nın event log tablosuna yeni bir satır ekler
  • Hasura bu satırı okuyup belirlenen HTTP endpoint’e POST isteği gönderir
  • Endpoint başarılı yanıt (2xx) döndürünce event “delivered” olarak işaretlenir
  • Başarısız olursa exponential backoff ile yeniden denenir

Bu mimaride güzel olan şey, webhook başarısız olsa bile event kaybolmuyor. Hasura’nın event log’u kalıcı ve retry sayısını, son hata mesajını vs. saklıyor.

Ortamı Hazırlamak

Öncelikle çalışan bir Hasura instance’ına ihtiyacınız var. Docker Compose ile hızlıca ayağa kaldıralım:

version: "3.6"
services:
  postgres:
    image: postgres:15
    restart: always
    environment:
      POSTGRES_PASSWORD: postgrespassword
      POSTGRES_DB: myapp
    volumes:
      - db_data:/var/lib/postgresql/data

  graphql-engine:
    image: hasura/graphql-engine:v2.35.0
    ports:
      - "8080:8080"
    depends_on:
      - postgres
    restart: always
    environment:
      HASURA_GRAPHQL_DATABASE_URL: postgres://postgres:postgrespassword@postgres:5432/myapp
      HASURA_GRAPHQL_ENABLE_CONSOLE: "true"
      HASURA_GRAPHQL_DEV_MODE: "true"
      HASURA_GRAPHQL_ADMIN_SECRET: mysecretkey
      HASURA_GRAPHQL_ENABLED_LOG_TYPES: startup, http-log, webhook-log, websocket-log, query-log

volumes:
  db_data:
docker-compose up -d
docker-compose ps
# Her iki servisin de "Up" durumda olduğunu doğrulayın

Hasura Console’a http://localhost:8080/console adresinden erişebilirsiniz. Admin secret olarak mysecretkey kullanın.

Örnek Senaryo: Sipariş Sistemi

Gerçek dünya senaryosu olarak bir e-ticaret uygulamasının sipariş yönetimini ele alalım. Yeni sipariş oluşturulduğunda e-posta göndermek, sipariş durumu değiştiğinde müşteriyi bilgilendirmek ve iptal edilen siparişlerde stok güncelleme işlemi yapmak istiyoruz.

Önce tablolarımızı oluşturalım:

# PostgreSQL'e bağlanıp tabloları oluşturuyoruz
docker exec -it <postgres_container_id> psql -U postgres -d myapp

# Aşağıdaki SQL'i çalıştırın
CREATE TABLE orders (
    id UUID DEFAULT gen_random_uuid() PRIMARY KEY,
    user_id UUID NOT NULL,
    status VARCHAR(50) DEFAULT 'pending',
    total_amount DECIMAL(10,2) NOT NULL,
    items JSONB,
    created_at TIMESTAMPTZ DEFAULT NOW(),
    updated_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE TABLE order_notifications (
    id UUID DEFAULT gen_random_uuid() PRIMARY KEY,
    order_id UUID REFERENCES orders(id),
    notification_type VARCHAR(100),
    sent_at TIMESTAMPTZ,
    status VARCHAR(50) DEFAULT 'pending',
    error_message TEXT
);

Hasura Console Üzerinden Event Trigger Oluşturmak

Console üzerinden Event Trigger oluşturmak oldukça basit. Fakat production ortamlarında her şeyi metadata olarak yönetmek çok daha mantıklı. Önce console yolunu gösterelim, sonra metadata yöntemine geçelim.

Console’da Events sekmesine gidin, “Create” butonuna tıklayın ve şu ayarları yapın:

  • Trigger Name: order_created_trigger
  • Schema/Table: public / orders
  • Operations: INSERT işaretleyin
  • Webhook URL: http://your-webhook-service:3000/webhooks/order-created

Ama biz bunu Hasura CLI ile metadata olarak yönetelim çünkü bu yaklaşım versiyon kontrolüne giriyor ve ekip içinde tutarlılık sağlıyor.

Hasura CLI ile Metadata Yönetimi

# Hasura CLI kurulumu
curl -L https://github.com/hasura/graphql-engine/raw/stable/cli/get.sh | bash

# Proje başlatma
hasura init myapp-hasura --endpoint http://localhost:8080 --admin-secret mysecretkey

cd myapp-hasura

# Mevcut metadata'yı çekme
hasura metadata export

Event trigger’ı metadata dosyası olarak tanımlayalım. metadata/databases/default/tables/public_orders.yaml dosyasını düzenleyin:

table:
  name: orders
  schema: public
event_triggers:
  - name: order_created
    definition:
      enable_manual: false
      insert:
        columns: "*"
    retry_conf:
      interval_sec: 10
      num_retries: 3
      timeout_sec: 60
    webhook: "{{ORDER_WEBHOOK_BASE_URL}}/webhooks/order-created"
    headers:
      - name: x-webhook-secret
        value_from_env: WEBHOOK_SECRET
  - name: order_status_updated
    definition:
      enable_manual: true
      update:
        columns:
          - status
    retry_conf:
      interval_sec: 15
      num_retries: 5
      timeout_sec: 60
    webhook: "{{ORDER_WEBHOOK_BASE_URL}}/webhooks/order-status-changed"
    headers:
      - name: x-webhook-secret
        value_from_env: WEBHOOK_SECRET

Dikkat edin, update trigger’ında sadece status kolonunu izliyoruz. Bu önemli bir optimizasyon. Siparişteki herhangi bir alan değiştiğinde değil, sadece durum değiştiğinde tetiklensin istiyoruz.

Webhook Servisini Yazmak

Node.js ile basit ama production-ready bir webhook servisi yazalım:

# package.json bağımlılıklarını yükleyin
npm init -y
npm install express body-parser nodemailer winston

# server.js
const express = require('express');
const bodyParser = require('body-parser');
const winston = require('winston');

const app = express();
app.use(bodyParser.json());

const logger = winston.createLogger({
  level: 'info',
  format: winston.format.combine(
    winston.format.timestamp(),
    winston.format.json()
  ),
  transports: [
    new winston.transports.Console(),
    new winston.transports.File({ filename: 'webhook.log' })
  ]
});

// Webhook secret doğrulama middleware
const verifyWebhookSecret = (req, res, next) => {
  const secret = req.headers['x-webhook-secret'];
  if (secret !== process.env.WEBHOOK_SECRET) {
    logger.warn('Unauthorized webhook attempt', {
      ip: req.ip,
      path: req.path
    });
    return res.status(401).json({ error: 'Unauthorized' });
  }
  next();
};

// Yeni sipariş oluşturulduğunda
app.post('/webhooks/order-created', verifyWebhookSecret, async (req, res) => {
  const { event, table, trigger } = req.body;
  const newOrder = event.data.new;

  logger.info('New order received', {
    orderId: newOrder.id,
    userId: newOrder.user_id,
    amount: newOrder.total_amount,
    triggerName: trigger.name
  });

  try {
    // E-posta gönderme işlemi (async, ama webhook'u bloklamasın)
    sendOrderConfirmationEmail(newOrder).catch(err => {
      logger.error('Email sending failed', { orderId: newOrder.id, error: err.message });
    });

    // Stok rezervasyonu
    await reserveStock(newOrder.items, newOrder.id);

    // Her zaman 200 dönün, işlemler async halledilebilir
    res.status(200).json({ 
      success: true, 
      orderId: newOrder.id,
      message: 'Order processing initiated'
    });

  } catch (error) {
    logger.error('Order processing failed', {
      orderId: newOrder.id,
      error: error.message
    });
    // 5xx dönersek Hasura retry yapacak
    res.status(500).json({ 
      error: 'Processing failed',
      message: error.message 
    });
  }
});

// Sipariş durumu değiştiğinde
app.post('/webhooks/order-status-changed', verifyWebhookSecret, async (req, res) => {
  const { event } = req.body;
  const oldOrder = event.data.old;
  const newOrder = event.data.new;

  logger.info('Order status changed', {
    orderId: newOrder.id,
    from: oldOrder.status,
    to: newOrder.status
  });

  try {
    switch (newOrder.status) {
      case 'confirmed':
        await notifyCustomer(newOrder.user_id, 'Siparişiniz onaylandı!', newOrder.id);
        break;
      case 'shipped':
        await notifyCustomer(newOrder.user_id, 'Siparişiniz kargoya verildi!', newOrder.id);
        break;
      case 'cancelled':
        await releaseStock(newOrder.items, newOrder.id);
        await refundPayment(newOrder.id, newOrder.total_amount);
        break;
      default:
        logger.info('No action for status', { status: newOrder.status });
    }

    res.status(200).json({ success: true });

  } catch (error) {
    logger.error('Status change handling failed', { error: error.message });
    res.status(500).json({ error: error.message });
  }
});

async function reserveStock(items, orderId) {
  // Stok servisiyle konuş
  logger.info('Reserving stock for order', { orderId, itemCount: items?.length });
}

async function sendOrderConfirmationEmail(order) {
  logger.info('Sending confirmation email', { orderId: order.id });
}

async function notifyCustomer(userId, message, orderId) {
  logger.info('Notifying customer', { userId, orderId, message });
}

async function releaseStock(items, orderId) {
  logger.info('Releasing stock for cancelled order', { orderId });
}

async function refundPayment(orderId, amount) {
  logger.info('Processing refund', { orderId, amount });
}

const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
  logger.info(`Webhook service running on port ${PORT}`);
});

Event Trigger Payload Yapısı

Hasura’nın gönderdiği payload’ı iyi anlamak lazım. Şöyle bir yapı geliyor:

# Örnek INSERT payload
{
  "event": {
    "session_variables": {
      "x-hasura-role": "user",
      "x-hasura-user-id": "abc123"
    },
    "op": "INSERT",
    "data": {
      "old": null,
      "new": {
        "id": "uuid-here",
        "user_id": "user-uuid",
        "status": "pending",
        "total_amount": "150.00",
        "items": [{"product_id": "p1", "quantity": 2}],
        "created_at": "2024-01-15T10:30:00Z",
        "updated_at": "2024-01-15T10:30:00Z"
      }
    },
    "trace_context": {
      "trace_id": "trace-id-here",
      "span_id": "span-id-here"
    }
  },
  "created_at": "2024-01-15T10:30:00Z",
  "id": "event-uuid",
  "delivery_info": {
    "max_retries": 3,
    "current_retry": 0
  },
  "trigger": {
    "name": "order_created"
  },
  "table": {
    "schema": "public",
    "name": "orders"
  }
}

Önemli noktalara dikkat edin:

  • event.data.old: UPDATE ve DELETE’te önceki değer, INSERT’te null
  • event.data.new: INSERT ve UPDATE’te yeni değer, DELETE’te null
  • event.session_variables: Hasura üzerinden yapılan işlemlerde JWT’den gelen kullanıcı bilgileri
  • delivery_info.current_retry: Kaçıncı deneme olduğunu anlayabilirsiniz
  • event.op: INSERT, UPDATE, DELETE veya MANUAL

Manuel Trigger Kullanımı

Bazen mevcut kayıtlar için de trigger’ı el ile çalıştırmanız gerekebilir. Örneğin yeni bir bildirim servisi eklediniz ve tüm mevcut aktif siparişler için çalıştırmak istiyorsunuz. enable_manual: true ayarını yaptıysanız Hasura Console veya API üzerinden manuel tetikleyebilirsiniz:

# Hasura API üzerinden manuel trigger
curl -X POST 
  http://localhost:8080/v1/metadata 
  -H 'Content-Type: application/json' 
  -H 'x-hasura-admin-secret: mysecretkey' 
  -d '{
    "type": "pg_invoke_event_trigger",
    "args": {
      "source": "default",
      "name": "order_status_updated",
      "payload": {
        "id": "specific-order-uuid"
      }
    }
  }'

Bu özellik özellikle veri migrasyonu veya yeni servis entegrasyonlarında çok işe yarıyor.

Event Log Takibi ve Hata Yönetimi

Hasura’nın event log’larını takip etmek için doğrudan veritabanından sorgulayabilirsiniz:

# Event log'larını kontrol et
docker exec -it <postgres_container_id> psql -U postgres -d myapp

SELECT 
    id,
    trigger_name,
    payload->>'id' as event_payload_id,
    delivered,
    error,
    tries,
    created_at,
    locked_at,
    next_retry_at
FROM hdb_catalog.event_log
WHERE trigger_name = 'order_created'
AND delivered = false
ORDER BY created_at DESC
LIMIT 20;

# Hatalı event'leri görüntüle
SELECT 
    el.id,
    el.trigger_name,
    el.tries,
    el.error,
    eli.status,
    eli.response_status,
    eli.error as invocation_error
FROM hdb_catalog.event_log el
JOIN hdb_catalog.event_invocation_logs eli ON el.id = eli.event_id
WHERE el.error = true
ORDER BY el.created_at DESC;

Production’da bu sorguları bir monitoring script’ine ekleyip Grafana veya benzeri bir araçta görselleştirmenizi öneririm. Belirli bir eşiğin üzerinde başarısız event birikirse alarm devreye girsin.

Performans ve Ölçeklendirme İpuçları

Event Trigger kullanırken dikkat edilmesi gereken birkaç nokta var:

  • Webhook timeout’unu gerçekçi belirleyin: Default 60 saniye, ama işleminiz gerçekten uzun sürüyorsa async pattern kullanın. Webhook’ta işi bir kuyruğa alın ve hemen 200 dönün.
  • Sadece gerekli kolonları izleyin: columns: "*" yerine sadece değişmesini izlemek istediğiniz kolonları belirtin. Hem payload daha küçük olur hem de gereksiz trigger çalışmaz.
  • Retry sayısını ihtiyaca göre ayarlayın: Kritik işlemler için daha fazla retry, idempotency garantisi olan işlemler için makul bir sayı seçin.
  • Event log temizliği: Hdb_catalog.event_log tablosu zamanla şişer. Hasura’nın auto_cleanup özelliğini aktif edin ya da periyodik temizlik job’ı çalıştırın.
# Event log otomatik temizlik ayarı (metadata'da)
# hasura.yaml veya environment variable olarak
HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE: 100
HASURA_GRAPHQL_EVENTS_FETCH_BATCH_SIZE: 100
HASURA_GRAPHQL_EVENTS_FETCH_INTERVAL: 1000
  • Connection pool boyutunu izleyin: Her event trigger worker bir DB connection tüketir. Pool size’ı buna göre ayarlayın.
  • Idempotency key kullanın: Aynı event birden fazla kez gelebilir (at-least-once delivery). Webhook servisinizde event ID’yi kontrol edin, aynı ID’yi ikinci kez işlememeye çalışın.

Transform ile Payload Şekillendirme

Hasura’nın Request Transform özelliğiyle webhook’a giden payload’ı şekillendirebilirsiniz. Farklı bir formат bekleven harici bir servisle entegrasyon kuruyorsanız bu çok kullanışlı:

# Metadata'da transform tanımı
request_transform:
  version: 2
  method: POST
  body:
    action: transform
    template: |
      {
        "order_id": {{$body.event.data.new.id}},
        "customer_id": {{$body.event.data.new.user_id}},
        "amount": {{$body.event.data.new.total_amount}},
        "event_type": "NEW_ORDER",
        "timestamp": {{$body.created_at}}
      }
  request_headers:
    add_headers:
      Content-Type: application/json
      X-Event-Source: hasura-event-trigger

Sonuç

Hasura Event Trigger, veritabanı değişikliklerini reaktif bir şekilde işlemenin en temiz yollarından biri. Polling’in yarattığı gereksiz yükten kurtuluyorsunuz, PostgreSQL’in kendi trigger mekanizması üzerine inşa edildiği için güvenilir ve hatasız çalışıyor. Retry mekanizması, event log, manuel tetikleme ve payload transform özellikleriyle production-ready bir çözüm sunuyor.

Sipariş bildirimleri, envanter yönetimi, audit log tutma, mikroservisler arası veri senkronizasyonu gibi pek çok senaryoda event trigger kullanabilirsiniz. Önemli olan webhook servisinizi idempotent yazmak ve hata durumlarını düzgün yönetmek. Endpoint’iniz 5xx döndürdüğünde Hasura retry yapacak, bu yüzden aynı event’i iki kez işleyebileceğinizi her zaman aklınızda tutun.

Metadata-as-code yaklaşımını benimseyip event trigger’larınızı Git’te yönetirseniz, farklı ortamlar arasında tutarlılığı kolayca sağlarsınız ve ekip içi kod review süreçlerine dahil edebilirsiniz. Bu, uzun vadede çok daha sürdürülebilir bir yapı oluşturmanızı sağlar.

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir