少し昔を振り返ってみましょう。昔(Kafka 2.4以前)の時代、コンシューマーグループのリバランスは全てか無かの選択でした。リバランスが始まると、グループ内のすべてのコンシューマーは次のことを行いました:
- メッセージの処理を停止する
- すべてのパーティションを解放する
- グループコーディネーターが新しいパーティションを割り当てるのを待つ
- 新しいパーティションのオフセットを取得する
- 処理を再開する
この「世界を止める」アプローチは、ラッシュアワーのマンハッタンで大型トラックを駐車しようとするようなものでした。処理の遅延を引き起こし、慎重に扱わないとメッセージの重複処理を引き起こす可能性がありました。
インクリメンタル協調リバランスの登場
Kafka 2.4は画期的な変化をもたらしました:インクリメンタル協調リバランス。このアプローチは、あの不格好な大型トラックから機敏な電動スクーターの艦隊にアップグレードするようなものです。以下がその仕組みです:
- 影響を受けたコンシューマーのみが処理を一時停止する
- パーティションは複数の小さなステップで再割り当てされる
- 影響を受けないパーティションの処理は継続できる
その結果は?リバランス時間が劇的に短縮され、全体的なスループットが向上します。まるでKafkaクラスターにダブルエスプレッソを注入するようなものです!
インクリメンタル協調リバランスの実装
コンシューマーをリバランスで刷新する準備はできましたか?始める方法は次の通りです:
1. 依存関係を更新する
まず最初に、Kafka 2.4以降を使用していることを確認してください。pom.xml
またはbuild.gradle
ファイルを次のように更新します:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
</dependency>
2. コンシューマーを設定する
次に、新しい協調リバランスプロトコルを使用するようにパーティション割り当て戦略を設定する必要があります。Javaでの設定方法は次の通りです:
Properties props = new Properties();
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
CooperativeStickyAssignor.class.getName());
props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "consumer-" + UUID.randomUUID().toString());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
CooperativeStickyAssignor
がここでの秘密のソースです。インクリメンタル協調リバランスプロトコルを実装し、可能な限りパーティションのスティッキネス(同じコンシューマーにパーティションを割り当て続けること)を維持しようとします。
3. リボケーションを優雅に処理する
協調リバランスでは、リバランス中にコンシューマーがいくつかのパーティションを手放すように求められることがあります。これを優雅に処理する必要があります:
consumer.subscribe(Collections.singletonList("my-topic"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// リボークされたパーティションのオフセットをコミットする
consumer.commitSync(currentOffsets(partitions));
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// 新しく割り当てられたパーティションのために必要な状態を初期化する
}
});
private Map<TopicPartition, OffsetAndMetadata> currentOffsets(Collection<TopicPartition> partitions) {
// 指定されたパーティションの現在のオフセットを取得する実装
}
証拠は結果にあり:ベンチマーク結果
さて、あなたはこう思っているかもしれません:「理論的には素晴らしいけど、実際に違いがあるの?」さあ、準備はいいですか、数字は嘘をつきません:

100パーティションと10コンシューマーのテストクラスターで観察された結果:
- イーガーリバランス:平均リバランス時間12秒
- 協調リバランス:平均リバランス時間2秒
なんと83%のリバランス時間の削減です!運用チームはあなたを愛し、ユーザーは感謝し、昇給するかもしれません(まあ、それは言い過ぎかもしれませんが)。
潜在的な落とし穴:注意が必要!
協調リバランスを全面的に導入する前に、いくつか注意すべき点があります:
- 互換性:グループ内のすべてのコンシューマーは同じリバランスプロトコルを使用する必要があります。イーガーと協調コンシューマーを同じグループで混在させるのは災害のもとです。
- グループインスタンスID:協調リバランスの完全な利点を得るためには、静的なグループインスタンスIDを使用してください。これにより、再参加が速くなり、不要なリバランスが減少します。
- 複雑さの増加:協調リバランスはより多くの動く部分を導入します。エラーハンドリングとモニタリングが十分であることを確認してください。
結論:それだけの価値があるのか?
では、今すぐすべてを捨てて協調リバランスを実装すべきでしょうか?技術の多くのことと同様に、それは状況によります。大規模なコンシューマーグループ、頻繁なスケーリングイベント、厳しいレイテンシー要件に対処している場合は、絶対にそうです!その利点は無視できません。
一方で、小規模で安定したコンシューマーグループで、ほとんど変化がない場合は、追加の複雑さがそれだけの価値がないかもしれません。常に測定し、テストし、特定のユースケースに基づいて情報に基づいた決定を下してください。
まとめ:Kafka消費の新時代
インクリメンタル協調リバランスは単なる新しい機能以上のものであり、Kafkaコンシューマーグループの考え方におけるパラダイムシフトです。リバランス中のダウンタイムを最小限に抑えることで、動的でスケーラブルなストリーム処理アーキテクチャの新しい可能性を開きます。
さあ、協調リバランスを実装し、あなたのKafkaクラスターが常にスムーズに動作し、リバランスフリーであることを願っています!
「人生で唯一の不変は変化である」 - ヘラクレイトス
...しかし、協調リバランスを使用すれば、その変化がKafkaコンシューマーをひざまずかせる必要はありません!
さらなる読み物
- KIP-429: Kafka Consumer Incremental Rebalance Protocol
- CooperativeStickyAssignor ソースコード
- Kafka ドキュメント: partition.assignment.strategy
コーディングを楽しんで、リバランスが迅速でレイテンシーが低いことを願っています!