Deep Dive: Delta Log について理解する

はじめに

Deltaは_delta_log ディレクトリとデータファイルであるParquetファイルのみから成り立っている。 _delta_log ディレクトリに格納されているファイルをDelta Logと呼び、この機能がDelta LakeでのACID トランザクション、スケーラブルなメタデータ処理、タイムトラベルなどを可能にしている。
そこで今回はDeltaLog (デルタログ)についてまとめる。

環境情報

・Databricks Runtime Version: 10.4 LTS (includes Apache Spark 3.2.1, Scala 2.12)

前提: Delta Lakeとは何か

そもそも「Delta Lakeとは何か」についてはこちらをご参照ください。

ktksq.hatenablog.com

実際のDelta Lakeの操作についてはこちらをご参照ください。

ktksq.hatenablog.com

Delta Log とは何か

Delta トランザクションログ (Delta Log) とは、ユーザーがテーブルに加えたすべての変更を順序付きで自動で記録したログであり、Single Source of Truthのソースとして機能する。イメージとしてはGitで管理されたソースコードリポジトリの.gitディレクトリに近い。トランザクションログ (Delta Log) が存在することで、Delta でのACID トランザクション、スケーラブルなメタデータ処理、タイムトラベルなどを可能にしている。

Delta Logの実体は、_delta_logディレクトリに格納されているファイルであり、以下で説明するファイルを総称してDelta トランザクションログまたは、Delta Logと呼んでいる。Delta Logには、ログを読み取るために任意のシステムで使用できるオープンプロトコル (Delta Transaction Protocol) が備わっている。

_delta_logディレクトリの構成

_delta_logディレクトリは以下の4種類のファイルから成り立っている。JSONとCRCファイルはテーブルへの変更操作ごとに作成され、1:1に対応している。Checkpoint ファイルは、10回のアトミックコミットごとに自動的に作成される。またLast Checkpoint ファイルには、直近のチェックポイントが格納されている。

_delta_log_ディレクトリの構成

1. JSONトランザクションログ:

Delta テーブルが作成されると、JSONトランザクションログは_delta_log サブディレクトリに自動作成される。そのテーブルに変更を加えると、それらの変更はJSONトランザクションログにアトミックコミットとして記録される。

2. Checkpoint ファイル:

Checkpoint ファイルには、そのバージョンまでのすべてのアクションの完全な再生が含まれ、無効なアクションは削除される。Checkpoint ファイルが存在することで、スナップショットを再構築する際に、ある時点までのログを読む手間を省いたり、過去のJSONトランザクションログの一定期間後の削除を可能にしている。

3. Last Checkpoint ファイル:

Last Checkpoint ファイルには、直近のチェックポイントが格納されており、直近のログへのポインタを提供することで、テーブルの最新スナップショットを作成するコストの削減を可能にしている。

4. CRC (Cyclic Redundancy Check) ファイル:

テーブルのバージョンの主要な統計情報が含まれており、データの整合性を検証するために使用される。

JSONトランザクションログ

Delta テーブルが作成されると、そのテーブルのJSONトランザクションログは_delta_log サブディレクトリに自動作成される。そのテーブルに変更を加えると、それらの変更はJSONトランザクションログにアトミックコミットとして記録される。

*アトミックコミットとは: 一連の個別の変更が単一の処理として実行される処理のこと。アトミックコミットが成功した場合は、すべての変更が適用される。一方でアトミックコミットを完了する前に障害が発生した場合は、すべての変更が取り消される。

*アトミックコミットとして記録される操作についてはこちらをご参照ください。
テーブルへの変更操作一覧

各コミットは00000000000000000000.json (ゼロパディングした20桁の数字) で始まるJSONファイルの中に書き出される。またユーザーがテーブルを変更する操作 (e.g. INSERT、UPDATE、DELETEなど) を行うたびに、アトミックコミット単位で、JSONファイルが生成される。
JSONファイルの数字が大きほど、最新バージョンに近いことを示しており、トランザクションログとテーブルのバージョンは1対1で対応している。

具体例をあげて説明していく。
まず1.parquet と 2.parquet から Delta テーブルを作成する。その後、追加したファイルを削除し、代わりにINSERT操作で新しいファイル (3.parquet) を追加する操作を行なった場合にどのようなJSONトランザクションログが生成されるか確認する。
この場合、テーブルを作成した際に、自動でJSONトランザクションログに000000...00.jsonが追加される。そして次の操作 (DELETE) で、00000...01.jsonが追加され、最後にINSERT操作で00000..02.jsonが追加される。

JSONトランザクションログの記録

JSONトランザクションログのスキーマ

JSONトランザクションログのデータスキーマについて確認する。以下はテーブル作成操作時(000000...00.json)のデータスキーマを表しているが、実施した操作によってJSONのスキーマは変化する。例えばコメント更新のみ行なった場合はcommitInfoとmetaDataのみ格納される。

root
 |-- add: struct (nullable = true)
 |    |-- dataChange: boolean (nullable = true)
 |    |-- modificationTime: long (nullable = true)
 |    |-- path: string (nullable = true)
 |    |-- size: long (nullable = true)
 |    |-- stats: string (nullable = true)
 |    |-- tags: struct (nullable = true)
 |    |    |-- INSERTION_TIME: string (nullable = true)
 |    |    |-- OPTIMIZE_TARGET_SIZE: string (nullable = true)
 |-- commitInfo: struct (nullable = true)
 |    |-- clusterId: string (nullable = true)
 |    |-- engineInfo: string (nullable = true)
 |    |-- isBlindAppend: boolean (nullable = true)
 |    |-- isolationLevel: string (nullable = true)
 |    |-- notebook: struct (nullable = true)
 |    |    |-- notebookId: string (nullable = true)
 |    |-- operation: string (nullable = true)
 |    |-- operationMetrics: struct (nullable = true)
 |    |    |-- numFiles: string (nullable = true)
 |    |    |-- numOutputBytes: string (nullable = true)
 |    |    |-- numOutputRows: string (nullable = true)
 |    |-- operationParameters: struct (nullable = true)
 |    |    |-- description: string (nullable = true)
 |    |    |-- isManaged: string (nullable = true)
 |    |    |-- partitionBy: string (nullable = true)
 |    |    |-- properties: string (nullable = true)
 |    |-- timestamp: long (nullable = true)
 |    |-- txnId: string (nullable = true)
 |    |-- userId: string (nullable = true)
 |    |-- userName: string (nullable = true)
 |-- metaData: struct (nullable = true)
 |    |-- configuration: struct (nullable = true)
 |    |    |-- delta.autoOptimize.autoCompact: string (nullable = true)
 |    |    |-- delta.autoOptimize.optimizeWrite: string (nullable = true)
 |    |-- createdTime: long (nullable = true)
 |    |-- format: struct (nullable = true)
 |    |    |-- provider: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- partitionColumns: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- schemaString: string (nullable = true)
 |-- protocol: struct (nullable = true)
 |    |-- minReaderVersion: long (nullable = true)
 |    |-- minWriterVersion: long (nullable = true)

アトミックコミットを構成するアクション

アトミックコミット (JSONトランザクションログ) を構成するアクションについて理解するため、000000...00.json (テーブル作成操作時のJSONトランザクションログ) に格納されているデータを確認する。

JSONトランザクションログ自体は アトミックコミット単位 = テーブルの変更操作 で記録されている。
一方でユーザーがテーブルの変更操作 (e.g. INSERT、UPDATE、DELETEなど) を行うたびに、Delta Lakeはその操作を以下の1つまたは複数のアクションからなる独立したステップに分割している。JSONトランザクションログの各行はアトミックコミットを構成するアクションである。各アクションは、順序化されながら、アトミックコミットの構成要素としてJSONトランザクションログに記録される。
今回の例ではCREATE TABLEの操作を行い、赤枠で囲われているようなJSONトランザクションログの各行がアクションを示している。

JSONトランザクションログに格納されているデータ

metaData: メタデータの更新

テーブルのメタデータを更新する。(e.g. テーブルの名前、スキーマ、パーティショニングの変更など)

metaDataカラムのスキーマ

Field Name 内容
id テーブルの一意な識別子
name テーブルのユーザー定義による識別子
description テーブルのユーザー定義による説明
format テーブルに格納されるファイルのエンコーディング仕様
schemaString テーブルのスキーマ
partitionColumns パーティションに用いるカラム (配列)
createdTime メタデータアクションが作成された時間
configuration メタデータアクションのコンフィグ (Map Type)

metaDataカラムに格納されているデータ例

df = ( spark.read.json( './_delta_log/00000000000000000000.json' )
      .where(f.col('metaData').isNotNull())
      .select('metaData')
     )
display(df)
{
   "configuration":{
      "delta.autoOptimize.autoCompact":"true",
      "delta.autoOptimize.optimizeWrite":"true"
   },
   "createdTime":1662199084374,
   "format":{
      "provider":"parquet"
   },
   "id":"6beacf7b-dea1-4ecd-bd6e-6a4dfc697d6b",
   "partitionColumns":[
      
   ],
   "schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"loan_amnt\",\"type\":\"float\",\"nullable\":true,\"metadata\":{}},{\"name\":\"funded_amnt\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"term\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"int_rate\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"addr_state\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}"
}

add: ファイルの追加

JSONトランザクションログにデータファイルを追加する。

addカラムのスキーマ

Field Name 内容
path テーブルのルートからのファイルへの相対パス、またはテーブルに追加されるべきファイルへの絶対パス
partitionValues パーテションカラムと値のマップ (e.g. {"date":"2017-12-10"})
size ファイルのサイズ (Byte)
modificationTime ファイルが作成された時間 (エポックタイムからのミリ秒単位)
dataChange falseの場合は、ファイルはすでにテーブルに存在するか、追加されたファイルのレコードが同じバージョンの1 つ以上のremoveアクションに含まれる必要がある
stats ファイルのデータに関する統計情報
(e.g. カウント、カラムの最小値/最大値など)
tags ファイルに関するメタデータ (Map Type)

addカラムに格納されているデータ例

{
   "dataChange":true,
   "modificationTime":1662199086000,
   "path":"part-00000-764ea26b-847f-4d8e-ba33-b50410648f11-c000.snappy.parquet",
   "size":892610,
   "stats":"{\"numRecords\":148014,\"minValues\":{\"id\":0,\"loan_amnt\":1000.0,\"funded_amnt\":1000,\"term\":\" 36 months\",\"int_rate\":\" 5.32%\",\"addr_state\":\"AK\"},\"maxValues\":{\"id\":25769817599,\"loan_amnt\":40000.0,\"funded_amnt\":40000,\"term\":\" 60 months\",\"int_rate\":\" 30.99%\",\"addr_state\":\"WY\"},\"nullCount\":{\"id\":0,\"loan_amnt\":1,\"funded_amnt\":1,\"term\":1,\"int_rate\":1,\"addr_state\":1}}",
   "tags":{
      "INSERTION_TIME":"1662199086000000",
      "OPTIMIZE_TARGET_SIZE":"268435456"
   }
}

remove: ファイルの削除

JSONトランザクションログからデータファイルを削除する。

removeカラムのスキーマ

Field Name 内容
path テーブルのルートからのファイルへの相対パス、またはテーブルに追加されるべきファイルへの絶対パス
deletionTimestamp ファイルが作成された時間 (エポックタイムからのミリ秒単位)
dataChange falseの場合は、削除されたファイルのレコードが同じバージョンの1 つ以上のaddアクションに含まれる必要がある
extendedFileMetadata trueの場合は、partitionValues、size、tags フィールドが存在する
partitionValues パーテションカラムと値のマップ (e.g. {"date":"2017-12-10"})
size ファイルのサイズ (Byte)
tags ファイルに関するメタデータ (Map Type)

txn: トランザクションの設定

構造化ストリーミング・ジョブが指定されたIDでマイクロバッチをコミットしたことを記録する。
アプリケーション固有のバージョンを使って進捗を追跡するインクリメンタル処理システム (e.g. ストリーミングシステムなど) では、書き込み時の失敗や再試行に直面してデータの重複を避けるために、進捗状況を記録する必要がある。トランザクション識別子を使うことで、この情報をデルタテーブルのトランザクションログに、テーブルの内容を変更する他のアクションと一緒にアトミックに記録することができる。

txnカラムのスキーマ

Field Name 内容
appId トランザクションを実行するアプリケーションの一意な識別子
version トランザクションのアプリケーション固有の数値識別子
lastUpdated トランザクションアクションが作成された時間 (エポックタイムからのミリ秒単位)

txnカラムに格納されているデータ例

{
  "txn": {
    "appId":"3ba13872-2d47-4e17-86a0-21afd2a22395",
    "version":364475
  }
}

protocol: プロトコルの変更

Delta Logを最新のソフトウェアプロトコルに切り替えることにより、新機能を有効にする。

*プロトコルのバージョニングについてはこちらの記事をご参照ください。
プロトコルのバージョニング

protocolカラムのスキーマ

Field Name 内容
minReaderVersion テーブルを正しく読み取るために、クライアントが実装しなければならないDeltaリードプロトコルの最小バージョン
minWriterVersion テーブルを正しく書き込むために、クライアントが実装しなければならないデルタ書き込みプロトコルの最小バージョン

protocolカラムに格納されているデータ例

{
   "minReaderVersion":1,
   "minWriterVersion":2
}

commitInfo: コミット情報

コミットに関する情報を追加する。(e.g. どの操作が、誰によって、何時に行われたか)

commitInfoカラムのスキーマ

Field Name 内容
operation 実行した操作の種類 (e.g. CREATE TABLE AS SELECT)
operationMetrics ファイル数 (numFiles)、出力された行数 (numOutputRows)、出力バイト数 (numOutputBytes)を記録
operationParameters ファイルの観点から、この操作がテーブル内のデータを追加するか上書きするかを指定
readVersion トランザクションコミットに関連するテーブルのバージョン
clusterID, notebook コミットを実行したDatabricksクラスターとノートブック
userID, userName 操作を実行したユーザーのIDおよび名前

commitInfoカラムに格納されているデータ例

{
   "clusterId":"0730-173143-ores78",
   "engineInfo":"Databricks-Runtime/10.4.x-cpu-ml-scala2.12",
   "isBlindAppend":true,
   "isolationLevel":"WriteSerializable",
   "notebook":{
      "notebookId":"4494197656234952"
   },
   "operation":"CREATE TABLE AS SELECT",
   "operationMetrics":{
      "numFiles":"1",
      "numOutputBytes":"892610",
      "numOutputRows":"148014"
   },
   "operationParameters":{
      "description":null,
      "isManaged":"true",
      "partitionBy":"[]",
      "properties":"{\"delta.autoOptimize.autoCompact\":\"true\",\"delta.autoOptimize.optimizeWrite\":\"true\"}"
   },
   "timestamp":1662199085825,
   "txnId":"5b12b522-1070-4139-b7b2-714dc1e6d701",
   "userId":"2497390366172842",
   "userName":"<username.gmail.com>"
}

Checkpoint ファイル

JSONトランザクションログに合計10回コミットすると、Delta Lakeは_delta_logサブディレクトリにCheckpoint ファイル (チェックポイントファイル) をParquetフォーマットで自動的に保存する。
*10回はデフォルトの回数のため変更可能。

Checkpoint ファイルには、無効なアクションについては削除された上で、ある時点のバージョンまでのすべてのアクションの完全な再生が含まれる。無効なアクションとは、リコンシリエーションルールに従って、 後続のアクションによって取り消されたアクションを指す。 (e.g. 追加したファイルを削除するなど)
Checkpoint ファイルによって、ある時点におけるテーブルの状態全体をSparkが迅速かつ容易に読み込めるようになるため、Sparkは何千もの小さくて非効率なJSONファイルの再処理を回避することができる。つまりSparkはlistFromオペレーションでDelta Log内のすべてのファイルを表示し、最新のCheckpoint ファイルを参照することで、最新のCheckpoint ファイルが保存されてから行われたJSONコミットのみを処理している。

具体例をあげて説明する。
0000013.jsonまでのJSONコミットが存在するDelta テーブルをSparkで読み込む場合を考える。Sparkは中間JSONファイルをすべて処理するのではなく、直近のチェックポイントファイルまでスキップする。そのためSparkは0000011.json、0000012.json、0000013.jsonの増分処理を行うだけで、テーブルの現在の状態を取得することができる。そしてその後Sparkはテーブルのバージョン13をメモリにキャッシュする。このワークフローに従うことで、Delta LakeはSparkを使って効率的にテーブルの状態を常に更新し続けることを可能にしている。

Checkpoint ファイルの役割

Last Checkpoint ファイル

Delta Logには、多くのJSONトランザクションログ (e.g. 10,000+) が含まれるケースも多い。そのような大きなディレクトリをリストアップするのには多くのコストがかかる。Last Checkpoint ファイルは直近のCheckpoint ファイルのポインタを提供することで、テーブルの最新スナップショットを作成するコストを削減することできる。

Last Checkpoint ファイルに格納されているデータ例

%fs
head './_delta_log/_last_checkpoint'

直近のCheckpoint ファイルはバージョン10であることを表している。

{"version":10,"size":12}

CRC (Cyclic Redundancy Check) ファイル

テーブルのバージョンの主要な統計情報が含まれており、データの整合性を検証するために使用される。

CRC (Cyclic Redundancy Check) ファイルのスキーマ

CRC (Cyclic Redundancy Check) ファイルのデータスキーマについて確認する。

root
 |-- histogramOpt: struct (nullable = true)
 |    |-- fileCounts: array (nullable = true)
 |    |    |-- element: long (containsNull = true)
 |    |-- sortedBinBoundaries: array (nullable = true)
 |    |    |-- element: long (containsNull = true)
 |    |-- totalBytes: array (nullable = true)
 |    |    |-- element: long (containsNull = true)
 |-- metadata: struct (nullable = true)
 |    |-- configuration: struct (nullable = true)
 |    |    |-- delta.autoOptimize.autoCompact: string (nullable = true)
 |    |    |-- delta.autoOptimize.optimizeWrite: string (nullable = true)
 |    |-- createdTime: long (nullable = true)
 |    |-- format: struct (nullable = true)
 |    |    |-- provider: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- partitionColumns: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- schemaString: string (nullable = true)
 |-- numFiles: long (nullable = true)
 |-- numMetadata: long (nullable = true)
 |-- numProtocol: long (nullable = true)
 |-- protocol: struct (nullable = true)
 |    |-- minReaderVersion: long (nullable = true)
 |    |-- minWriterVersion: long (nullable = true)
 |-- tableSizeBytes: long (nullable = true)

まとめ

今回はDelta Lakeを理解する上で重要な機能であるDelta Logについてまとめた。

参考