AWS Glue ile Veri Entegrasyonu: Kapsamlı Bir Rehber

Veri entegrasyonu meselesi, büyük şirketlerin yıllardır baş ağrısı olmuştur. Onlarca farklı kaynaktan gelen veriyi temizlemek, dönüştürmek ve anlamlı bir yere taşımak için custom ETL scriptleri yazmak, bunları sunucularda ayakta tutmak, hata yönetimi yapmak… Tüm bunlar ciddi zaman ve emek ister. AWS Glue tam da bu noktada devreye girerek “serverless ETL” yaklaşımıyla bu yükü önemli ölçüde hafifletiyor. Ben de bu yazıda AWS Glue’yu sıfırdan ele alarak gerçek dünya senaryolarıyla nasıl kullanacağınızı anlatacağım.

AWS Glue Nedir ve Neden Kullanılır?

AWS Glue, tamamen yönetilen bir ETL (Extract, Transform, Load) servisidir. Altta Apache Spark çalışır, siz sadece iş mantığınıza odaklanırsınız. Sunucu provisioning yok, Spark cluster kurulumu yok, altyapı bakımı yok.

Temel bileşenlere bakalım:

  • Data Catalog: Metadata deposu. Tablolarınızın şemasını, kaynak bilgilerini burada saklarsınız.
  • Crawlers: Veri kaynaklarını tarayıp otomatik şema keşfi yapan araçlar.
  • Jobs: Asıl ETL işini yapan Spark/Python scriptleri.
  • Triggers: Job’ları zamanlamak veya olay bazlı tetiklemek için kullanılır.
  • Connections: JDBC, S3, Kafka gibi kaynaklara bağlantı tanımlamaları.
  • Workflows: Birden fazla job ve crawler’ı orchestrate etmek için.

Tipik kullanım senaryoları şunlardır:

  • S3’teki ham log verilerini temizleyip Redshift’e yüklemek
  • RDS’teki operasyonel veriyi data lake’e taşımak
  • Farklı formatlardaki (CSV, JSON, Parquet) verileri birleştirmek
  • Çeşitli SaaS kaynaklarından gelen veriyi normalize etmek

Ortam Hazırlığı ve IAM Ayarları

Glue job’ları çalıştırmak için önce doğru IAM rolünü oluşturmanız gerekiyor. Bu rol hem Glue servisine hem de erişeceğiniz kaynaklara uygun yetkiler içermelidir.

# IAM rol oluşturma (trust policy için önce json dosyası hazırlıyoruz)
cat > glue-trust-policy.json << 'EOF'
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "glue.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}
EOF

aws iam create-role 
  --role-name GlueETLRole 
  --assume-role-policy-document file://glue-trust-policy.json

# AWS managed policy ekliyoruz
aws iam attach-role-policy 
  --role-name GlueETLRole 
  --policy-arn arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole

# S3 bucket'a erişim için inline policy
cat > s3-access-policy.json << 'EOF'
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": ["s3:GetObject", "s3:PutObject", "s3:DeleteObject", "s3:ListBucket"],
      "Resource": [
        "arn:aws:s3:::my-data-lake-bucket",
        "arn:aws:s3:::my-data-lake-bucket/*"
      ]
    }
  ]
}
EOF

aws iam put-role-policy 
  --role-name GlueETLRole 
  --policy-name S3DataLakeAccess 
  --policy-document file://s3-access-policy.json

Crawler ile Şema Keşfi

Diyelim ki S3’te düzensiz bir şekilde biriken e-ticaret sipariş loglarınız var. Günlük CSV dosyaları geliyor, şema zaman zaman değişiyor. Crawler’ı ayarlarsanız bu değişiklikleri otomatik olarak Data Catalog’a yansıtabilirsiniz.

# Glue crawler oluşturma
aws glue create-crawler 
  --name "orders-s3-crawler" 
  --role "arn:aws:iam::123456789012:role/GlueETLRole" 
  --database-name "ecommerce_raw" 
  --targets '{
    "S3Targets": [
      {
        "Path": "s3://my-data-lake-bucket/raw/orders/",
        "Exclusions": ["**/_temporary/**", "**/tmp/**"]
      }
    ]
  }' 
  --schema-change-policy '{
    "UpdateBehavior": "UPDATE_IN_DATABASE",
    "DeleteBehavior": "LOG"
  }' 
  --configuration '{
    "Version": 1.0,
    "CrawlerOutput": {
      "Partitions": {"AddOrUpdateBehavior": "InheritFromTable"},
      "Tables": {"AddOrUpdateBehavior": "MergeNewColumns"}
    }
  }'

# Crawler'ı başlatma
aws glue start-crawler --name "orders-s3-crawler"

# Crawler durumunu kontrol etme
aws glue get-crawler --name "orders-s3-crawler" 
  --query 'Crawler.State' --output text

Dikkat edilmesi gereken nokta: DeleteBehavior için DELETE_FROM_DATABASE yerine LOG tercih edin. Aksi takdirde S3’ten silinen dosyalar yüzünden tablo tanımlarınız kaybolabilir ve downstream job’lar patlar.

İlk Glue Job: CSV’den Parquet’e Dönüşüm

Şimdi asıl işe gelelim. Aşağıdaki senaryo birçok şirkette karşılaşılan klasik bir durum: Ham CSV verileri S3’e geliyor, bunları sorgulama için optimize edilmiş Parquet formatına dönüştürmeniz gerekiyor.

# glue_csv_to_parquet.py
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import col, to_date, year, month, dayofmonth, trim, lower
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

args = getResolvedOptions(sys.argv, ['JOB_NAME', 'source_path', 'target_path', 'database_name', 'table_name'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# Data Catalog üzerinden kaynak veriyi okuma
datasource = glueContext.create_dynamic_frame.from_catalog(
    database=args['database_name'],
    table_name=args['table_name'],
    transformation_ctx="datasource"
)

logger.info(f"Kayıt sayısı: {datasource.count()}")
logger.info(f"Şema: {datasource.schema()}")

# DynamicFrame'i Spark DataFrame'e dönüştürme
df = datasource.toDF()

# Veri temizleme operasyonları
df_clean = df 
    .filter(col("order_id").isNotNull()) 
    .filter(col("order_date").isNotNull()) 
    .withColumn("customer_email", lower(trim(col("customer_email")))) 
    .withColumn("order_date", to_date(col("order_date"), "yyyy-MM-dd")) 
    .withColumn("year", year(col("order_date"))) 
    .withColumn("month", month(col("order_date"))) 
    .withColumn("day", dayofmonth(col("order_date"))) 
    .dropDuplicates(["order_id"])

logger.info(f"Temizleme sonrası kayıt sayısı: {df_clean.count()}")

# Spark DataFrame'i tekrar DynamicFrame'e çevirme
dynamic_frame_clean = DynamicFrame.fromDF(df_clean, glueContext, "dynamic_frame_clean")

# Parquet formatında S3'e yazma (partition'lı)
glueContext.write_dynamic_frame.from_options(
    frame=dynamic_frame_clean,
    connection_type="s3",
    connection_options={
        "path": args['target_path'],
        "partitionKeys": ["year", "month", "day"]
    },
    format="parquet",
    format_options={
        "compression": "snappy"
    },
    transformation_ctx="datasink"
)

job.commit()
logger.info("Job başarıyla tamamlandı.")

Bu scripti Glue’ya deploy etmek için:

# Script'i S3'e yükleme
aws s3 cp glue_csv_to_parquet.py s3://my-glue-scripts/etl/

# Glue job oluşturma
aws glue create-job 
  --name "csv-to-parquet-orders" 
  --role "arn:aws:iam::123456789012:role/GlueETLRole" 
  --command '{
    "Name": "glueetl",
    "ScriptLocation": "s3://my-glue-scripts/etl/glue_csv_to_parquet.py",
    "PythonVersion": "3"
  }' 
  --default-arguments '{
    "--job-language": "python",
    "--job-bookmark-option": "job-bookmark-enable",
    "--enable-metrics": "",
    "--enable-continuous-cloudwatch-log": "true",
    "--source_path": "s3://my-data-lake-bucket/raw/orders/",
    "--target_path": "s3://my-data-lake-bucket/processed/orders/",
    "--database_name": "ecommerce_raw",
    "--table_name": "orders"
  }' 
  --glue-version "4.0" 
  --number-of-workers 10 
  --worker-type "G.1X" 
  --timeout 60

# Job'ı çalıştırma
aws glue start-job-run --job-name "csv-to-parquet-orders"

Job Bookmark özelliğine dikkat edin. Bu özellik etkinleştirildiğinde Glue, hangi verilerin işlendiğini takip eder ve sonraki çalışmada sadece yeni verileri işler. Incremental load için hayat kurtarıcıdır.

RDS’ten S3’e Veri Taşıma

Üretim veritabanınızdaki veriyi data lake’e aktarmak istiyorsunuz. JDBC bağlantısı kullanarak RDS’ten veri çekip S3’e yazabilirsiniz.

# RDS için Glue Connection oluşturma
aws glue create-connection 
  --connection-input '{
    "Name": "rds-postgres-prod",
    "ConnectionType": "JDBC",
    "ConnectionProperties": {
      "JDBC_CONNECTION_URL": "jdbc:postgresql://mydb.cluster.us-east-1.rds.amazonaws.com:5432/ecommerce",
      "USERNAME": "glue_readonly_user",
      "PASSWORD": "{{resolve:secretsmanager:rds/glue-credentials:SecretString:password}}",
      "JDBC_ENFORCE_SSL": "true"
    },
    "PhysicalConnectionRequirements": {
      "SubnetId": "subnet-0a1b2c3d4e5f",
      "SecurityGroupIdList": ["sg-0123456789abcdef0"],
      "AvailabilityZone": "us-east-1a"
    }
  }'

Dikkat: Glue’nun RDS’e erişebilmesi için VPC içinde çalışması gerekiyor. Security group’ların RDS portuna (5432 PostgreSQL için) izin verdiğinden emin olun. Bu konfigürasyon hataları Glue job’larının çalışmama sebebinin büyük çoğunluğunu oluşturuyor.

# rds_to_s3_etl.py - Gerçek dünya senaryosu
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import col, current_timestamp, date_format

args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# RDS'ten veri okuma - pushdown predicate ile sadece son 7 günü çekiyoruz
datasource = glueContext.create_dynamic_frame.from_catalog(
    database="ecommerce_rds",
    table_name="public_users",
    push_down_predicate="updated_at >= current_date - interval '7 days'",
    additional_options={
        "sampleQuery": "SELECT * FROM public.users WHERE updated_at >= NOW() - INTERVAL '7 days'"
    },
    transformation_ctx="rds_source"
)

df = datasource.toDF()

# Hassas kolonları maskeleme (GDPR uyumu)
df_masked = df 
    .withColumn("email", 
        expr("concat(left(email, 3), '***@', split(email, '@')[1])")) 
    .withColumn("phone", 
        expr("concat('***-***-', right(regexp_replace(phone, '[^0-9]', ''), 4))")) 
    .withColumn("etl_timestamp", current_timestamp()) 
    .withColumn("partition_date", date_format(current_timestamp(), "yyyy-MM-dd"))

# S3'e yazma
from awsglue.dynamicframe import DynamicFrame
output_df = DynamicFrame.fromDF(df_masked, glueContext, "output")

glueContext.write_dynamic_frame.from_options(
    frame=output_df,
    connection_type="s3",
    connection_options={
        "path": "s3://my-data-lake-bucket/processed/users/",
        "partitionKeys": ["partition_date"]
    },
    format="parquet",
    transformation_ctx="s3_sink"
)

job.commit()

Workflow ile Job Orchestration

Gerçek projeler genellikle birden fazla adımdan oluşur: önce crawl, sonra transform, sonra validate, sonra load. Glue Workflows bu pipeline’ı yönetmenizi sağlar.

# Workflow oluşturma
aws glue create-workflow 
  --name "daily-etl-pipeline" 
  --description "Gunluk e-ticaret veri entegrasyonu"

# Workflow'a trigger ekleme (her gece 02:00'de)
aws glue create-trigger 
  --name "daily-start-trigger" 
  --workflow-name "daily-etl-pipeline" 
  --type SCHEDULED 
  --schedule "cron(0 2 * * ? *)" 
  --actions '[{"CrawlerName": "orders-s3-crawler"}]'

# Crawler tamamlandığında ETL job'ı başlatan trigger
aws glue create-trigger 
  --name "after-crawler-trigger" 
  --workflow-name "daily-etl-pipeline" 
  --type CONDITIONAL 
  --predicate '{
    "Logical": "AND",
    "Conditions": [
      {
        "LogicalOperator": "EQUALS",
        "CrawlerName": "orders-s3-crawler",
        "CrawlState": "SUCCEEDED"
      }
    ]
  }' 
  --actions '[{"JobName": "csv-to-parquet-orders"}]'

# ETL tamamlandığında validation job'ı başlatan trigger
aws glue create-trigger 
  --name "after-etl-trigger" 
  --workflow-name "daily-etl-pipeline" 
  --type CONDITIONAL 
  --predicate '{
    "Logical": "AND",
    "Conditions": [
      {
        "LogicalOperator": "EQUALS",
        "JobName": "csv-to-parquet-orders",
        "State": "SUCCEEDED"
      }
    ]
  }' 
  --actions '[{"JobName": "data-quality-validation"}]'

# Workflow'u başlatma
aws glue start-workflow-run --name "daily-etl-pipeline"

Hata Yönetimi ve Monitoring

Glue job’larınızın production’da güvenilir çalışması için iyi bir monitoring altyapısı şart. CloudWatch ile alarm kurabilirsiniz.

# Başarısız job için CloudWatch alarm
aws cloudwatch put-metric-alarm 
  --alarm-name "GlueJobFailure-csv-to-parquet" 
  --alarm-description "CSV to Parquet job basarisiz oldu" 
  --metric-name "glue.driver.aggregate.numFailedTasks" 
  --namespace "Glue" 
  --dimensions Name=JobName,Value=csv-to-parquet-orders 
  --statistic Sum 
  --period 300 
  --threshold 1 
  --comparison-operator GreaterThanOrEqualToThreshold 
  --evaluation-periods 1 
  --alarm-actions "arn:aws:sns:us-east-1:123456789012:ops-alerts"

# Job run geçmişini kontrol etme
aws glue get-job-runs 
  --job-name "csv-to-parquet-orders" 
  --max-results 10 
  --query 'JobRuns[*].{ID:Id,Status:JobRunState,Started:StartedOn,Duration:ExecutionTime,Error:ErrorMessage}'

Glue job’larında sık karşılaşılan sorunlar ve çözümleri:

  • OutOfMemoryError: --conf spark.driver.memory=4g ve --conf spark.executor.memory=4g parametrelerini artırın. Büyük datasetlerde G.2X worker type’a geçmeyi düşünün.
  • Job bookmark çalışmıyor: Bookmark’ın bağlandığı transformation_ctx değerlerinin tutarlı olduğundan emin olun. Context adı değişirse bookmark sıfırlanır.
  • JDBC timeout: connectionProperties içine {"queryTimeout": "1800", "fetchsize": "1000"} ekleyin.
  • S3 throttling: Çok fazla küçük dosya yazıyorsanız coalesce() veya repartition() ile dosya sayısını azaltın.
  • VPC bağlantı sorunları: Glue için özel bir ENI oluşturulur. Security group’ların self-referencing kuralına sahip olduğundan emin olun.

Glue Data Quality ile Veri Doğrulama

AWS Glue 4.0 ile gelen Data Quality özelliği, ETL pipeline’ınıza yerleşik veri kalitesi kontrolleri eklemenizi sağlıyor. Bu özellik production’da çok değerli çünkü bozuk veri downstream sistemlerinize kadar sızmadan önce yakalanıyor.

# data_quality_check.py
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsgluedq.transforms import EvaluateDataQuality

args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# İşlenmiş veriyi okuma
datasource = glueContext.create_dynamic_frame.from_catalog(
    database="ecommerce_processed",
    table_name="orders_parquet",
    transformation_ctx="datasource"
)

# Kalite kurallarını tanımlama
ruleset = """
    Rules = [
        IsComplete "order_id",
        IsUnique "order_id",
        IsComplete "customer_email",
        Completeness "order_amount" >= 0.99,
        ColumnValues "order_amount" > 0,
        ColumnValues "order_status" in ["pending", "processing", "shipped", "delivered", "cancelled"],
        RowCount > 1000
    ]
"""

# Kalite değerlendirmesi çalıştırma
dq_results = EvaluateDataQuality.apply(
    frame=datasource,
    ruleset=ruleset,
    publishing_options={
        "dataQualityEvaluationContext": "orders_quality_check",
        "enableDataQualityCloudWatchMetrics": True,
        "enableDataQualityResultsPublishing": True
    }
)

# Başarısız kayıtları ayrıştırma
passed_records = SelectFromCollection.apply(
    dfc=dq_results, 
    key="rowLevelOutcomes"
)

# Sadece geçen kayıtları downstream'e aktarma
passed_df = passed_records.toDF().filter("DataQualityEvaluationResult = 'Passed'")

print(f"Kalite kontrolünden geçen kayıt: {passed_df.count()}")

job.commit()

Maliyet Optimizasyonu

Glue ücretlendirmesi DPU (Data Processing Unit) bazlıdır ve yanlış konfigürasyonlarla fatura şişebilir. Dikkat etmeniz gereken noktalar:

  • Worker type seçimi: Küçük datasetler için G.025X veya G.1X yeterlidir. G.2X’i gerçekten büyük işlemler için saklayın. G.025X ile maliyeti 4’te 1’e düşürebilirsiniz.
  • Auto Scaling kullanın: --enable-auto-scaling parametresiyle job’ların dinamik olarak scale olmasını sağlayın. Sabit worker sayısı yerine min/max değer belirleyin.
  • Job timeout ayarlayın: Timeout vermezseniz takılı kalan job’lar saatlerce DPU tüketir. İşinizin normal süresinin 2-3 katını timeout olarak ayarlayın.
  • Glue Studio yerine script kullanın: Görsel editör başlangıç için kolayken, üretim ortamında versionlanmış Python scriptleri çok daha yönetilebilir olur.
  • Parquet ve Snappy: Hedef formatı her zaman Parquet ve Snappy compression olarak seçin. Hem depolama hem de okuma maliyetlerini dramatik biçimde düşürür.
  • Bookmark kullanın: Tüm veriyi her seferinde işlemek yerine incremental load yapın.
# Mevcut job'ların maliyetini görmek için
aws glue get-job-runs 
  --job-name "csv-to-parquet-orders" 
  --query 'JobRuns[*].{ID:Id,DPUSeconds:DPUSeconds,WorkerType:WorkerType}' 
  --output table

# Job'u auto scaling ile güncelleme
aws glue update-job 
  --job-name "csv-to-parquet-orders" 
  --job-update '{
    "NumberOfWorkers": 10,
    "WorkerType": "G.1X",
    "DefaultArguments": {
      "--enable-auto-scaling": "true"
    }
  }'

Sonuç

AWS Glue, doğru yapılandırıldığında veri entegrasyonu süreçlerini ciddi biçimde hızlandıran ve operasyonel yükü azaltan bir servis. Ancak “serverless” olması her şeyin otomatik çalışacağı anlamına gelmiyor. IAM rolleri, VPC konfigürasyonu, job bookmark yönetimi ve maliyet optimizasyonu konularında dikkatli olmak gerekiyor.

Pratikte önerim: Küçük bir pilot projeyle başlayın. Tek bir tabloyu CSV’den Parquet’e dönüştüren basit bir job kurun, monitoring’i ayarlayın, maliyetleri takip edin. Sonrasında workflow’lar ve data quality kontrolleri ekleyerek sistemi olgunlaştırın. Büyük bir veri migrasyonunu ilk denemenizde 20 worker ile çalıştırmak yerine önce 2 worker ile test edin, performansı ölçün ve buna göre ölçeklendirin.

Data lake mimarisi kuruyorsanız Glue Data Catalog’u merkezi metadata deposunuz olarak konumlandırmanızı kesinlikle tavsiye ederim. Athena, EMR ve Redshift Spectrum hepsi bu catalog’u kullanabiliyor ve bu entegrasyon gerçekten çok değer katıyor.

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir