NiFiで毎秒10億のイベントを処理する

NiFiで毎秒10億のイベントを処理する

著者: Mark Payne Twitter: @dataflowmark
本ブログ記事は「https://blog.cloudera.com/benchmarking-nifi-performance-and-scalability/」(2020/4/9投稿)の日本語翻訳記事です。
(お知らせ)本ブログの一番最後に記載していますが、Clouderaでは日本語によるNiFiのトレーニングを開始します。

Apache NiFiがどれほど速いかを疑問に思ったことはありませんか?

NiFiがどれだけうまくスケールするのかを不思議に思ったことはありませんか?

単一のNiFiクラスターは、完全なデータ来歴とリネージを持って、1日あたり何兆ものイベントとペタバイトのデータを処理することができます。その方法は以下のとおりです。

これらは通常、お客様が本番環境でNiFiを使用することを検討している場合、最初に私たちが尋ねられる質問の一つです。お客様は、どのぐらいのハードウェアが必要か、NiFiがどの程度のデータレート(データ転送速度)に対応できるかどうかを知りたいと考えています。

これは驚くべきことではありません。今日の世界は、増え続けるデータ量で構成されています。ユーザーは、これらのデータレートに簡単に対応できるツールを必要としています。企業のスタックにあるツールが1つでも必要なデータレートに追いつけない場合、ボトルネックが発生し、残りのツールは必要なデータにアクセスできなくなってしまいます。

NiFiは多種多様なタスクを実行し、あらゆる種類やサイズのデータで動作します。そのため、ユースケースを完全に理解しないと、どの程度のハードウェアが必要になるのかを判断することは困難です。NiFiがFTPサーバーからHDFSにデータを移動するだけなら必要なリソースは少ないでしょう。NiFiが何百ものソースからデータを取り込み、フィルタリングし、ルーティングし、複雑な変換を実行し、最終的に複数の異なる宛先にデータの配信を担当している場合は、追加のリソースが必要になります。

幸いなことに、後者の質問に対する答え−「私が必要とする程度にNiFiはスケールできますか?」−は、はるかに簡単です。答えは、ほとんどの場合、「はい!」です。このブログでは、一般的なユースケースを定義して、実世界のデータ処理シナリオにおいてNiFiが高いスケーラビリティとパフォーマンスの両方を実現しているのかを実証します。

ユースケース

数字や統計に飛び込む前に、ユースケースを理解することが重要です。理想的なユースケースとは、現実的でありながら、簡潔に説明できるほどシンプルなものです。

以下のスクリーンショットは、そのようなユースケースを示しています。各プロセッサーは1から8までの数字で示されています。以下のユースケースのウォークスルーでは、データフローの各ステップがどのように実行されるかを説明するために、これらのプロセッサーの番号を参照しています。

ここで紹介するユースケースは次のとおりです。

  1. Google Compute Storage(GCS)にバケットが存在します。このバケットには、無視すべき他の無関係なデータに加えて、約1.5TB分のNiFiのログデータが含まれています。
  2. NiFiはこのバケットを監視します[プロセッサー1]。データがバケットに入った際、ファイル名に「nifi-app」が含まれている場合は、NiFiはデータを取り出します。[プロセッサー2、3]
  3. データは圧縮されている場合とされていない場合があります。これは、受信ログファイルごとに検出する必要があります[プロセッサー4]。圧縮されている場合は解凍する必要があります[プロセッサー5]。
  4. ログレベルが「WARN」または「ERROR」のメッセージ以外のログメッセージを除外します[プロセッサー6]。ログメッセージに例外が含まれていた場合、その例外も保持しておく必要があります。また、一部のログメッセージは複数行のログメッセージである可能性があることにも注意してください。
  5. ログメッセージをJSONに変換します[プロセッサー6]。
  6. JSONを圧縮します(元の受信データが圧縮されているかどうかにかかわらず)[プロセッサー7]。
  7. 最後に、WARNおよびERRORレベルのログメッセージ(圧縮されたJSON形式)をスタックトレースとともに2番目のGCSバケットに配信します[プロセッサー8]。GCSにデータのプッシュに失敗した場合、データは完了するまで再試行されます。

これは、NiFiの非常に一般的なユースケースです。新しいデータを監視し、利用可能な場合はそれを取得し、それに対してルーティングの決定を行い、データをフィルタリングし、変換し、最後にデータを最終的な目的地にプッシュします。

RouteOnAttributeプロセッサー[プロセッサー2]とFetchGCSObject プロセッサー[プロセッサー3]の間の接続のアイコンに注意してください。このアイコンは、データがクラスター全体で負荷分散されていることを示しています。GCSバケットはキューイングメカニズムを提供していないため、データをクラスターに合わせて引き出せるかどうかはNiFiにかかっています。このため、単一のノード(プライマリノード)のみでリストアップを実行します。次に、そのリストをクラスター全体に分散し、クラスター内のすべてのノードがGCSから同時に引き出せるようにします。これにより、膨大なスループットが得られ、クラスター内のノード間でデータをシャッフルする必要がなくなります。

また、ほとんどのデータフローは最初にデータの大部分をフィルタリングしないため、データにはINFOレベルのメッセージだけでなく、WARNメッセージとERRORメッセージがよく混ざっていることを確認したいことにも注意してください。この目的のため、私たちは意図的にいくつかのプロセッサーの設定を誤ることで、ログを生成するNiFiインスタンスに常にエラーが発生させました。その結果、ログメッセージの約20〜30%が警告やエラーであり、スタックトレースを含んでいました。平均メッセージサイズは約250バイトです。

ハードウェア

何らかのデータレートについて議論する前に、使用しているハードウェアの種類について議論することが重要です。今回の目的では、Google Kubernetes Engine(GKE)を「n1-highcpu-32」のインスタンスタイプで使用しています。これにより、ノードあたり32コアと28.8 GBのRAMが提供されます(ただし、NiFiのJVMは2 GBのヒープしか使用しないため、RAMはもっと少なくても大丈夫です)。NiFiのコンテナを26コアに制限し、DNSサービスやnginxのようなVM内で動作している他のサービスが責任を果たすのに十分なリソースを持っていることを保証します。

NiFiはデータをディスク上に保存しているため、使用するボリュームの種類も考慮する必要があります。Kubernetesで実行する場合、ノードが失われた場合、ノードが別のホストに移動されてもデータが失われないようにすることが重要です。そのため、Persistent SSDボリュームにデータを保存しています。GKEは、ある程度までは大容量ボリュームの方がより良いスループットを提供します。そこで、最高のパフォーマンスを確保するために、コンテンツリポジトリに1 TBボリュームを使用します(書き込みの場合は400 MB /秒、読み取りの場合は1,200 MB /秒)。FlowFileのリポジトリと Provenanceのリポジトリには 130 GB を使用しています。これらのボリュームは、同じアベイラビリティゾーンでビルトインの冗長性を提供しているため、良好なパフォーマンスと信頼性を手頃な価格帯で実現しています(現在は1GBあたり$0.17/月)。

パフォーマンス

NiFiが所定の時間内に処理できるデータ量は、ハードウェアに大きく依存しますが、設定されているデータフローにも大きく依存します。このフローでは、どのようなデータレートが達成されるのかを判断するために、いくつかの異なるサイズのクラスターで実行することにしました。結果を以下に示します。

データレートを本当に理解し、異なるクラスターサイズ間でレートを比較するためには、フローのどの時点で統計を観察したいのか、どの統計が最も関連性があるのかを検討する必要があります。フローの最後の方を見て、どのくらいの量のデータが流れたのかを確認することもできますが、既にフィルタリングで除外されたデータ(最終的にはWARNおよびERRORメッセージ以外のすべてのデータ)があるため、これは良い表現ではありません。GCSからデータをフェッチするフローの先頭付近を見ることもできますが、データの一部は圧縮され、一部は圧縮されていないので、どのぐらいのデータが処理されているのかを理解するのは難しいです。

もっと考えておくと便利なのは、「ログのフィルター、JSONへの変換」プロセッサー[プロセッサー6]への入力です。このプロセッサーが処理したデータ量は、クラスターが処理できたデータの総量を示します。さらに、このプロセッサのステータス履歴を見ることができます。これにより、処理している1秒あたりのレコード数がわかります。これらのメトリクスはどちらも重要なので、データレートを分析する際には両方のメトリクスを考慮します。

これらのメトリクスを見ると、いくつかの異なるサイズのクラスターで、NiFiがこのデータフロー上でどのようなパフォーマンスを発揮するかを見ることができます。では単一ノードを見てみましょう。

ここでは、単一ノードが56.41 GBの着信データを処理したことがわかります。これは5分間のタイムウィンドウで処理されています。この数字を300秒で割ると、0.18803 GB /秒、約192.5 MB /秒になります。ステータス履歴を見ると、1秒あたりのレコード(ログメッセージ)の数がわかります。

ここでは、単一ノードが平均して5分間あたり283,727,739レコード、または1秒あたり946,000レコード少々を処理したことがわかります。これは、1秒間に100万件のイベントを処理したことになります。単一ノードはショボいものではありません!

しかし、単一ノードが十分ではなく、より多くのノードにスケールアウトする必要がある場合はどうでしょうか。理想的にはノードを増やすことで、線形にスケーリングできることがわかります。単一ノードクラスターの代わりに5ノードクラスターを使用すると、次のような統計が得られます。

受信データレートは5分あたり264.42 GB(0.8814 GB /秒)になりました。1秒あたりのレコード数では、5分間で平均約14億9,300万レコード、つまり1秒あたり約497万レコードになります。

これをさらにスケールアウトすると、25ノードのクラスターで達成可能なパフォーマンスを観察することができます。

受信データレートは、5分間で1.71 TB、つまり5.8 GB /秒という驚異的なデータレートを示しています。1秒あたりのレコード数では次のようになります。

これは、5分あたり78億2,000万レコードを超えるデータレート、つまり1秒あたり2,600万件(または1日あたり2.25兆)のイベントが発生していることを示しています。25ノードのクラスターの場合、これはノードあたり1秒あたり100万レコード強に相当します。

洞察力のある読者は、ステータス履歴を見ると、時間の経過とともに読み取りレコードの数が大きく変化していることに気づくかもしれません。これはデータのばらつきによって最もよく説明できます。エラーが非常に少ないファイルを処理している時は、1秒あたりのレコード数が非常に大きくなります。スタックトレースを含むメッセージを処理している場合(これははるかに大きく、より多くの処理が必要です)、1秒あたりのレコード数は少なくなります。これは、これらの統計値と書き込まれたレコードの統計値と比較しても明らかです。

ここでは、読み取りレコード数が減少すると、書き込みレコード数が増加し、その逆も然りです。このため、統計を観察する際には、小さなメッセージと大きなメッセージの両方の処理を含む時間帯のみを考慮するようにしています。これを実現するために、読み取られたレコードの数が高いポイントと低いポイントに達する時間帯を選択します。次に、この時間帯の平均読み取りレコード数を考慮します。

毎秒2,600万件のイベントでは、ほとんどの組織が必要なデータレートに簡単に達しています。しかし、まだ到達していない組織にとっては、より大きなクラスターにした場合、NiFiは線形的にスケールし続けるのでしょうか?

それを確かめるために、クラスターを25ノードから100ノードに、そして150ノードへと増やしました。150ノードのクラスターで得られた結果を以下に示します。

ここでは、NiFiは5分間に9.56 TB(424億メッセージ)、または32.6 GB /秒(1億4130万イベント/秒)という驚異的な速度でデータを処理しています。これは、1日あたり2.75 PB(12.2兆イベント)に相当します。すべてのデータに発生した、すべてのイベントを追跡して表示する詳細な来歴情報を備えています。データがいつ、どこで受信されたのか、それがいつ、どこで何が他の場所に送られたのかを正確に知ることができます。

次の表は、比較のために、達成されたデータレートをまとめたものです。

Google Kubernetes Engineで上記で説明したフローを実行してキャプチャされたデータレートとイベントレート。各ノードは32コア、15 GB のRAM、2 GBのヒープがあります。コンテンツリポジトリは1 TBのPersistent SSD(書き込み400 MB /秒、読み取り1200 MB /秒)です。

スケーラビリティ

システムのパフォーマンス特性を理解することは重要ですが、単一ノードではデータレートが高すぎて追いつかないポイントがあります。その結果、複数のノードにスケールアウトする必要があります。つまり、システムがどの程度スケールアウトできるかを理解することも重要です。

前のセクションで、NiFiは少なくとも150ノードまで線形的にスケールアウトできることを見ましたが、その限界はどこにあるのでしょうか?250、500、1000ノードまでスケールできるのでしょうか。これらのノードが前述の32コアマシンよりもはるかに小さい場合はどうでしょうか?ここでは、答えを探ることにしました。

NiFiがどの程度スケールできるかどうかを調べるために、異なるサイズの仮想マシンで大規模なクラスターを作成してみました。すべてのケースで、15 GBのRAMを持つVMを使用しました。また、コンテンツリポジトリには130 GB、FlowFileのリポジトリには10 GB、Provenanceのリポジトリには20 GBのボリュームを使用し、前述の試行よりもはるかに小さいディスクを使用しました。小さいディスクサイズではIOPSの数とMB /秒が制限されるため、I/Oスループットが大幅に低下することを意味します。このように、同じノード数のクラスターでは、前述のセクションよりもはるかに小さいスループットが得られると予想されます。

4コア仮想マシン

最初に、NiFiが4コアのみの非常に小さなVMを使用してどのようにパフォーマンスを発揮するかを確認するためにスケールアウトを試みました。各 VM は NiFi だけでなく、Kubernetes DNS サービスや他の Kubernetes コアサービスもホストしなければならないので、NiFi コンテナを 2.5 コアに制限しなければなりませんでした。

150ノードのクラスタはそれなりにうまく機能しましたが、UIはかなりの遅延を示していました。500ノードにスケールすると、ほとんどのWebリクエストが完了するまでに5秒以上かかるなど、ユーザーエクスペリエンスが著しく低下しました。750ノードにスケールしようとすると、ノードがクラスタからドロップアウトし始め、クラスターが不安定になりました。NiFi のシステム診断ページによると、クラスターコーディネータは 1 分間の負荷平均が 30 以上で、利用可能なコアは 2.5 個しかありませんでした。これは、CPUが処理能力の約12倍以上の処理を求められていたことを意味します。この構成(1 VM あたり 4 コア)では、750 ノードのクラスタでは不十分と判断されました。

6コア仮想マシン

次に、6コアの仮想マシンのクラスターをスケールアウトしてみました。今回はコンテナを2.5コアではなく4.5コアに制限することができました。これにより、かなり良い結果が得られました。500ノードのクラスターでは若干の停滞が見られましたが、ほとんどのWebリクエストは3秒以内に完了しました。

750ノードにスケールアウトしても、UIの応答性という点ではほとんど違いはありませんでした。次に、1,000ノードのクラスタを試してみました。

実際、今回は6コアのVMを使用して1,000ノードまでスケールアウトすることができました。クラスタは安定していましたが、もちろん、このような小さなVMと限られたディスク容量では、パフォーマンスは各ノードで毎秒100万イベントの範囲内ではありませんでした。むしろ、各ノードで40,000~50,000イベント/秒の範囲内のパフォーマンスでした

この設定では、ほとんどのリクエストが2~3秒の範囲でかかっているため、UIはまだ少し遅くなっていました。

コアが少ないため、フローを実行するためにNiFiを提供するスレッドの数も減らしました。6コアのVMで1分間の負荷平均は2から4までの範囲で、ノードはあまり過負荷になっていないことがわかります。

この程度までスケールアウトしても線形的にスケールするかどうかは疑問が残りました。そこで次にこの点について調べてみました。

12コア仮想マシン

12コアの仮想マシンを使用して1,000ノードにスケールアウトすることでNiFiのスケーラビリティの調査を終了します。性能が線形的にスケールしているかどうかを判断するために、250ノード、500ノード、および1,000ノードの性能メトリクスを収集しました。ここでも、これらのノードのコア数は前の例の約3分の1に過ぎず、ディスクの速度もかなり遅いため、ここでのパフォーマンスは大規模なVMのパフォーマンスとは比較にならないはずです。

250ノードでは、これらのVMで処理されたイベントの数は約4500万イベント/秒(ノードあたり180,000イベント/秒)でした

500 ノードでは、イベント処理数が約 9,000 万イベント/秒(ノードあたり 180,000 イベント/秒)であることがわかりました。

これは、32コアシステムから見たパフォーマンスの約20%です。これは、ノードのコア数が1/3であり、コンテンツリポジトリが32コアシステムのスループットの約1/4のスループットを提供していることを考えると、非常に合理的です。これは、NiFiが垂直方向にスケーリングした場合にも、実際にはかなり線形にスケールすることを示しています。

最後に、12コアVMのクラスターを1,000ノードにスケールしました。興味深いことに、これは少し問題がありました。1,000 ノードのクラスタでは、1.5 TB のログデータが非常に高速に処理されるため、正確なパフォーマンス測定を行うのに十分な時間、キューを満杯にしておくのに苦労しました。これを回避するために、フローにDuplicateFlowFileプロセッサを追加しました。これにより、データをすぐに使い切らないようにすることができます。

しかし、これはちょっとしたズルです。データはすでにローカルに存在しているので、96%のデータについてはGCSからフェッチしていないことを意味します。しかし、NiFiはまだすべてのデータを処理しています。その結果、500ノードのクラスタの2倍以上のパフォーマンスが期待できます。

そして確かに、クラスタでは2億5,600万イベント/秒、ノードごとに256,000イベント/秒のオーダーという結果が出ました。

すべてをまとめる

NiFiでは、私たちの哲学は、ポイントAからポイントBへのデータの移動速度だけではなく、新しい機会をつかむために、いかに早く行動を変えるかということにあります。このため、これらのデータフローを構築するために、このようなリッチなユーザー体験を提供するように努めています。実際、このデータフローの構築にかかった時間はわずか15分程度で、いつでもその場で変更することができます。しかし、各ノードで毎秒100万レコードを超えているので、興奮しないわけにはいきません。

NiFiは少なくとも1,000ノードに線形にスケールアウトすることが可能であり、垂直方向のスケーリングも同様に線形であるという事実と組み合わせてください。1秒あたり100万イベントを1,000ノードと乗算します。次に、さらにスケールアウトできる可能性が高く、VMあたり96コアまで確実にスケールアウトできることを考えてみましょう。これは、1つのNiFiクラスターが、毎秒10億イベントを超える速度でこのデータフローを実行できることを意味します。

技術的なソリューションを構築する際には、すべてのツールが予測されるデータ量を処理できることを確認する必要があります。どんな複雑なソリューションでも追加のツールが必要になりますが、この記事では、適切なサイズに調整し、適切に設計されたフローを実行していれば、NiFiがボトルネックになる可能性は低いことを示しています。(しかし、データレートが毎秒10億イベントを超える場合には話し合う必要がありますね!)

補足情報

Clouderaでは、NiFiトレーニングコースの日本語開催を開始します。ハンズオン演習が含まれる3日間のトレーニングで、短期間でNiFiについて一通り学習することができます。内容、日程等の詳細は次のページをご覧ください。ご参加をお待ちしています!

Cloudera DataFlow: Apache NiFi を使ったフロー管理
https://jp.cloudera.com/about/training/courses/dataflow-flow-managment-with-nifi.html

 

E-book: IoTによる デジタル トランスフォーメーション

Cloudera Japan Marketing
この著者の他の記事

コメントする

あなたのメールアドレスは公開されません。また、コメントにリンクを貼ることはできません。