Cheatsheet: Delta Lake での操作

本記事のカバー範囲 (*意気込み)

はじめに

今回はDelta Lakeでの基本的な操作についてチートシートとしてまとめる。
*このドキュメントは、随時更新していく予定です。

環境情報

・Databricks Runtime Version: 10.4 LTS (includes Apache Spark 3.2.1, Scala 2.12)

前提: Delta Lake とは何か

そもそも「Delta Lakeとは何か」についてはこちらをご参照ください。

ktksq.hatenablog.com

Delta Logについてはこちらをご参照ください。

ktksq.hatenablog.com

既存テーブルをDeltaテーブルに変更する

*既存テーブルがParquetテーブルの場合のみ実行可能。

CONVERT TO DELTA <database_name.table_name>;

テーブルに書き込む

*format=deltaはデフォルト設定

# パスを指定しないで書込む場合: マネージドテーブル
df.write.format("delta").mode("append").saveAsTable("<database_name.table_name>") 

# パスを指定して書込む場合: アンマネージドテーブル
df.write.format("delta").mode("append").save("<path>")

1. パスを指定しないでテーブルに書込む場合:

データファイルとメタデータは/user/hive/warehouse配下に保存される (Hiveメタストアの場合)。
パスを指定しなかった場合マネージド テーブルと呼ばれ、DROP TABLEをした場合には、データファイルを含む全てのファイルが削除される。

例: マネージド テーブルの挙動 (パスを指定しなかった場合)

df_csv.write.format("delta").mode("overwrite").saveAsTable("managed_table")
DESCRIBE EXTENDED managed_table;

マネージドテーブルの格納先

DROP TABLE managed_table;

全てのファイルが削除されていることを確認

2. パスを指定してテーブルに書込む場合:

データファイルとメタデータは指定したパスの配下に保存される。
パスを指定した場合アンマネージド テーブルと呼ばれる。マネージド テーブルとは異なり、アンマネージド テーブルのファイルはテーブルをDROPしても削除されない
アンマネージド テーブルは、メタストア内のテーブルが既存データのスキーマ、パーティショニング、テーブルのプロパティを自動的に継承する機能を利用して、データをメタストアに「インポート」する。

例: アンマネージド テーブルの挙動 (パスを指定した場合)

raw_data_path = "dbfs:/user/ktksq/unmanaged/"
df_parquet.write.format("delta").mode("overwrite").option("path", raw_data_path).saveAsTable("unmanaged_table")
DESCRIBE EXTENDED unmanaged_table;

アンマネージドテーブルの格納先

DROP TABLE unmanaged_table;

全てのファイルが削除されないことを確認

3. マネージドテーブルとアンマネージドテーブルの違い:

マネージド テーブルはメタストアが管理する (パスの指定不可) のに対して、アンマネージド テーブルはメタストアで管理されない (パスの指定必須) 。
そのため両者のテーブルではDROP TABLE時の挙動が異なる。

  • マネージド テーブル: DROP時は全てのファイルが削除される 
  • アンマネージド テーブル: DROP時にファイルは削除されない

テーブルを読み込む*

df = spark.table("<database_name.table_name>")    

# df = spark.read.format("delta").load("<path>")  # パスを指定して読込む場合

値を削除(DELETE)する*

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, "<path>")
deltaTable.delete(f.col("<col_name>") < "<value>")

*DELETEは、最新バージョンのDeltaテーブルからデータを削除するが、古いバージョンが明示的にバキュームされるまで物理ストレージから削除されるわけではないことに注意する。

値を更新(UPDATE)する*

deltaTable = DeltaTable.forPath(spark, "<path>")
deltaTable.update(
  condition = f.col("gender") == "M",
  set = { "gender": lit("Male") }
)

*DELETEとUPDATEは、述語にパーティション・カラムを指定すると大幅に高速化できる。

値をマージ(MERGE)する*

(
  deltaTable.alias("t")
  .merge(
    sourceDF.alias("s"),
    "t.key = s.key")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .execute()
)

具体例

from delta.tables import *

deltaTablePeople = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
deltaTablePeopleUpdates = DeltaTable.forPath(spark, "/tmp/delta/people-10m-updates")

dfUpdates = deltaTablePeopleUpdates.toDF()

(
    deltaTablePeople.alias("people")
    .merge(dfUpdates.alias("updates"), "people.id = updates.id")
    .whenMatchedUpdate(
        set={
            "id": "updates.id",
            "firstName": "updates.firstName",
            "middleName": "updates.middleName",
            "lastName": "updates.lastName",
            "gender": "updates.gender",
            "birthDate": "updates.birthDate",
            "ssn": "updates.ssn",
            "salary": "updates.salary",
        }
    )
    .whenNotMatchedInsert(
        values={
            "id": "updates.id",
            "firstName": "updates.firstName",
            "middleName": "updates.middleName",
            "lastName": "updates.lastName",
            "gender": "updates.gender",
            "birthDate": "updates.birthDate",
            "ssn": "updates.ssn",
            "salary": "updates.salary",
        }
    )
    .execute()
)

変更履歴を表示する*

テーブルへの更新ごとに、操作やユーザーなどの実績情報を表示する。テーブルの履歴はデフォルトで30日間保持される。

DESCRIBE HISTORY table_name

テーブルのスキーマを更新する*

デフォルトではテーブルのデータを上書きしてもスキーマは上書きされないため、スキーマを更新したい場合はoverwriteSchemaをtrueにする。

df.write.option("overwriteSchema", "true")

テーブルをコピー (クローン) する*

Cloneコマンドを使用して、既存のDeltaテーブルのコピーを特定のバージョンで作成することができる。
Deep CloneまたはShallow Cloneのいずれに加えられた変更も、クローン自体にのみ影響し、ソーステーブルには影響しない。 そのため、PROD環境からSTAGING環境へデータを取得する場合や、規制上の理由から特定のバージョンをアーカイブする場合によく用いられる。
*クローンされたテーブルは、クローン元テーブルとは独立した更新履歴を持つため、ソーステーブルと同じタイムトラベルはできない。

テーブルのクローンには以下の2種類の方法が存在し、データファイルのコピーを含めるかが挙動の違いになっている:

1. Deep Clone:*

既存のテーブルのメタデータに加えて、データファイルもクローンターゲットにコピーする。CTASコマンド (CREATE TABLE... AS... SELECT...) によるコピーと機能的に近いが、CTASのようにパーティションや制約などの情報を再指定する必要がないため、よりシンプルに指定できる。
*VERSION AS OFを指定しない場合は最新断面がクローンされる。

CREATE TABLE IF NOT EXISTS <user_data_bronze_clone_deep>
  DEEP CLONE <user_data_bronze>
  VERSION AS OF <version_number>;

クローン先のディレクトリには_delta_logディレクトリとデータファイルが存在することを確認する。

Deep Cloneの構造

2. Shallow Clone:*

Shallow Clone (Zero Copy Clone) は、クローンされるテーブルのメタデータのみをコピーし、テーブル自体のデータファイルはコピーしない。データのソースはクローン元のソースに依存する。つまりデータの物理的なコピーが作成されないため、ストレージのコストは最小限に抑えられる。
Shallow Cloneはクローン元テーブルへのポインタとなっているため、VACUUMを実行して元となるファイルが削除されるとShallow Cloneが壊れる可能性がある。そのためShallow Cloneは通常テストや実験などの短時間のユースケースに使用される。
*VERSION AS OFを指定しない場合は最新断面がクローンされる。

CREATE TABLE IF NOT EXISTS <user_data_bronze_clone>
  SHALLOW CLONE <user_data_bronze>
  VERSION AS OF <version_number>;

クローン先のディレクトリには_delta_logディレクトリのみ存在することを確認する。

Shallow Cloneの構造

Shallow Cloneしたテーブルに変更を加えた場合は、_delta_logディレクトリと同じ階層にデータファイルが格納される。

タイムトラベルで過去データを参照する*

1. タイムトラベルの条件

タイムトラベルによって以前のバージョンのデータにアクセスするには、該当のバージョンのデルタログとデータファイルの両方を保持する必要がある。

Deltaテーブルを構成するデータファイルは、自動的に削除されることはない。データファイルは、VACUUMを実行した場合にのみ削除される。
ただしVACUUMを実行しても、デルタログファイルは削除されない。デルタログファイルは、チェックポイントが書き込まれた後に自動的にクリーンアップされる。

デフォルトでは、30日前までのDeltaテーブルにタイムトラベルすることができる。
ただし以下の場合は、30日より以前でもタイムトラベルできない場合があるので注意する。

  • DeltaテーブルでVACUUMを実行する。

  • テーブルプロパティを用いて、データファイルまたはデルタログファイルの保持期間を変更する。

(1)デルタログファイルの保持期間を変更
チェックポイントが書き込まれるたびに、Databricksは保持間隔より古いログエントリを自動的にクリーンアップする。そのため、設定値を十分に大きくした場合には多くのログエントリが保持される。

delta.logRetentionDuration = "interval <interval>": 

*デフォルト設定は30日

(2)データファイルの保持期間を変更(VACUUM対象ファイルの期間指定)
VACUUMコマンドの対象ファイルとなる期間を指定する。
VACUUMを実行しても30日分のタイムトラベルを可能にするには、30日に設定を変更する必要がある。
*ストレージコストがかかるため、コストを考慮しながら設定値を決める。

delta.deletedFileRetentionDuration = "interval <interval>":

*デフォルト設定は7日

2. タイムトラベルの実施

# タイムスタンプで指定
df1 = spark.read.format("delta").option("timestampAsOf", <timestamp_string>).load("<path>")

# バージョンで指定
df2 = spark.read.format("delta").option("versionAsOf", <version>).load("<path>")

@シンタックスで指定する場合:

# タイムスタンプで指定
spark.read.format("delta").load("/tmp/delta/people10m@20190101000000000") 

# バージョンで指定
spark.read.format("delta").load("/tmp/delta/people10m@v123")

ロールバックを行う*

-- タイムスタンプで指定
RESTORE TABLE employee TO TIMESTAMP AS OF "2022-08-02 00:00:00";

-- バージョンで指定
RESTORE TABLE employee TO VERSION AS OF 1;

まとめ

今回はDelta Lakeでの基本的な操作についてチートシートとしてまとめた。
本ドキュメントは今後随時拡充していきます。

参考

docs.databricks.com

docs.delta.io