AWS Step Functions ile İş Akışı Otomasyonu

Dağıtık sistemleri yönetirken en can sıkıcı şeylerden biri, birden fazla Lambda fonksiyonu, API çağrısı veya batch işin birbirine bağımlı olduğu iş akışlarını koordine etmektir. Biri başarısız olursa ne olacak? Retry mantığını nereye koyacaksın? Timeout’ları nasıl yöneteceksin? AWS Step Functions tam olarak bu sorunları çözmek için tasarlanmış bir orkestrasyon servisi. Bugün gerçek dünya senaryolarıyla birlikte Step Functions’ı nasıl etkili kullanacağını anlatacağım.

Step Functions Nedir ve Neden Kullanmalısın?

Step Functions, görsel olarak tanımlanabilen ve JSON tabanlı Amazon States Language (ASL) ile yazılan iş akışlarını çalıştırmanı sağlayan bir orkestrasyon servisi. Lambda fonksiyonlarını, ECS görevlerini, SNS/SQS mesajlarını, DynamoDB işlemlerini ve daha onlarca AWS servisini tek bir akış altında yönetebiliyorsun.

En büyük avantajı şu: Her adımın durumunu kendisi takip ediyor. Bir adım başarısız olduğunda ne yapılacağını tanımlayabiliyorsun, her adımın ne kadar sürdüğünü görebiliyorsun ve herhangi bir noktada akışın hangi durumda olduğunu AWS Console üzerinden gerçek zamanlı izleyebiliyorsun.

İki farklı workflow tipi var:

  • Standard Workflows: Uzun süreli iş akışları için, maksimum 1 yıl çalışabilir, her adım kaydedilir ve tam olarak bir kez çalışma garantisi verir
  • Express Workflows: Yüksek hacimli, kısa süreli işler için, maksimum 5 dakika, saniyede binlerce execution destekler

Temel Kavramlar

Bir Step Functions state machine tanımı şu temel bileşenlerden oluşur:

  • States: İş akışındaki her adım bir state
  • Task: Bir AWS servisi veya Lambda fonksiyonu çağıran state tipi
  • Choice: Koşullu dallanma sağlayan state tipi
  • Wait: Belirli süre veya zamana kadar bekleyen state tipi
  • Parallel: Birden fazla dalı eş zamanlı çalıştıran state tipi
  • Map: Bir liste üzerinde iterasyon yapan state tipi
  • Pass: Girdiyi çıktıya aktaran, genellikle test için kullanılan state tipi
  • Succeed / Fail: Akışı başarı veya hata ile sonlandıran state tipleri

AWS CLI ile İlk State Machine Oluşturma

Öncelikle gerekli IAM rolünü oluşturalım:

# Step Functions için IAM rol oluşturma
aws iam create-role 
  --role-name StepFunctionsExecutionRole 
  --assume-role-policy-document '{
    "Version": "2012-10-17",
    "Statement": [
      {
        "Effect": "Allow",
        "Principal": {
          "Service": "states.amazonaws.com"
        },
        "Action": "sts:AssumeRole"
      }
    ]
  }'

# Lambda çağırma iznini ekle
aws iam attach-role-policy 
  --role-name StepFunctionsExecutionRole 
  --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaRole

Şimdi basit bir state machine tanımı oluşturalım. Bu örnek, bir e-ticaret sipariş işleme akışını simüle ediyor:

# State machine tanımını dosyaya kaydet
cat > order-workflow.json << 'EOF'
{
  "Comment": "E-ticaret sipariş işleme akışı",
  "StartAt": "ValidateOrder",
  "States": {
    "ValidateOrder": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:eu-west-1:123456789:function:validate-order",
      "Next": "CheckInventory",
      "Retry": [
        {
          "ErrorEquals": ["Lambda.ServiceException", "Lambda.TooManyRequestsException"],
          "IntervalSeconds": 2,
          "MaxAttempts": 3,
          "BackoffRate": 2
        }
      ],
      "Catch": [
        {
          "ErrorEquals": ["ValidationError"],
          "Next": "OrderFailed",
          "ResultPath": "$.error"
        }
      ]
    },
    "CheckInventory": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:eu-west-1:123456789:function:check-inventory",
      "Next": "InventoryDecision"
    },
    "InventoryDecision": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.inventoryAvailable",
          "BooleanEquals": true,
          "Next": "ProcessPayment"
        }
      ],
      "Default": "BackorderProcess"
    },
    "ProcessPayment": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:eu-west-1:123456789:function:process-payment",
      "Next": "ShipOrder"
    },
    "BackorderProcess": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:eu-west-1:123456789:function:handle-backorder",
      "Next": "NotifyCustomer"
    },
    "ShipOrder": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:eu-west-1:123456789:function:ship-order",
      "Next": "NotifyCustomer"
    },
    "NotifyCustomer": {
      "Type": "Task",
      "Resource": "arn:aws:states:::sns:publish",
      "Parameters": {
        "TopicArn": "arn:aws:sns:eu-west-1:123456789:order-notifications",
        "Message.$": "States.Format('Siparişiniz işlendi. ID: {}', $.orderId)"
      },
      "End": true
    },
    "OrderFailed": {
      "Type": "Fail",
      "Error": "OrderProcessingFailed",
      "Cause": "Sipariş doğrulama başarısız"
    }
  }
}
EOF

# State machine oluştur
aws stepfunctions create-state-machine 
  --name "OrderProcessingWorkflow" 
  --definition file://order-workflow.json 
  --role-arn arn:aws:iam::123456789:role/StepFunctionsExecutionRole 
  --type STANDARD

Gerçek Dünya Senaryosu: Veri İşleme Pipeline’ı

Şimdi daha gerçekçi bir senaryo ele alalım. Her gece çalışan bir veri işleme pipeline’ı düşün: S3’ten ham veri çekilecek, temizlenecek, dönüştürülecek, veritabanına yazılacak ve bir rapor üretilecek.

# Pipeline execution başlatma scripti
cat > trigger-pipeline.sh << 'EOF'
#!/bin/bash

STATE_MACHINE_ARN="arn:aws:states:eu-west-1:123456789:stateMachine:DataProcessingPipeline"
DATE=$(date +%Y-%m-%d)
EXECUTION_NAME="pipeline-${DATE}-$(date +%H%M%S)"

INPUT_PAYLOAD=$(cat <<PAYLOAD
{
  "executionDate": "${DATE}",
  "sourceS3Bucket": "raw-data-bucket",
  "sourceS3Prefix": "incoming/${DATE}/",
  "targetDatabase": "analytics_db",
  "notificationEmail": "[email protected]"
}
PAYLOAD
)

echo "Pipeline başlatılıyor: ${EXECUTION_NAME}"

EXECUTION_ARN=$(aws stepfunctions start-execution 
  --state-machine-arn "${STATE_MACHINE_ARN}" 
  --name "${EXECUTION_NAME}" 
  --input "${INPUT_PAYLOAD}" 
  --query 'executionArn' 
  --output text)

echo "Execution ARN: ${EXECUTION_ARN}"

# Execution durumunu izle
while true; do
  STATUS=$(aws stepfunctions describe-execution 
    --execution-arn "${EXECUTION_ARN}" 
    --query 'status' 
    --output text)
  
  echo "$(date): Durum - ${STATUS}"
  
  if [[ "${STATUS}" == "SUCCEEDED" || "${STATUS}" == "FAILED" || "${STATUS}" == "TIMED_OUT" || "${STATUS}" == "ABORTED" ]]; then
    echo "Pipeline tamamlandı: ${STATUS}"
    break
  fi
  
  sleep 30
done
EOF

chmod +x trigger-pipeline.sh

Parallel State ile Eş Zamanlı İşleme

Büyük veri setlerini işlerken paralel çalışma hayat kurtarır. Aşağıdaki örnekte üç farklı bölgenin verisini aynı anda işliyoruz:

cat > parallel-processing.json << 'EOF'
{
  "Comment": "Paralel bölge veri işleme",
  "StartAt": "ProcessRegionsParallel",
  "States": {
    "ProcessRegionsParallel": {
      "Type": "Parallel",
      "Branches": [
        {
          "StartAt": "ProcessEurope",
          "States": {
            "ProcessEurope": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:eu-west-1:123456789:function:process-region",
              "Parameters": {
                "region": "europe",
                "data.$": "$.europeData"
              },
              "End": true
            }
          }
        },
        {
          "StartAt": "ProcessAmerica",
          "States": {
            "ProcessAmerica": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:eu-west-1:123456789:function:process-region",
              "Parameters": {
                "region": "america",
                "data.$": "$.americaData"
              },
              "End": true
            }
          }
        },
        {
          "StartAt": "ProcessAsia",
          "States": {
            "ProcessAsia": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:eu-west-1:123456789:function:process-region",
              "Parameters": {
                "region": "asia",
                "data.$": "$.asiaData"
              },
              "End": true
            }
          }
        }
      ],
      "Next": "AggregateResults"
    },
    "AggregateResults": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:eu-west-1:123456789:function:aggregate-results",
      "End": true
    }
  }
}
EOF

Map State ile Dinamik Liste İşleme

Bir S3 bucket’ında kaç tane dosya olduğunu bilmiyorsun ve hepsini işlemen gerekiyor. Map state burada devreye giriyor:

cat > map-processing.json << 'EOF'
{
  "Comment": "S3 dosya listesi üzerinde Map ile işleme",
  "StartAt": "ListFiles",
  "States": {
    "ListFiles": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:eu-west-1:123456789:function:list-s3-files",
      "Next": "ProcessFiles"
    },
    "ProcessFiles": {
      "Type": "Map",
      "ItemsPath": "$.files",
      "MaxConcurrency": 10,
      "Iterator": {
        "StartAt": "ProcessSingleFile",
        "States": {
          "ProcessSingleFile": {
            "Type": "Task",
            "Resource": "arn:aws:lambda:eu-west-1:123456789:function:process-file",
            "Retry": [
              {
                "ErrorEquals": ["States.TaskFailed"],
                "IntervalSeconds": 5,
                "MaxAttempts": 2,
                "BackoffRate": 1.5
              }
            ],
            "End": true
          }
        }
      },
      "Next": "SendSummaryReport"
    },
    "SendSummaryReport": {
      "Type": "Task",
      "Resource": "arn:aws:states:::ses:sendEmail",
      "Parameters": {
        "Source": "[email protected]",
        "Destination": {
          "ToAddresses": ["[email protected]"]
        },
        "Message": {
          "Subject": {
            "Data": "Dosya işleme tamamlandı"
          },
          "Body": {
            "Text": {
              "Data.$": "States.Format('{} dosya başarıyla işlendi.', States.ArrayLength($.processedFiles))"
            }
          }
        }
      },
      "End": true
    }
  }
}
EOF

Execution Geçmişini İnceleme ve Hata Ayıklama

Step Functions’ın en güzel özelliklerinden biri, her execution’ın detaylı geçmişini saklaması. Üretim ortamında bir şeyler ters gittiğinde şu komutları çok sık kullanırsın:

#!/bin/bash
# Başarısız execution'ları bul ve hata detaylarını göster

STATE_MACHINE_ARN="arn:aws:states:eu-west-1:123456789:stateMachine:OrderProcessingWorkflow"

echo "=== Son 24 saatteki başarısız execution'lar ==="
aws stepfunctions list-executions 
  --state-machine-arn "${STATE_MACHINE_ARN}" 
  --status-filter FAILED 
  --query 'executions[*].[name,startDate,stopDate]' 
  --output table

# Belirli bir execution'ın hata detaylarını al
FAILED_EXECUTION_ARN=$1

if [ -z "${FAILED_EXECUTION_ARN}" ]; then
  echo "Kullanım: $0 <execution-arn>"
  exit 1
fi

echo ""
echo "=== Execution Detayları ==="
aws stepfunctions describe-execution 
  --execution-arn "${FAILED_EXECUTION_ARN}" 
  --query '{status: status, startDate: startDate, stopDate: stopDate, input: input, output: output}'

echo ""
echo "=== Hata Oluşan Adım ==="
aws stepfunctions get-execution-history 
  --execution-arn "${FAILED_EXECUTION_ARN}" 
  --query 'events[?type==`TaskFailed`].{timestamp: timestamp, details: taskFailedEventDetails}' 
  --output json

EventBridge ile Zamanlı Tetikleme

Çoğu pipeline senaryosunda iş akışını belirli zamanlarda otomatik başlatman gerekir. EventBridge ile bunu kolayca yapabilirsin:

# Her gece saat 02:00'de çalışacak zamanlayıcı oluştur
aws events put-rule 
  --name "NightlyDataPipeline" 
  --schedule-expression "cron(0 2 * * ? *)" 
  --state ENABLED 
  --description "Gecelik veri işleme pipeline tetikleyicisi"

# EventBridge'e Step Functions'ı tetikleme izni ver
aws iam create-role 
  --role-name EventBridgeStepFunctionsRole 
  --assume-role-policy-document '{
    "Version": "2012-10-17",
    "Statement": [{
      "Effect": "Allow",
      "Principal": {"Service": "events.amazonaws.com"},
      "Action": "sts:AssumeRole"
    }]
  }'

aws iam put-role-policy 
  --role-name EventBridgeStepFunctionsRole 
  --policy-name StartStepFunctionsExecution 
  --policy-document '{
    "Version": "2012-10-17",
    "Statement": [{
      "Effect": "Allow",
      "Action": "states:StartExecution",
      "Resource": "arn:aws:states:eu-west-1:123456789:stateMachine:DataProcessingPipeline"
    }]
  }'

# Target olarak Step Functions ekle
aws events put-targets 
  --rule "NightlyDataPipeline" 
  --targets '[{
    "Id": "StepFunctionsTarget",
    "Arn": "arn:aws:states:eu-west-1:123456789:stateMachine:DataProcessingPipeline",
    "RoleArn": "arn:aws:iam::123456789:role/EventBridgeStepFunctionsRole",
    "Input": "{"trigger": "scheduled", "environment": "production"}"
  }]'

echo "Zamanlayıcı başarıyla oluşturuldu."

CloudWatch ile İzleme ve Alarm Kurma

Step Functions metrikleri otomatik olarak CloudWatch’a gönderilir. Kritik bir pipeline varsa alarm kurmak şart:

# Başarısız execution sayısı için alarm
aws cloudwatch put-metric-alarm 
  --alarm-name "StepFunctions-OrderPipeline-Failures" 
  --alarm-description "Sipariş pipeline'ında başarısız execution alarmı" 
  --metric-name ExecutionsFailed 
  --namespace AWS/States 
  --dimensions Name=StateMachineArn,Value=arn:aws:states:eu-west-1:123456789:stateMachine:OrderProcessingWorkflow 
  --statistic Sum 
  --period 300 
  --threshold 1 
  --comparison-operator GreaterThanOrEqualToThreshold 
  --evaluation-periods 1 
  --alarm-actions arn:aws:sns:eu-west-1:123456789:ops-alerts 
  --treat-missing-data notBreaching

# Execution süresi için alarm (30 dakikayı geçen işler)
aws cloudwatch put-metric-alarm 
  --alarm-name "StepFunctions-OrderPipeline-Duration" 
  --alarm-description "Pipeline beklenen süreden uzun çalışıyor" 
  --metric-name ExecutionTime 
  --namespace AWS/States 
  --dimensions Name=StateMachineArn,Value=arn:aws:states:eu-west-1:123456789:stateMachine:OrderProcessingWorkflow 
  --statistic Average 
  --period 300 
  --threshold 1800000 
  --comparison-operator GreaterThanThreshold 
  --evaluation-periods 2 
  --alarm-actions arn:aws:sns:eu-west-1:123456789:ops-alerts

echo "CloudWatch alarmları oluşturuldu."

Maliyet Optimizasyonu İpuçları

Step Functions kullanırken maliyetleri kontrol altında tutmak için dikkat etmen gereken birkaç nokta var:

  • Express vs Standard seçimi: Saniyede 100’den fazla execution başlatıyorsan ve işin 5 dakikadan kısa sürüyorsa Express kullan. Standard Workflow’da her state geçişi için ücret ödersin, Express’te ise süre ve bellek bazlı ücretlendirme var
  • Gereksiz Wait state’lerinden kaçın: Polling yerine callback token pattern’ı kullan. Lambda’dan işi başlatıp callback gelene kadar bekle, bu şekilde boş bekleme state’lerini ortadan kaldırırsın
  • Log level optimizasyonu: Üretim ortamında ALL log level yerine ERROR kullan, CloudWatch Logs maliyetleri hızla artabilir
  • Execution geçmişini temizle: Standard Workflow’larda execution geçmişi 90 gün saklanır, bu maliyete etki etmez ama gereksiz execution’ları başlatmaktan kaçın

Callback token kullanımına basit bir örnek:

# Lambda fonksiyonu içinde callback token ile external işlemi tetikleme
# Bu pattern polling yerine event-driven bekleme sağlar

cat > callback-example.json << 'EOF'
{
  "StartAt": "StartExternalJob",
  "States": {
    "StartExternalJob": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke.waitForTaskToken",
      "Parameters": {
        "FunctionName": "arn:aws:lambda:eu-west-1:123456789:function:start-external-job",
        "Payload": {
          "taskToken.$": "$$.Task.Token",
          "jobParameters.$": "$.jobParameters"
        }
      },
      "TimeoutSeconds": 3600,
      "HeartbeatSeconds": 300,
      "Next": "ProcessJobResult"
    },
    "ProcessJobResult": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:eu-west-1:123456789:function:process-job-result",
      "End": true
    }
  }
}
EOF

# External iş tamamlandığında callback gönderme (Lambda veya başka bir servisten)
# aws stepfunctions send-task-success 
#   --task-token "TASK_TOKEN_BURAYA" 
#   --task-output '{"status": "completed", "resultLocation": "s3://bucket/result.json"}'

Yaygın Sorunlar ve Çözümleri

Üretimde Step Functions kullanırken en sık karşılaşılan sorunları ve çözüm yollarını bilmek çok işe yarıyor:

  • States.DataLimitExceeded hatası: State’ler arası geçen payload 256 KB ile sınırlı. Büyük veri setleri için S3’e yaz, Step Functions’a sadece S3 referansını geç
  • Execution throttling: Aynı anda çok fazla execution başlatıldığında throttling olabilir. Retry mekanizmasını doğru konfigüre et ve exponential backoff kullan
  • Lambda timeout ile Step Functions timeout uyumsuzluğu: Step Functions’daki TimeoutSeconds, Lambda’nın kendi timeout süresinden büyük olmalı, aksi halde Lambda timeout olur ama Step Functions hala bekler
  • IAM izin hataları: Her Task state için Step Functions rolünün ilgili AWS servisini çağırma yetkisi olduğundan emin ol

Terraform ile Infrastructure as Code

Step Functions’ı Terraform ile yönetmek, ortamlar arası tutarlılığı sağlamanın en iyi yolu:

# Terraform state machine kaynağı - main.tf
cat > stepfunctions.tf << 'EOF'
resource "aws_sfn_state_machine" "order_processing" {
  name     = "OrderProcessingWorkflow-${var.environment}"
  role_arn = aws_iam_role.step_functions_execution.arn
  type     = "STANDARD"

  definition = templatefile("${path.module}/workflows/order-processing.json", {
    validate_order_lambda_arn  = aws_lambda_function.validate_order.arn
    check_inventory_lambda_arn = aws_lambda_function.check_inventory.arn
    process_payment_lambda_arn = aws_lambda_function.process_payment.arn
    notification_topic_arn     = aws_sns_topic.order_notifications.arn
    environment                = var.environment
  })

  logging_configuration {
    log_destination        = "${aws_cloudwatch_log_group.step_functions.arn}:*"
    include_execution_data = true
    level                  = var.environment == "production" ? "ERROR" : "ALL"
  }

  tracing_configuration {
    enabled = true
  }

  tags = {
    Environment = var.environment
    ManagedBy   = "Terraform"
    Team        = "Platform"
  }
}

resource "aws_cloudwatch_log_group" "step_functions" {
  name              = "/aws/states/${var.environment}/order-processing"
  retention_in_days = 30
}
EOF

echo "Terraform konfigürasyonu oluşturuldu."
# terraform init && terraform plan

Sonuç

AWS Step Functions, dağıtık sistemlerdeki iş akışı koordinasyon sorununu gerçekten iyi çözen bir servis. Özellikle hata yönetimi, retry mekanizması ve görsel izleme konularında Lambda’ları kendin zincirlemekten çok daha güvenilir bir yapı sunuyor.

Kendi deneyimimden en önemli tavsiye şu: Step Functions’a geçerken mevcut “callback cehennemini” ya da “if-else Lambda zincirlerini” doğrudan taşıma. Önce iş akışını kağıt üzerinde durumlar halinde modelle, sonra ASL tanımına dönüştür. Bu yaklaşım hem akışı daha iyi anlamanı sağlıyor hem de gereksiz karmaşıklıktan kaçınmana yardımcı oluyor.

Express Workflow’ların yüksek hacimli senaryolarda ciddi maliyet avantajı sağladığını, Standard Workflow’ların ise audit trail ve uzun süreli iş akışları için vazgeçilmez olduğunu unutma. Uygun workflow tipini seçmek, hem maliyet hem de güvenilirlik açısından büyük fark yaratıyor.

Son olarak, her Step Functions deployment’ında CloudWatch alarmlarını ve X-Ray tracing’i açık tutmayı alışkanlık haline getir. Üretimde bir sorun çıktığında execution geçmişine bakıp tam olarak neyin nerede bozulduğunu saniyeler içinde görmen, saatlik debug seanslarından çok daha iyi hissettiriyor.

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir