GCP Dataflow ile Veri İşleme Pipeline Kurulumu ve Yönetimi
Veri mühendisliği dünyasında “pipeline” kavramı artık her sistemin merkezinde yer alıyor. Özellikle büyük ölçekli veri işleme söz konusu olduğunda, GCP Dataflow gerçekten güçlü bir seçenek olarak öne çıkıyor. Apache Beam tabanlı bu yönetilen servis, hem batch hem de streaming veri işleme işlemlerini aynı kod tabanıyla yönetmene olanak tanıyor. Bu yazıda, Dataflow’u sıfırdan kurmaktan üretim ortamına taşımaya kadar tüm süreci ele alacağız.
GCP Dataflow Nedir ve Neden Kullanmalısın?
Dataflow, Google’ın tamamen yönetilen (fully managed) bir veri işleme servisidir. Altyapıyı kendin yönetmek zorunda kalmıyorsun; Dataflow otomatik olarak worker node’ları başlatıp durduruyor, ölçeklendirme yapıyor ve hataları yönetiyor.
Klasik bir senaryo düşün: Her gün milyonlarca log kaydı Pub/Sub’a düşüyor, bunları temizleyip BigQuery’ye yazmak gerekiyor. Bunu Compute Engine üzerinde elle yönetmeye çalışmak ciddi bir operasyonel yük demek. Dataflow bu yükü senden alıyor.
Temel avantajlar şunlar:
- Otomatik ölçeklendirme: İş yüküne göre worker sayısı dinamik olarak ayarlanıyor
- Unified model: Batch ve streaming için aynı kodu kullanabiliyorsun (Apache Beam)
- Fully managed: Sunucu bakımı, güncelleme, patch senin işin değil
- Maliyet verimliliği: Kullandığın kadar öde modeli ile gereksiz kaynak harcamazsın
- Monitoring: Cloud Monitoring ile entegre, pipeline görselleştirme arayüzü mevcut
Ortam Hazırlığı ve Kurulum
Başlamadan önce birkaç şeyin yerli yerinde olması gerekiyor. GCP projeni ve yerel geliştirme ortamını düzgünce ayarlamak, ileride başını çok kurtarıyor.
# gcloud CLI kurulumu ve kimlik doğrulama
gcloud auth login
gcloud config set project PROJE_ID
# Application Default Credentials ayarla
gcloud auth application-default login
# Gerekli API'leri etkinleştir
gcloud services enable dataflow.googleapis.com
gcloud services enable storage.googleapis.com
gcloud services enable bigquery.googleapis.com
# Python sanal ortam oluştur (Python 3.8+ önerilir)
python3 -m venv dataflow-env
source dataflow-env/bin/activate
# Apache Beam ve GCP extras kur
pip install apache-beam[gcp]==2.52.0
pip install google-cloud-bigquery
pip install google-cloud-pubsub
Servis hesabı konusunu atlamak büyük hata olur. Production ortamında her zaman dedicated bir servis hesabı kullan:
# Dataflow için servis hesabı oluştur
gcloud iam service-accounts create dataflow-sa
--display-name="Dataflow Service Account"
# Gerekli rolleri ata
gcloud projects add-iam-policy-binding PROJE_ID
--member="serviceAccount:dataflow-sa@PROJE_ID.iam.gserviceaccount.com"
--role="roles/dataflow.worker"
gcloud projects add-iam-policy-binding PROJE_ID
--member="serviceAccount:dataflow-sa@PROJE_ID.iam.gserviceaccount.com"
--role="roles/bigquery.dataEditor"
gcloud projects add-iam-policy-binding PROJE_ID
--member="serviceAccount:dataflow-sa@PROJE_ID.iam.gserviceaccount.com"
--role="roles/storage.objectAdmin"
# GCS bucket oluştur (temp ve staging için)
gsutil mb -l europe-west1 gs://PROJE_ID-dataflow-temp
İlk Pipeline: Batch Veri İşleme
Gerçek dünyadan bir senaryo ile başlayalım. E-ticaret platformunuzda her gün CSV formatında sipariş dosyaları GCS’e atılıyor. Bu dosyaları okuyup temizleyip BigQuery’ye yazman gerekiyor.
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import StandardOptions
import logging
import json
import csv
from io import StringIO
class ParseCSVLine(beam.DoFn):
"""CSV satırını parse ederek dict'e çevirir"""
def process(self, element):
try:
reader = csv.DictReader(StringIO(element),
fieldnames=['order_id', 'user_id',
'product_id', 'amount',
'created_at'])
for row in reader:
# Header satırını atla
if row['order_id'] == 'order_id':
return
yield row
except Exception as e:
logging.error(f"CSV parse hatasi: {e}, element: {element}")
class ValidateAndClean(beam.DoFn):
"""Veriyi doğrula ve temizle"""
def process(self, element):
try:
# Amount değerini float'a çevir
element['amount'] = float(element['amount'])
# Negatif amount'ları filtrele
if element['amount'] <= 0:
return
# String temizleme
element['order_id'] = element['order_id'].strip()
element['user_id'] = element['user_id'].strip()
yield element
except (ValueError, KeyError) as e:
logging.warning(f"Validation hatasi: {e}")
def run():
options = PipelineOptions()
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'PROJE_ID'
google_cloud_options.region = 'europe-west1'
google_cloud_options.job_name = 'siparis-batch-pipeline'
google_cloud_options.staging_location = 'gs://PROJE_ID-dataflow-temp/staging'
google_cloud_options.temp_location = 'gs://PROJE_ID-dataflow-temp/temp'
options.view_as(StandardOptions).runner = 'DataflowRunner'
# BigQuery schema tanımı
table_schema = {
'fields': [
{'name': 'order_id', 'type': 'STRING', 'mode': 'REQUIRED'},
{'name': 'user_id', 'type': 'STRING', 'mode': 'REQUIRED'},
{'name': 'product_id', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'amount', 'type': 'FLOAT', 'mode': 'REQUIRED'},
{'name': 'created_at', 'type': 'STRING', 'mode': 'NULLABLE'},
]
}
with beam.Pipeline(options=options) as p:
(
p
| 'Dosyayi Oku' >> beam.io.ReadFromText(
'gs://PROJE_ID-orders/raw/*.csv',
skip_header_lines=1
)
| 'CSV Parse' >> beam.ParDo(ParseCSVLine())
| 'Validate ve Clean' >> beam.ParDo(ValidateAndClean())
| 'BigQuery Yaz' >> beam.io.WriteToBigQuery(
'PROJE_ID:ecommerce.orders',
schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
)
)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
Streaming Pipeline: Pub/Sub’dan BigQuery’ye
Batch işleme güzel, ama gerçek zamanı yakalamanın heyecanı başka. Şimdi Pub/Sub’dan mesaj okuyup gerçek zamanlı olarak işleyen bir pipeline yazalım.
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.transforms.window import FixedWindows
import json
import logging
from datetime import datetime
class ParsePubSubMessage(beam.DoFn):
"""Pub/Sub mesajını parse et"""
def process(self, element):
try:
# Pub/Sub mesajı bytes gelir, decode et
message = element.decode('utf-8')
data = json.loads(message)
# Timestamp ekle
data['processed_at'] = datetime.utcnow().isoformat()
yield data
except json.JSONDecodeError as e:
logging.error(f"JSON decode hatasi: {e}")
except Exception as e:
logging.error(f"Beklenmeyen hata: {e}")
class EnrichUserData(beam.DoFn):
"""Kullanici verisini zenginleştir"""
def process(self, element):
# Event tipine göre kategori ata
event_type = element.get('event_type', 'unknown')
category_map = {
'page_view': 'engagement',
'add_to_cart': 'conversion',
'purchase': 'conversion',
'login': 'auth',
'logout': 'auth'
}
element['category'] = category_map.get(event_type, 'other')
yield element
def run():
options = PipelineOptions(streaming=True)
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'PROJE_ID'
google_cloud_options.region = 'europe-west1'
google_cloud_options.job_name = 'user-events-streaming'
google_cloud_options.staging_location = 'gs://PROJE_ID-dataflow-temp/staging'
google_cloud_options.temp_location = 'gs://PROJE_ID-dataflow-temp/temp'
table_schema = 'user_id:STRING,event_type:STRING,category:STRING,'
'session_id:STRING,timestamp:STRING,processed_at:STRING'
with beam.Pipeline(options=options) as p:
events = (
p
| 'PubSub Oku' >> beam.io.ReadFromPubSub(
subscription='projects/PROJE_ID/subscriptions/user-events-sub'
)
| 'Pencere Uygula' >> beam.WindowInto(FixedWindows(60)) # 60 saniyelik pencere
| 'Mesaj Parse' >> beam.ParDo(ParsePubSubMessage())
| 'Veri Zenginleştir' >> beam.ParDo(EnrichUserData())
| 'BigQuery Yaz' >> beam.io.WriteToBigQuery(
'PROJE_ID:analytics.user_events',
schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
triggering_frequency=30 # Her 30 saniyede bir flush
)
)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
Dead Letter Queue Implementasyonu
Production ortamında hatalı mesajları kaybetmek kabul edilemez. Dead Letter Queue (DLQ) pattern’i mutlaka uygulamalısın:
class ProcessWithDLQ(beam.DoFn):
"""Hata yönetimi ile veri işleme - DLQ pattern"""
OUTPUT_TAG_ERRORS = 'errors'
def process(self, element):
try:
data = json.loads(element.decode('utf-8'))
# Zorunlu alanları kontrol et
required_fields = ['user_id', 'event_type', 'timestamp']
for field in required_fields:
if field not in data:
raise ValueError(f"Eksik alan: {field}")
yield data
except Exception as e:
# Hatalı mesajı dead letter queue'ya yönlendir
error_record = {
'original_message': element.decode('utf-8', errors='replace'),
'error_message': str(e),
'error_time': datetime.utcnow().isoformat()
}
yield beam.pvalue.TaggedOutput(self.OUTPUT_TAG_ERRORS, error_record)
# Pipeline içinde kullanımı
with beam.Pipeline(options=options) as p:
results = (
p
| 'PubSub Oku' >> beam.io.ReadFromPubSub(
subscription='projects/PROJE_ID/subscriptions/events-sub'
)
| 'DLQ ile İşle' >> beam.ParDo(ProcessWithDLQ()).with_outputs(
ProcessWithDLQ.OUTPUT_TAG_ERRORS,
main='valid_records'
)
)
# Geçerli kayıtları BigQuery'ye yaz
results.valid_records | 'Geçerlileri Yaz' >> beam.io.WriteToBigQuery(
'PROJE_ID:analytics.events',
schema='user_id:STRING,event_type:STRING,timestamp:STRING'
)
# Hatalı kayıtları ayrı bir tabloya yaz
results.errors | 'Hataları Yaz' >> beam.io.WriteToBigQuery(
'PROJE_ID:analytics.events_dlq',
schema='original_message:STRING,error_message:STRING,error_time:STRING'
)
Pipeline’ı Gcloud ile Yönetmek
Dataflow job’larını komut satırından yönetmek günlük operasyonun parçası:
# Çalışan job'ları listele
gcloud dataflow jobs list
--region=europe-west1
--status=active
# Belirli bir job'un detaylarını gör
gcloud dataflow jobs describe JOB_ID
--region=europe-west1
# Job'u iptal et (drain: mevcut pencereleri tamamla)
gcloud dataflow jobs drain JOB_ID
--region=europe-west1
# Job'u zorla durdur (cancel: hemen durdur)
gcloud dataflow jobs cancel JOB_ID
--region=europe-west1
# Job log'larını görüntüle
gcloud logging read
"resource.type=dataflow_step AND resource.labels.job_id=JOB_ID"
--limit=50
--format="table(timestamp,severity,textPayload)"
Flex Templates ile Production Deployment
Production ortamında job’ları doğrudan Python dosyasından çalıştırmak yerine Flex Template kullanmak çok daha yönetilebilir bir yaklaşım:
# Dockerfile oluştur
cat > Dockerfile << 'EOF'
FROM apache/beam_python3.9_sdk:2.52.0
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY pipeline.py .
ENTRYPOINT ["python", "pipeline.py"]
EOF
# Container image build et ve push et
gcloud builds submit
--tag europe-west1-docker.pkg.dev/PROJE_ID/dataflow/siparis-pipeline:v1.0
.
# Flex Template oluştur
gcloud dataflow flex-template build
gs://PROJE_ID-dataflow-temp/templates/siparis-pipeline.json
--image=europe-west1-docker.pkg.dev/PROJE_ID/dataflow/siparis-pipeline:v1.0
--sdk-language=PYTHON
--metadata-file=metadata.json
# Template'den job başlat
gcloud dataflow flex-template run siparis-pipeline-$(date +%Y%m%d)
--template-file-gcs-location=gs://PROJE_ID-dataflow-temp/templates/siparis-pipeline.json
--region=europe-west1
--parameters input_bucket=gs://PROJE_ID-orders/raw/
--parameters output_table=PROJE_ID:ecommerce.orders
--service-account-email=dataflow-sa@PROJE_ID.iam.gserviceaccount.com
--worker-machine-type=n1-standard-4
--max-workers=10
Monitoring ve Alerting Kurulumu
Pipeline çalışıyor, ama bir şeyler ters gittiğinde nasıl haberdar olacaksın? Cloud Monitoring’i düzgünce yapılandırmak şart:
# Dataflow job hata için alert policy oluştur
gcloud alpha monitoring policies create
--policy-from-file=alert_policy.json
# Job'un system lag'ini kontrol et (streaming için kritik)
gcloud monitoring metrics list
--filter="metric.type=dataflow.googleapis.com/job/system_lag"
--format="table(metric.type,resource.labels)"
Alert policy JSON dosyasını da hazırla:
cat > alert_policy.json << 'EOF'
{
"displayName": "Dataflow Job Hata Alarmı",
"conditions": [
{
"displayName": "System lag yüksek",
"conditionThreshold": {
"filter": "resource.type = "dataflow_job" AND metric.type = "dataflow.googleapis.com/job/system_lag"",
"comparison": "COMPARISON_GT",
"thresholdValue": 300,
"duration": "120s",
"aggregations": [
{
"alignmentPeriod": "60s",
"perSeriesAligner": "ALIGN_MEAN"
}
]
}
}
],
"alertStrategy": {
"notificationRateLimit": {
"period": "300s"
}
},
"notificationChannels": ["projects/PROJE_ID/notificationChannels/CHANNEL_ID"]
}
EOF
Maliyet Optimizasyonu İpuçları
Dataflow faturası sürpriz yapabilir. Birkaç kritik noktayı bil:
Worker tipi seçimi:
- n1-standard-4: Genel amaçlı batch işler için iyi başlangıç noktası
- n1-highmem-4: Bellek yoğun transformations için tercih et
- n1-highcpu-8: CPU bound işlemler için ideal
Autoscaling parametreleri düzgün ayarla:
- –max-workers: Bunu gerçekçi bir sayıya çek, 100 yazmak faturayı patlatır
- –num-workers: Başlangıç worker sayısı, küçük başla
- –autoscaling-algorithm: THROUGHPUT_BASED genellikle en iyisi
Preemptible VM kullan:
# Batch job'larda preemptible worker kullan (%80 tasarruf)
gcloud dataflow jobs run batch-job
--gcs-location=gs://dataflow-templates/latest/GCS_Text_to_BigQuery
--region=europe-west1
--additional-experiments=enable_prime
--additional-experiments=use_runner_v2
--worker-machine-type=n1-standard-4
--additional-worker-zones=europe-west1-b,europe-west1-c,europe-west1-d
Shuffle service’i etkinleştir: Büyük GroupByKey ve CoGroupByKey operasyonlarında Dataflow Shuffle servisi hem performans hem maliyet açısından avantaj sağlar. --experiments=shuffle_mode=service parametresini ekle.
Yaygın Sorunlar ve Çözümleri
Sahada en çok karşılaşılan durumları buraya topladım:
Stuck pipeline: Streaming pipeline’ın watermark ilerlemiyorsa, kaynak tarafında bir sorun var demektir. Pub/Sub subscription’ında mesaj birikip birikmediğini kontrol et. gcloud pubsub subscriptions describe SUB_NAME ile backlog’a bak.
Out of memory hataları: beam.ParDo içinde büyük objeler tutmaktan kaçın. State ve Side Input kullanıyorsan, bunları mümkün olduğunca küçük tut. Worker machine type’ı highmem seriye geçirmeyi düşün.
BigQuery write hataları: Schema mismatch en yaygın sorun. WRITE_TRUNCATE yerine WRITE_APPEND kullanıyorsan, her zaman incoming verinin schema’ya uyduğunu doğrula. DLQ pattern bunu çözer.
Yavaş başlangıç: Flex Template kullanıyorsan, container image boyutunu küçük tut. Gereksiz dependency’leri requirements.txt’den çıkar. Bu, job başlangıç süresini ciddi ölçüde kısaltır.
CI/CD ile Pipeline Deployment Otomasyonu
Production’da elle deployment yapmak riskli. Basit bir Cloud Build pipeline’ı kur:
cat > cloudbuild.yaml << 'EOF'
steps:
# Unit testleri çalıştır
- name: 'python:3.9'
entrypoint: 'pip'
args: ['install', '-r', 'requirements-test.txt']
- name: 'python:3.9'
entrypoint: 'python'
args: ['-m', 'pytest', 'tests/', '-v']
# Docker image build et
- name: 'gcr.io/cloud-builders/docker'
args: ['build', '-t',
'europe-west1-docker.pkg.dev/$PROJECT_ID/dataflow/pipeline:$SHORT_SHA',
'.']
# Image push et
- name: 'gcr.io/cloud-builders/docker'
args: ['push',
'europe-west1-docker.pkg.dev/$PROJECT_ID/dataflow/pipeline:$SHORT_SHA']
# Flex Template güncelle
- name: 'gcr.io/google.com/cloudsdktool/cloud-sdk'
entrypoint: 'gcloud'
args:
- 'dataflow'
- 'flex-template'
- 'build'
- 'gs://$PROJECT_ID-dataflow-temp/templates/pipeline-$SHORT_SHA.json'
- '--image=europe-west1-docker.pkg.dev/$PROJECT_ID/dataflow/pipeline:$SHORT_SHA'
- '--sdk-language=PYTHON'
timeout: '1800s'
EOF
Sonuç
GCP Dataflow, doğru kullanıldığında veri mühendisliği operasyonlarını inanılmaz derecede sadeleştiriyor. Altyapı yönetimi yerine iş mantığına odaklanabiliyorsun, bu büyük bir avantaj. Ancak her güçlü araç gibi, Dataflow’u da bilinçli kullanmak gerekiyor.
Batch işlemelerde her zaman DLQ pattern’i uygula, streaming’de watermark’ları yakından izle, maliyet konusunda preemptible VM ve max worker limitlerini ihmal etme. Flex Template kullanımı production ortamında tekrarlanabilirlik ve güvenilirlik açısından neredeyse zorunlu hale geldi.
Küçük başla, yerel olarak DirectRunner ile test et, sonra Dataflow’a geç. Pipeline kodunu versiyon kontrolünde tut ve Cloud Build ile otomatikleştir. Bu alışkanlıkları edindiğinde, Dataflow ile veri mühendisliği gerçekten keyifli bir hal alıyor.
Sorularınız veya farklı senaryolarınız varsa, yorum bölümünde paylaşın. Özellikle büyük ölçekli join operasyonları ve custom sources/sinks konularını gelecek yazılarda detaylı ele alabiliriz.
