Storm 論文を読んで Storm についてまとめた

今回はストリーム処理システムの Storm の論文を読みました。

対象にした論文は Storm@twitter [Ankit2014] です。

2014年の SIGMOD での発表論文です。論文を元に自分が理解したものについてまとめます。

2014年の論文なので Storm 0.x 系の話が中心です(とはいえ、コアとなる仕組みやアーキテクチャはほとんど変わっていないでしょう)。 2017年現在のStorm 1.x, 2.x 系などではさらに改良されています(HA Nimbus など)。


概要

Storm はフォールトトレラントを備えたリアルタイム分散ストリームデータ処理システムである。 この論文ではStorm のアーキテクチャといかにしてスケールアウト・フォールトトレラントを実現しているかを説明する。 また、クエリ(すなわち、topology)がどう実行されるかについても説明する。

Storm の特徴

  1. Scalable(スケーラブル)
  2. Resilent(回復力が高い)
  3. Extensible(拡張可能)
  4. Efficient(効率的)
  5. Easy to Administer(管理しやすい)

1. Scalable(スケーラブル)

既に存在するデータの流れを中断することなく、Storm クラスタにノードを追加したり外したりすることが可能。

2. Resilent(復帰力が高い)

フォールトトレランスは大規模クラスタにとって必須。Storm クラスタはパフォーマンスに与える影響を最小にして処理を続けなければならない。

3. Extensible(拡張可能)

Storm の topology は任意の外部機能(例えば、MySQL 上のデータ検索)を呼ぶこともあるので、拡張性のあるフレームワークが必要になる。

4. Efficient(効率的)

リアルタイムにデータを処理するためには、良い性能を持たなければならない。 ストレージやメモリ中の計算データを保持するための効率的なテクニックを使っている。

5. Easy to Administer(管理しやすい)

リアルタイム処理は止まると、ユーザが異変にすぐに気付く。 運用チームは早期に警告してくれるツールを必要とし、問題が起こる箇所をすぐに特定できなければならない。 なので、使いやすい運用ツールはあったら良いではなく、要件として必須である

Storm のデータモデルとアーキテクチャ

基本的な Storm のデータ処理のアーキテクチャは topologyの中 を流れる tuple ストリームから構成される。 topology は頂点が計算を表し、矢印が計算コンポーネント間のデータフローを表す有向グラフである。 頂点は spoutsbolts の2つに分けられる。

Spouts はその topology の tuple の入力元である。典型的に spout は kafka などのキューからデータを取得する。 Bolts はやってきたタプルを処理し、次の下流のボルトにタプルを流す。 なお、storm の topology は cycle を持つことができる。

Tweet のストリームで出現した単語をカウントし、5分おきにカウントを生成するシンプルな topology の例を図1に示す。 TweetSpout は、Twitter の Firehose API から tuple を引っ張ってきて、新しい tweet を継続的にこの topology に注ぎ込む。 ParseTweetBolt は tweet を単語に分解し、<word, count>の 2-ary タプルを送信する。 WordCountBolt は 2-ary タプルを受取り、各単語の出現数の総和を5分おきに出力する。 単語の出現数を出力した後は、内部のカウンタを初期化する。 f:id:kmikmy:20171223151657p:plain 図1.Tweet word count topology

Storm 全体像

f:id:kmikmy:20171223204816p:plain

  • Nimbus: マスターノード。クライアントはトポロジーNimbus に投入する。Nimbus は topology の実行の分散と協調の責任者。
  • Worker Node: 実際に働くノード。各ワーカ上に一つ以上のワーカープロセスが走っている。一つのワーカープロセスは単一の topology に属している。
  • Supervisor: 各ワーカーノード上に一プロセス存在し、 Nimbus と通信を行う。
  • Zookeeper: クラスタの状態を保持するために利用する。

各ワーカプロセスは一つの JVM を走らせており、一つ以上の Executor が存在する。Executor はワーカープロセスの中のスレッドであり、Executor は一つ以上のタスクから構成される(基本的には 1 Executor につき 1 task だが、1 Executor が複数 task を担当することも可能) 。

f:id:kmikmy:20171223213459p:plain

各 spout、各 bolt はクラスタ上をまたいで存在する executor の集合の中で動いているタスクの集合である(一つの spout/bolt は一つ以上のタスクからなる)。 複数の task は bolt 内 / spout 内の並列性を提供する。 生産者である spout/bolt から消費者である bolt へデータはシャッフルされる。Storm は次の分割戦略をサポートしている。

  1. Shuffle grouping: ランダムにタプルを分割する。
  2. Fields grouping: タプルの属性/フィールドの部分集合上のハッシュ値にもとづいて分割する。
  3. All grouping: 全ての消費者のタスクに一つのストリーム全てを複製して送る。
  4. Global grouping: 一つのストリームを全て、一つの bolt に送る。
  5. Local grouping: 同じ executor の中の消費者である bolt にタプルを送る。

大雑把に言うと、topology はデータベースシステムの観点からは論理的なクエリプランとして考えられる。topology の一部として、プログラマは各 spout / bolt が spawn される数を指定できる。 現在はプログラマが spout / bolt の数を指定する必要があるが、Future work の一部として、パフォーマンス目的などの高い抽象レベルの目的にもとづいて数の自動的な決定と動的な変化があげられる。

Storm の内部実装

Storm のキーコンポネントと、各コンポーネントがどのように相互影響しているかについての説明

Nimbus and Zookeeper

NimbusHadoop でいう "JobTracker" のような役割で、ユーザと Storm システムの間を取り持つタッチポイント。 NimbusApache Thrift のサービスであり、Storm topology の定義は Thrift オブジェクトである。Storm クラスタにジョブを送信するために、ユーザは Thrift オブジェクトとして topology を記述し、Nimbus にその object をオウル。この設計により、どんなプログラミング言語でも Storm topology を作成することができる。

topology を送る一部として、ユーザは JAR ファイルとして Nimbus にユーザコードを送る。Nimbus はローカルディスクと topology の状態について保持している Zookeeper を組み合わせて使う。 現在、ユーザコードは Nimbus マシンのローカルディスクに記録され、topology の Thrift オブジェクトは ZooKeeper に保存されている。

Supervisor は 定期的なハートビードプロトコルNimbus と連絡を取り、現在実行しているtopology と、他の topology を実行するために利用可能なリソースの空きを報告する。 Nimbus は 割当の必要のある topology を継続して追跡し、延期された topology と Supervisor の間をとりもつ(match-making する)。

Nimbus と Supervisor の間の全協調作業は Zookeeper を使ってなされる。さらに、Nimbus と Supervisor デーモンは fail-fast(とりあえずやってみてダメだったらやめる方針) で stateless であり、彼らの全ての状態は Zookeeper またはローカルディスクに保持されている。この設計は Storm の回復力の鍵となる。もし Nimbus サービスが死んでも、worker は処理を続けることができる。worker が死んだら supervisor が worker を再起動させる。

しかしながら、Nimbus がダウンしているとユーザは新しい topology を投入できない。また、実行中の topology で物理故障が起こったとしても、Nimbus が復活するまで異なるマシンに再割り当てすることができない。ここでの Future work は、この制限に対処して Storm をより resilent (回復力が高い)にすることである。(Storm 1.x 系で実装された Nimbus の HA化の話?)

Supervisor

Supervisor は 各 Storm ノード上で動いている。Nimbus から割当を受けると、その割当に基づいて worker を生成する。また、worker の健康をモニタし、必要であれば再生成する。 Supervisor は 3 つのスレッドを生成する。メインスレッドは Storm の設定を読み、Supervisor のグローバルマップ(?)を初期化し、ローカルファイルシステムに永続的な状態を作成し、タイマーイベントの繰り返しをスケジュールする。

タイマーイベントは次の3種類。

  1. heartbeat イベント: 15 秒おきに実行するようにスケジュールされ、メインスレッドのコンテキストで実行される。Nimbus に supervisor が生きていることを報告する。
  2. synchronize supervisor イベント: event manager thread で10秒おきに実行される。このスレッドは既存の割当の変化を管理するのに責任を持つ。もし、新しいトポロジーの追加などの変化があると、必要な JAR ファイルとライブラリをダウンロードし、即座に synchronize イベントをスケジュールする。
  3. synchronize process イベント: process event manager thread で3秒おきに実行される。このスレッドは topology の一部を実行するワーカープロセスの管理責任を持つ。ローカルの状態から worker のハートビートを読み、worker を valid, timed out, not started, or disallowed のいずれかに分類する。

"timed out" ワーカーはワーカーが特定のタイムフレームでハートビートを提供しなかったことを意味し、それは死んだとみなされる。 "not started" ワーカーは新しく送信された topology に割り当てられたか、既存の topology のワーカーがこの supervisor に移動してきたという理由で、worker がまだ開始していないことを意味する。 "disallowed" ワーカーはそのトポロジーが kill されたか、topology のワーカーが他のノードに移動したかという理由で、ワーカーが実行されるべきでないことを意味する。

Workers and Executors

各 Executor には一つ以上のタスクが割り当てられる。一つのタスクは spout または bolt の一インスタンスである。 割当は現在静的であるため、一つのタスクは厳密に一つの executor に固定される。

入ってくるタプルや出て行くタプルをルーティングするために、各 worker プロセスは worker receive threadworker send thread の2つの専用のスレッドをもっている。 worker receive thread は TCP/IP ポートを listen し、全ての入ってくるタプルの逆多重化ポイントとして機能します。これは tuple の宛先タスクの id を調べ、それに応じて、その executor に対応する適切な in queue に入ってくるタプルを突っ込む。

各 executor は user logic threadexecutor send thread の2つのスレッドから構成される。 User logic thread は in-queue から入ってきたタプルを受取、宛先タスクの id を調べ、そのタプルについて実際のタスク( spout または bolt )を実行し、出力タプルを生成する。 それから出力タプルは executor に紐づくout-queue に配置される。次に executor send thread は out-queue から複数のタプルを取り出し、それらを global transfer queue に入れる。 global transfer queue は複数の executor からの全ての出力タプルを含んでいる。

worker send thread は、global transfer queue の各タプルを調べ、タプルの宛先タスクの id にもとづいて、次の下流の worker にそれを送る。 同じワーカーの異なるタスクに向けられた出力タプルについては、 executor send thread は直接タプルを宛先タスクの in-queue に書き込む。

f:id:kmikmy:20171224115158p:plain

Processing Semantics

Storm は "at least once" と "at most once" の2つのセマンティクスを提供する。

"at least once" セマンティクスを提供するために、topology には、spout によって送信されるすべてのタプルの非循環グラフを追跡する "acker" bolt が追加される。 拡張された word count topology の例を以下の図に示す。

f:id:kmikmy:20171224123031p:plain

Storm はシステムを流れる各タプルにランダムに生成された 64-bit の "message id" を付加する。この id は各種入力ソースから初めにタプルを引っ張ってきた spout の中でタプルに付加される。 一つのタプルを処理した時に新しくタプルが生成されることもある;例えば、一つの Tweet を含むタプルは、bolt によってトレンドのトピックの集合に分割され、一つの入力タプルからトピックごとに一つのタプルが生成される。 そのような新しいタプルには新たな 64-bit の id が付加され、そのタプル id のリストも出力タプルに関連する一つの provenance tree(起源ツリー)に保持される。 一つのタプルが最終的に topology から離れる時、その出力タプルに貢献したタスクを確認するためにバックフローメカニズムが使用される。 このバックフローメカニズムは最終的にタプルの処理が始まった最初の場所である spout に到達し、その地点でその tuple をリタイアすることが可能である。

このメカニズムのナイーブな実装は各タプルについてリニアに追跡する必要がある。これは各タプルについて、タプルの処理が終わるまでその元となるタプルの id を保持しなければならないことを意味する。 このような実装は特に複雑な topology では大量にメモリを使ってしまうので、実際には XOR を使った実装がされている。 一つのタプルが spout によって処理された後、一つ以上のタプルが送信される。これらの送信されたタプルは新しい message id が割り当てられている。これらの message id は XOR され、起源のタプルの message id とタイムアウトのパラメータとともに acker bolt に送られる。これによって、acker bolt はこれらの全てのタプルを追跡する。 タプルの処理が完了か ack された時、元のタプルのメッセージ id とそのメッセージ id が acker bolt に送られる。acker bolt は元のタプルとその XOR チェックサムを見つけ出す。この XOR チェックサムは ack されたタプルの id でまた XZOR される。XOR チェックサムが 0 になった時、acker bolt はそのタプルを送信した spout に最終的な ack を送る。そうして、spout はこのタプルが完全に処理されたことを知る。

failure のために、XOR チェックサムが永遠に 0 にならない可能性もある。そのようなケースに対処するために、spout は初めに timeout パラメータを割り当てる。 acker bolt はこのタイムアウトパラメータで追跡を続け、XOR チェックサムタイムアウト前に0にならない場合は、タプルは failed になったとみなす

Storm での通信は信頼性のある TCP/IP を介して行われるので、タプルは一回よりも多く送信されることはない。故に XOR は冪等性でないけれど、XOR のメカニズムがうまく働く。

"at least once" のセマンティクスでは、データソースはタプルを保持しなければならない。 そのタプルについて spout が positive な ack を受けたら、データソースにタプルを削除するように伝えることができる。 もし、ack や fail が指定した時間内に到着しなかったら、データソースは "保持" を期限切れにし、そのタプルを再送するだろう。 Kestrel queue ではそのような挙動をする。一方、Kafka queue の場合は、処理したタプルは各 spout 毎に ZooKeeper 上にチェックポイントされている。 spout インスタンスが fail したりリスタートする時、ZooKeeper に記録されている最後のチェックポイントからタプルの処理を開始する。

"at most once" のセマンティクスはシステムに入ったタプルが少なくとも一回処理されるか、全く処理されないかのどちらかを意味している。 Storm は acking のメカニズムをオフにすることで most once セマンティクスを達成する。 ack をオフにすると、その topology の各ステージにおいてタプルの処理が成功 or 失敗に関する保証はなく、処理は前へ前へ続けられる。

感想

他のブログの記事などを読んでいると fail の原因は明示的に fail 関数を呼ぶか、タイムアウトしかないと書いてあったのが「?」だったのですが、論文を読んで ack/fail が実際にどのように判断されているかについて理解することができました。 他人のブログの記事読んで首捻ってる暇があったら、さっさと論文を読んだほうがよいですね。

この記事では Storm のアーキテクチャの説明をまとめましたが、論文では Twitter 社が Storm を実際にどのように使用しているかについても述べられています。 ZooKeeper に書き込む一部のデータを Key-Value ストアに書くようにして worker 数をスケールさせたり、Storm によるオーバーヘッド調査、max spout pending というパラメータの自動決定アルゴリズムなどについて書かれているので突っ込んだ話を知りたい人は論文の 3章以降を読むと良いかなと思います。

参考文献

[Ankit2014]: Ankit Toshniwal, Siddarth Taneja, Amit Shukla, Karthikeyan Ramasamy, Jignesh M. Patel, Sanjeev Kulkarni, Jason Jackson, Krishna Gade, Maosong Fu, Jake Donham, Nikunj Bhagat, Sailesh Mittal, Dmitriy V. Ryaboy: Storm@twitter. SIGMOD Conference 2014: 147-156