by Srinivas Rishindra Pothireddi
この記事は、2024/2/14 に公開された「Optimization Strategies for Iceberg Tables」の翻訳です。
はじめに
Apache Iceberg は、データレイクにデータウェアハウスのような機能を追加し、構造化および非構造化データの分析を容易にするため、最近人気が高まっています。スキーマの進化、隠れたパーティショニング、タイムトラベルなど、データエンジニアやデータアナリストの生産性を向上させるいくつかのメリットがあります。しかし、Iceberg テーブルを定期的にメンテナンスして健全な状態に保ち、読み取りクエリを高速に実行できるようにする必要があります。このブログでは、Iceberg テーブルで遭遇する可能性のあるいくつかの問題について説明し、それぞれのシナリオで Iceberg テーブルを最適化する方法を提案します。提供されている戦略を組み合わせて、特定のユースケースに適応させることができます。
スナップショットが多すぎる問題
Iceberg テーブルで書き込み操作が発生するたびに、新しいスナップショットが作成されます。このような状態が長期間続くと、テーブルの metadata.json ファイルが肥大化し、データストアに存在する古くて不要な可能性のあるデータ/削除ファイルの数が増え、ストレージコストが増加します。metadata.json ファイルが肥大化すると、大きなメタデータファイルを毎回、読み書きする必要があるため、読み書き時間が長くなる可能性があります。不要になったデータファイルを削除し、テーブルメタデータのサイズを小さく保つために、定期的にスナップショットを失効させることをお勧めします。スナップショットの有効期限切れは、比較的コストのかからない操作で、メタデータを使用して新しくアクセスできなくなったファイルを判断します。
解決策:スナップショットの期限切れ
古いスナップショットは、expire_snapshots を使って失効させることができます。
最適でないマニフェストの問題
時間が経つと、スナップショットは多数のマニフェストファイルを参照する可能性があります。これにより、クエリ計画が遅くなり、メタデータクエリの実行時間が長くなる可能性があります。さらに、最初に作成されたとき、マニフェストはパーティションプルーニングに適していない場合があります。一方、マニフェストがパーティションの離散的な境界にうまく編成されている場合、パーティションプルーニングはデータファイルのサブツリー全体を削除することができます。
解決策:マニフェストを書き換える
rewrite_manifests でマニフェストファイルが多すぎる問題を解決し、データファイルのバランスの取れた階層ツリーを得られる可能性があります。
ファイルの削除に関する問題
背景
merge-on-read 対 copy-on-write
Iceberg V2 以降、既存のデータを削除、更新、またはmergeステートメントによって更新する必要がある場合、copy-on-write と merge-on-read という 2 つのオプションが利用できます。copy-on-write オプションでは、delete、update、merge 操作に対応するデータファイルが読み込まれ、必要な書き込み変更を加えたまったく新しいデータファイルが書き込まれます。Iceberg は古いデータ ファイルを削除しません。そのため、変更が適用される前のテーブルを照会したい場合は、Icebergのタイムトラベル機能を使用することができます。タイムトラベル機能の利用方法については、別ブログにおいて説明します。古いデータファイルが不要になったと判断した場合は、上記で説明したように、古いスナップショットを期限切れにすることで、古いデータファイルを取り除くことができます。
merge-on-read オプションを使用すると、書き込み時にデータファイル全体を書き換える代わりに、単に削除ファイルが書き込まれます。これは等価削除ファイルでも位置削除ファイルでもかまいません。この記事を書いている時点では、Spark は等価削除を書きませんが、読み込むことはできます。このオプションを使用するメリットは、データファイル全体を書き換える必要がないため、書き込みが非常に速くなることです。例えば、GDPR の要件によりテーブル内の特定のユーザーデータを削除したい場合、Iceberg は単純にユーザーデータが存在する対応するデータファイル内のユーザーデータの場所を指定して、削除ファイルを書き込みます。そのため、テーブルを読み込むたびに、Iceberg はこれらの削除を動的に適用し、対応するレコードが物理データファイルに残っていても、ユーザーデータが削除された論理テーブルを表示します。
当社では、デフォルトで merge-on-read オプションを有効にしています。お客様の要件に応じて以下のプロパティを設定することで、有効または無効にすることができます。書き込みプロパティを参照してください。
シリアライズ可能とスナップショット分離
削除、更新、merge 操作に提供されるデフォルトの分離保証はシリアライズ可能な分離です。分離レベルをスナップショット分離に変更することもできます。シリアライズ可能な分離保証とスナップショット分離保証の両方が、データの読み取り一貫性のあるビューを提供します。シリアライズ可能な分離の方がより強力な保証です。例えば、従業員の給与を管理する従業員テーブルがあるとします。ここで、給与が$100,000を超える従業員に対応する全てのレコードを削除したいとします。この給与テーブルには5つのデータファイルがあり、そのうちの3つに$100,000以上の給与を持つ従業員のレコードがあるとします。「delete_mode」が merge-on-read の場合、これらの3つのデータファイル内の削除する位置を指す削除ファイルが書き込まれます。「delete_mode」が copy-on-write の場合、3つのデータファイルは単に書き換えられます。
delete_mode に関係なく、削除処理が行われている間に、別のユーザによって$100,000を超える給与を持つ新しいデータファイルが書き込まれたとします。選択した分離保証がスナップショットであれば、削除操作は成功し、元の3つのデータファイルに対応する給与レコードだけがテーブルから削除されます。削除操作の進行中に新しく書き込まれたデータファイルのレコードはそのまま残ります。一方、分離保証がシリアライズ可能であった場合、削除操作は失敗し、最初から削除をやり直す必要があります。使用するケースによっては、分離レベルを「スナップショット」に下げることをお勧めします。
問題点
削除ファイルが多すぎると、最終的には読み取りパフォーマンスが低下します。これは、Iceberg V2 仕様では、データファイルが読み込まれるたびに、対応するすべての削除ファイルも読み込む必要があるからです。(Iceberg コミュニティは現在、将来的に「delete vector」という概念を導入することを検討しており、現在の仕様とは異なる動作をする可能性があります)。これは非常にコストがかかります。位置削除ファイルには、現在のスナップショットのいずれにも存在しないデータへの参照が含まれていることがあり、それは、未解決の削除が含まれている可能性を持ちます。
解決策:位置削除の書き換え
位置削除ファイルの場合、ファイルをコンパクトにすることで、読み込む必要のある削除ファイルの数を減らし、削除データをより圧縮することでパフォーマンスを向上させることができます。さらに、この手順では未解決の削除も、削除されます。
ファイルの書き換え位置の削除
Iceberg は、Spark SQL の書き換え位置削除プロシージャを提供します。
しかし、削除ファイルが存在すると、パフォーマンス上の問題が生じます。また、規制上の要件により、論理的な削除ではなく、最終的に物理的にデータを削除しなければならない場合もあります。これは、大規模なコンパクションを行い、削除ファイルを完全に削除することで対処できます。
小さいファイルの問題
通常、読み取り中に触れるファイルの数は、最小限にしたいものです。ファイルを開くにはコストがかかります。Parquet のようなファイルフォーマットは、基礎となるファイルサイズが大きい場合にうまく機能します。同じファイルをさらに読み込む方が、新しいファイルを開くよりもコストがかかりません。Parquet では通常、ファイルサイズを512MB程度、行グループのサイズを128MB程度にします。書き込みフェーズでは、それぞれ「write.target-file-size-bytes」と「write.parquet.row-group-size-bytes」で制御します。Iceberg のデフォルトのままにしておくことをお勧めします。
例えば Spark の場合、データがディスクに書き込まれる際に Parquet/ORC で圧縮されるため、メモリ上のSpark タスクのサイズをデフォルトのサイズより大きくする必要があります。そのため、Spark タスクのサイズが十分に大きくない限り、ファイルを望ましいサイズにするのは簡単ではありません。
もう1つの問題はパーティションで発生します。適切に整列されない限り、Spark タスクは複数のパーティションにアクセスする可能性があります。例えば100個の Spark タスクがあり、それぞれが100個のパーティションに書き込む必要があるとします。この問題をパーティション増幅と呼びます。
解決策:書き込み時に distribution-mode を使用
増幅の問題は、書き込みプロパティで適切な書き込み分散モードを設定することで、書き込み時に対処することができます。挿入分散は「write.distribution-mode」で制御され、デフォルトでは none に設定されています。削除の分散は「write.delete.distribution-mode」で制御され、デフォルトでは hash に設定されます。更新の分散は「write.update.distribution-mode」で制御され、デフォルトでは none に設定されます。
この記事を書いている時点で Iceberg で利用可能な書き込み分散モードは、none、hash、rangeの3つです。モードが none の場合、データのシャッフルは行われません。このモードは、パーティション増幅の問題を気にしない場合や、ジョブ内の各タスクが特定のパーティションにしか書き込まないことがわかっている場合にのみ使用します。
モードが hash に設定されている場合、hash コードを生成するためにパーティションキーを使用してデータがシャッフルされ、各結果タスクが特定のパーティションにのみ書き込まれるようになります。分散モードが範囲に設定されている場合、データはパーティションキー、またはテーブルに SortOrder がある場合はソートキーによって並べ替えられるように分散されます。
Hash や範囲を使用すると、テーブルのパーティション数に基づいてデータを再分割することになるため、厄介なことになります。このため、シャッフル後の Spark タスクが小さすぎたり大きすぎたりする可能性があります。この問題は「spark.sql.adaptive.enabled=true」を設定することで、Spark のアダプティブクエリ実行を有効にすることで軽減できます (これは Spark 3.2 からデフォルトで有効)。Spark では、適応型クエリ実行の動作を調整するために、いくつかの設定項目が用意されています。何をしているのかを正確に把握していない限り、デフォルトのままにしておくのがおそらく最善の選択肢です。
パーティション増幅の問題は、ジョブに適した正しい書き込み分散モードを設定することで軽減される可能性がありますが、Spark タスクの書き込み量が少ないため、結果的にファイルが小さくなる可能性があります。ジョブが持っている以上のデータを書き込むことはできません。
解決策:データファイルの書き換え
小さいファイル問題や削除ファイル問題に対して、Iceberg はデータファイルを書き換える機能を提供しています。この機能は現在 Spark でのみ利用可能です。こちらは、ブログの後半で詳しく説明します。この機能を使うことで、データファイルをコンパクトにしたり、あるいは拡張したり、書き換えるデータファイルに対応する削除ファイルから削除を組み込んだり、より多くのデータを読み取り時に直接フィルタリングできるように、より良いデータ順序を提供したりすることができます。Iceberg が提供するツールボックスの中で最も強力なツールのひとつです。
RewriteDataFiles
Iceberg は Spark SQL でデータファイルを書き換えるプロシージャを提供します。
RewriteDatafiles JavaDoc を参照してください。
次に、戦略オプションの意味について説明します。データファイルの書き換えプロシージャをより活用するためには、戦略オプションを理解することが重要です。戦略オプションは3つあり、それは in Pack、Sort、Z Order です。Spark プロシージャを使用する場合は、sort_order を 「zorder(columns…)」に設定するだけで、Z Order ストラテジーが呼び出されることに注意してください。
戦略オプション
– ビンパック
- 一番安価で高速
- 小さすぎるファイルを結合し、ビンパッキングアプローチを使って結合し、出力ファイル数を減らす
- データの順序は変更されない
- データはシャッフルされない
– ソート
- ビンパックよりはるかに高価
- 完全な階層順序を提供
- 読み取りクエリは、クエリで使用されるカラムが順序付けされている場合にのみ恩恵を受ける
- 書き込みの前に、レンジパーティショニングを使用してデータをシャッフルする必要がある
– Z Order
- 3つのオプションの中で最も高価
- 使用されるカラムは、ある種の本質的なクラスタビリティを持つべきで、それでも各パーティションに十分な量のデータが必要である。もしそうであれば、クエリは読み取り時に多くのデータを削除することができる
- Z Order で複数のカラムを使用する場合にのみ意味がある。必要なカラムが1つだけの場合は、通常のソートの方がよい
- Z Order の詳細については、https://blog.cloudera.com/speeding-up-queries-with-z-order/ をご覧ください。
コミットコンフリクト
Iceberg は新しいスナップショットをコミットする際に楽観的同時実行制御を使用します。そのため、書き換えデータファイルを使ってデータを更新すると、新しいスナップショットが作成されます。しかし、そのスナップショットがコミットされる前に、コンフリクトがあるかどうかのチェックが行われます。コンフリクトが発生した場合、すべての作業が破棄される可能性があります。潜在的なコンフリクトを最小限に抑えるようにメンテナンス作業を計画することが重要です。コンフリクトの原因についていくつか説明します。
- 書き換えの開始からコミットの試行までの間に挿入のみが発生した場合、競合は発生しません。これは、挿入によって新しいデータファイルが生成され、新しいデータファイルを書き換えとコミットの再試行用のスナップショットに追加することができるためです。
- すべての削除ファイルは、1つ以上のデータファイルに関連付けられています。将来のスナップショット (B) で、書き換え中のデータファイルに対応する新しい削除ファイルが追加された場合、その削除ファイルはすでに書き換え中のデータファイルを参照しているため、競合が発生します。
紛争緩和
- もし可能であれば、テーブルへの書き込みが可能なジョブをメンテナンス作業中に一時停止してみてください。少なくとも、書き換え中のファイルには削除を書き込まないようにしてください。
- すべての新規書き込みと削除が新しいパーティションに書き込まれるように、テーブルをパーティション分割します。例えば、受信データが日付ごとにパーティショニングされている場合、新しいデータはすべて日付ごとのパーティションに書き込まれます。古い日付のパーティションに対して書き換え操作を実行できます。
- データファイルの書き換え Spark アクションのフィルターオプションを活用して、削除の競合が発生しないように、使用ケースに基づいて書き換えるファイルを最適に選択します。
- パーシャルプログレスを有効にすると、書き換え全体が完了する前にファイルグループをコミットすることで、作業を省力化できます。ファイルグループの1つが失敗しても、他のファイルグループが成功する可能性があります。
結論
Iceberg は最新のデータレイクに必要な機能をいくつか提供しています。少し注意し、計画を立て、Iceberg のアーキテクチャを少し理解すれば、Iceberg が提供する素晴らしい機能を最大限に活用することができます。
また、ウェビナーで Apache Iceberg の詳細をご覧いただき、デモで最新の機能をご確認ください。