n8n ile Çoklu Adım İş Akışı Tasarımı ve Yönetimi

Birkaç yıl önce, sabahın dördünde bir monitoring alarmıyla uyandığımda ve bir sonraki iki saati manuel olarak log analizi yaparak, Slack’e mesaj atarak, ticket açarak geçirdiğimde, “bu işin daha iyi bir yolu olmalı” diye düşündüm. O günden bu yana iş akışı otomasyonu benim için bir tercih değil, bir zorunluluk haline geldi. n8n’i keşfettiğimde ise şunu fark ettim: Görsel iş akışı tasarımı, sadece kolaylık değil, aynı zamanda karmaşık süreçleri anlaşılır hale getirmenin en etkili yolu.

Çoklu Adım İş Akışının Anatomisi

n8n’de basit bir “trigger -> action” ikilisinin ötesine geçmek, gerçek dünya senaryolarında kaçınılmaz. Bir sunucudan log çekip analiz etmek, belirli kriterlere göre filtrelemek, farklı kanallardan bildirim göndermek ve sonuçları bir veritabanına yazmak istiyorsanız, artık çoklu adımlı bir iş akışından bahsediyoruz.

n8n’de bir iş akışı temelde üç katmandan oluşur:

  • Trigger katmanı: İş akışını başlatan olay (webhook, zamanlayıcı, harici servis eventi)
  • İşlem katmanı: Veriyi dönüştüren, filtreleyen, zenginleştiren node’lar
  • Aksiyon katmanı: Sonuçları ileten veya kaydeden node’lar

Basit görünüyor, ama 15-20 node’lu bir iş akışını yönetmeye başladığınızda, bu katmanlar arasındaki veri akışını ve hata yönetimini doğru kurgulamak ciddi bir mimari düşünce gerektiriyor.

Temel Veri Yapısı: Her Şey JSON

n8n’de node’lar arasında akan veri her zaman JSON formatındadır ve her item bir obje olarak taşınır. Bunu anlamak, özellikle Function ve Set node’larını kullanırken hayat kurtarır.

Bir önceki node’dan gelen veriyi JavaScript ile manipüle etmek istediğinizde Function node’u kullanırsınız:

// Gelen veriyi işle ve yeni alanlar ekle
const items = $input.all();

return items.map(item => {
  const timestamp = new Date().toISOString();
  const hostname = item.json.hostname || 'unknown';
  const cpuUsage = parseFloat(item.json.cpu_usage);
  
  return {
    json: {
      ...item.json,
      processed_at: timestamp,
      alert_level: cpuUsage > 90 ? 'critical' : cpuUsage > 75 ? 'warning' : 'normal',
      display_name: `${hostname} (${cpuUsage}%)`
    }
  };
});

Bu basit dönüşüm, sonraki node’larda koşullu routing yapmanıza zemin hazırlar.

Gerçek Senaryo: Sunucu Sağlık Monitörü

Şimdi gerçek bir senaryo üzerinden gidelim. Elimizde 20 sunucu var ve her birinin CPU, RAM, disk kullanımını izleyen bir sistem kuruyoruz. Sorun çıkınca otomatik olarak hem Slack’e hem de PagerDuty’ye bildirim gitmeli, aynı zamanda PostgreSQL’e log kaydedilmeli.

İş akışımızın adımları şöyle:

  • Cron trigger: Her 5 dakikada bir tetiklenir
  • HTTP Request: Prometheus’tan metrik çeker
  • Function: Metrikleri parse eder ve kritiklik seviyesine göre etiketler
  • IF node: Kritik mi, uyarı mı, normal mi diye ayırır
  • SplitInBatches: Büyük veri kümelerini parçalara böler
  • Paralel aksiyon: Slack, PagerDuty ve PostgreSQL’e eşzamanlı yazım

Prometheus’tan veri çekme adımı için HTTP Request node’unu şu şekilde yapılandırırsınız. Ama bunu n8n arayüzünde yapmak yerine, iş akışını kod olarak da dışa aktarabilirsiniz. İşte Prometheus sorgusu için kullanacağınız URL yapısı:

# Prometheus API üzerinden CPU kullanımını çeken sorgu
# n8n HTTP Request node'unda URL alanına bu formatı kullanın
http://prometheus.internal:9090/api/v1/query?query=100-(avg+by+(instance)(irate(node_cpu_seconds_total{mode="idle"}[5m]))*100)

# Disk kullanımı için
http://prometheus.internal:9090/api/v1/query?query=(node_filesystem_size_bytes-node_filesystem_free_bytes)/node_filesystem_size_bytes*100

# RAM kullanımı için
http://prometheus.internal:9090/api/v1/query?query=(1-(node_memory_MemAvailable_bytes/node_memory_MemTotal_bytes))*100

IF Node ile Koşullu Routing

n8n’deki IF node’u tek bir true/false ayrımı yapar. Ama gerçek dünyada üç veya dört farklı yola ihtiyaç duyarsınız. Bunun için Switch node’unu tercih etmelisiniz.

Switch node’unda Expression modunu kullanarak şu mantığı kurabilirsiniz:

// Switch node'unda "Value 1" alanına yazılacak expression
{{ $json.alert_level }}

// Output'lar:
// Output 0: "critical" eşleşirse -> PagerDuty + Slack #alerts-critical
// Output 1: "warning" eşleşirse -> Slack #alerts-warning
// Output 2: "normal" eşleşirse -> sadece DB'ye yaz

Bu noktada dikkat etmeniz gereken bir şey var: Switch node’undan çıkan her branch bağımsız çalışır ve birinin başarısız olması diğerlerini etkilemez. Bu aslında bir avantaj, ama hata yönetimini her branch için ayrı ayrı düşünmeniz gerektiği anlamına da gelir.

Merge Node ile Paralel Akışları Birleştirme

Bir iş akışında birden fazla kaynaktan veri toplamak ve bunları tek bir node’da işlemek sık karşılaşılan bir ihtiyaç. Diyelim ki hem CMDB’den sunucu bilgilerini, hem de Prometheus’tan metrikleri çekiyorsunuz ve bunları eşleştirmek istiyorsunuz.

Merge node’u bu iş için var. mergeByKey modunu kullanarak iki farklı akışı ortak bir alan üzerinden birleştirebilirsiniz:

// Merge node sonrasında Function node'u ile veriyi temizle
const items = $input.all();

return items
  .filter(item => item.json.hostname !== undefined)
  .map(item => ({
    json: {
      hostname: item.json.hostname,
      ip_address: item.json.ip_address,        // CMDB'den gelen
      environment: item.json.environment,       // CMDB'den gelen
      cpu_usage: item.json.cpu_usage,           // Prometheus'tan gelen
      ram_usage: item.json.ram_usage,           // Prometheus'tan gelen
      disk_usage: item.json.disk_usage,         // Prometheus'tan gelen
      merged_at: new Date().toISOString()
    }
  }));

PostgreSQL’e Veri Yazma

Metrikleri veritabanına kaydetmek için önce tabloyu hazırlamanız gerekiyor:

-- PostgreSQL'de metrik log tablosu
CREATE TABLE server_metrics (
    id SERIAL PRIMARY KEY,
    hostname VARCHAR(255) NOT NULL,
    ip_address INET,
    environment VARCHAR(50),
    cpu_usage DECIMAL(5,2),
    ram_usage DECIMAL(5,2),
    disk_usage DECIMAL(5,2),
    alert_level VARCHAR(20),
    recorded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    workflow_run_id VARCHAR(100)
);

CREATE INDEX idx_server_metrics_hostname ON server_metrics(hostname);
CREATE INDEX idx_server_metrics_recorded_at ON server_metrics(recorded_at);
CREATE INDEX idx_server_metrics_alert_level ON server_metrics(alert_level);

n8n’deki PostgreSQL node’unda INSERT operasyonu için Query modunu kullanırsanız daha fazla kontrol sahibi olursunuz:

INSERT INTO server_metrics 
  (hostname, ip_address, environment, cpu_usage, ram_usage, disk_usage, alert_level, workflow_run_id)
VALUES 
  ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT DO NOTHING
RETURNING id, recorded_at;

Hata Yönetimi: Görmezden Gelinmemesi Gereken Kısım

n8n’de hata yönetimi, çoğu kişinin geç öğrendiği ama erken öğrenmesi gereken bir konudur. Her node’un sağ üst köşesindeki ayarlardan Continue on Fail seçeneğini aktif edebilirsiniz. Ama bunu her node için düşünerek yapmalısınız.

Kritik bir aksiyon node’unda (örneğin PagerDuty’ye alarm gönderme) Continue on Fail açıksa ve node sessizce başarısız olursa, farkında bile olmayabilirsiniz. Bunun yerine bir Error Trigger iş akışı kurun:

// Error handler workflow'unda Function node
// $execution.error objesinden detaylı hata bilgisi alın
const error = $execution.error;
const workflowName = $workflow.name;
const timestamp = new Date().toISOString();

const slackMessage = {
  channel: '#n8n-errors',
  text: `*n8n İş Akışı Hatası*`,
  blocks: [
    {
      type: 'section',
      text: {
        type: 'mrkdwn',
        text: `*Workflow:* ${workflowName}n*Hata:* ${error.message}n*Node:* ${error.node?.name || 'Bilinmiyor'}n*Zaman:* ${timestamp}`
      }
    }
  ]
};

return [{ json: slackMessage }];

SubWorkflow Kullanımı: Modülerlik Şart

Büyük iş akışları zamanla kontrol edilemez hale gelir. Bir iş akışında 30-40 node görmeye başladıysanız, modüler yapıya geçme vakti gelmiştir. n8n’in Execute Workflow node’u tam da bu iş için var.

Örneğin, “Slack bildirimi gönder” mantığını ayrı bir iş akışına taşıyabilirsiniz:

// Ana workflow'dan SubWorkflow'a veri gönderme
// Execute Workflow node'unda "Input Data" alanına:
{
  "recipient_channel": "{{ $json.alert_channel }}",
  "message_title": "{{ $json.alert_title }}",
  "message_body": "{{ $json.alert_message }}",
  "severity": "{{ $json.alert_level }}",
  "metadata": {
    "hostname": "{{ $json.hostname }}",
    "environment": "{{ $json.environment }}",
    "triggered_by": "{{ $workflow.name }}"
  }
}

Bu yaklaşımın avantajları büyük:

  • Tekrar kullanılabilirlik: Aynı bildirim mantığını 10 farklı iş akışından çağırabilirsiniz
  • Bakım kolaylığı: Slack formatını değiştirmek istediğinizde tek bir yerde değişiklik yaparsınız
  • Test edilebilirlik: SubWorkflow’ları bağımsız olarak test edebilirsiniz
  • Versiyon yönetimi: Kritik SubWorkflow’ları Git ile takip edebilirsiniz

Zamanlama ve Concurrency Yönetimi

n8n’de Cron trigger ile çalışırken concurrency sorunlarına dikkat etmek gerekir. Eğer bir iş akışı 5 dakikada bir çalışıyorsa ama tamamlanması 6 dakika sürüyorsa, örtüşme başlar.

Bunu engellemek için n8n’in Workflow Settings kısmında Execution Order ayarını Single Execution olarak ayarlayabilirsiniz. Ama bu her zaman doğru çözüm değil. Bazen eski execution’ı durdurup yenisini başlatmak daha mantıklı.

Daha gelişmiş bir yaklaşım için Redis ile distributed lock kullanabilirsiniz:

# n8n'in çalıştığı sunucuda Redis'i kontrol et
redis-cli ping

# Lock mekanizması için kullanacağınız key yapısı
# n8n Function node'unda HTTP Request ile Redis'e yazabilirsiniz
# ya da doğrudan Redis node'unu kullanabilirsiniz

# Redis SET komutunun NX ve EX parametreleriyle
# SET workflow:server-health-check:lock "running" NX EX 300
# NX: Sadece key yoksa yaz
# EX 300: 300 saniye sonra otomatik sil (timeout güvencesi)

n8n içinde bu kontrolü Function node’uyla da yapabilirsiniz:

// Workflow başında lock kontrolü
const lockKey = 'workflow:server-monitor:lock';
const lockValue = $execution.id;
const lockTTL = 300; // 5 dakika

// Bu kısım için HTTP Request node'u ile Redis REST API kullanın
// ya da n8n'in Redis node'unu Execute Command olarak ayarlayın
return [{
  json: {
    lock_key: lockKey,
    lock_value: lockValue,
    lock_ttl: lockTTL,
    check_url: `http://redis-api.internal/set/${lockKey}?nx=true&ex=${lockTTL}`
  }
}];

Büyük Veri Setlerinde SplitInBatches

200 sunucuyu aynı anda işlemeye çalışırsanız, hem API rate limit sorunlarıyla karşılaşırsınız hem de hata ayıklamak zorlaşır. SplitInBatches node’u bu sorunu çözer.

// SplitInBatches sonrasında her batch'i işleyen Function node
const currentBatch = $input.all();
const batchSize = currentBatch.length;
const batchIndex = $itemIndex;

// Her batch için özet log
console.log(`Batch işleniyor: ${batchIndex + 1}, Item sayısı: ${batchSize}`);

return currentBatch.map(item => ({
  json: {
    ...item.json,
    batch_processed: true,
    batch_timestamp: new Date().toISOString()
  }
}));

Genellikle şu batch boyutlarını öneririm:

  • API çağrıları için: 10-20 item (rate limit’e göre ayarlayın)
  • Veritabanı yazma için: 50-100 item
  • Dosya işleme için: 5-10 item (memory kullanımına dikkat)
  • E-posta gönderme için: 5-10 item (spam filtrelerinden kaçınmak için)

İş Akışı Versiyonlama ve Backup

n8n iş akışlarını JSON olarak dışa aktarabilirsiniz. Bunu düzenli olarak yapıp Git’e commit etmek, üretim ortamında hayat kurtarır.

#!/bin/bash
# n8n workflow backup scripti
# /usr/local/bin/backup-n8n-workflows.sh

N8N_API_URL="http://localhost:5678/api/v1"
N8N_API_KEY="your-api-key-here"
BACKUP_DIR="/opt/n8n-backups/workflows"
GIT_REPO="/opt/n8n-backups"
DATE=$(date +%Y%m%d_%H%M%S)

mkdir -p "${BACKUP_DIR}"

# Tüm workflow'ları çek
WORKFLOWS=$(curl -s -H "X-N8N-API-KEY: ${N8N_API_KEY}" 
  "${N8N_API_URL}/workflows" | jq -r '.data[].id')

for WF_ID in $WORKFLOWS; do
  WF_NAME=$(curl -s -H "X-N8N-API-KEY: ${N8N_API_KEY}" 
    "${N8N_API_URL}/workflows/${WF_ID}" | jq -r '.name' | tr ' ' '_')
  
  curl -s -H "X-N8N-API-KEY: ${N8N_API_KEY}" 
    "${N8N_API_URL}/workflows/${WF_ID}" 
    > "${BACKUP_DIR}/${WF_NAME}_${WF_ID}.json"
  
  echo "Backup alındı: ${WF_NAME} (ID: ${WF_ID})"
done

# Git'e commit
cd "${GIT_REPO}"
git add -A
git commit -m "Workflow backup: ${DATE}"
git push origin main

echo "Backup tamamlandı: ${DATE}"

Bu scripti crontab’a ekleyin:

# Her gece 02:00'de n8n workflow backup
0 2 * * * /usr/local/bin/backup-n8n-workflows.sh >> /var/log/n8n-backup.log 2>&1

Performans İpuçları

n8n’i production’da kullanırken öğrendiğim birkaç kritik ipucu:

  • Execution data’yı sınırlandırın: n8n ayarlarında Execution Data Pruning aktif etmezseniz, veritabanınız hızla şişer. Benim standart ayarım 30 gün.
  • Webhook endpoint’lerini koruyun: Herkese açık n8n instance’larında webhook URL’lerine authentication ekleyin. Header-based authentication en pratik yöntem.
  • Node içi logging: Console.log kullanmak execution log’larında görünür, production’da aşırı log atmaktan kaçının.
  • External secrets: API key’leri doğrudan workflow’a gömmeyin. n8n’in Credentials sistemini veya environment variable’ları kullanın.
# n8n'i environment variable'larla başlatma örneği
# docker-compose.yml içinde
environment:
  - N8N_ENCRYPTION_KEY=your-random-32-char-key
  - DB_TYPE=postgresdb
  - DB_POSTGRESDB_HOST=postgres
  - DB_POSTGRESDB_DATABASE=n8n
  - EXECUTIONS_DATA_PRUNE=true
  - EXECUTIONS_DATA_MAX_AGE=30
  - N8N_METRICS=true

Sonuç

n8n ile çoklu adımlı iş akışı tasarlamak başlangıçta kolay görünür, ama scale ettikçe mimarisel kararlarınızın ağırlığını hissedersiniz. SubWorkflow kullanımı, hata yönetimi stratejisi ve veri dönüşüm katmanlarını başından doğru kurmak, sonradan “şunu yeniden yazmalıyım” demenizi engeller.

Özellikle şu üç prensiple ilerlerseniz uzun vadede rahat edersiniz: Her karmaşık iş akışını modüllere bölün, hata yönetimini her branch için ayrı düşünün ve workflow’larınızı mutlaka version control altında tutun. Bu alışkanlıklar, sabahın dördünde alarm almakla sabahın dördünde her şeyin otomatik çalıştığını görmek arasındaki farktır.

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir