Kafka 論文を読んで Kafka についてまとめた

対象とする論文は Jay Kreps, Neha Narkhede, and Jun Rao. Kafka: a distributed messaging system for log processing. ACM SIGMOD Workshop on Networking Meets Databases, page 6, 2011

データベースのトップカンファレンス SIGMOD 2011 で発表された論文。 2011 年は機械学習が学会以外の場所でも盛り上がり始めたぐらいの頃だろうか。

Kafka の概要については事前に色々なサイトで調べたりしてたんですが、やはり論文に勝る情報源はないですね。 どう使うか、どんな特徴があるかは当然重要ですが、システム開発者(システム開発者はシステム利用者に非ず)にとって重要なのはどう実装されているかだと思います。

主に Kafka のアーキテクチャについてまとめています。 (注意:2011 年の論文の内容のため、2017 年現在の実装とはいくつか乖離があるかもしれないです。コンセプト自体はそんなに変わってはいないでしょうが。)


概要

Kafka はログ処理に適したメッセージングシステム。 Kafka に貯めたログは複数のオフラインとオンラインアプリケーションから利用することが可能。 メッセージの送受信は効率的であり、スケーラブルな設計を持つ。

Kafka のコンセプト

特定のタイプのメッセージのストリームを topic と定義する。 producer はメッセージを topic に発行する。 それから、発行されたメッセージは broker と呼ばれるサーバの集合に保存される。 consumer は broker から一つ以上の topic を購読することができ、broker からデータを pull することによって読まれたメッセージは消費される。

Kafka は次の2つの配送モデルをサポートする

  • point-to-point モデル: 複数の consumer が共同で一つの topic の全メッセージを消費するモデル(topic を partition という単位にわけ、各 consumer がそれぞれ割り当てられた partition を消費する)
  • publish/subscribe モデル: 複数の consumer が自分専用の一つの topic のコピーを回収するモデル(複数の consumer グループ(後述)が同じデータを読むことができ、あるグループは他のグループに影響を与えない。)

ログの保存方法

一つの topic の各 partition は一つの論理的なログに対応する。 物理的には、一つのログはだいたい同じサイズ(例えば1GB)のセグメントファイルの集合として実現される。 producer がメッセージを partition に発行すると、broker は最後のセグメントファイルにそのメッセージを追記する。 より良い性能のため、設定された数(batch)のメッセージが発行された後か一定時間が経過した後のタイミングで、セグメントファイルに永続化(flush)する。 メッセージは永続化された後でしか consumer に公開されない

Kafka は明示的なメッセージの id を持たず、そのかわり各メッセージはログの論理的なオフセットで特定される。 これにより、メッセージ id と実際のメッセージの場所をマッピングするインデックス構造とそれを管理するオーバーヘッドが不要になる。 Kafka のメッセージ id は連続的に増加せず、次のメッセージ id を計算するためには、現在のメッセージの id に自身のメッセージの長さを足さなければならない。

consumer はパーティションからシーケンシャルにメッセージを消費する。 consumer があるオフセットから始まるメッセージを取得したら、そのパーティションのそのオフセットより前の全メッセージを受け取ったことを意味する。 consumer は非同期で pull リクエストを broker に発行する。各 pull リクエストで、消費が始まるメッセージのオフセットと読み込み可能なバイト数が渡される

各 broker はメモリ上にソートされたオフセットの一覧を持ち、ここに各セグメントファイルの最初のメッセージのオフセットが入っている。 broker はオフセット一覧からリクエストされたメッセージがあるセグメントファイルを特定し、メッセージデータを consumer に送る。 consumer がメッセージを受け取った後、consumer は次の pull リクエストのために、次のメッセージのオフセットを計算する(オフセットを計算するのは broker でないことに注意)

f:id:kmikmy:20171202233337p:plain

Kafka log (Kafka log と in-memory index の図) [Jay2011 Figure. 2 より]

メッセージの転送方法

producer は一回の send リクエストで複数のメッセージを送信できる。 consumer のエンド API は一度に一つのメッセージをイテレートするが、consumer からの各 pull リクエストは一定サイズ(典型的に数百キロバイト)で取得され、バッファリングされている。

consumer のネットワークアクセスの最適化も行っており、ローカルファイルをリモートソケットに送る際の典型的なアプローチは次の4回のデータコピーと2回のシステムコールからなる。

  1. ストレージから OS のページキャッシュにデータをコピー
  2. アプリケーションバッファにページキャッシュのデータをコピー
  3. アプリケーションバッファをカーネルバッファにコピー
  4. カーネルバッファをリモートソケットに送信

が、LinuxUnix で用意されている、ファイルチャネルのバイト列を直接ソケットチャネルに転送する sendfile API を使うことで 2, 3 でもたらされる2回のコピーと1回のシステムコールを避けている。

broker の設計

多くのメッセージングシステムとは異なり、Kafka では各 consumer がどこまで消費したのかは broker ではなく、consumer 自身が持っている。 この設計は多くの複雑性と broker のオーバーヘッドを減らす。 この設計だと consumer がどこまでメッセージを消費したかがわからないため、どこまでメッセージを削除して良いかが分からないという問題があるが、これは保持期間による 単純な時間ベースの SLA を使うことで解決する。 一定期間(典型的に7日)が過ぎたメッセージは自動的に削除されるようになっており、Kafka は長期間のデータ保持による大量のデータで性能劣化しない。 ほとんどの consumer はオフラインのものも含めて、一日毎、一時間ごと、またはリアルタイムでデータ処理を終える。

この設計による重要な利点として、consumer は意図的に古いオフセットに rewind して戻ることができ、データを再消費できる。 これはキューとしての共通規約には違反するがが、多くの consumer にとって重要な機能である。 例えば、consumer のアプリケーションロジックでエラーが起きた際に、アプリケーションはそのエラーをなおした後に特定のメッセージを再処理することができる。 (consumer の rewind は push モデルよりも pull モデルのほうがかなり簡単にサポートできるとされる。)

分散コーディネーション

各プロデューサーがメッセージを送るパーティションは、ランダムに選ばれるか、partion key と partition function によって意味的に決定される。

Kafka には consumer グループという概念があり、各 consumer グループはトピックの集合をともに消費する一つ以上の consumer から構成される。 各メッセージは一つの consumer グループ内では一つの consumer にのみ届けられる。 異なる consumer グループはそれぞれ独立してメッセージの全集合を消費する。 同じグループ内の複数の consumer は異なるプロセスまたは異なるマシンとして存在する。

Kafka の分散コーディネーションの目標は、consumer 間で大きな協調のオーバーヘッドをかけずに broekr に保存されたメッセージを公平に分割することである。

  • トピックの partition を並列処理の最小単位とする
    • 一つのパーティションの全メッセージは、各 consumer グループ内の単一の consumer によってのみ消費される。一つのパーティションから同時に複数の consumer の消費を許すと、ロックが必要になったり、状態の管理などオーバーヘッドがかかる協調動作が必要になる。
  • 中心となるマスターノードをもたず、分散した形で各々が協調する。
    • マスタの概念を追加すると、マスタの failure を考えなければならない。
    • この協調に Zookeeper を使う。

Kafka での Zookeeper の利用

Zookeeper は API のような単純なファイルシステムを提供する。また、次のような特徴がある。

(a) あるパスのウォッチャーを登録でき、そのパスの子やそのパスの値に変更があったときに通知を受けられる (b) ephemeral ( persistent の反対; 揮発性を持つ )としてパスを作ることもでき、これは作成したクライアントがいなくなったらそのパスは Zookeeper サーバから自動的に削除される。 (c) 複数のサーバにデータを複製でき、高い可用性を提供する。

Kafka は Zookeeper を次のタスクで用いている (1) broker と consumer の追加や削除の検知 (2) (1) が起こった時の各 consumer 上で rebalance プロセスのトリガー (3) どの partition がどの consumer から消費されているかの関係の記録と各 partition の消費されたオフセットの記録

正常処理時

各 broker や consumer が起動したら、その情報を Zookeeper 上の broker や consumer のレジストリに保存する。 broker レジストリは broker のホスト名やポート、その broker 上に保存されている topic や partition の集合を含む。 consumer レジストリは その consumer がどの consumer グループに所属しているのかと consumer が購読している topic の集合を含む。 各 consumer グループは ownership レジストリと offset レジストリに関連付けられる。 ownership レジストリは各 partition につき 1 パス持ち、パスの値は現在その partition から消費をおこなっている consumer の id である。 offset レジストリは購読されている partition が、最後に取得されたメッセージのオフセットを保存する。offset レジストリは定期的に更新される。

Zookeeper によって作られたパスは、broker, consumer, ownership レジストリの中では ephemeral で、offset レジストリは persistent である。

障害発生時

broker に障害が起きたら、その broker の全ての partition 情報が broker レジストリから取り除かれる。 consumer に障害が起きたら consumer レジストリの自身のエントリと、ownership レジストリにある自身が保持する全ての partition 情報が取り除かれる。 各 consumer は broker レジストリと consumer レジストリの両方の上に Zookeeper の watcher を登録することで、broker の集合や consumer グループに変化が起こるときはいつでも通知を受けとれる。

consumer の最初の起動時または、consumer が watcher を通じて broker/consumer の変化を通知された時、consumer は自身が消費すべき新しい partition のサブ集合を決定するためのリバランスプロセスを起動する。

リバランス処理

  1. consumer ははじめに、購読する各 topic T の利用可能な partition の集合 (P_T)と T を購読する consumer の集合 (C_T) を計算する。
  2. P_T を |C_T| 個 で範囲分割し、一つのチャンクを自身で取得する。
  3. その consumer が取得した各パーティションについて、consumer は ownership レジストリパーティションの新しい所有者として自身を書き込む。
  4. consumer は所持する各パーティションからデータを取得するスレッドを開始し、offset レジストリに保存されたオフセットから開始する。

一つのグループ内に複数の consumer がいる時、consumer はそれぞれ broker や consumer の変化の通知を受ける。しかし、その通知はわずかに異なるタイミングで発生する。ここで起こりうるのは、他の consumer がまだ所持しているパーティションの権限を取ろうとすることだ。これが起こる時は、初めの consumer が単に現在所持している全パーティションを開放し、少し待った後にリバランスプロセスが再開する。2,3回 のリトライがあった後にリバランスプロセスは安定化する。

なお、新しい consumer グループが作成された時、offset レジストリに有効なオフセットは存在しない。この場合、consumer は各購読 partition で利用可能な最小のオフセットか最大のオフセット(設定による)で始め、broker が提供する API を使用する。

メッセージ配送の保証

Kafka では at-least-once (最低一回) のみ保証する。 Exactly-once (確実に1回)は典型的に 2-phase commit を必要とするが、ログ処理アプリケーションでは必要ない。

consumer プロセスが急にクラッシュした場合、障害が起きた consumer が担当していた partition を肩代わりする consumer プロセスは zookeeper に記録された最後のオフセット以降で重複するメッセージを取得するかもしれない。 アプリケーションが重複を気にするなら、消費者に返すオフセットまたはメッセージ内の一意のキーを使用して独自の重複排除ロジックを追加する必要があるが、これは two-phase コミットよりコスト効率の良いアプローチである。

Kafka は一つの partition からのメッセージは一つの consumer に順番に送られる異なるパーティションから来るメッセージの順番は保証されない

ログ破壊を避けるために、Kafka はログにある各メッセージに CRC を保存する。 broker 上で I/O エラーがあると、Kafka は CRC に不整合のあるメッセージを削除するリカバリープロセスが走る。メッセージレベルの CRC を持つことで、メッセージが生産 or 消費された後のネットワークエラーをチェックすることができる。

broker がダウンした場合、そこに保存されまだ消費されていないメッセージは使用不可になる。 その broker のストレージシステムが致命的に破壊された場合、まだ消費されていないいくつかのメッセージは永遠にロストする。 将来的に、Kafka で各メッセージを複数の broker に重複して保存するための備え付けの replication 機能を追加する計画である(2017年現在既に追加されています)。

Future Work

参考

Ja2011: Jay Kreps, Neha Narkhede, and Jun Rao. Kafka: a distributed messaging system for log processing. ACM SIGMOD Workshop on Networking Meets Databases, page 6, 2011