SIGMOD 2017 で興味のある論文

SIGMOD 2017 の accepted-papers の中で自分の興味のある論文をリスト化しておく

  • Cicada: Dependably Fast Multi-Core In-Memory Transactions
  • BatchDB: Efficient Isolated Execution of Hybrid OLTP+OLAP Workloads for Interactive Applications
  • Monkey: Optimal Navigable Key-Value Store
  • LittleTable: A Time-Series Database and Its Uses
  • Transaction Repair for Multi-Version Concurrency Control
  • Concerto: A High Concurrency Key-Value Store with Integrity
  • Fast Failure Recovery for Main-Memory DBMSs on Multicores
  • Spanner: Becoming a SQL System
  • Revisiting Reuse in Main Memory Database Systems
  • Optimizing Iceberg Queries with Complex Joins
  • From In-Place Updates to In-Place Appends: Revisiting Out-of-Place Updates on Flash

Storm 論文を読んで Storm についてまとめた

今回はストリーム処理システムの Storm の論文を読みました。

対象にした論文は Storm@twitter [Ankit2014] です。

2014年の SIGMOD での発表論文です。論文を元に自分が理解したものについてまとめます。

2014年の論文なので Storm 0.x 系の話が中心です(とはいえ、コアとなる仕組みやアーキテクチャはほとんど変わっていないでしょう)。 2017年現在のStorm 1.x, 2.x 系などではさらに改良されています(HA Nimbus など)。


概要

Storm はフォールトトレラントを備えたリアルタイム分散ストリームデータ処理システムである。 この論文ではStorm のアーキテクチャといかにしてスケールアウト・フォールトトレラントを実現しているかを説明する。 また、クエリ(すなわち、topology)がどう実行されるかについても説明する。

Storm の特徴

  1. Scalable(スケーラブル)
  2. Resilent(回復力が高い)
  3. Extensible(拡張可能)
  4. Efficient(効率的)
  5. Easy to Administer(管理しやすい)

1. Scalable(スケーラブル)

既に存在するデータの流れを中断することなく、Storm クラスタにノードを追加したり外したりすることが可能。

2. Resilent(復帰力が高い)

フォールトトレランスは大規模クラスタにとって必須。Storm クラスタはパフォーマンスに与える影響を最小にして処理を続けなければならない。

3. Extensible(拡張可能)

Storm の topology は任意の外部機能(例えば、MySQL 上のデータ検索)を呼ぶこともあるので、拡張性のあるフレームワークが必要になる。

4. Efficient(効率的)

リアルタイムにデータを処理するためには、良い性能を持たなければならない。 ストレージやメモリ中の計算データを保持するための効率的なテクニックを使っている。

5. Easy to Administer(管理しやすい)

リアルタイム処理は止まると、ユーザが異変にすぐに気付く。 運用チームは早期に警告してくれるツールを必要とし、問題が起こる箇所をすぐに特定できなければならない。 なので、使いやすい運用ツールはあったら良いではなく、要件として必須である

Storm のデータモデルとアーキテクチャ

基本的な Storm のデータ処理のアーキテクチャは topologyの中 を流れる tuple ストリームから構成される。 topology は頂点が計算を表し、矢印が計算コンポーネント間のデータフローを表す有向グラフである。 頂点は spoutsbolts の2つに分けられる。

Spouts はその topology の tuple の入力元である。典型的に spout は kafka などのキューからデータを取得する。 Bolts はやってきたタプルを処理し、次の下流のボルトにタプルを流す。 なお、storm の topology は cycle を持つことができる。

Tweet のストリームで出現した単語をカウントし、5分おきにカウントを生成するシンプルな topology の例を図1に示す。 TweetSpout は、Twitter の Firehose API から tuple を引っ張ってきて、新しい tweet を継続的にこの topology に注ぎ込む。 ParseTweetBolt は tweet を単語に分解し、<word, count>の 2-ary タプルを送信する。 WordCountBolt は 2-ary タプルを受取り、各単語の出現数の総和を5分おきに出力する。 単語の出現数を出力した後は、内部のカウンタを初期化する。 f:id:kmikmy:20171223151657p:plain 図1.Tweet word count topology

Storm 全体像

f:id:kmikmy:20171223204816p:plain

  • Nimbus: マスターノード。クライアントはトポロジーNimbus に投入する。Nimbus は topology の実行の分散と協調の責任者。
  • Worker Node: 実際に働くノード。各ワーカ上に一つ以上のワーカープロセスが走っている。一つのワーカープロセスは単一の topology に属している。
  • Supervisor: 各ワーカーノード上に一プロセス存在し、 Nimbus と通信を行う。
  • Zookeeper: クラスタの状態を保持するために利用する。

各ワーカプロセスは一つの JVM を走らせており、一つ以上の Executor が存在する。Executor はワーカープロセスの中のスレッドであり、Executor は一つ以上のタスクから構成される(基本的には 1 Executor につき 1 task だが、1 Executor が複数 task を担当することも可能) 。

f:id:kmikmy:20171223213459p:plain

各 spout、各 bolt はクラスタ上をまたいで存在する executor の集合の中で動いているタスクの集合である(一つの spout/bolt は一つ以上のタスクからなる)。 複数の task は bolt 内 / spout 内の並列性を提供する。 生産者である spout/bolt から消費者である bolt へデータはシャッフルされる。Storm は次の分割戦略をサポートしている。

  1. Shuffle grouping: ランダムにタプルを分割する。
  2. Fields grouping: タプルの属性/フィールドの部分集合上のハッシュ値にもとづいて分割する。
  3. All grouping: 全ての消費者のタスクに一つのストリーム全てを複製して送る。
  4. Global grouping: 一つのストリームを全て、一つの bolt に送る。
  5. Local grouping: 同じ executor の中の消費者である bolt にタプルを送る。

大雑把に言うと、topology はデータベースシステムの観点からは論理的なクエリプランとして考えられる。topology の一部として、プログラマは各 spout / bolt が spawn される数を指定できる。 現在はプログラマが spout / bolt の数を指定する必要があるが、Future work の一部として、パフォーマンス目的などの高い抽象レベルの目的にもとづいて数の自動的な決定と動的な変化があげられる。

Storm の内部実装

Storm のキーコンポネントと、各コンポーネントがどのように相互影響しているかについての説明

Nimbus and Zookeeper

NimbusHadoop でいう "JobTracker" のような役割で、ユーザと Storm システムの間を取り持つタッチポイント。 NimbusApache Thrift のサービスであり、Storm topology の定義は Thrift オブジェクトである。Storm クラスタにジョブを送信するために、ユーザは Thrift オブジェクトとして topology を記述し、Nimbus にその object をオウル。この設計により、どんなプログラミング言語でも Storm topology を作成することができる。

topology を送る一部として、ユーザは JAR ファイルとして Nimbus にユーザコードを送る。Nimbus はローカルディスクと topology の状態について保持している Zookeeper を組み合わせて使う。 現在、ユーザコードは Nimbus マシンのローカルディスクに記録され、topology の Thrift オブジェクトは ZooKeeper に保存されている。

Supervisor は 定期的なハートビードプロトコルNimbus と連絡を取り、現在実行しているtopology と、他の topology を実行するために利用可能なリソースの空きを報告する。 Nimbus は 割当の必要のある topology を継続して追跡し、延期された topology と Supervisor の間をとりもつ(match-making する)。

Nimbus と Supervisor の間の全協調作業は Zookeeper を使ってなされる。さらに、Nimbus と Supervisor デーモンは fail-fast(とりあえずやってみてダメだったらやめる方針) で stateless であり、彼らの全ての状態は Zookeeper またはローカルディスクに保持されている。この設計は Storm の回復力の鍵となる。もし Nimbus サービスが死んでも、worker は処理を続けることができる。worker が死んだら supervisor が worker を再起動させる。

しかしながら、Nimbus がダウンしているとユーザは新しい topology を投入できない。また、実行中の topology で物理故障が起こったとしても、Nimbus が復活するまで異なるマシンに再割り当てすることができない。ここでの Future work は、この制限に対処して Storm をより resilent (回復力が高い)にすることである。(Storm 1.x 系で実装された Nimbus の HA化の話?)

Supervisor

Supervisor は 各 Storm ノード上で動いている。Nimbus から割当を受けると、その割当に基づいて worker を生成する。また、worker の健康をモニタし、必要であれば再生成する。 Supervisor は 3 つのスレッドを生成する。メインスレッドは Storm の設定を読み、Supervisor のグローバルマップ(?)を初期化し、ローカルファイルシステムに永続的な状態を作成し、タイマーイベントの繰り返しをスケジュールする。

タイマーイベントは次の3種類。

  1. heartbeat イベント: 15 秒おきに実行するようにスケジュールされ、メインスレッドのコンテキストで実行される。Nimbus に supervisor が生きていることを報告する。
  2. synchronize supervisor イベント: event manager thread で10秒おきに実行される。このスレッドは既存の割当の変化を管理するのに責任を持つ。もし、新しいトポロジーの追加などの変化があると、必要な JAR ファイルとライブラリをダウンロードし、即座に synchronize イベントをスケジュールする。
  3. synchronize process イベント: process event manager thread で3秒おきに実行される。このスレッドは topology の一部を実行するワーカープロセスの管理責任を持つ。ローカルの状態から worker のハートビートを読み、worker を valid, timed out, not started, or disallowed のいずれかに分類する。

"timed out" ワーカーはワーカーが特定のタイムフレームでハートビートを提供しなかったことを意味し、それは死んだとみなされる。 "not started" ワーカーは新しく送信された topology に割り当てられたか、既存の topology のワーカーがこの supervisor に移動してきたという理由で、worker がまだ開始していないことを意味する。 "disallowed" ワーカーはそのトポロジーが kill されたか、topology のワーカーが他のノードに移動したかという理由で、ワーカーが実行されるべきでないことを意味する。

Workers and Executors

各 Executor には一つ以上のタスクが割り当てられる。一つのタスクは spout または bolt の一インスタンスである。 割当は現在静的であるため、一つのタスクは厳密に一つの executor に固定される。

入ってくるタプルや出て行くタプルをルーティングするために、各 worker プロセスは worker receive threadworker send thread の2つの専用のスレッドをもっている。 worker receive thread は TCP/IP ポートを listen し、全ての入ってくるタプルの逆多重化ポイントとして機能します。これは tuple の宛先タスクの id を調べ、それに応じて、その executor に対応する適切な in queue に入ってくるタプルを突っ込む。

各 executor は user logic threadexecutor send thread の2つのスレッドから構成される。 User logic thread は in-queue から入ってきたタプルを受取、宛先タスクの id を調べ、そのタプルについて実際のタスク( spout または bolt )を実行し、出力タプルを生成する。 それから出力タプルは executor に紐づくout-queue に配置される。次に executor send thread は out-queue から複数のタプルを取り出し、それらを global transfer queue に入れる。 global transfer queue は複数の executor からの全ての出力タプルを含んでいる。

worker send thread は、global transfer queue の各タプルを調べ、タプルの宛先タスクの id にもとづいて、次の下流の worker にそれを送る。 同じワーカーの異なるタスクに向けられた出力タプルについては、 executor send thread は直接タプルを宛先タスクの in-queue に書き込む。

f:id:kmikmy:20171224115158p:plain

Processing Semantics

Storm は "at least once" と "at most once" の2つのセマンティクスを提供する。

"at least once" セマンティクスを提供するために、topology には、spout によって送信されるすべてのタプルの非循環グラフを追跡する "acker" bolt が追加される。 拡張された word count topology の例を以下の図に示す。

f:id:kmikmy:20171224123031p:plain

Storm はシステムを流れる各タプルにランダムに生成された 64-bit の "message id" を付加する。この id は各種入力ソースから初めにタプルを引っ張ってきた spout の中でタプルに付加される。 一つのタプルを処理した時に新しくタプルが生成されることもある;例えば、一つの Tweet を含むタプルは、bolt によってトレンドのトピックの集合に分割され、一つの入力タプルからトピックごとに一つのタプルが生成される。 そのような新しいタプルには新たな 64-bit の id が付加され、そのタプル id のリストも出力タプルに関連する一つの provenance tree(起源ツリー)に保持される。 一つのタプルが最終的に topology から離れる時、その出力タプルに貢献したタスクを確認するためにバックフローメカニズムが使用される。 このバックフローメカニズムは最終的にタプルの処理が始まった最初の場所である spout に到達し、その地点でその tuple をリタイアすることが可能である。

このメカニズムのナイーブな実装は各タプルについてリニアに追跡する必要がある。これは各タプルについて、タプルの処理が終わるまでその元となるタプルの id を保持しなければならないことを意味する。 このような実装は特に複雑な topology では大量にメモリを使ってしまうので、実際には XOR を使った実装がされている。 一つのタプルが spout によって処理された後、一つ以上のタプルが送信される。これらの送信されたタプルは新しい message id が割り当てられている。これらの message id は XOR され、起源のタプルの message id とタイムアウトのパラメータとともに acker bolt に送られる。これによって、acker bolt はこれらの全てのタプルを追跡する。 タプルの処理が完了か ack された時、元のタプルのメッセージ id とそのメッセージ id が acker bolt に送られる。acker bolt は元のタプルとその XOR チェックサムを見つけ出す。この XOR チェックサムは ack されたタプルの id でまた XZOR される。XOR チェックサムが 0 になった時、acker bolt はそのタプルを送信した spout に最終的な ack を送る。そうして、spout はこのタプルが完全に処理されたことを知る。

failure のために、XOR チェックサムが永遠に 0 にならない可能性もある。そのようなケースに対処するために、spout は初めに timeout パラメータを割り当てる。 acker bolt はこのタイムアウトパラメータで追跡を続け、XOR チェックサムタイムアウト前に0にならない場合は、タプルは failed になったとみなす

Storm での通信は信頼性のある TCP/IP を介して行われるので、タプルは一回よりも多く送信されることはない。故に XOR は冪等性でないけれど、XOR のメカニズムがうまく働く。

"at least once" のセマンティクスでは、データソースはタプルを保持しなければならない。 そのタプルについて spout が positive な ack を受けたら、データソースにタプルを削除するように伝えることができる。 もし、ack や fail が指定した時間内に到着しなかったら、データソースは "保持" を期限切れにし、そのタプルを再送するだろう。 Kestrel queue ではそのような挙動をする。一方、Kafka queue の場合は、処理したタプルは各 spout 毎に ZooKeeper 上にチェックポイントされている。 spout インスタンスが fail したりリスタートする時、ZooKeeper に記録されている最後のチェックポイントからタプルの処理を開始する。

"at most once" のセマンティクスはシステムに入ったタプルが少なくとも一回処理されるか、全く処理されないかのどちらかを意味している。 Storm は acking のメカニズムをオフにすることで most once セマンティクスを達成する。 ack をオフにすると、その topology の各ステージにおいてタプルの処理が成功 or 失敗に関する保証はなく、処理は前へ前へ続けられる。

感想

他のブログの記事などを読んでいると fail の原因は明示的に fail 関数を呼ぶか、タイムアウトしかないと書いてあったのが「?」だったのですが、論文を読んで ack/fail が実際にどのように判断されているかについて理解することができました。 他人のブログの記事読んで首捻ってる暇があったら、さっさと論文を読んだほうがよいですね。

この記事では Storm のアーキテクチャの説明をまとめましたが、論文では Twitter 社が Storm を実際にどのように使用しているかについても述べられています。 ZooKeeper に書き込む一部のデータを Key-Value ストアに書くようにして worker 数をスケールさせたり、Storm によるオーバーヘッド調査、max spout pending というパラメータの自動決定アルゴリズムなどについて書かれているので突っ込んだ話を知りたい人は論文の 3章以降を読むと良いかなと思います。

参考文献

[Ankit2014]: Ankit Toshniwal, Siddarth Taneja, Amit Shukla, Karthikeyan Ramasamy, Jignesh M. Patel, Sanjeev Kulkarni, Jason Jackson, Krishna Gade, Maosong Fu, Jake Donham, Nikunj Bhagat, Sailesh Mittal, Dmitriy V. Ryaboy: Storm@twitter. SIGMOD Conference 2014: 147-156

Kafka 論文を読んで Kafka についてまとめた

対象とする論文は Jay Kreps, Neha Narkhede, and Jun Rao. Kafka: a distributed messaging system for log processing. ACM SIGMOD Workshop on Networking Meets Databases, page 6, 2011

データベースのトップカンファレンス SIGMOD 2011 で発表された論文。 2011 年は機械学習が学会以外の場所でも盛り上がり始めたぐらいの頃だろうか。

Kafka の概要については事前に色々なサイトで調べたりしてたんですが、やはり論文に勝る情報源はないですね。 どう使うか、どんな特徴があるかは当然重要ですが、システム開発者(システム開発者はシステム利用者に非ず)にとって重要なのはどう実装されているかだと思います。

主に Kafka のアーキテクチャについてまとめています。 (注意:2011 年の論文の内容のため、2017 年現在の実装とはいくつか乖離があるかもしれないです。コンセプト自体はそんなに変わってはいないでしょうが。)


概要

Kafka はログ処理に適したメッセージングシステム。 Kafka に貯めたログは複数のオフラインとオンラインアプリケーションから利用することが可能。 メッセージの送受信は効率的であり、スケーラブルな設計を持つ。

Kafka のコンセプト

特定のタイプのメッセージのストリームを topic と定義する。 producer はメッセージを topic に発行する。 それから、発行されたメッセージは broker と呼ばれるサーバの集合に保存される。 consumer は broker から一つ以上の topic を購読することができ、broker からデータを pull することによって読まれたメッセージは消費される。

Kafka は次の2つの配送モデルをサポートする

  • point-to-point モデル: 複数の consumer が共同で一つの topic の全メッセージを消費するモデル(topic を partition という単位にわけ、各 consumer がそれぞれ割り当てられた partition を消費する)
  • publish/subscribe モデル: 複数の consumer が自分専用の一つの topic のコピーを回収するモデル(複数の consumer グループ(後述)が同じデータを読むことができ、あるグループは他のグループに影響を与えない。)

ログの保存方法

一つの topic の各 partition は一つの論理的なログに対応する。 物理的には、一つのログはだいたい同じサイズ(例えば1GB)のセグメントファイルの集合として実現される。 producer がメッセージを partition に発行すると、broker は最後のセグメントファイルにそのメッセージを追記する。 より良い性能のため、設定された数(batch)のメッセージが発行された後か一定時間が経過した後のタイミングで、セグメントファイルに永続化(flush)する。 メッセージは永続化された後でしか consumer に公開されない

Kafka は明示的なメッセージの id を持たず、そのかわり各メッセージはログの論理的なオフセットで特定される。 これにより、メッセージ id と実際のメッセージの場所をマッピングするインデックス構造とそれを管理するオーバーヘッドが不要になる。 Kafka のメッセージ id は連続的に増加せず、次のメッセージ id を計算するためには、現在のメッセージの id に自身のメッセージの長さを足さなければならない。

consumer はパーティションからシーケンシャルにメッセージを消費する。 consumer があるオフセットから始まるメッセージを取得したら、そのパーティションのそのオフセットより前の全メッセージを受け取ったことを意味する。 consumer は非同期で pull リクエストを broker に発行する。各 pull リクエストで、消費が始まるメッセージのオフセットと読み込み可能なバイト数が渡される

各 broker はメモリ上にソートされたオフセットの一覧を持ち、ここに各セグメントファイルの最初のメッセージのオフセットが入っている。 broker はオフセット一覧からリクエストされたメッセージがあるセグメントファイルを特定し、メッセージデータを consumer に送る。 consumer がメッセージを受け取った後、consumer は次の pull リクエストのために、次のメッセージのオフセットを計算する(オフセットを計算するのは broker でないことに注意)

f:id:kmikmy:20171202233337p:plain

Kafka log (Kafka log と in-memory index の図) [Jay2011 Figure. 2 より]

メッセージの転送方法

producer は一回の send リクエストで複数のメッセージを送信できる。 consumer のエンド API は一度に一つのメッセージをイテレートするが、consumer からの各 pull リクエストは一定サイズ(典型的に数百キロバイト)で取得され、バッファリングされている。

consumer のネットワークアクセスの最適化も行っており、ローカルファイルをリモートソケットに送る際の典型的なアプローチは次の4回のデータコピーと2回のシステムコールからなる。

  1. ストレージから OS のページキャッシュにデータをコピー
  2. アプリケーションバッファにページキャッシュのデータをコピー
  3. アプリケーションバッファをカーネルバッファにコピー
  4. カーネルバッファをリモートソケットに送信

が、LinuxUnix で用意されている、ファイルチャネルのバイト列を直接ソケットチャネルに転送する sendfile API を使うことで 2, 3 でもたらされる2回のコピーと1回のシステムコールを避けている。

broker の設計

多くのメッセージングシステムとは異なり、Kafka では各 consumer がどこまで消費したのかは broker ではなく、consumer 自身が持っている。 この設計は多くの複雑性と broker のオーバーヘッドを減らす。 この設計だと consumer がどこまでメッセージを消費したかがわからないため、どこまでメッセージを削除して良いかが分からないという問題があるが、これは保持期間による 単純な時間ベースの SLA を使うことで解決する。 一定期間(典型的に7日)が過ぎたメッセージは自動的に削除されるようになっており、Kafka は長期間のデータ保持による大量のデータで性能劣化しない。 ほとんどの consumer はオフラインのものも含めて、一日毎、一時間ごと、またはリアルタイムでデータ処理を終える。

この設計による重要な利点として、consumer は意図的に古いオフセットに rewind して戻ることができ、データを再消費できる。 これはキューとしての共通規約には違反するがが、多くの consumer にとって重要な機能である。 例えば、consumer のアプリケーションロジックでエラーが起きた際に、アプリケーションはそのエラーをなおした後に特定のメッセージを再処理することができる。 (consumer の rewind は push モデルよりも pull モデルのほうがかなり簡単にサポートできるとされる。)

分散コーディネーション

各プロデューサーがメッセージを送るパーティションは、ランダムに選ばれるか、partion key と partition function によって意味的に決定される。

Kafka には consumer グループという概念があり、各 consumer グループはトピックの集合をともに消費する一つ以上の consumer から構成される。 各メッセージは一つの consumer グループ内では一つの consumer にのみ届けられる。 異なる consumer グループはそれぞれ独立してメッセージの全集合を消費する。 同じグループ内の複数の consumer は異なるプロセスまたは異なるマシンとして存在する。

Kafka の分散コーディネーションの目標は、consumer 間で大きな協調のオーバーヘッドをかけずに broekr に保存されたメッセージを公平に分割することである。

  • トピックの partition を並列処理の最小単位とする
    • 一つのパーティションの全メッセージは、各 consumer グループ内の単一の consumer によってのみ消費される。一つのパーティションから同時に複数の consumer の消費を許すと、ロックが必要になったり、状態の管理などオーバーヘッドがかかる協調動作が必要になる。
  • 中心となるマスターノードをもたず、分散した形で各々が協調する。
    • マスタの概念を追加すると、マスタの failure を考えなければならない。
    • この協調に Zookeeper を使う。

Kafka での Zookeeper の利用

Zookeeper は API のような単純なファイルシステムを提供する。また、次のような特徴がある。

(a) あるパスのウォッチャーを登録でき、そのパスの子やそのパスの値に変更があったときに通知を受けられる (b) ephemeral ( persistent の反対; 揮発性を持つ )としてパスを作ることもでき、これは作成したクライアントがいなくなったらそのパスは Zookeeper サーバから自動的に削除される。 (c) 複数のサーバにデータを複製でき、高い可用性を提供する。

Kafka は Zookeeper を次のタスクで用いている (1) broker と consumer の追加や削除の検知 (2) (1) が起こった時の各 consumer 上で rebalance プロセスのトリガー (3) どの partition がどの consumer から消費されているかの関係の記録と各 partition の消費されたオフセットの記録

正常処理時

各 broker や consumer が起動したら、その情報を Zookeeper 上の broker や consumer のレジストリに保存する。 broker レジストリは broker のホスト名やポート、その broker 上に保存されている topic や partition の集合を含む。 consumer レジストリは その consumer がどの consumer グループに所属しているのかと consumer が購読している topic の集合を含む。 各 consumer グループは ownership レジストリと offset レジストリに関連付けられる。 ownership レジストリは各 partition につき 1 パス持ち、パスの値は現在その partition から消費をおこなっている consumer の id である。 offset レジストリは購読されている partition が、最後に取得されたメッセージのオフセットを保存する。offset レジストリは定期的に更新される。

Zookeeper によって作られたパスは、broker, consumer, ownership レジストリの中では ephemeral で、offset レジストリは persistent である。

障害発生時

broker に障害が起きたら、その broker の全ての partition 情報が broker レジストリから取り除かれる。 consumer に障害が起きたら consumer レジストリの自身のエントリと、ownership レジストリにある自身が保持する全ての partition 情報が取り除かれる。 各 consumer は broker レジストリと consumer レジストリの両方の上に Zookeeper の watcher を登録することで、broker の集合や consumer グループに変化が起こるときはいつでも通知を受けとれる。

consumer の最初の起動時または、consumer が watcher を通じて broker/consumer の変化を通知された時、consumer は自身が消費すべき新しい partition のサブ集合を決定するためのリバランスプロセスを起動する。

リバランス処理

  1. consumer ははじめに、購読する各 topic T の利用可能な partition の集合 (P_T)と T を購読する consumer の集合 (C_T) を計算する。
  2. P_T を |C_T| 個 で範囲分割し、一つのチャンクを自身で取得する。
  3. その consumer が取得した各パーティションについて、consumer は ownership レジストリパーティションの新しい所有者として自身を書き込む。
  4. consumer は所持する各パーティションからデータを取得するスレッドを開始し、offset レジストリに保存されたオフセットから開始する。

一つのグループ内に複数の consumer がいる時、consumer はそれぞれ broker や consumer の変化の通知を受ける。しかし、その通知はわずかに異なるタイミングで発生する。ここで起こりうるのは、他の consumer がまだ所持しているパーティションの権限を取ろうとすることだ。これが起こる時は、初めの consumer が単に現在所持している全パーティションを開放し、少し待った後にリバランスプロセスが再開する。2,3回 のリトライがあった後にリバランスプロセスは安定化する。

なお、新しい consumer グループが作成された時、offset レジストリに有効なオフセットは存在しない。この場合、consumer は各購読 partition で利用可能な最小のオフセットか最大のオフセット(設定による)で始め、broker が提供する API を使用する。

メッセージ配送の保証

Kafka では at-least-once (最低一回) のみ保証する。 Exactly-once (確実に1回)は典型的に 2-phase commit を必要とするが、ログ処理アプリケーションでは必要ない。

consumer プロセスが急にクラッシュした場合、障害が起きた consumer が担当していた partition を肩代わりする consumer プロセスは zookeeper に記録された最後のオフセット以降で重複するメッセージを取得するかもしれない。 アプリケーションが重複を気にするなら、消費者に返すオフセットまたはメッセージ内の一意のキーを使用して独自の重複排除ロジックを追加する必要があるが、これは two-phase コミットよりコスト効率の良いアプローチである。

Kafka は一つの partition からのメッセージは一つの consumer に順番に送られる異なるパーティションから来るメッセージの順番は保証されない

ログ破壊を避けるために、Kafka はログにある各メッセージに CRC を保存する。 broker 上で I/O エラーがあると、Kafka は CRC に不整合のあるメッセージを削除するリカバリープロセスが走る。メッセージレベルの CRC を持つことで、メッセージが生産 or 消費された後のネットワークエラーをチェックすることができる。

broker がダウンした場合、そこに保存されまだ消費されていないメッセージは使用不可になる。 その broker のストレージシステムが致命的に破壊された場合、まだ消費されていないいくつかのメッセージは永遠にロストする。 将来的に、Kafka で各メッセージを複数の broker に重複して保存するための備え付けの replication 機能を追加する計画である(2017年現在既に追加されています)。

Future Work

参考

Ja2011: Jay Kreps, Neha Narkhede, and Jun Rao. Kafka: a distributed messaging system for log processing. ACM SIGMOD Workshop on Networking Meets Databases, page 6, 2011

react を使って SPA を構築した場合、静的リソースが存在しない URL を直接指定すると 404 Error になる

この記事の概要(忙しい方はここだけ見れば良いです)

react-router のルーティング設定だけでは、静的リソースの存在しないページを直接 URL で指定してアクセスすると 404 Error になります。 URL を直接指定しても目的のページを表示させるようにするには、サーバでフォールバックを設定しましょう。webpack-dev-server であるならば historyApiFallback: true を設定することで、404 Error を回避できます。

react-router のルーティングはあくまでクライアント上での(サーバーにリクエストを送らない)ページ遷移に用いられるもので、サーバの方で routing が設定されている OR 静的リソースが存在しないページへのアクセスは 404 Error になります。routing が設定されていない(静的リソースが存在しない)ページがサーバに要求された場合、トップレベルの URL (http://localhost/ ) のコンテンツをクライアントに返した後で、react-router による遷移がなされるような(フォールバック)設定をする必要があります。

詳細説明

環境

frontend: react + redux backend: nodejs (webpack-dev-server)

状況

原因の詳細

https://teratail.com/questions/26245:react-routerでURLパラメータを指定した際、URL直打ちだと404になります から引用

トップページから react-router でページを辿る場合以下の遷移になりますが、

http://hoge.com/
http://hoge.com/index.html => 実際ブラウザが読み込んでいるのはこのファイル
http://hoge.com/detail/1 => react-router が対応するコンポーネントを読み込んで表示。このとき History API により URL が書き換わっている
ブラウザリロード、またはダイレクトにアクセスした場合、以下のようになります。

http://hoge.com/detail/1 => このリソースは存在しないので 404 になる
したがって、リソースが存在しない場合 /index.html に rewrite してあげないといけないはずです。rewrite の設定ができている場合、ダイレクトにアクセスした場合の挙動は以下のようになります。

http://hoge.com/detail/1
http://hoge.com/index.html => URLが rewrite された結果ブラウザが読み込んでいるのはこのファイルになる
http://hoge.com/detail/1 => react-router が対応するコンポーネントを読み込んで表示

解決方法

webpack.config.js の devServer に "historyApiFallback: true" を追加する

devServer: {
  historyApiFallback: true,
  contentBase: 'www',
  port: 4000,
  inline: true
}

 これで存在しないリソースに対するアクセスは、/index.html に フォールバックされるようになります。

参考

blog.mismithportfolio.com