著者: Shashank Naik, Bhagya Gummalla
訳注: 本稿は2019/5/9に公開されたブログ記事の翻訳です。
「スモール・ファイル」はApache Hadoopの世界では一般的な課題であり、注意して扱わないと、いくつかの複雑な問題を引き起こす可能性があります。Apache Hadoop分散ファイルシステム(HDFS)は、テラバイトからペタバイトの範囲にわたる、大きなデータセットを格納して処理するために開発されました。しかしHDFSはスモール・ファイルを効率的に格納しないため、スモール・ファイルによってNameNodeのメモリ使用率とRPC呼び出しの効率が悪くなり、スキャンのスループットが低下し、アプリケーション層のパフォーマンスが低下します。このブログ記事ではスモール・ファイルのストレージの問題とは何かを定義し、複雑さを避けつつ対処する方法を検討します。
「スモール・ファイル」とは何か
スモール・ファイルとは、Apache Hadoopの、HDFSのデフォルトブロックサイズ(CDHではデフォルトで128MB)よりもかなり小さなファイルです。HDFS上にいくつかのスモール・ファイルを置くことは想定されており、それは避けられないことに注意すべきです。スモール・ファイルとはライブラリのjarファイル、XMLの設定ファイル、一時的なステージングファイルのようなファイルです。しかし、スモール・ファイルがデータセットの重要な部分になると問題が起こります。このセクションでは、ファイルサイズをできる限りHDFSブロックサイズの倍数に近づけることが良い目標である理由を説明します。
Hadoopのストレージ層とアプリケーション層は、多数のスモール・ファイルで効率的に機能するようには設計されていません。この意味を理解する前に、HDFSがファイルをどのように保存するかを見てみましょう。
HDFSでは、データとメタデータは別々のエンティティです。ファイルはブロックに分割され、クラスター全体でDataNodeのローカルファイルシステムに格納され、複製されます。HDFSの名前空間ツリーとそれに関連するメタデータは、NameNodeのメモリ(およびディスクにバックアップされる)内のオブジェクトとして管理されています。これらは、経験則としてそれぞれ約150バイトを占めます。この配置についてはドキュメントで詳しく説明しています。
以下の2つのシナリオは、スモール・ファイルの問題を示しています。
シナリオ1(1つの大きなファイル。サイズは192MiB):
シナリオ2(192個のスモール・ファイル。それぞれは1MiB):
シナリオ1では、128MBと64MBのサイズの2ブロックに分割された192MBのファイルが1つあります。複製後、ファイルのメタデータを格納するために必要な合計メモリは、150バイト ×(1ファイル inode+(ブロック数×複製係数))です。
この計算によると、このファイルのメタデータをNameNodeに格納するのに必要な合計メモリは、 150 ×(1 +(2 × 3))= 1050 バイトです。
対照的に、シナリオ2には192個の1 MBのファイルがあります。これらのファイルはクラスタ全体に複製されます。これらのファイルのメタデータを格納するためにNameNodeが必要とする合計メモリは、 150 ×(192 +(192 × 3))= 115200バイトです。
したがって、1つの大きな192MBのファイルではなく複数のスモール・ファイルを格納するためには、NameNodeのヒープ上に100倍以上のメモリが必要であることがわかります。
ストレージ層への影響
NameNodeを再起動した時に、ローカルディスクからファイルシステムのメタデータをメモリにロードする必要があります。つまり、NameNodeのメタデータが大きい場合、再起動が遅くなります。NameNodeは、クラスター上のブロック位置の変更も追跡する必要があります。スモール・ファイルが多すぎると、DataNodeがディスク上のデータ領域を使い果たす前に、NameNodeがメモリー内のメタデータ領域を使い果たすこともあります。DataNodeは、また、ネットワークを介してNameNodeへのブロック変更を報告します。ブロック数が多いほど、ネットワークを介して報告する変更が増えます。
ファイルが多いほど、NameNodeで処理する必要がある読み取り要求が多くなるため、NameNodeの処理能力が低下する可能性があります。これにより、RPCキューと処理待ち時間が増加し、パフォーマンスと応答性が低下します。そうするとリクエスト数は40K〜50K RPC/sに達し、RPCの負荷が全体的に高い状態にあると言えるでしょう。
アプリケーション層への影響
一般的に、Impalaのような分析用SQLエンジンや、MapReduceやSparkのようなアプリケーションフレームワークを介して計算を実行する際、スモール・ファイルが多数あるとディスクのシークが増えます。
MapReduce / Spark
Hadoopでは、計算を実行できる最も細かい単位のデータがブロックです。そのためアプリケーションのスループットに影響します。MapReduceでは、読み込む必要があるブロックごとに個別のMapタスクが生成されます。従って、データが非常に少ないブロックではパフォーマンスが低下し、Application Masterのブックキーピング処理、タスクのスケジューリング、およびタスク作成のオーバーヘッドが増加する可能性(各タスクは独自のJVMプロセスを必要とするため)があります。
この概念はSparkでも似ており、エクゼキュータ内での各「マップ」同等のタスクは一度に1つのパーティションを読み取って処理します。各パーティションは、デフォルトで1つのHDFSブロックです。従って、単一の同時タスクを、SparkのRDD内のすべてのパーティションに対して実行できます。たくさんのスモール・ファイルがある場合、それぞれのファイルは異なるパーティションで読み込まれるため、CPUコアあたりのスループットが低下し、タスクスケジューリングのオーバーヘッドが大幅に増加することを意味します。
MapReduceジョブは、_SUCCESSや_FAILUREなどの0バイトのファイルも作成します。これらのファイルはHDFSのブロックを考慮に入れていませんが、前述のように、それぞれ150バイトを使用するNamenodeヒープ内の inodeエントリーとして登録されています。これらのファイルを消去する簡単で効果的な方法は、下記のHDFSコマンドを使用することです。
hdfs dfs -ls -R <path> | awk ‘$1 !~ /^d/ && $5 == “0” { print $8 }’ | xargs -n100 hdfs dfs –rm
ゴミ箱の保存ポリシーが有効になっていると、上記のコマンドにより、0バイトのファイルは自動的に .Trash ディレクトリ以下(ゴミ箱)に移動されます。
注:アプリケーションが、ジョブがいつ完了または失敗したかを知るのにこれらのファイルに依存している場合、削除するとアプリケーションが失敗する可能性があるため、ワークロードが指定されたパスで実行されている間はこの処理を行わないでください。
Impala — Catalog Server への影響
Impalaは高性能のクエリエンジンであり、高速なメタデータアクセスのために、Catalog ServerにHDFS名前空間の情報をキャッシュします。以下の図は、Impala Catalogがサービス全体で維持、配布する方法を詳細に示したアーキテクチャーです。
NameNodeのメタデータ管理が複雑であったように、ImpalaがCatalog Serverで維持する必要があるメタデータに関しても同様の問題が発生します。カタログのサイズは、Catalog Serverで管理されているオブジェクトの数とサイズの関数です。以下の表に、これらオブジェクトとその推定平均メモリ使用量を示します。
*1.4KB/列/パーティションまで可能
例:それぞれ200個のパーティションのある1000のテーブルで、1つのパーティションにつき10のファイルがある場合、Impalaのカタログサイズは次のようになります(テーブル統計とテーブルの幅を除く)。
#tables * 5KB + #partitions * 2kb + #files * 750B + #file_blocks * 300B = 5MB + 400MB + 1.5GB + 600MB = ~ 2.5GB
Impalaのカタログサイズが大きいほどメモリ使用量も大きくなります。Hive/Impala用のHMS(Hive MetaStore Server)内の大きなメタデータは、より多くのファイルを追跡する必要があるため、以下の理由によりお勧めできません。
- メタデータの読み込み時間が長くなる
- StateStoreのトピックの更新時間が長くなる
- DDLステートメント操作が遅くなる
- クエリ計画の配布時間が長くなる
メタデータに関連する問題に加え、Impalaではデフォルトで各ディスク読み取りがシングルスレッドになっているため、スモール・ファイルはI/Oに大きなオーバーヘッドが生じる可能性があります。さらに、テーブルがParquetファイル形式で格納されている場合、各物理ファイルは2回オープン/クローズする必要があります。つまりフッタの読み取り用に1回と、列データ用にもう1回です。
スモール・ファイルはどこから来るのか
知らないうちにスモール・ファイルが生み出される可能性がある、よくある誤りのいくつかについて説明しましょう。
ストリーミングデータ
データを少しずつ大量に取り込むと、一定期間内に多数のスモール・ファイルが作成される可能性があります。たくさんのデータを作成しない小さなウィンドウ(数分または数時間ごと)を使用するストリーミングデータのニアリアルタイムの要求では、この問題が発生します。以下は、典型的なHDFSへのストリーミングETL取り込みパイプラインです。
多くの数のMapper/Reducer
MapReduceジョブおよび多くのMapperまたはReducerを含むHiveクエリは、Mapper(Map-Onlyジョブの場合)またはReducer(MapReduceジョブの場合)の数に比例して、HDFS上に多数のファイルを生成する可能性があります。HDFSに書き込まれるデータが不十分な(訳注:書き込む量が少ない)多数のReducerはスモール・ファイルを生み出してしまいます。これは、各Reducerがそれぞれ1つのファイルに書き込むためです。同じ方針により、データの偏り(Skew)はほとんどのデータが1つまたは少数のReducerに渡され、他のReducerに書き込むデータがほとんどないため、スモール・ファイルになるという同様の結果をもたらす可能性があります。
過剰な数のパーティションがあるテーブル(表)
過剰なパーティションがあるテーブルとは、パーティションごとに少量のデータ(256 MB未満)を持つ、パーティション化されたHiveテーブルです。Hive Metastore Server(HMS)API呼び出しのオーバーヘッドは、テーブルで維持するパーティションの数とともに増加します。その結果パフォーマンスが低下します。このような場合はパーティションの設計を見直し、パーティションの粒度を粗くすることを検討してください。たとえば日単位のパーティションから、月単位のパーティションへと変更します。
過剰な並列化
Sparkジョブでは、書き込みタスクで指定されているパーティションの数に応じて、新しいファイルがパーティションごとに書き込まれます。これはMapReduceフレームワークでの、各Reduceタスクに応じて新しいファイルが作成されるのと似ています。Sparkのパーティションが多いほど多くのファイルが書き込まれます。スモール・ファイルの生成を抑えるためにパーティションの数を制御します。
ファイルフォーマットと圧縮
TextFileフォーマットなどの非効率的なファイルフォーマットを使用してデータを圧縮せずに保存すると、スモール・ファイルの問題がさらに悪化し、さまざまな形でパフォーマンスとスケーラビリティに影響を与えます。
- 非列形式(TextFile、SequenceFile、Avro)で格納されている非常に広いテーブル(多数の列を持つテーブル)からデータを読み取るには、たとえ少しの列しか必要ない場合でも、各レコードをディスクから完全に読み取る必要があります。Parquetのようなカラムナ(列指向)フォーマットは、ディスクから必要な列だけを読むことを可能にします。これはパフォーマンスをかなり改善できます。
- 効率の悪いフォーマット、特に非圧縮フォーマットを使用すると、HDFSの使用量とNameNodeで追跡する必要があるブロックの数が増加します。ファイル群のサイズが小さい場合、これはデータがより多くのファイルに分割され、保存する関連メタデータの量が増えることを意味します。
スモール・ファイルを識別するには
FSImageとfsck
NameNodeはファイルに関連するすべてのメタデータを格納するために、名前空間のイメージ全体をメモリ(RAM)に保持します。fsimageは、NameNodeのローカルネイティブファイルシステムのファイルに格納されているイメージの永続的なレコードです。従って、 fsimageまたはfsckの出力を分析することで、スモール・ファイルのパスを識別できます。
fsimageで利用可能なフィールド
Path, Replication, ModificationTime, AccessTime, PreferredBlockSize, BlocksCount, FileSize, NSQUOTA, DSQUOTA, Permission, UserName, GroupName
fsimageは、MapReduceやSparkのようなアプリケーションフレームワークで処理できます。また、容易なSQLアクセスをするためにHiveテーブルにロードすることもできます。
もう1つの方法はfsckの出力を使用して、解析するためにHiveテーブルにロードすることです。このアプローチにはいくつかの異なる方法があります。 このリンク先の公開プロジェクトでは、PySparkとHiveを使用して実現しています。それぞれのHDFSのパスでのブロックの総数、平均ブロックサイズ、およびファイルサイズの合計を集計して、HiveまたはImpalaでクエリできます。
Cloudera Navigator
Cloudera Navigatorは、監査、リネージ、メタデータ管理、データスチュワードシップ、およびポリシー強制を備えたデータガバナンス製品です。
Navigatorの「検索」タブと「分析」タブを使用して、小さいファイルを簡単に識別できます。左側のパネルにあるHDFS検索フィルタを使用すると、特定のサイズまたはサイズの範囲のファイルをフィルタできます。Cloudera Navigatorの新しいバージョン(2.14.x)には、以下に示すように小さなファイルを識別するための組み込みのDashboardウィジェットさえもあります。
スモール・ファイルに取り組む方法
予防
ストリーミング取り込みのユースケース
前述のようにストリーミングデータを取り込むと、通常スモール・ファイルが作成されます。取り込み速度、ウィンドウ、またはDSstreamのサイズ(Spark)を微調整することは、一部の問題の軽減に役立ちます。しかし、通常ニアリアルタイムの分析要求を満たすためには、中間コンパクションジョブ、複数のランディングディレクトリの維持、およびテーブルのデータのアクティブ/パッシブなバージョンに関して、HDFSの取り込みパイプラインにいくつかアーキテクチャーの変更を導入する必要があります。これについては、 Cloudera Engineeringブログ(英語)で詳しく説明しています。
ニアリアルタイムの分析処理では、データの種類(非構造化 vs 構造化)、追加と更新の頻度、およびデータ使用パターン(ランダム読み取り vs 集約)に基づいて、HBaseとKuduがストレージ層に適しています。
訳注: 現在ではデータの保存先として、HDFS以外の選択肢 (オブジェクトストアやNoSQL)も増えています。
Apache HBaseはGoogleのBigTableを参考に開発されたNoSQLであり、テーブルを複数のワーカーノードに分散して保存します。オンラインでのデータ更新が得意ですが、分析処理はそれほど得意ではありません。古いスライドですが、興味のある方はこちらのスライドをご覧ください。一方Kuduは列指向の分散ストレージエンジンで、HDFSとは異なりデータの削除、更新ができ、列指向により分析処理にも適しています。Kuduにご興味があればこちらのスライドもご覧ください。
バッチ取り込みのユースケース
バッチ取り込みパイプラインの場合、HDFSに到着後にファイルをコンパクト化する、定期的にスケジュールしたコンパクションジョブが適しています。後ほどこの処理に適しているファイルコンパクションツールを紹介します。
過剰な数のパーティションがあるテーブル
各パーティション内のファイルが大きくなるように、大量のデータを持つパーティションの作成を目指します。パーティションの粒度を決定しながら、パーティションごとに格納されるデータの量を検討します。たとえ、それが日単位の代わりに月単位を用いるような粒度の粗いパーティションを持つことを意味するとしても、大きなファイル(Parquetでは~256MB あるいはそれ以上)を持つパーティションを計画します。たとえば、テーブルが存続する期間中、パーティション数を10,000から30,000の範囲内で維持することは、従うべき良いガイドラインです。
データ量が少ない(数百MB)テーブルの場合は、パーティション化しないテーブルの作成を検討してください。複数のパーティションに散在するわずかなバイト数の数千のファイルを処理するよりも、1つのファイルに格納されているすべての(小さな)テーブルのデータをスキャンする方が効率的です。
テーブルにバケットを作成する(生成されるReducerと出力ファイルの数を固定する)ことによっても、小さなファイルの数を減らすこともできます。
Sparkの過剰な並列化
SparkでHDFSにデータを書き込むときは、ディスクに書き込む前にパーティションを再分割 (repartition) または結合 (coalesce) します。これらの関数で指定するパーティション数によって出力ファイルの数が決まります。Sparkジョブの出力を確認し、作成されたファイル数と達成されたスループットを確認することを強くお勧めします。
治療
HDFSファイルコンパクト化ツール
スモール・ファイル群に対する最も明白な解決策は、ファイル群をHDFSの大きなファイルに書き換える、ファイルコンパクト化のジョブを実行することです。これによく使われるツールはFileCrushです。Sparkコンパクションツールなど、他にも公開されているプロジェクトがあります。
Hiveでテーブルを再作成する
パフォーマンスと効率的なストレージのバランスを良くするため、PARQUETファイル形式を使用してテーブルを作成し、データを書き込むときにデータ圧縮が有効になっていることを確認してください。
多数のスモール・ファイルを含む既存のHiveテーブルがある場合、テーブルを書き換える前に以下の設定ができます。
set hive.exec.compress.output=true;
set hive.exec.parallel = true;
set parquet.compression=snappy;
set hive.merge.mapfiles=true;
set hive.merge.mapredfiles=true;
set hive.merge.smallfiles.avgsize = 134217728; --128M
set hive.merge.size.per.task = 268435456; --256M
set hive.optimize.sort.dynamic.partition = true;
set parquet.blocksize= 268435456; --256M
set dfs.block.size=268435456; --256M
注:ここで指定されている平均サイズとParquetブロックのサイズは代表的なものであり、用途やニーズに基づいて変更する必要があります。Hiveの設定プロパティの詳細は Apache Hiveの公式ページをご覧ください。
これを行うには2つの方法があります。
- ターゲット表がパーティション化されておらず、外部表でもなく、バケット化もされていない限り、CREATE TABLE AS SELECT(CTAS)ステートメントを実行してターゲット表を作成できます。
- 直接CTASを行う代わりにこれらの制限に対処するには、CREATE TABLE LIKE(CTL)ステートメントを実行して、ソース表のスキーマをコピーしてターゲット表を作成し、次にINSERT OVERWRITE SELECTステートメントを使用して、ソース表からデータをターゲット表にロードします。
注:静的パーティション名を定義せずにデータを挿入する場合、Hiveでnon-strict動的パーティションモードを有効にする必要があります。以下のプロパティで設定ができます:
hive.exec.dynamic.partition.mode=nonstrict
このコンテキストで動的パーティションが機能するには、パーティション列がSELECTステートメントの最後の列である必要があります。
次の簡単な例を見てください。
create external table target_tbl like source_tbl
stored as parquet
location <hdfs_path>’;
set hive.exec.dynamic.partition.mode=nonstrict;
insert overwrite table target_tbl partition (partition_col)
select * from source_tbl;
Impalaでも同様のCTASを実行できますが、クエリが異なるノード上の複数のフラグメントで実行されると、フラグメントごとに1つのファイルを取得します。これを回避するにはImpalaでset num_nodes = 1を使用して、単一ノードでクエリを実行するように制限することができますが、このアプローチは並列性を失うことで挿入が遅くなり、パフォーマンスが低下し、大きなテーブルを書き込む場合にデーモンがメモリー不足になるために推奨されません。
さらに、 mapred.reduce.tasksの設定を使用してReducerの数を直接設定することもできます。作成されるファイルの数は使用されたReducerの数に等しくなります。最適なReducer値の設定は書き込まれるデータの量によって異なります。
まとめ
治療よりも予防が大切です。そのため、アプリケーションの設計を見直し、スモール・ファイルを作成しているユーザーを把握することが重要です。スモール・ファイルの数が妥当であれば許容できるかもしれませんが、その数が多すぎるとクラスターに悪影響を及ぼす可能性があります。最終的には、いらだち、悲しみ、長時間の労働につながります。Happy Cluster, Happy Life!
Shashank NaikはClouderaのシニアソリューションコンサルタントです。
Bhagya GummallaはClouderaのソリューションコンサルタントです。