Delta Lake とは何か

はじめに

2022年6月に開催されたDATA+AI SUMMITで、今までDatabricksでしか使えなかった機能を含む全てのDelta Lakeの機能がオープンソース (OSS) 化されるという発表があったのは記憶に新しい。そこで今回は「Delta Lakeとは何か」についてまとめる。

環境情報

Databricks Runtime Version: 10.4 LTS (includes Apache Spark 3.2.1, Scala 2.12)

www.databricks.com

ちなみに気づいたらDelta Lakeのマスコットがスポンジボブに出てきそうな可愛くないカニになっていたのですが、何があったのでしょうか…

Delta Lakeのマスコット

Delta Lake (デルタレイク) とは

オープンソースのデータフォーマットであるDeltaは、Parquetファイルとトランザクションログ(Delta Log)から成り立っている。Delta形式のフォーマットを用いて構築されたテーブルは、Deltaテーブルと呼ばれ、Deltaテーブルのデータストアを、Delta Lake (デルタレイク) と呼ぶ。多くの場合、データの実体はAWS S3などのオブジェクトストレージ上 (データレイク) に保管されている。

また2022年05月時点で月間ダウンロード数は700万を超えており、ストレージフォーマットとして広く認知されている。

Delta Lakeの月間ダウンロード数
*参照元: Day 1 Opening Keynote - DATA+AI SUMMIT 2022

*データレイクについてはこちらの記事をご参照ください。
データレイクについて理解する

Delta Lake の実体

Delta Lakeの実体は、Parquetで保存されたデータファイルと全てのトランザクションを記録したトランザクションログ (Delta Log) から成り立っている。

Delta Lake の構造

ParquetファイルからDeltaテーブルを作成して、Deltaの構造を確認する。

(1) ParquetファイルからDeltaテーブルを作成する

source_path = 'dbfs:/databricks-datasets/samples/lending_club/parquet/'
df_parquet = spark.read.parquet(source_path)

# マネージドテーブルとして保存: pathの指定なし
df_parquet.write.format("delta").mode("append").saveAsTable("demo_delta_table")

*マネージドテーブルについてはこちらの記事をご参照ください。
Delta テーブルを作成する

(2) 作成したDeltaテーブルの格納先を表示する

%sql
DESCRIBE DETAIL demo_delta_table

→ dbfs:/user/hive/warehouse/...にDeltaテーブルの実体が格納されていることがわかる

Deltaテーブルの格納先のパス

(3) Deltaテーブルに格納されているファイルを確認する

display(dbutils.fs.ls('dbfs:/user/hive/warehouse/delta_demo_ktksq.db/demo_delta_table'))

→ Deltaの実体がParquetで保存されたデータファイル_delta_logディレクトリであることがわかる

Deltaの構造

*_delta_logディレクトリの内部構造についてはこちらの記事をご参照ください。
Delta Logについて理解する

Parquet と Delta の相違点

ここからは、ParquetとDeltaのファイル構造の違いについて説明する。

Parquetとは何か

Parquetとは主にHadoopのエコシステムで使われるカラムナ(列指向)フォーマットのこと。カラムナフォーマットは圧縮性とパフォーマンスに優れているため、分析的なクエリに適している。
・高いデータの利用効率
CSVのような行指向のフォーマットと比べて、不必要なカラムを読まずに済む。加えて、内部で行方向に一定の単位でメタ情報を保持することによりスキャン範囲を限定できる水平パーティショニングの機能も持っているため、効率的にクエリを実行することができる。
・高い圧縮効率
カラムナフォーマットは、同じ属性のデータが集まる列方向にデータを圧縮することでデータの圧縮効率を高めることができる。

Parquetにおける水平分割の仕組み

Parquetの構造

Parquetの構造を確認すると、_delta_logディレクトリが存在していないことがわかる。その代わりに_started_< id >と_committed_< id >で始まるメタデータファイルがParquetファイルと一緒に保存されている。これらのファイルは、DBIOトランザクションプロトコルによって作成されており、一般的に直接変更することは推奨されていない。
*DBIOトランザクションプロトコル: Databricksが実装した独自のトランザクションコミットのこと。Databricks環境以外でSparkアプリケーションを動作させる場合は、Sparkのデフォルトのコミットプロトコルが用いられる。

Parquetの構造: SparkアプリケーションをDatabricks環境で動作させた場合

Parquet と Delta の違い

Parquet と Delta の大きな違いは、Delta トランザクションログのセントラルレポジトリである_delta_logディレクトリの有無である。トランザクションログ (Delta Log) が存在することで、Delta でのACIDトランザクション、スケーラブルなメタデータ処理、タイムトラベルなどを可能にしている。

Delta Lake が生まれた経緯: データレイクと Delta Lake の違い

データ分析の重要性の高まりとともに、安価なストレージでさまざまな形式の生データを扱うことができるデータレイクが登場した。データベースからデータレイクへの移行はコンピュートとストレージを独立して拡張できるという大きな利点があった。しかしデータベースからデータレイクへ移行することで、データの信頼性や品質が失われ、メタデータ管理の欠如によるデータスワンプ問題が発生した。
*Data Swamp (データスワンプ): メタデータを管理することなく、データをただ貯めることでデータ情報がブラックボックス化してしまっている状態のこと

データレイクのメリット

(1) コンピュートとストレージの分離
(2) 無限のストレージ容量
(3) 安価なストレージコスト
(4) あらゆる種類の生データを保存 (e.g. 非構造データ、構造化データ、ビデオ、オーディオ、テキスト)

データレイクの課題 *Parquetで構築した場合

(1) ACIDトランザクションが担保されていないため、部分的に完了したトランザクションによりデータが破損した状態で残り、複雑なリカバリが必要になる 
(2) データ品質が担保できないため、一貫性がなく使い物にならないデータが作成される
(3) 一貫性/独立性がないため、データの追加とデータの読込み、バッチとストリーミングを同時に実行させることが困難
(4) 多くの小さいサイズのファイルが存在するため、ファイルI/Oに時間がかかる
(5) クラウドストレージのスループットが低い (S3は20~50MB/scoreに対して、ローカルのNVMe SSDは300MB/score)

そのためデータレイクにデータの信頼性と品質、そしてパフォーマンスを付加するために、Delta Lakeが開発された。
Apache Sparkのオリジナルクリエイターによって開発されたDelta Lakeは、オンライン分析処理 (OLAPスタイル) のために、データベースのトランザクションの信頼性データレイクの水平スケーラビリティという両者の長所を組み合わせるように設計されている。

Delta Lake の特徴

Data Lake (データレイク) と比べて、Delta Lake (デルタレイク) には大きく3つのメリットがある。
・信頼性の向上
・データ品質の向上
・パフォーマンスの向上

本セクションではこのメリットを支えるDelta Lakeの機能について説明する。

Delta Lakeの3つのメリット

ACIDトランザクションの担保

Delta Lakeはデータレイク上の全ての操作をトランザクション化することで、データの信頼性整合性を最大限に高めている。トランザクションとは、以下で説明する4つの特性 (ACID特性) を備えた不可分な一連の処理単位のことを言う。またACID特性を備えたデータベース操作のことをACIDトランザクションと呼んでいる。

Delta Lakeはデータレイク上でのACIDトランザクションを可能にすることで、部分的にしか操作を完了できなかった場合でも、データの一貫性を保てるように設計されている。ACIDトランザクションでない場合は、処理の途中で障害が発生すると、データは一部分しか保存されない可能性がある。
*信頼性: 意図する処理と結果が一貫している特性のこと

Atomicity (原子性):
トランザクションは完全に終了 (COMMIT) するか、もしくは実行前の状態に戻す (ROLLBACK) かのどちらかの状態をとる必要がある。
トランザクション内の各ステートメント (データの読取り、書込み、更新、削除) は1つの実行単位として扱われ、ステートメント全体が実行されるか、あるいは何も実行されないかのどちらかの状態にしかならない。この特性により、たとえばストリーミングデータソースが途中で故障した場合でも、データの損失や破損が発生するのを防ぐことができる。

Consistency (一貫性):
トランザクション処理では実行前後でデータの整合性を持ち、一貫したデータを確保する必要がある。
トランザクションの一貫性により、データの破損やエラーがテーブルの整合性に意図しない結果をもたらすことがないようにしなければならない。

Isolation (独立性):
複数のトランザクションがそれぞれ独立して実行できる必要がある。
つまり複数のユーザが同じテーブルから一度に読み書きする場合、各トランザクションを分離することで、同時進行するトランザクションが互いに干渉したり影響を与えないようにしなければならない。

Durability (耐久性):
正常に実行されたトランザクションによって行われたデータの変更は、システム障害が発生した場合でも確実に保存される必要がある。

スケーラブルなメタデータ管理

Delta Lakeは、Delta Logと呼ばれるメタデータとデータファイルを一緒にデータレイク上に格納することでスケーラブルなメタデータ管理を可能にしている。
Delta Lakeは、他の多くのストレージフォーマットやクエリエンジンとは異なり、すべてのメタデータ処理Sparkを利用してスケールアウトするため、メタデータを効率的に処理することができる。
メタストアにすべてのメタデータが格納されていると、データが頻繁に変化する場合にはメタストアに保存する情報量そのものがビッグデータとなり、スケーラブルなメタデータ処理に課題があった。そのためDelta Lakeでは、メタストアに基本的なメタデータの詳細を残しつつ、重要な詳細のほとんどをDelta LogにParquet形式で保存している。

*Delta Logについてはこちらの記事をご参照ください。
Delta Logについて理解する

バッチとストリーミングワークロードの統合

Delta Lakeのテーブルは、バッチ処理ストリーミングのソースおよびシンクの両方で動作する機能を備えており、readStreamwriteStreamを通して、Spark Structured Streamingと密接に統合されている。
Delta Lakeは、以下のようなストリーミング時の課題を解決する:

低レイテンシーのインジェストで生成された小さなファイルの統合
複数ストリーム (または同時実行のバッチジョブ) による "Exactly Once"処理の実現
ファイルをストリームのソースとして使用する際に、新規作成ファイルを効率的に検出

タイムトラベル (バージョン管理)

タイムトラベルとは、Delta Lakeがデータレイクに保存されているデータのバージョン管理を自動的に行うことで、簡単に以前のバージョンのデータにアクセスし、元に戻すことができる機能である。
Deltaトランザクションログ (Delta Log) が、データに加えられたすべての変更に関する詳細を記録し、変更の完全な監査証跡を保持していることで、タイムトラベルを可能にしている。タイムトラベルは、ETLパイプラインやデータ品質の問題のトラブルシュート、または偶発的に発生したデータパイプラインの修正や機械学習時のモデルや実験の再現性を担保するのに用いられている。

*タイムトラベルで過去データを参照する方法についてはこちらの記事をご参照ください。
タイムトラベルで過去データを参照する

CONSTRAINT句のサポート

データの品質と整合性を自動的に検証するために、Delta Lakeは標準SQLのCONSTRAINT句をサポートしている。もし制約に違反した場合は、InvariantViolationExceptionをスローし、新しいデータを追加することはできない。
サポートしているCONSTRAINT句は以下の2種類である。

・NOT NULL制約: 特定の列の値にNULL値が入力されるのを禁止する
・CHECK制約: 条件を指定し、条件を満たさないデータの追加を禁止する

DML (データ操作言語) のフルサポート

多くの分散処理フレームワークでは、データレイクのアトミックなデータ変更操作をサポートしていない。
一方で、Delta LakeはMERGEUPDATEDELETE操作をサポートし、チェンジデータキャプチャ (CDC)、スローチェンジディメンション (SCD)、ストリーミングアップサートを含む複雑なユースケースを可能にしている。

*DML操作についてはこちらの記事をご参照ください。
Delta LakeでDML操作を行う

UPDATE

UPDATEオペレーションを使用することで、フィルタリング条件 (述語) にマッチする行を選択的に更新することができる。具体的には、述語に一致する各ファイルをメモリに読込み、関連する行を更新し、その結果を新しいデータファイルに書き出す。またDelta Lakeはデータスキッピングを用いてこのプロセスを高速化している。

DELETE

DELETEオペレーションを使用することで、フィルタリング条件 (述語) にマッチする行を選択的に削除することができる。 DELETEはUPDATEと同じように動作し、以下の2ステップでデータを削除している。
(1) 最初のスキャンで、述語の条件に一致する行を含むデータファイルを識別して抽出する
(2) 2回目のスキャンで、一致するデータファイルをメモリに読込み、関連する行を削除し、新しいデータファイルに書込む

MERGE

MERGEオペレーションを使用することで、UPDATEとINSERTを組合せた「UPSERT (アップサート) 」を実行することができる。 UPSERTの動作を理解するために、既存テーブル (ターゲットテーブル) と 新しいレコードと既存のレコードの更新が混在しているソーステーブルの例で考える。
(1) ソーステーブルのレコードが、ターゲットテーブルの既存レコードと一致する場合、該当のレコードを更新する
(2) 一致するレコードがない場合、新しいレコードを挿入する

Delta Lakeは2ステップでデータをMERGEしている。
(1) ターゲットテーブルとソーステーブルの間で内部結合 (Inner join) を行い、マッチするファイルをすべて選択する
(2) ターゲットテーブルとソーステーブルの選択されたファイル間で外部結合 (Outer join) を実行し、更新/削除/挿入されたデータを新しいデータファイルに書込む

柔軟なスキーマ管理

Delta Lakeでは、状況に応じで柔軟にスキーマ管理を行うことができる。
スキーマ エンフォースメントを用いることで、ユーザがミスやゴミデータで意図せずテーブルを汚してしまうことを防ぐことができる。一方で意図したスキーマの変更を行いたい場合は、スキーマ エボリューションを用いることで、自動的に追加することができる。

1. スキーマ エンフォースメント

スキーマエンフォースメント (スキーマ強制) とは、スキーマバリデーションとも呼ばれ、既存テーブルのスキーマと一致しないデータの挿入を自動的に防止することで、データの品質を確保することができる。
Delta Lakeは、書込み時にスキーマ検証を行う。つまり、あるテーブルへの新しい書込みはすべて、書込み時にターゲットテーブルのスキーマと互換性があるかどうかチェックされる。スキーマに互換性がない場合は、Delta Lakeはトランザクションを完全にキャンセルし、ユーザに不一致を知らせるために例外を発生させる (データは書き込まれない) 。
スキーマエンフォースメントは、メダリオンアーキテクチャのゴールドテーブルのように強い型付けが必要な場合に使用され、データ品質を担保している。

Delta Lakeは以下のルールに従い、テーブルへの書込みが互換性があるかどうかを判断する。
ターゲットテーブルのスキーマに存在しない追加のカラムを含むことはできない。
ターゲットテーブルのカラムのデータ型と異なるデータ型を含むことはできない。
大文字と小文字が異なるだけの列名を含むことはできない。(e.g. 同じテーブルに 'Foo' と 'foo' のようなカラムを定義することはできない)

2. スキーマ エボリューション

スキーマエボリューション (スキーマ拡張) とは、時間と共に変化するデータに対応するために、ユーザが簡単にテーブルの現在のスキーマを変更できるようにする機能である。一般的に、追加や上書きを行う際に使用され、1つまたは複数の新しいカラムを含めるためにスキーマを自動的に適応させる。

上述したように、デフォルトではDelta Lakeは不正なスキーマを自動的に防止する。
一方で必要に応じて、テーブルスキーマを拡張させる場合には以下の2通りの方法がある。
(1) .write または .writeStreamコマンドに .option('mergeSchema', 'true') を追加する
(2) Sparkの設定に spark.databricks.delta.schema.autoMerge = True を追加する (Sparkセッション全体に対して有効化)

*スキーマ エボリューションの方法についてはこちらの記事をご参照ください。
スキーマを更新する

ストレージレイヤーの最適化

Delta Lakeは、ストレージ内のデータのレイアウトを最適化することで、クエリパフォーマンスを向上させている。例えば、下図のようにデータの大きさに偏りがあったり、サイズの小さいファイルが多く存在する場合は、クエリパフォーマンスが低下する。そのためDelta Lakeには、ストレージレイヤーを最適化するためのさまざまな機能が存在している。

ストレージレイヤー最適化によるクエリパフォーマンスの向上

1. Data Compaction (Bin-packing)

Bin-packing (ビンパッキング) と呼ばれるデータのコンパクト化をすることで、多くの小さなファイルをより少ない大きなファイルにまとめ、ファイル数を削減している。 通常この処理にはコストがかかるため、ピーク時以外に実行するか、メインパイプラインとは別のクラスタで実行して、メインジョブの不要な遅延を回避するのを推奨している。また本処理はべき等であるため、新しいデータが到着していなければ、より頻繁に実行しても追加計算が発生することはない。
OPTIMIZEコマンドを実行することで、データをコンパクト化することができる。

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "events")
# deltaTable = DeltaTable.forPath(spark, "/data/events") # pathを指定

deltaTable.optimize().executeCompaction()
# deltaTable.optimize().where("date='2021-11-18'").executeCompaction() # where句で条件を指定

OPTIMIZE処理は、テーブル、ファイルパス、またはwhere節を指定することでテーブルのより小さなサブセットに対して実行することができる。 デフォルトのターゲット・ファイルサイズは1GBとなっている。optimize.minFileSizeoptimize.maxFileSizeの2つのパラメータにバイト単位で値を指定することにより、デフォルトのファイルサイズを変更することができる。 またOPTIMIZEコマンドを実行するために並列実行されるスレッドの最大数のデフォルト値は、optimize.maxThreadsを調整することによって増やすことができる。

2. Auto-Optimize

Auto-Optimizeは、Deltaテーブルへの書込み時自動的にファイルサイズの最適化を行うオプションである。
Auto-Optimizeは、以下のシナリオで特に推奨されている。
数分オーダーのレイテンシーを許容するストリーミングの場合
MERGE INTOをDelta Lake への書込み方法として使用する場合
CREATE TABLE AS SELECTまたはINSERT INTOを用いる場合

Auto-Optimizeは、最適化された書込みと自動コンパクションという2つの機能から構成されている。

(1) 最適化された書込み
実際のデータに基づいてApache Sparkのパーティションサイズを動的に最適化し、各テーブルパーティションに対して128MBのファイルを書き出す。
*データセットの特性によって多少のサイズ変動が存在する。

(2) 自動コンパクション
データの書込みの後、ファイルをさらにコンパクト化できるかどうかをチェックし、小さなファイルが最も多いパーティションに対して、ファイルをさらにコンパクト化するためにOPTIMIZEジョブを実行する。
*標準のOPTIMIZEで使用する1GBファイルサイズではなく128MBファイルサイズで実行されることに注意する。

・新規テーブルに適用する場合

set spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite = true;
set spark.databricks.delta.properties.defaults.autoOptimize.autoCompact = true;

・既存テーブルに適用する場合

ALTER TABLE [table_name | delta.`<table-path>`] SET TBLPROPERTIES (delta.autoOptimize.optimizeWrite = true, delta.autoOptimize.autoCompact = true)

・Spark Configに設定する場合

spark.databricks.delta.optimizeWrite.enabled
spark.databricks.delta.autoCompact.enabled

3. Data skipping (データスキッピング)

各カラムの最小値と最大値のようなファイルレベルの統計値をデータを書き込む際に自動的に取集することで、不要なファイルI/Oを回避し高速なクエリを提供することができる。Delta Lakeは、ファイルレベルの統計情報をDelta Logに一元化し、クエリ時にフィルタ値が各ファイルの最小値以下または最大値以上の物理ファイルを完全にスキップできるようになった。そのため実際に開いて読込む必要のあるファイルの総数を大幅に削減することができる。
データスキッピング自体はユーザ側で設定する必要はなく、デフォルトで有効になっている。ただし効果はデータのレイアウトに依存するため、Z-Orderを一緒に適用することを推奨している。

文字列やバイナリのような長い値を含む列の統計情報を収集することは、高価な処理となるため避ける必要がある。テーブル プロパティとしてdelta.dataSkippingNumIndexedColsを設定すると、不必要な列の統計情報が収集されるのを防ぐことができる。このプロパティは、テーブルのスキーマにおける列の位置インデックスを示しているため、delta.dataSkippingNumIndexedColsより小さい位置インデックスを持つ列は統計が収集される。つまり長い値を含む列の統計情報の収集を避けるには2種類の方法が使用できる。
*ネストされた列内の各フィールドは、個別の列として見なされる。

  1. テーブルのスキーマで長い値の列がこのインデックスの後になるようにdelta.dataSkippingNumIndexedColsプロパティを設定する
  2. ALTER TABLE ALTER COLUMN を使用して長い値を含む列をdelta.dataSkippingNumIndexedColsプロパティを超えるインデックス位置に移動させる

データスキッピングの仕組み (イメージ)
*参照元: Processing Petabytes of Data in Seconds with Databricks Delta - The Databricks Blog

*データスキッピングに使用される統計情報についてはこちらの記事をご参照ください。
メタデータについて理解する

4. Z-Order

Z-Order (多次元クラスタリング) は、同じファイルセットの中に関連する情報をまとめて配置するファイルレイアウト技術で、データスキッピングと共に使用することで読込みデータ量が大幅に削減される。 Z-Orderに使用するカラムには、WHERE句に頻繁に登場する、またはクエリのJOIN条件で使用される高カーディナリティ (多数の異なる値を持つ) のフィールドを指定する。一般的には、ユーザID、IPアドレス、プロダクトID、電子メールアドレス、電話番号などの識別子フィールドを指定する場合が多い。ファイルレイアウトでは、データがクラスタ化され、最小値と最大値の範囲が狭く、(理想的には)重複していない場合、データスキッピングの効果を最大化することができる。

ZORDER BYにカラムを指定すると、Z-Orderによるファイルレイアウトの最適化が実施される。Z-Orderにはカンマ区切りのリストとして複数のカラムを指定することができるが、カラムが増えるごとに局所性の効果は低下する (カラム数は3~4個が適当)。データスキッピングには列の統計情報が必要なため、統計が収集されていない列に対するZ-Orderは効果がない。

OPTIMIZE events
WHERE date >= current_timestamp() - INTERVAL 1 day
ZORDER BY (eventType)

Z-Orderにかかる時間は、複数回実行しても減少することは保証されない。一方で前回の実行以降に新規データが追加されていなければ、ノーオペレーションとなる。 Z-Orderは、タプル数に関して均等にバランスの取れたデータファイルを作成することを目的としているが、ディスク上のデータサイズに関しては必ずしも均等にはならない。

Z-Orderを適用したファイルレイアウト (イメージ)

Z-Orderによるクエリの高速化を理解するために、各パーツファイルを読込んで、X = 2 またはY = 3を検索する場合を考える。その結果、Z-Orderによるファイルレイアウトの方が効率的にデータの読込みを行なっていることがわかる。

Z-Orderによるクエリの高速化の仕組み (イメージ)
*参照元: Lakehouse with Delta Lake Deep Dive

5. Delta Cache (デルタキャッシュ)

クラウドストレージのデータコピーをワーカーノードのSSDディスク上のローカルに作成することで、データの読込みを高速化する。 データはリモートロケーションからファイルを取得する必要があるたびに、自動的にキャッシュされる。Parquet / Delta形式がサポートされている。同じデータを連続して読込む場合はローカルで実行されるため、読込み速度が大幅に向上する。

ディスクキャッシュには、リモートデータのローカルコピーが含まれ、さまざまなクエリのパフォーマンスを向上させることができる。一方で任意のサブクエリの結果を保存することはできない。そのため任意のサブクエリデータの結果や、Parquet以外の形式 (CSV、JSON、ORCなど) で保存されたデータを保存する場合は、Sparkキャッシュを用いる。またSparkキャッシュとは異なり、ディスクキャッシュはシステムメモリを使用しない。

ディスクキャッシュを使用する場合は、クラスタを構成するときにSSDボリュームを持つワーカータイプを選択する必要がある。
ディスクキャッシュ (Delta Cache) を有効化/無効化するには、以下を実行する。

spark.conf.set("spark.databricks.io.cache.enabled", "[true | false]")

ディスク使用量は、以下のプロパティを設定することで変更できる。
spark.databricks.io.cache.maxDiskUsage: キャッシュデータ用に確保されたノードごとのディスク容量 (bytes)
spark.databricks.io.cache.maxMetaDataCache: キャッシュされたメタデータのために確保されたノードごとのディスク容量 (bytes)
spark.databricks.io.cache.compression.enabled: キャッシュされたデータを圧縮形式で保存するかどうか

設定例

spark.databricks.io.cache.maxDiskUsage 50g
spark.databricks.io.cache.maxMetaDataCache 1g
spark.databricks.io.cache.compression.enabled false

Delta Lake を実現する仕組み

アトミック操作を実現する仕組み

Delta Lakeがアトミック操作を実現するための仕組みをこちらにまとめました。
記事が3万字を超えたので、別記事に切り分けましたが、
Delta Lakeを理解する上で重要なパートなので、ぜひ読んでみてください。

ktksq.hatenablog.com

ACIDトランザクションを実現する仕組み

Delta Lakeは楽観的同時実行制御 (Optimistic Concurrency Control) と多版同時実行制御 (MVCC: MultiVersion Concurrency Control) を組合わせてACIDトランザクションを実現している。

楽観的同時実行制御

楽観的同時実行制御とは、データに対してのロックは行わずに,更新対象のデータが他のトランザクションとコンフリクトがなかったことを確認してからコミットを行う方法である。具体的には、トランザクションがコミットされる時点で、対象のコミット操作の結果が変更されるようなデータへの変更が行われていないことを確認することで、データの整合性を保証している。

多版同時実行制御

多版同時実行制御(MVCC: MultiVersion Concurrency Control)によって、同時実行される2つのトランザクションのうち,先発のトランザクションがデータを更新し,コミットする前に,後発のトランザクションが同じデータを参照すると,更新前の値を返すことを可能にしている。そのため、複数のユーザから同時にデータベースの更新要求を受け取った場合でも、同時並行性と一貫性の両方を保証できる。多版同時実行制御では、更新中のデータに対する参照要求に対してトランザクション開始時点のデータのコピー (スナップショット) を提供する。また参照中のデータに対する更新要求に対しても、トランザクション開始前のデータのコピーを提供する。これにより、読込みのロックと書込みのロックがコンフリクトすることがなくなり、トランザクションの同時並行性が向上する。

コンフリクトが発生した場合の対応

2つ以上のコミットが同時に行われた場合は、以下の相互排他のルールに従って解決する。このプロトコルにより、Delta LakeはACIDの原則である分離を実現し、複数の同時書込み後のテーブルの状態が、各書込みが互いに分離して連続的に行われた場合と同じ結果となることを保証する。つまりトランザクション分離レベルがSERIALIZABLE (直列化可能) であることを保証している。
*デフォルトの分離レベルはWriteSerializable

(1) テーブルの読込み:
テーブルの最新バージョンを参照し、変更すべきファイルを特定
(2) 書込み:
新しいデータファイルを書込むことによって、すべての変更を段階的に実施
(3) 検証→コミット:
変更に対して同時にコミットされた他の変更とのコンフリクトの有無を確認
コンフリクトがない場合→すべての段階的な変更がコミットされ、書込み操作は成功
コンフリクトがある場合→同時変更例外が発生して書込み操作が失敗

コンフリクト時の挙動を理解するために、2人のユーザによって同時にコミットが行われた場合を考える。

Delta Lakeは、変更を加える前に読み込んだテーブルの開始バージョン (バージョン0) を記録する。 ユーザ1と2が同時にテーブルにデータを追加する場合、どちらか1人のユーザだけが000001.jsonのコミットを成功させることができる。ここではユーザ1のコミットが通り、ユーザ2のコミットは拒否されている。
ユーザー2のコミットが拒否された後 (エラーは発生しない) 、コミットしようとしている操作の結果を変更するようなデータへの変更が行われたかどうかを確認する。コンフリクトしない場合は、新しく更新されたテーブルに対してユーザ2のコミットを再試行し000002.jsonとして正常にコミットする。

楽観的にコンフリクトを解決する

まとめ

今回はDelta Lakeについてまとめた。

また今回記事を作成するにあたり、参考にしたテキストは無料でダウンロードできますので、ぜひダウンロードしてみてください。

www.databricks.com

参考

Cheatsheet: Delta Lake での操作

本記事のカバー範囲 (*意気込み)

はじめに

今回はDelta Lakeでの基本的な操作についてチートシートとしてまとめる。
*このドキュメントは、随時更新していく予定です。

環境情報

・Databricks Runtime Version: 10.4 LTS (includes Apache Spark 3.2.1, Scala 2.12)

前提: Delta Lake とは何か

そもそも「Delta Lakeとは何か」についてはこちらをご参照ください。

ktksq.hatenablog.com

Delta Logについてはこちらをご参照ください。

ktksq.hatenablog.com

既存テーブルをDeltaテーブルに変更する

*既存テーブルがParquetテーブルの場合のみ実行可能。

CONVERT TO DELTA <database_name.table_name>;

テーブルに書き込む

*format=deltaはデフォルト設定

# パスを指定しないで書込む場合: マネージドテーブル
df.write.format("delta").mode("append").saveAsTable("<database_name.table_name>") 

# パスを指定して書込む場合: アンマネージドテーブル
df.write.format("delta").mode("append").save("<path>")

1. パスを指定しないでテーブルに書込む場合:

データファイルとメタデータは/user/hive/warehouse配下に保存される (Hiveメタストアの場合)。
パスを指定しなかった場合マネージド テーブルと呼ばれ、DROP TABLEをした場合には、データファイルを含む全てのファイルが削除される。

例: マネージド テーブルの挙動 (パスを指定しなかった場合)

df_csv.write.format("delta").mode("overwrite").saveAsTable("managed_table")
DESCRIBE EXTENDED managed_table;

マネージドテーブルの格納先

DROP TABLE managed_table;

全てのファイルが削除されていることを確認

2. パスを指定してテーブルに書込む場合:

データファイルとメタデータは指定したパスの配下に保存される。
パスを指定した場合アンマネージド テーブルと呼ばれる。マネージド テーブルとは異なり、アンマネージド テーブルのファイルはテーブルをDROPしても削除されない
アンマネージド テーブルは、メタストア内のテーブルが既存データのスキーマ、パーティショニング、テーブルのプロパティを自動的に継承する機能を利用して、データをメタストアに「インポート」する。

例: アンマネージド テーブルの挙動 (パスを指定した場合)

raw_data_path = "dbfs:/user/ktksq/unmanaged/"
df_parquet.write.format("delta").mode("overwrite").option("path", raw_data_path).saveAsTable("unmanaged_table")
DESCRIBE EXTENDED unmanaged_table;

アンマネージドテーブルの格納先

DROP TABLE unmanaged_table;

全てのファイルが削除されないことを確認

3. マネージドテーブルとアンマネージドテーブルの違い:

マネージド テーブルはメタストアが管理する (パスの指定不可) のに対して、アンマネージド テーブルはメタストアで管理されない (パスの指定必須) 。
そのため両者のテーブルではDROP TABLE時の挙動が異なる。

  • マネージド テーブル: DROP時は全てのファイルが削除される 
  • アンマネージド テーブル: DROP時にファイルは削除されない

テーブルを読み込む*

df = spark.table("<database_name.table_name>")    

# df = spark.read.format("delta").load("<path>")  # パスを指定して読込む場合

値を削除(DELETE)する*

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, "<path>")
deltaTable.delete(f.col("<col_name>") < "<value>")

*DELETEは、最新バージョンのDeltaテーブルからデータを削除するが、古いバージョンが明示的にバキュームされるまで物理ストレージから削除されるわけではないことに注意する。

値を更新(UPDATE)する*

deltaTable = DeltaTable.forPath(spark, "<path>")
deltaTable.update(
  condition = f.col("gender") == "M",
  set = { "gender": lit("Male") }
)

*DELETEとUPDATEは、述語にパーティション・カラムを指定すると大幅に高速化できる。

値をマージ(MERGE)する*

(
  deltaTable.alias("t")
  .merge(
    sourceDF.alias("s"),
    "t.key = s.key")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .execute()
)

具体例

from delta.tables import *

deltaTablePeople = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
deltaTablePeopleUpdates = DeltaTable.forPath(spark, "/tmp/delta/people-10m-updates")

dfUpdates = deltaTablePeopleUpdates.toDF()

(
    deltaTablePeople.alias("people")
    .merge(dfUpdates.alias("updates"), "people.id = updates.id")
    .whenMatchedUpdate(
        set={
            "id": "updates.id",
            "firstName": "updates.firstName",
            "middleName": "updates.middleName",
            "lastName": "updates.lastName",
            "gender": "updates.gender",
            "birthDate": "updates.birthDate",
            "ssn": "updates.ssn",
            "salary": "updates.salary",
        }
    )
    .whenNotMatchedInsert(
        values={
            "id": "updates.id",
            "firstName": "updates.firstName",
            "middleName": "updates.middleName",
            "lastName": "updates.lastName",
            "gender": "updates.gender",
            "birthDate": "updates.birthDate",
            "ssn": "updates.ssn",
            "salary": "updates.salary",
        }
    )
    .execute()
)

変更履歴を表示する*

テーブルへの更新ごとに、操作やユーザーなどの実績情報を表示する。テーブルの履歴はデフォルトで30日間保持される。

DESCRIBE HISTORY table_name

テーブルのスキーマを更新する*

デフォルトではテーブルのデータを上書きしてもスキーマは上書きされないため、スキーマを更新したい場合はoverwriteSchemaをtrueにする。

df.write.option("overwriteSchema", "true")

テーブルをコピー (クローン) する*

Cloneコマンドを使用して、既存のDeltaテーブルのコピーを特定のバージョンで作成することができる。
Deep CloneまたはShallow Cloneのいずれに加えられた変更も、クローン自体にのみ影響し、ソーステーブルには影響しない。 そのため、PROD環境からSTAGING環境へデータを取得する場合や、規制上の理由から特定のバージョンをアーカイブする場合によく用いられる。
*クローンされたテーブルは、クローン元テーブルとは独立した更新履歴を持つため、ソーステーブルと同じタイムトラベルはできない。

テーブルのクローンには以下の2種類の方法が存在し、データファイルのコピーを含めるかが挙動の違いになっている:

1. Deep Clone:*

既存のテーブルのメタデータに加えて、データファイルもクローンターゲットにコピーする。CTASコマンド (CREATE TABLE... AS... SELECT...) によるコピーと機能的に近いが、CTASのようにパーティションや制約などの情報を再指定する必要がないため、よりシンプルに指定できる。
*VERSION AS OFを指定しない場合は最新断面がクローンされる。

CREATE TABLE IF NOT EXISTS <user_data_bronze_clone_deep>
  DEEP CLONE <user_data_bronze>
  VERSION AS OF <version_number>;

クローン先のディレクトリには_delta_logディレクトリとデータファイルが存在することを確認する。

Deep Cloneの構造

2. Shallow Clone:*

Shallow Clone (Zero Copy Clone) は、クローンされるテーブルのメタデータのみをコピーし、テーブル自体のデータファイルはコピーしない。データのソースはクローン元のソースに依存する。つまりデータの物理的なコピーが作成されないため、ストレージのコストは最小限に抑えられる。
Shallow Cloneはクローン元テーブルへのポインタとなっているため、VACUUMを実行して元となるファイルが削除されるとShallow Cloneが壊れる可能性がある。そのためShallow Cloneは通常テストや実験などの短時間のユースケースに使用される。
*VERSION AS OFを指定しない場合は最新断面がクローンされる。

CREATE TABLE IF NOT EXISTS <user_data_bronze_clone>
  SHALLOW CLONE <user_data_bronze>
  VERSION AS OF <version_number>;

クローン先のディレクトリには_delta_logディレクトリのみ存在することを確認する。

Shallow Cloneの構造

Shallow Cloneしたテーブルに変更を加えた場合は、_delta_logディレクトリと同じ階層にデータファイルが格納される。

タイムトラベルで過去データを参照する*

1. タイムトラベルの条件

タイムトラベルによって以前のバージョンのデータにアクセスするには、該当のバージョンのデルタログとデータファイルの両方を保持する必要がある。

Deltaテーブルを構成するデータファイルは、自動的に削除されることはない。データファイルは、VACUUMを実行した場合にのみ削除される。
ただしVACUUMを実行しても、デルタログファイルは削除されない。デルタログファイルは、チェックポイントが書き込まれた後に自動的にクリーンアップされる。

デフォルトでは、30日前までのDeltaテーブルにタイムトラベルすることができる。
ただし以下の場合は、30日より以前でもタイムトラベルできない場合があるので注意する。

  • DeltaテーブルでVACUUMを実行する。

  • テーブルプロパティを用いて、データファイルまたはデルタログファイルの保持期間を変更する。

(1)デルタログファイルの保持期間を変更
チェックポイントが書き込まれるたびに、Databricksは保持間隔より古いログエントリを自動的にクリーンアップする。そのため、設定値を十分に大きくした場合には多くのログエントリが保持される。

delta.logRetentionDuration = "interval <interval>": 

*デフォルト設定は30日

(2)データファイルの保持期間を変更(VACUUM対象ファイルの期間指定)
VACUUMコマンドの対象ファイルとなる期間を指定する。
VACUUMを実行しても30日分のタイムトラベルを可能にするには、30日に設定を変更する必要がある。
*ストレージコストがかかるため、コストを考慮しながら設定値を決める。

delta.deletedFileRetentionDuration = "interval <interval>":

*デフォルト設定は7日

2. タイムトラベルの実施

# タイムスタンプで指定
df1 = spark.read.format("delta").option("timestampAsOf", <timestamp_string>).load("<path>")

# バージョンで指定
df2 = spark.read.format("delta").option("versionAsOf", <version>).load("<path>")

@シンタックスで指定する場合:

# タイムスタンプで指定
spark.read.format("delta").load("/tmp/delta/people10m@20190101000000000") 

# バージョンで指定
spark.read.format("delta").load("/tmp/delta/people10m@v123")

ロールバックを行う*

-- タイムスタンプで指定
RESTORE TABLE employee TO TIMESTAMP AS OF "2022-08-02 00:00:00";

-- バージョンで指定
RESTORE TABLE employee TO VERSION AS OF 1;

まとめ

今回はDelta Lakeでの基本的な操作についてチートシートとしてまとめた。
本ドキュメントは今後随時拡充していきます。

参考

docs.databricks.com

docs.delta.io

Deep Dive: Delta Log について理解する

はじめに

Deltaは_delta_log ディレクトリとデータファイルであるParquetファイルのみから成り立っている。 _delta_log ディレクトリに格納されているファイルをDelta Logと呼び、この機能がDelta LakeでのACID トランザクション、スケーラブルなメタデータ処理、タイムトラベルなどを可能にしている。
そこで今回はDeltaLog (デルタログ)についてまとめる。

環境情報

・Databricks Runtime Version: 10.4 LTS (includes Apache Spark 3.2.1, Scala 2.12)

前提: Delta Lakeとは何か

そもそも「Delta Lakeとは何か」についてはこちらをご参照ください。

ktksq.hatenablog.com

実際のDelta Lakeの操作についてはこちらをご参照ください。

ktksq.hatenablog.com

Delta Log とは何か

Delta トランザクションログ (Delta Log) とは、ユーザーがテーブルに加えたすべての変更を順序付きで自動で記録したログであり、Single Source of Truthのソースとして機能する。イメージとしてはGitで管理されたソースコードリポジトリの.gitディレクトリに近い。トランザクションログ (Delta Log) が存在することで、Delta でのACID トランザクション、スケーラブルなメタデータ処理、タイムトラベルなどを可能にしている。

Delta Logの実体は、_delta_logディレクトリに格納されているファイルであり、以下で説明するファイルを総称してDelta トランザクションログまたは、Delta Logと呼んでいる。Delta Logには、ログを読み取るために任意のシステムで使用できるオープンプロトコル (Delta Transaction Protocol) が備わっている。

_delta_logディレクトリの構成

_delta_logディレクトリは以下の4種類のファイルから成り立っている。JSONとCRCファイルはテーブルへの変更操作ごとに作成され、1:1に対応している。Checkpoint ファイルは、10回のアトミックコミットごとに自動的に作成される。またLast Checkpoint ファイルには、直近のチェックポイントが格納されている。

_delta_log_ディレクトリの構成

1. JSONトランザクションログ:

Delta テーブルが作成されると、JSONトランザクションログは_delta_log サブディレクトリに自動作成される。そのテーブルに変更を加えると、それらの変更はJSONトランザクションログにアトミックコミットとして記録される。

2. Checkpoint ファイル:

Checkpoint ファイルには、そのバージョンまでのすべてのアクションの完全な再生が含まれ、無効なアクションは削除される。Checkpoint ファイルが存在することで、スナップショットを再構築する際に、ある時点までのログを読む手間を省いたり、過去のJSONトランザクションログの一定期間後の削除を可能にしている。

3. Last Checkpoint ファイル:

Last Checkpoint ファイルには、直近のチェックポイントが格納されており、直近のログへのポインタを提供することで、テーブルの最新スナップショットを作成するコストの削減を可能にしている。

4. CRC (Cyclic Redundancy Check) ファイル:

テーブルのバージョンの主要な統計情報が含まれており、データの整合性を検証するために使用される。

JSONトランザクションログ

Delta テーブルが作成されると、そのテーブルのJSONトランザクションログは_delta_log サブディレクトリに自動作成される。そのテーブルに変更を加えると、それらの変更はJSONトランザクションログにアトミックコミットとして記録される。

*アトミックコミットとは: 一連の個別の変更が単一の処理として実行される処理のこと。アトミックコミットが成功した場合は、すべての変更が適用される。一方でアトミックコミットを完了する前に障害が発生した場合は、すべての変更が取り消される。

*アトミックコミットとして記録される操作についてはこちらをご参照ください。
テーブルへの変更操作一覧

各コミットは00000000000000000000.json (ゼロパディングした20桁の数字) で始まるJSONファイルの中に書き出される。またユーザーがテーブルを変更する操作 (e.g. INSERT、UPDATE、DELETEなど) を行うたびに、アトミックコミット単位で、JSONファイルが生成される。
JSONファイルの数字が大きほど、最新バージョンに近いことを示しており、トランザクションログとテーブルのバージョンは1対1で対応している。

具体例をあげて説明していく。
まず1.parquet と 2.parquet から Delta テーブルを作成する。その後、追加したファイルを削除し、代わりにINSERT操作で新しいファイル (3.parquet) を追加する操作を行なった場合にどのようなJSONトランザクションログが生成されるか確認する。
この場合、テーブルを作成した際に、自動でJSONトランザクションログに000000...00.jsonが追加される。そして次の操作 (DELETE) で、00000...01.jsonが追加され、最後にINSERT操作で00000..02.jsonが追加される。

JSONトランザクションログの記録

JSONトランザクションログのスキーマ

JSONトランザクションログのデータスキーマについて確認する。以下はテーブル作成操作時(000000...00.json)のデータスキーマを表しているが、実施した操作によってJSONのスキーマは変化する。例えばコメント更新のみ行なった場合はcommitInfoとmetaDataのみ格納される。

root
 |-- add: struct (nullable = true)
 |    |-- dataChange: boolean (nullable = true)
 |    |-- modificationTime: long (nullable = true)
 |    |-- path: string (nullable = true)
 |    |-- size: long (nullable = true)
 |    |-- stats: string (nullable = true)
 |    |-- tags: struct (nullable = true)
 |    |    |-- INSERTION_TIME: string (nullable = true)
 |    |    |-- OPTIMIZE_TARGET_SIZE: string (nullable = true)
 |-- commitInfo: struct (nullable = true)
 |    |-- clusterId: string (nullable = true)
 |    |-- engineInfo: string (nullable = true)
 |    |-- isBlindAppend: boolean (nullable = true)
 |    |-- isolationLevel: string (nullable = true)
 |    |-- notebook: struct (nullable = true)
 |    |    |-- notebookId: string (nullable = true)
 |    |-- operation: string (nullable = true)
 |    |-- operationMetrics: struct (nullable = true)
 |    |    |-- numFiles: string (nullable = true)
 |    |    |-- numOutputBytes: string (nullable = true)
 |    |    |-- numOutputRows: string (nullable = true)
 |    |-- operationParameters: struct (nullable = true)
 |    |    |-- description: string (nullable = true)
 |    |    |-- isManaged: string (nullable = true)
 |    |    |-- partitionBy: string (nullable = true)
 |    |    |-- properties: string (nullable = true)
 |    |-- timestamp: long (nullable = true)
 |    |-- txnId: string (nullable = true)
 |    |-- userId: string (nullable = true)
 |    |-- userName: string (nullable = true)
 |-- metaData: struct (nullable = true)
 |    |-- configuration: struct (nullable = true)
 |    |    |-- delta.autoOptimize.autoCompact: string (nullable = true)
 |    |    |-- delta.autoOptimize.optimizeWrite: string (nullable = true)
 |    |-- createdTime: long (nullable = true)
 |    |-- format: struct (nullable = true)
 |    |    |-- provider: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- partitionColumns: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- schemaString: string (nullable = true)
 |-- protocol: struct (nullable = true)
 |    |-- minReaderVersion: long (nullable = true)
 |    |-- minWriterVersion: long (nullable = true)

アトミックコミットを構成するアクション

アトミックコミット (JSONトランザクションログ) を構成するアクションについて理解するため、000000...00.json (テーブル作成操作時のJSONトランザクションログ) に格納されているデータを確認する。

JSONトランザクションログ自体は アトミックコミット単位 = テーブルの変更操作 で記録されている。
一方でユーザーがテーブルの変更操作 (e.g. INSERT、UPDATE、DELETEなど) を行うたびに、Delta Lakeはその操作を以下の1つまたは複数のアクションからなる独立したステップに分割している。JSONトランザクションログの各行はアトミックコミットを構成するアクションである。各アクションは、順序化されながら、アトミックコミットの構成要素としてJSONトランザクションログに記録される。
今回の例ではCREATE TABLEの操作を行い、赤枠で囲われているようなJSONトランザクションログの各行がアクションを示している。

JSONトランザクションログに格納されているデータ

metaData: メタデータの更新

テーブルのメタデータを更新する。(e.g. テーブルの名前、スキーマ、パーティショニングの変更など)

metaDataカラムのスキーマ

Field Name 内容
id テーブルの一意な識別子
name テーブルのユーザー定義による識別子
description テーブルのユーザー定義による説明
format テーブルに格納されるファイルのエンコーディング仕様
schemaString テーブルのスキーマ
partitionColumns パーティションに用いるカラム (配列)
createdTime メタデータアクションが作成された時間
configuration メタデータアクションのコンフィグ (Map Type)

metaDataカラムに格納されているデータ例

df = ( spark.read.json( './_delta_log/00000000000000000000.json' )
      .where(f.col('metaData').isNotNull())
      .select('metaData')
     )
display(df)
{
   "configuration":{
      "delta.autoOptimize.autoCompact":"true",
      "delta.autoOptimize.optimizeWrite":"true"
   },
   "createdTime":1662199084374,
   "format":{
      "provider":"parquet"
   },
   "id":"6beacf7b-dea1-4ecd-bd6e-6a4dfc697d6b",
   "partitionColumns":[
      
   ],
   "schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"loan_amnt\",\"type\":\"float\",\"nullable\":true,\"metadata\":{}},{\"name\":\"funded_amnt\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"term\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"int_rate\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"addr_state\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}"
}

add: ファイルの追加

JSONトランザクションログにデータファイルを追加する。

addカラムのスキーマ

Field Name 内容
path テーブルのルートからのファイルへの相対パス、またはテーブルに追加されるべきファイルへの絶対パス
partitionValues パーテションカラムと値のマップ (e.g. {"date":"2017-12-10"})
size ファイルのサイズ (Byte)
modificationTime ファイルが作成された時間 (エポックタイムからのミリ秒単位)
dataChange falseの場合は、ファイルはすでにテーブルに存在するか、追加されたファイルのレコードが同じバージョンの1 つ以上のremoveアクションに含まれる必要がある
stats ファイルのデータに関する統計情報
(e.g. カウント、カラムの最小値/最大値など)
tags ファイルに関するメタデータ (Map Type)

addカラムに格納されているデータ例

{
   "dataChange":true,
   "modificationTime":1662199086000,
   "path":"part-00000-764ea26b-847f-4d8e-ba33-b50410648f11-c000.snappy.parquet",
   "size":892610,
   "stats":"{\"numRecords\":148014,\"minValues\":{\"id\":0,\"loan_amnt\":1000.0,\"funded_amnt\":1000,\"term\":\" 36 months\",\"int_rate\":\" 5.32%\",\"addr_state\":\"AK\"},\"maxValues\":{\"id\":25769817599,\"loan_amnt\":40000.0,\"funded_amnt\":40000,\"term\":\" 60 months\",\"int_rate\":\" 30.99%\",\"addr_state\":\"WY\"},\"nullCount\":{\"id\":0,\"loan_amnt\":1,\"funded_amnt\":1,\"term\":1,\"int_rate\":1,\"addr_state\":1}}",
   "tags":{
      "INSERTION_TIME":"1662199086000000",
      "OPTIMIZE_TARGET_SIZE":"268435456"
   }
}

remove: ファイルの削除

JSONトランザクションログからデータファイルを削除する。

removeカラムのスキーマ

Field Name 内容
path テーブルのルートからのファイルへの相対パス、またはテーブルに追加されるべきファイルへの絶対パス
deletionTimestamp ファイルが作成された時間 (エポックタイムからのミリ秒単位)
dataChange falseの場合は、削除されたファイルのレコードが同じバージョンの1 つ以上のaddアクションに含まれる必要がある
extendedFileMetadata trueの場合は、partitionValues、size、tags フィールドが存在する
partitionValues パーテションカラムと値のマップ (e.g. {"date":"2017-12-10"})
size ファイルのサイズ (Byte)
tags ファイルに関するメタデータ (Map Type)

txn: トランザクションの設定

構造化ストリーミング・ジョブが指定されたIDでマイクロバッチをコミットしたことを記録する。
アプリケーション固有のバージョンを使って進捗を追跡するインクリメンタル処理システム (e.g. ストリーミングシステムなど) では、書き込み時の失敗や再試行に直面してデータの重複を避けるために、進捗状況を記録する必要がある。トランザクション識別子を使うことで、この情報をデルタテーブルのトランザクションログに、テーブルの内容を変更する他のアクションと一緒にアトミックに記録することができる。

txnカラムのスキーマ

Field Name 内容
appId トランザクションを実行するアプリケーションの一意な識別子
version トランザクションのアプリケーション固有の数値識別子
lastUpdated トランザクションアクションが作成された時間 (エポックタイムからのミリ秒単位)

txnカラムに格納されているデータ例

{
  "txn": {
    "appId":"3ba13872-2d47-4e17-86a0-21afd2a22395",
    "version":364475
  }
}

protocol: プロトコルの変更

Delta Logを最新のソフトウェアプロトコルに切り替えることにより、新機能を有効にする。

*プロトコルのバージョニングについてはこちらの記事をご参照ください。
プロトコルのバージョニング

protocolカラムのスキーマ

Field Name 内容
minReaderVersion テーブルを正しく読み取るために、クライアントが実装しなければならないDeltaリードプロトコルの最小バージョン
minWriterVersion テーブルを正しく書き込むために、クライアントが実装しなければならないデルタ書き込みプロトコルの最小バージョン

protocolカラムに格納されているデータ例

{
   "minReaderVersion":1,
   "minWriterVersion":2
}

commitInfo: コミット情報

コミットに関する情報を追加する。(e.g. どの操作が、誰によって、何時に行われたか)

commitInfoカラムのスキーマ

Field Name 内容
operation 実行した操作の種類 (e.g. CREATE TABLE AS SELECT)
operationMetrics ファイル数 (numFiles)、出力された行数 (numOutputRows)、出力バイト数 (numOutputBytes)を記録
operationParameters ファイルの観点から、この操作がテーブル内のデータを追加するか上書きするかを指定
readVersion トランザクションコミットに関連するテーブルのバージョン
clusterID, notebook コミットを実行したDatabricksクラスターとノートブック
userID, userName 操作を実行したユーザーのIDおよび名前

commitInfoカラムに格納されているデータ例

{
   "clusterId":"0730-173143-ores78",
   "engineInfo":"Databricks-Runtime/10.4.x-cpu-ml-scala2.12",
   "isBlindAppend":true,
   "isolationLevel":"WriteSerializable",
   "notebook":{
      "notebookId":"4494197656234952"
   },
   "operation":"CREATE TABLE AS SELECT",
   "operationMetrics":{
      "numFiles":"1",
      "numOutputBytes":"892610",
      "numOutputRows":"148014"
   },
   "operationParameters":{
      "description":null,
      "isManaged":"true",
      "partitionBy":"[]",
      "properties":"{\"delta.autoOptimize.autoCompact\":\"true\",\"delta.autoOptimize.optimizeWrite\":\"true\"}"
   },
   "timestamp":1662199085825,
   "txnId":"5b12b522-1070-4139-b7b2-714dc1e6d701",
   "userId":"2497390366172842",
   "userName":"<username.gmail.com>"
}

Checkpoint ファイル

JSONトランザクションログに合計10回コミットすると、Delta Lakeは_delta_logサブディレクトリにCheckpoint ファイル (チェックポイントファイル) をParquetフォーマットで自動的に保存する。
*10回はデフォルトの回数のため変更可能。

Checkpoint ファイルには、無効なアクションについては削除された上で、ある時点のバージョンまでのすべてのアクションの完全な再生が含まれる。無効なアクションとは、リコンシリエーションルールに従って、 後続のアクションによって取り消されたアクションを指す。 (e.g. 追加したファイルを削除するなど)
Checkpoint ファイルによって、ある時点におけるテーブルの状態全体をSparkが迅速かつ容易に読み込めるようになるため、Sparkは何千もの小さくて非効率なJSONファイルの再処理を回避することができる。つまりSparkはlistFromオペレーションでDelta Log内のすべてのファイルを表示し、最新のCheckpoint ファイルを参照することで、最新のCheckpoint ファイルが保存されてから行われたJSONコミットのみを処理している。

具体例をあげて説明する。
0000013.jsonまでのJSONコミットが存在するDelta テーブルをSparkで読み込む場合を考える。Sparkは中間JSONファイルをすべて処理するのではなく、直近のチェックポイントファイルまでスキップする。そのためSparkは0000011.json、0000012.json、0000013.jsonの増分処理を行うだけで、テーブルの現在の状態を取得することができる。そしてその後Sparkはテーブルのバージョン13をメモリにキャッシュする。このワークフローに従うことで、Delta LakeはSparkを使って効率的にテーブルの状態を常に更新し続けることを可能にしている。

Checkpoint ファイルの役割

Last Checkpoint ファイル

Delta Logには、多くのJSONトランザクションログ (e.g. 10,000+) が含まれるケースも多い。そのような大きなディレクトリをリストアップするのには多くのコストがかかる。Last Checkpoint ファイルは直近のCheckpoint ファイルのポインタを提供することで、テーブルの最新スナップショットを作成するコストを削減することできる。

Last Checkpoint ファイルに格納されているデータ例

%fs
head './_delta_log/_last_checkpoint'

直近のCheckpoint ファイルはバージョン10であることを表している。

{"version":10,"size":12}

CRC (Cyclic Redundancy Check) ファイル

テーブルのバージョンの主要な統計情報が含まれており、データの整合性を検証するために使用される。

CRC (Cyclic Redundancy Check) ファイルのスキーマ

CRC (Cyclic Redundancy Check) ファイルのデータスキーマについて確認する。

root
 |-- histogramOpt: struct (nullable = true)
 |    |-- fileCounts: array (nullable = true)
 |    |    |-- element: long (containsNull = true)
 |    |-- sortedBinBoundaries: array (nullable = true)
 |    |    |-- element: long (containsNull = true)
 |    |-- totalBytes: array (nullable = true)
 |    |    |-- element: long (containsNull = true)
 |-- metadata: struct (nullable = true)
 |    |-- configuration: struct (nullable = true)
 |    |    |-- delta.autoOptimize.autoCompact: string (nullable = true)
 |    |    |-- delta.autoOptimize.optimizeWrite: string (nullable = true)
 |    |-- createdTime: long (nullable = true)
 |    |-- format: struct (nullable = true)
 |    |    |-- provider: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- partitionColumns: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- schemaString: string (nullable = true)
 |-- numFiles: long (nullable = true)
 |-- numMetadata: long (nullable = true)
 |-- numProtocol: long (nullable = true)
 |-- protocol: struct (nullable = true)
 |    |-- minReaderVersion: long (nullable = true)
 |    |-- minWriterVersion: long (nullable = true)
 |-- tableSizeBytes: long (nullable = true)

まとめ

今回はDelta Lakeを理解する上で重要な機能であるDelta Logについてまとめた。

参考