by Zoltán Borók-Nagy, Ayush Saxena, Tamas Mate, and Simhadri Govindappa
この記事は、2023/04/03に公開された「Open Data Lakehouse powered by Iceberg for all your Data Warehouse needs」の翻訳です。
Cloudera Data Platform (CDP)上でのApache Iceberg一般提供を発表して以来、お客様がIceberg上で分析ワークロードをテストしているのを見るのは、大変嬉しいことです。そんな中で、Cloudera Data Warehousing (CDW)、Cloudera Data Engineering (CDE)、Cloudera Machine Learning (CML)、Cloudera Data Flow (CDF)、Cloudera Stream Processing (CSP) など、CDP の主要なデータサービスが Apache Iceberg テーブルフォーマットとどのように統合されるのか、また、どのように始めるのが最も簡単なのかについての詳細を共有してほしいという要望をいくつか受けています。このブログ記事では、Cloudera が Cloudera Data Warehouse 内で Apache Hive や Apache Impala を含む主要な計算エンジンを Icebergとどのように統合するかを詳しく紹介します。他のデータサービスについては、別途ご紹介できればと思います。
Icebergの基本
Icebergは大規模な分析ワークロードのために設計されたオープンなテーブルフォーマットです。Icebergの紹介で説明されているように、スキーマの進化、隠しパーティション化、パーティションレイアウトの進化、タイムトラベルをサポートしています。テーブルが変更されるたびにIcebergスナップショットが作成されるため、同時実行の問題を解決し、毎回安定したテーブル状態をスキャンすることができます。
Apache Icebergプロジェクトは、Javaライブラリの形で仕様の実装も開発しています。このライブラリは、Impala、Hive、Sparkなどの実行エンジンに統合されています。このブログ記事で紹介したい新しい機能は、Iceberg V2フォーマット(バージョン2)です。Icebergテーブル仕様書に記載の通り、V1フォーマットは大規模な分析データテーブルをサポートすることを目的としていましたが、V2は行レベルの削除と更新を追加することを目的としています。
もう少し詳しく説明すると、Iceberg V1ではテーブルの作成、更新、削除、挿入のサポートが追加されました。テーブルのメタデータはデータファイルの隣にあるメタデータディレクトリに保存され、複数のエンジンが同じテーブルを同時に使用できるようになったのです。
Iceberg V2
Iceberg V2では、データファイルを書き換えることなく行レベルの変更を行うことが可能です。具体的には、削除されたレコードに関する情報をいわゆる「削除ファイル」に格納することで可能になります。私たちは、クエリのパフォーマンスが最も高いとされる位置削除ファイルの使用を選択しました。これらのファイルには、削除されたレコードのファイルパスと位置が格納されています。クエリエンジンはクエリ実行中に、同じスナップショットに属するデータファイルと削除ファイルの両方をスキャンし、それらをマージ、つまり、出力から削除された行を取り除きます。
行の値の更新は、単一のトランザクションでDELETEとINSERTの操作を行うことで可能になります。
テーブルの圧縮(Compaction)は、変更/削除を実際のデータファイルにマージし、読み込みのパフォーマンスを向上させます。テーブルを圧縮するには、CDE Sparkを使用します。
デフォルトでは、HiveとImpalaはIceberg V1テーブルを作成します。V2テーブルを作成するには、テーブルプロパティ「format-version」を「2」に設定する必要があります。既存のIceberg V1テーブルの場合は、テーブルプロパティ「format-version」を「2」に設定するだけでV2テーブルにアップグレードできます。HiveとImpalaは、Icebergの両フォーマットバージョンと互換性があります。つまり、ユーザーは古い V1 テーブルを引き続き使用できます。V2 テーブルには、より多くの機能が追加されているのです。
ユースケース
EU一般データ保護規則 (GDPR) やカリフォルニア州消費者プライバシー法 (CCPA) などの規制を遵守するためには、データベースは顧客の要求に応じて個人データを削除しなければなりません。削除ファイルを使えば、特定の人に属するレコードに簡単にマークを付けることが可能です。そして、定期的な圧縮ジョブによって、削除されたレコードを物理的に消去することができるのです。
もうひとつの簡単なユースケースは、間違ったデータを修正したり、古い値を更新したりするために、既存のレコードを修正する必要がある場合です。
更新と削除の方法
現在のところ、行レベルの修正ができるのはHiveのみです。Impalaは更新されたテーブルを読み込むことができ、Iceberg V2のテーブルにデータをINSERTすることも可能です。
単一の顧客に属するすべてのデータを削除する:
DELETE FROM ice_tbl WHERE user_id = 1234;
特定のレコードのカラム値を更新する:
UPDATE ice_tbl SET col_v = col_v + 1 WHERE id = 4321;
MERGE INTO文を使用して、ステージングテーブルに基づいてIcebergテーブルを更新する:
MERGE INTO customer USING (SELECT * FROM new_customer_stage) sub ON sub.id = customer.id WHEN MATCHED THEN UPDATE SET name = sub.name, state = sub.new_state WHEN NOT MATCHED THEN INSERT VALUES (sub.id, sub.name, sub.state);
Icebergを使わない場合
IcebergテーブルはアトミックなDELETEとUPDATE操作を特徴とし、従来のRDBMSシステムと似ています。しかし、高頻度のトランザクションを扱うようには設計されていないため、OLTPワークロードには適していないことに注意する必要があります。その代わり、Icebergは大規模で頻繁に変更されないデータセットの管理を目的としているのです。
非常に大きなデータセットや頻繁な更新を扱えるソリューションを探しているのであれば、Apache Kuduを使うことをお勧めします。
CDWの基本
Cloudera Data Warehouse (CDW) Data Serviceは、動的にスケールし、独立してアップグレード可能な、高いパフォーマンスを持つ独立したセルフサービスのデータウェアハウスをクラウド上に作成するための、Kubernetesベースのアプリケーションです。CDWは、オープンスタンダード、オープンファイルフォーマット、オープンテーブルフォーマット、標準APIにより、合理化されたアプリケーション開発をサポートします。CDWは、Apache Iceberg、Apache Impala、Apache Hiveを活用し、幅広いカバレッジを提供することで、各ワークロードに最適化された機能セットを実現します。
CDWは、コンピュート (仮想ウェアハウス) とメタデータ (DBカタログ) を独立したKubernetesポッドで実行することで分離しています。Hive LLAPまたはImpala Virtual Warehouses形式のコンピュートは、オンデマンドでプロビジョニングされ、クエリ負荷に基づいてオートスケールされます。アイドル時にはリソースが無効化されるため、クラウドコストを削減し、高い並行性、HA、クエリ隔離により一貫した迅速な結果を提供することが可能です。これにより、データレイク全体のあらゆるエンタープライズデータについて、データ探索、ETL、分析的インサイトの導出が簡素化されます。
また、CDWはマルチテナントを安全かつ管理しやすくすることで、運用を効率化します。CDWでは、仮想ウェアハウスとデータベースカタログを独立してアップグレード可能です。テナントの分離により、CDWは互いに干渉しないワークロードを処理できるため、クラウドコストをコントロールしながら、誰もがレポートのタイムラインを守ることができるのです。
使用方法
以下のセクションでは、Iceberg V2テーブルの作成方法とその操作方法について、いくつかの例を紹介します。データの挿入、スキーマやパーティションレイアウトの変更、行の削除/更新、タイムトラベル、スナップショット管理などを行う方法を見てみましょう。
Hive:
Iceberg V2 テーブルの作成
Hive Iceberg V2 テーブルは、テーブルプロパティで format-version を 2 に指定することで作成できます。
例:
CREATE Iceberg V2 テーブル:
CREATE EXTERNAL TABLE TBL_ICEBERG_PART( ID INT, NAME STRING ) PARTITIONED BY (DEPT STRING) STORED BY ICEBERG STORED AS PARQUET TBLPROPERTIES( 'FORMAT-VERSION' = '2' ) ;
CREATE TABLE AS SELECT (CTAS):
CREATE EXTERNAL TABLE CTAS_ICEBERG_SOURCE STORED BY ICEBERG AS SELECT * FROM TBL_ICEBERG_PART ;
CREATE TABLE LIKE
(他のテーブルをベースとした空のテーブルを作成する)
CREATE EXTERNAL TABLE ICEBERG_CTLT_TARGET LIKE ICEBERG_CTLT_SOURCE STORED BY ICEBERG ;
データの取り込み
Iceberg V2 テーブルへのデータは、通常の Hive テーブルと同様に挿入できます。
例:
INSERT INTO:
INSERT INTO TABLE TBL_ICEBERG_PART VALUES (1,'ONE','MATH'), (2, 'ONE','PHYSICS'), (3,'ONE','CHEMISTRY'), (4,'TWO','MATH'), (5, 'TWO','PHYSICS'), (6,'TWO','CHEMISTRY');
INSERT OVERWRITE:
INSERT OVERWRITE TABLE CTLT_ICEBERG_SOURCE SELECT * FROM TBL_ICEBERG_PART ;
MERGE:
MERGE INTO TBL_ICEBERG_PART USING TBL_ICEBERG_PART_2 ON TBL_ICEBERG_PART.ID = TBL_ICEBERG_PART_2.ID WHEN NOT MATCHED THEN INSERT VALUES( TBL_ICEBERG_PART_2.ID, TBL_ICEBERG_PART_2.NAME, TBL_ICEBERG_PART_2.DEPT ) ;
Querying Iceberg tables
SELECT ステートメント
Hive は、Iceberg V2 テーブルのベクトル化された読み取りと非ベクトル化された読み取りの両方をサポートしています。通常、ベクトル化は次の構成を使用して有効にできます。
- set hive.llap.io.memory.mode=cache;
- set hive.llap.io.enabled=true;
- set hive.vectorized.execution.enabled=true
SELECT COUNT(*) FROM TBL_ICEBERG_PART;
タイムトラベルに関するステートメント
Hive を使用すると、特定のスナップショットバージョンのテーブルデータをクエリできます。
SELECT * FROM TBL_ICEBERG_PART FOR SYSTEM_VERSION AS OF 7521248990126549311 ;
スナップショット管理
Hive では、スナップショット管理に関する次のようないくつかの操作が可能です。
期限切れスナップショット:
ALTER TABLE TBL_ICEBERG_PART EXECUTE EXPIRE_SNAPSHOTS('2021-12-09 05:39:18.689000000');
現状のスナップショットを設定:
ALTER TABLE TBL_ICEBERG_PART EXECUTE SET_CURRENT_SNAPSHOT( 7521248990126549311 ) ;
スナップショットをロールバックする:
ALTER TABLE TBL_ICEBERG_PART EXECUTE ROLLBACK( 3088747670581784990 );
Iceberg テーブルの変更
スキーマの進化:
ALTER TABLE … ADD COLUMNS (...); (Add a column) ALTER TABLE … REPLACE COLUMNS (...);(Drop column by using REPLACE COLUMN to remove the old column) ALTER TABLE … CHANGE COLUMN … AFTER …; (Reorder columns)
パーティションの進化:
ALTER TABLE TBL_ICEBERG_PART SET PARTITION SPEC (NAME);
マテリアライズドビュー
マテリアライズドビューの作成:
CREATE MATERIALIZED VIEW MAT_ICEBERG AS SELECT ID, NAME FROM TBL_ICEBERG_PART ;
マテリアライズドビューの再構築:
ALTER MATERIALIZED VIEW MAT_ICEBERG REBUILD;
マテリアライズドビューのクエリ:
SELECT * FROM MAT_ICEBERG;
Impala
Apache Impalaはオープンソースの分散型、大規模並列SQLクエリエンジンであり、バックエンドのエグゼキューターはC++で書かれ、フロントエンド (アナライザー、プランナー) はJavaで書かれています。ImpalaはIceberg Javaライブラリを使い、クエリ分析とプランニングの際にIcebergテーブルの情報を取得します。一方、クエリの実行は高性能なC++エグゼキューターが担当します。つまり、Icebergテーブルに対するクエリは非常に高速なのです。
ImpalaはIcebergテーブル上で以下のステートメントをサポートしています。
テーブルの作成
Iceberg V2 テーブルのCREATE:
CREATE TABLE ice_t( id INT, name STRING, dept STRING ) PARTITIONED BY SPEC( bucket(19, id), dept ) STORED BY ICEBERG TBLPROPERTIES( 'format-version' = '2' ) ;
CREATE TABLE AS SELECT (CTAS):
CREATE TABLE ice_ctas PARTITIONED BY SPEC(truncate(1000, id)) STORED BY ICEBERG TBLPROPERTIES( 'format-version' = '2' ) AS SELECT id, int_col, string_col FROM source_table ;
CREATE TABLE LIKE:
(他のテーブルをベースとした空のテーブルを作成する)
CREATE TABLE new_ice_tbl LIKE orig_ice_tbl ;
Iceberg テーブルのクエリ
Impala は、位置削除による V2 テーブルの読み取りをサポートしています。
SELECT ステートメント:
Impala は、他のテーブルでサポートされるすべての種類のクエリを Iceberg テーブルでもサポートします。 例えば、結合、集計、分析クエリなどがすべてサポートされています。
SELECT * FROM ice_t; SELECT count(*) FROM ice_t i LEFT OUTER JOIN other_t b ON ( i.id = other_t.fid ) WHERE i.col = 42 ;
タイムトラベルに関するステートメント:
テーブルの以前のスナップショット (有効期限が切れるまで) をクエリすることができます。
SELECT * FROM ice_t FOR SYSTEM_TIME AS OF '2022-01-04 10:00:00'; SELECT * FROM ice_t FOR SYSTEM_TIME AS OF now() - interval 5 days; SELECT * FROM ice_t FOR SYSTEM_VERSION AS OF 123456;
DESCRIBE HISTORY ステートメントを使用すると、テーブルの以前のスナップショットを確認できます。
DESCRIBE HISTORY ice_t FROM '2022-01-04 10:00:00'; DESCRIBE HISTORY ice_t FROM now() - interval 5 days; DESCRIBE HISTORY ice_t BETWEEN '2022-01-04 10:00:00' AND '2022-01-05 10:00:00';
Iceberg テーブルにデータを挿入する
INSERT ステートメントは、V1 テーブルと V2 テーブルの両方で機能します。
INSERT INTO: INSERT INTO ice_t VALUES (1, 2); INSERT INTO ice_t SELECT col_a, col_b FROM other_t; INSERT OVERWRITE: INSERT OVERWRITE ice_t VALUES (1, 2); INSERT OVERWRITE ice_t SELECT col_a, col_b FROM other_t;
データを Iceberg テーブルにロードする
LOAD DATA INPATH: LOAD DATA INPATH '/tmp/some_db/parquet_files/' INTO TABLE iceberg_tbl;
Iceberg テーブルを変更する
スキーマの進化:
ALTER TABLE ... RENAME TO ... (renames the table) ALTER TABLE ... CHANGE COLUMN ... (change name and type of a column) ALTER TABLE ... ADD COLUMNS ... (adds columns to the end of the table) ALTER TABLE ... DROP COLUMN ...
パーティションの進化:
ALTER TABLE ice_p SET PARTITION SPEC (VOID(i), VOID(d), TRUNCATE(3, s), HOUR(t), i);
スナップショット管理
期限切れの古いスナップショット:
ALTER TABLE ice_tbl EXECUTE expire_snapshots('2022-01-04 10:00:00'); ALTER TABLE ice_tbl EXECUTE expire_snapshots(now() - interval 5 days);
Impala の DELETE ステートメントと UPDATE ステートメントは、今後のリリースで導入される予定です。上で述べたように、Impala は独自の C++ 実装を使用して Iceberg テーブルを処理しています。 これにより、他のエンジンと比較してパフォーマンスが大幅に向上します。
今後の取り組み(2023年4月時点)
Iceberg v2のサポートは先進的で信頼できるものであり、私たちは革新への努力を続けています。CDWのリリースごとに、Iceberg関連の新機能が追加される予定です。ご意見、ご感想がありましたらコメント欄でも受け付けております。
まとめ
Icebergは、新しく登場した非常に興味深いテーブルフォーマットです。毎月新機能が追加され、急速に開発が進んでいます。Cloudera Data Warehouseは最新リリースでIcebergの最新フォーマットバージョンのサポートを追加しました。ユーザーはHiveとImpalaの仮想ウェアハウスを実行し、SQLステートメントを使ってIcebergテーブルとやり取りすることが可能です。これらのエンジンも急速に進化しており、リリースごとに新機能の提供や最適化が施されています。今後リリースされる新機能や技術的な詳細についてもご紹介していきたいと思います。
さらに詳しく知る:
- ウェビナー「データの統合する:AIとアナリティクスをオープンなレイクハウスで」(英語) を見る
- 「データレイクハウスの未来:オープン性」を読む
- ミートアップ「Apache Iceberg:水面下を探る」(英語) を見る
ご興味のある方は、Cloudera Data Warehouse (CDW) の60日間トライアルに申し込むか、CDPのテストドライブにお申し込みください。CDPにおけるApache Icebergについての詳細は、アカウントチームにお知らせいただくか、直接お問い合わせください。