要約

gRPCを使用して、マイクロサービス間の分散トランザクションを管理するための堅牢なSagaパターンを実装します。基本をカバーし、設定方法を示し、いくつかの便利なコード例も提供します。最後には、プロの指揮者がマイクロサービスの交響曲を指揮するように、分散トランザクションをオーケストレーションできるようになります。

Sagaパターンの簡単な紹介

詳細に入る前に、Sagaパターンが何であるかを簡単に振り返りましょう:

  • Sagaは一連のローカルトランザクションです
  • 各トランザクションは単一のサービス内でデータを更新します
  • ステップが失敗した場合、以前の変更を元に戻すための補償トランザクションが実行されます

分散システムのための高機能な「元に戻す」ボタンのようなものです。では、これをgRPCを使ってどのように実装できるか見てみましょう。

なぜSagaにgRPCを使うのか?

「なぜgRPC?RESTを使えばいいのでは?」と思うかもしれません。確かにRESTを使うこともできますが、gRPCにはいくつかの大きな利点があります:

  • 効率的なバイナリシリアライゼーション(Protocol Buffers)
  • 強い型付け
  • 双方向ストリーミング
  • 認証、負荷分散などの組み込みサポート

さらに、非常に高速です。スピードを愛さない人はいませんよね?

準備

まず、Protocol Buffersでサービスを定義しましょう。シンプルなOrderSagaサービスを作成します:

syntax = "proto3";

package ordersaga;

service OrderSaga {
  rpc StartSaga(SagaRequest) returns (SagaResponse) {}
  rpc CompensateSaga(CompensationRequest) returns (CompensationResponse) {}
}

message SagaRequest {
  string order_id = 1;
  double amount = 2;
}

message SagaResponse {
  bool success = 1;
  string message = 2;
}

message CompensationRequest {
  string order_id = 1;
}

message CompensationResponse {
  bool success = 1;
  string message = 2;
}

これで、基本的なサービスが設定され、物事がうまくいかない場合の補償用のRPCが2つ用意されました。

Sagaコーディネーターの実装

次に、分散トランザクションをオーケストレーションするSagaコーディネーターを作成します。この例ではGoを使用しますが、お好きな言語を使用してください。

package main

import (
    "context"
    "log"
    "net"

    "google.golang.org/grpc"
    pb "path/to/your/proto"
)

type server struct {
    pb.UnimplementedOrderSagaServer
}

func (s *server) StartSaga(ctx context.Context, req *pb.SagaRequest) (*pb.SagaResponse, error) {
    // ここにSagaロジックを実装
    log.Printf("Starting saga for order: %s", req.OrderId)

    // 他のマイクロサービスを呼び出して分散トランザクションを実行
    if err := createOrder(req.OrderId); err != nil {
        return &pb.SagaResponse{Success: false, Message: "Failed to create order"}, nil
    }

    if err := processPayment(req.OrderId, req.Amount); err != nil {
        // 注文作成の補償
        cancelOrder(req.OrderId)
        return &pb.SagaResponse{Success: false, Message: "Failed to process payment"}, nil
    }

    if err := updateInventory(req.OrderId); err != nil {
        // 注文作成と支払いの補償
        cancelOrder(req.OrderId)
        refundPayment(req.OrderId, req.Amount)
        return &pb.SagaResponse{Success: false, Message: "Failed to update inventory"}, nil
    }

    return &pb.SagaResponse{Success: true, Message: "Saga completed successfully"}, nil
}

func (s *server) CompensateSaga(ctx context.Context, req *pb.CompensationRequest) (*pb.CompensationResponse, error) {
    // ここに補償ロジックを実装
    log.Printf("Compensating saga for order: %s", req.OrderId)

    // 各ステップの補償メソッドを呼び出す
    cancelOrder(req.OrderId)
    refundPayment(req.OrderId, 0) // 金額をどこかに保存しておくと良いかもしれません
    restoreInventory(req.OrderId)

    return &pb.CompensationResponse{Success: true, Message: "Compensation completed"}, nil
}

func main() {
    lis, err := net.Listen("tcp", ":50051")
    if err != nil {
        log.Fatalf("Failed to listen: %v", err)
    }
    s := grpc.NewServer()
    pb.RegisterOrderSagaServer(s, &server{})
    log.Println("Server listening on :50051")
    if err := s.Serve(lis); err != nil {
        log.Fatalf("Failed to serve: %v", err)
    }
}

// 他のマイクロサービスと連携するための関数を実装
func createOrder(orderId string) error { /* ... */ }
func processPayment(orderId string, amount float64) error { /* ... */ }
func updateInventory(orderId string) error { /* ... */ }
func cancelOrder(orderId string) error { /* ... */ }
func refundPayment(orderId string, amount float64) error { /* ... */ }
func restoreInventory(orderId string) error { /* ... */ }

この実装は、Sagaコーディネーターの基本構造を示しています。分散トランザクションのメインロジックを処理し、ステップが失敗した場合の補償メカニズムを提供します。

失敗とリトライの処理

分散システムでは、失敗は避けられません。Saga実装にいくらかの回復力を追加しましょう:

func (s *server) StartSaga(ctx context.Context, req *pb.SagaRequest) (*pb.SagaResponse, error) {
    maxRetries := 3
    var err error

    for i := 0; i < maxRetries; i++ {
        err = s.executeSaga(ctx, req)
        if err == nil {
            return &pb.SagaResponse{Success: true, Message: "Saga completed successfully"}, nil
        }
        log.Printf("Attempt %d failed: %v. Retrying...", i+1, err)
    }

    // すべてのリトライが失敗した場合、補償を行いエラーを返す
    s.CompensateSaga(ctx, &pb.CompensationRequest{OrderId: req.OrderId})
    return &pb.SagaResponse{Success: false, Message: "Saga failed after multiple retries"}, err
}

func (s *server) executeSaga(ctx context.Context, req *pb.SagaRequest) error {
    // 実際のSagaロジックをここに実装
    // ...
}

このリトライメカニズムは、Sagaが成功するためのいくつかのチャンスを与え、失敗した場合には補償を開始します。

監視とログ

分散トランザクションを扱う際には、可視性が重要です。Sagaコーディネーターにログとメトリクスを追加しましょう:

import (
    "github.com/prometheus/client_golang/prometheus"
    "github.com/prometheus/client_golang/prometheus/promauto"
)

var (
    sagaSuccessCounter = promauto.NewCounter(prometheus.CounterOpts{
        Name: "saga_success_total",
        Help: "The total number of successful sagas",
    })
    sagaFailureCounter = promauto.NewCounter(prometheus.CounterOpts{
        Name: "saga_failure_total",
        Help: "The total number of failed sagas",
    })
)

func (s *server) StartSaga(ctx context.Context, req *pb.SagaRequest) (*pb.SagaResponse, error) {
    log.Printf("Starting saga for order: %s", req.OrderId)
    defer func(start time.Time) {
        log.Printf("Saga for order %s completed in %v", req.OrderId, time.Since(start))
    }(time.Now())

    // ... (saga logic)

    if err != nil {
        sagaFailureCounter.Inc()
        log.Printf("Saga failed for order %s: %v", req.OrderId, err)
        return &pb.SagaResponse{Success: false, Message: "Saga failed"}, err
    }

    sagaSuccessCounter.Inc()
    return &pb.SagaResponse{Success: true, Message: "Saga completed successfully"}, nil
}

これらのメトリクスは、Prometheusのような監視システムと簡単に統合でき、Sagaのパフォーマンスに関するリアルタイムの洞察を提供します。

Sagaのテスト

分散トランザクションのテストは難しいですが、重要です。Sagaコーディネーターをテストする簡単な例を示します:

func TestStartSaga(t *testing.T) {
    // モックサーバーをセットアップ
    s := &server{}

    // テストリクエストを作成
    req := &pb.SagaRequest{
        OrderId: "test-order-123",
        Amount:  100.50,
    }

    // StartSagaメソッドを呼び出す
    resp, err := s.StartSaga(context.Background(), req)

    // 結果をアサート
    if err != nil {
        t.Errorf("StartSaga returned an error: %v", err)
    }
    if !resp.Success {
        t.Errorf("StartSaga failed: %s", resp.Message)
    }
}

失敗シナリオや補償ロジックのテストも忘れずに行いましょう!

まとめ

これで、gRPCを使用して分散トランザクションを管理する堅牢なSagaパターンを実装しました。学んだことを振り返りましょう:

  • Sagaパターンは、マイクロサービス間の分散トランザクションを管理するのに役立ちます
  • gRPCは、Sagaを実装するための効率的で強く型付けされた方法を提供します
  • 適切なエラーハンドリングとリトライは、回復力にとって重要です
  • 監視とログは、分散トランザクションの可視性を提供します
  • テストは難しいですが、信頼性のあるSagaには不可欠です

分散トランザクションは複雑なものです。この実装は出発点であり、特定のユースケースに合わせて適応する必要があるかもしれません。しかし、この知識を持っていれば、分散トランザクションのモンスターを手なずけるための準備が整っています。

考えるための材料

行く前に、考えてみるべき質問をいくつか紹介します:

  • gRPCのタイムアウト制限を超える可能性のある長時間実行されるSagaをどのように処理しますか?
  • Sagaコーディネーター自体をフォールトトレラントにするためにどのような戦略を採用しますか?
  • このSagaパターンを既存のイベント駆動型アーキテクチャとどのように統合しますか?

コーディングを楽しんでください。そして、あなたのトランザクションが常に一貫していることを願っています!