Delta Lake とは何か

はじめに

2022年6月に開催されたDATA+AI SUMMITで、今までDatabricksでしか使えなかった機能を含む全てのDelta Lakeの機能がオープンソース (OSS) 化されるという発表があったのは記憶に新しい。そこで今回は「Delta Lakeとは何か」についてまとめる。

環境情報

Databricks Runtime Version: 10.4 LTS (includes Apache Spark 3.2.1, Scala 2.12)

www.databricks.com

ちなみに気づいたらDelta Lakeのマスコットがスポンジボブに出てきそうな可愛くないカニになっていたのですが、何があったのでしょうか…

Delta Lakeのマスコット

Delta Lake (デルタレイク) とは

オープンソースのデータフォーマットであるDeltaは、Parquetファイルとトランザクションログ(Delta Log)から成り立っている。Delta形式のフォーマットを用いて構築されたテーブルは、Deltaテーブルと呼ばれ、Deltaテーブルのデータストアを、Delta Lake (デルタレイク) と呼ぶ。多くの場合、データの実体はAWS S3などのオブジェクトストレージ上 (データレイク) に保管されている。

また2022年05月時点で月間ダウンロード数は700万を超えており、ストレージフォーマットとして広く認知されている。

Delta Lakeの月間ダウンロード数
*参照元: Day 1 Opening Keynote - DATA+AI SUMMIT 2022

*データレイクについてはこちらの記事をご参照ください。
データレイクについて理解する

Delta Lake の実体

Delta Lakeの実体は、Parquetで保存されたデータファイルと全てのトランザクションを記録したトランザクションログ (Delta Log) から成り立っている。

Delta Lake の構造

ParquetファイルからDeltaテーブルを作成して、Deltaの構造を確認する。

(1) ParquetファイルからDeltaテーブルを作成する

source_path = 'dbfs:/databricks-datasets/samples/lending_club/parquet/'
df_parquet = spark.read.parquet(source_path)

# マネージドテーブルとして保存: pathの指定なし
df_parquet.write.format("delta").mode("append").saveAsTable("demo_delta_table")

*マネージドテーブルについてはこちらの記事をご参照ください。
Delta テーブルを作成する

(2) 作成したDeltaテーブルの格納先を表示する

%sql
DESCRIBE DETAIL demo_delta_table

→ dbfs:/user/hive/warehouse/...にDeltaテーブルの実体が格納されていることがわかる

Deltaテーブルの格納先のパス

(3) Deltaテーブルに格納されているファイルを確認する

display(dbutils.fs.ls('dbfs:/user/hive/warehouse/delta_demo_ktksq.db/demo_delta_table'))

→ Deltaの実体がParquetで保存されたデータファイル_delta_logディレクトリであることがわかる

Deltaの構造

*_delta_logディレクトリの内部構造についてはこちらの記事をご参照ください。
Delta Logについて理解する

Parquet と Delta の相違点

ここからは、ParquetとDeltaのファイル構造の違いについて説明する。

Parquetとは何か

Parquetとは主にHadoopのエコシステムで使われるカラムナ(列指向)フォーマットのこと。カラムナフォーマットは圧縮性とパフォーマンスに優れているため、分析的なクエリに適している。
・高いデータの利用効率
CSVのような行指向のフォーマットと比べて、不必要なカラムを読まずに済む。加えて、内部で行方向に一定の単位でメタ情報を保持することによりスキャン範囲を限定できる水平パーティショニングの機能も持っているため、効率的にクエリを実行することができる。
・高い圧縮効率
カラムナフォーマットは、同じ属性のデータが集まる列方向にデータを圧縮することでデータの圧縮効率を高めることができる。

Parquetにおける水平分割の仕組み

Parquetの構造

Parquetの構造を確認すると、_delta_logディレクトリが存在していないことがわかる。その代わりに_started_< id >と_committed_< id >で始まるメタデータファイルがParquetファイルと一緒に保存されている。これらのファイルは、DBIOトランザクションプロトコルによって作成されており、一般的に直接変更することは推奨されていない。
*DBIOトランザクションプロトコル: Databricksが実装した独自のトランザクションコミットのこと。Databricks環境以外でSparkアプリケーションを動作させる場合は、Sparkのデフォルトのコミットプロトコルが用いられる。

Parquetの構造: SparkアプリケーションをDatabricks環境で動作させた場合

Parquet と Delta の違い

Parquet と Delta の大きな違いは、Delta トランザクションログのセントラルレポジトリである_delta_logディレクトリの有無である。トランザクションログ (Delta Log) が存在することで、Delta でのACIDトランザクション、スケーラブルなメタデータ処理、タイムトラベルなどを可能にしている。

Delta Lake が生まれた経緯: データレイクと Delta Lake の違い

データ分析の重要性の高まりとともに、安価なストレージでさまざまな形式の生データを扱うことができるデータレイクが登場した。データベースからデータレイクへの移行はコンピュートとストレージを独立して拡張できるという大きな利点があった。しかしデータベースからデータレイクへ移行することで、データの信頼性や品質が失われ、メタデータ管理の欠如によるデータスワンプ問題が発生した。
*Data Swamp (データスワンプ): メタデータを管理することなく、データをただ貯めることでデータ情報がブラックボックス化してしまっている状態のこと

データレイクのメリット

(1) コンピュートとストレージの分離
(2) 無限のストレージ容量
(3) 安価なストレージコスト
(4) あらゆる種類の生データを保存 (e.g. 非構造データ、構造化データ、ビデオ、オーディオ、テキスト)

データレイクの課題 *Parquetで構築した場合

(1) ACIDトランザクションが担保されていないため、部分的に完了したトランザクションによりデータが破損した状態で残り、複雑なリカバリが必要になる 
(2) データ品質が担保できないため、一貫性がなく使い物にならないデータが作成される
(3) 一貫性/独立性がないため、データの追加とデータの読込み、バッチとストリーミングを同時に実行させることが困難
(4) 多くの小さいサイズのファイルが存在するため、ファイルI/Oに時間がかかる
(5) クラウドストレージのスループットが低い (S3は20~50MB/scoreに対して、ローカルのNVMe SSDは300MB/score)

そのためデータレイクにデータの信頼性と品質、そしてパフォーマンスを付加するために、Delta Lakeが開発された。
Apache Sparkのオリジナルクリエイターによって開発されたDelta Lakeは、オンライン分析処理 (OLAPスタイル) のために、データベースのトランザクションの信頼性データレイクの水平スケーラビリティという両者の長所を組み合わせるように設計されている。

Delta Lake の特徴

Data Lake (データレイク) と比べて、Delta Lake (デルタレイク) には大きく3つのメリットがある。
・信頼性の向上
・データ品質の向上
・パフォーマンスの向上

本セクションではこのメリットを支えるDelta Lakeの機能について説明する。

Delta Lakeの3つのメリット

ACIDトランザクションの担保

Delta Lakeはデータレイク上の全ての操作をトランザクション化することで、データの信頼性整合性を最大限に高めている。トランザクションとは、以下で説明する4つの特性 (ACID特性) を備えた不可分な一連の処理単位のことを言う。またACID特性を備えたデータベース操作のことをACIDトランザクションと呼んでいる。

Delta Lakeはデータレイク上でのACIDトランザクションを可能にすることで、部分的にしか操作を完了できなかった場合でも、データの一貫性を保てるように設計されている。ACIDトランザクションでない場合は、処理の途中で障害が発生すると、データは一部分しか保存されない可能性がある。
*信頼性: 意図する処理と結果が一貫している特性のこと

Atomicity (原子性):
トランザクションは完全に終了 (COMMIT) するか、もしくは実行前の状態に戻す (ROLLBACK) かのどちらかの状態をとる必要がある。
トランザクション内の各ステートメント (データの読取り、書込み、更新、削除) は1つの実行単位として扱われ、ステートメント全体が実行されるか、あるいは何も実行されないかのどちらかの状態にしかならない。この特性により、たとえばストリーミングデータソースが途中で故障した場合でも、データの損失や破損が発生するのを防ぐことができる。

Consistency (一貫性):
トランザクション処理では実行前後でデータの整合性を持ち、一貫したデータを確保する必要がある。
トランザクションの一貫性により、データの破損やエラーがテーブルの整合性に意図しない結果をもたらすことがないようにしなければならない。

Isolation (独立性):
複数のトランザクションがそれぞれ独立して実行できる必要がある。
つまり複数のユーザが同じテーブルから一度に読み書きする場合、各トランザクションを分離することで、同時進行するトランザクションが互いに干渉したり影響を与えないようにしなければならない。

Durability (耐久性):
正常に実行されたトランザクションによって行われたデータの変更は、システム障害が発生した場合でも確実に保存される必要がある。

スケーラブルなメタデータ管理

Delta Lakeは、Delta Logと呼ばれるメタデータとデータファイルを一緒にデータレイク上に格納することでスケーラブルなメタデータ管理を可能にしている。
Delta Lakeは、他の多くのストレージフォーマットやクエリエンジンとは異なり、すべてのメタデータ処理Sparkを利用してスケールアウトするため、メタデータを効率的に処理することができる。
メタストアにすべてのメタデータが格納されていると、データが頻繁に変化する場合にはメタストアに保存する情報量そのものがビッグデータとなり、スケーラブルなメタデータ処理に課題があった。そのためDelta Lakeでは、メタストアに基本的なメタデータの詳細を残しつつ、重要な詳細のほとんどをDelta LogにParquet形式で保存している。

*Delta Logについてはこちらの記事をご参照ください。
Delta Logについて理解する

バッチとストリーミングワークロードの統合

Delta Lakeのテーブルは、バッチ処理ストリーミングのソースおよびシンクの両方で動作する機能を備えており、readStreamwriteStreamを通して、Spark Structured Streamingと密接に統合されている。
Delta Lakeは、以下のようなストリーミング時の課題を解決する:

低レイテンシーのインジェストで生成された小さなファイルの統合
複数ストリーム (または同時実行のバッチジョブ) による "Exactly Once"処理の実現
ファイルをストリームのソースとして使用する際に、新規作成ファイルを効率的に検出

タイムトラベル (バージョン管理)

タイムトラベルとは、Delta Lakeがデータレイクに保存されているデータのバージョン管理を自動的に行うことで、簡単に以前のバージョンのデータにアクセスし、元に戻すことができる機能である。
Deltaトランザクションログ (Delta Log) が、データに加えられたすべての変更に関する詳細を記録し、変更の完全な監査証跡を保持していることで、タイムトラベルを可能にしている。タイムトラベルは、ETLパイプラインやデータ品質の問題のトラブルシュート、または偶発的に発生したデータパイプラインの修正や機械学習時のモデルや実験の再現性を担保するのに用いられている。

*タイムトラベルで過去データを参照する方法についてはこちらの記事をご参照ください。
タイムトラベルで過去データを参照する

CONSTRAINT句のサポート

データの品質と整合性を自動的に検証するために、Delta Lakeは標準SQLのCONSTRAINT句をサポートしている。もし制約に違反した場合は、InvariantViolationExceptionをスローし、新しいデータを追加することはできない。
サポートしているCONSTRAINT句は以下の2種類である。

・NOT NULL制約: 特定の列の値にNULL値が入力されるのを禁止する
・CHECK制約: 条件を指定し、条件を満たさないデータの追加を禁止する

DML (データ操作言語) のフルサポート

多くの分散処理フレームワークでは、データレイクのアトミックなデータ変更操作をサポートしていない。
一方で、Delta LakeはMERGEUPDATEDELETE操作をサポートし、チェンジデータキャプチャ (CDC)、スローチェンジディメンション (SCD)、ストリーミングアップサートを含む複雑なユースケースを可能にしている。

*DML操作についてはこちらの記事をご参照ください。
Delta LakeでDML操作を行う

UPDATE

UPDATEオペレーションを使用することで、フィルタリング条件 (述語) にマッチする行を選択的に更新することができる。具体的には、述語に一致する各ファイルをメモリに読込み、関連する行を更新し、その結果を新しいデータファイルに書き出す。またDelta Lakeはデータスキッピングを用いてこのプロセスを高速化している。

DELETE

DELETEオペレーションを使用することで、フィルタリング条件 (述語) にマッチする行を選択的に削除することができる。 DELETEはUPDATEと同じように動作し、以下の2ステップでデータを削除している。
(1) 最初のスキャンで、述語の条件に一致する行を含むデータファイルを識別して抽出する
(2) 2回目のスキャンで、一致するデータファイルをメモリに読込み、関連する行を削除し、新しいデータファイルに書込む

MERGE

MERGEオペレーションを使用することで、UPDATEとINSERTを組合せた「UPSERT (アップサート) 」を実行することができる。 UPSERTの動作を理解するために、既存テーブル (ターゲットテーブル) と 新しいレコードと既存のレコードの更新が混在しているソーステーブルの例で考える。
(1) ソーステーブルのレコードが、ターゲットテーブルの既存レコードと一致する場合、該当のレコードを更新する
(2) 一致するレコードがない場合、新しいレコードを挿入する

Delta Lakeは2ステップでデータをMERGEしている。
(1) ターゲットテーブルとソーステーブルの間で内部結合 (Inner join) を行い、マッチするファイルをすべて選択する
(2) ターゲットテーブルとソーステーブルの選択されたファイル間で外部結合 (Outer join) を実行し、更新/削除/挿入されたデータを新しいデータファイルに書込む

柔軟なスキーマ管理

Delta Lakeでは、状況に応じで柔軟にスキーマ管理を行うことができる。
スキーマ エンフォースメントを用いることで、ユーザがミスやゴミデータで意図せずテーブルを汚してしまうことを防ぐことができる。一方で意図したスキーマの変更を行いたい場合は、スキーマ エボリューションを用いることで、自動的に追加することができる。

1. スキーマ エンフォースメント

スキーマエンフォースメント (スキーマ強制) とは、スキーマバリデーションとも呼ばれ、既存テーブルのスキーマと一致しないデータの挿入を自動的に防止することで、データの品質を確保することができる。
Delta Lakeは、書込み時にスキーマ検証を行う。つまり、あるテーブルへの新しい書込みはすべて、書込み時にターゲットテーブルのスキーマと互換性があるかどうかチェックされる。スキーマに互換性がない場合は、Delta Lakeはトランザクションを完全にキャンセルし、ユーザに不一致を知らせるために例外を発生させる (データは書き込まれない) 。
スキーマエンフォースメントは、メダリオンアーキテクチャのゴールドテーブルのように強い型付けが必要な場合に使用され、データ品質を担保している。

Delta Lakeは以下のルールに従い、テーブルへの書込みが互換性があるかどうかを判断する。
ターゲットテーブルのスキーマに存在しない追加のカラムを含むことはできない。
ターゲットテーブルのカラムのデータ型と異なるデータ型を含むことはできない。
大文字と小文字が異なるだけの列名を含むことはできない。(e.g. 同じテーブルに 'Foo' と 'foo' のようなカラムを定義することはできない)

2. スキーマ エボリューション

スキーマエボリューション (スキーマ拡張) とは、時間と共に変化するデータに対応するために、ユーザが簡単にテーブルの現在のスキーマを変更できるようにする機能である。一般的に、追加や上書きを行う際に使用され、1つまたは複数の新しいカラムを含めるためにスキーマを自動的に適応させる。

上述したように、デフォルトではDelta Lakeは不正なスキーマを自動的に防止する。
一方で必要に応じて、テーブルスキーマを拡張させる場合には以下の2通りの方法がある。
(1) .write または .writeStreamコマンドに .option('mergeSchema', 'true') を追加する
(2) Sparkの設定に spark.databricks.delta.schema.autoMerge = True を追加する (Sparkセッション全体に対して有効化)

*スキーマ エボリューションの方法についてはこちらの記事をご参照ください。
スキーマを更新する

ストレージレイヤーの最適化

Delta Lakeは、ストレージ内のデータのレイアウトを最適化することで、クエリパフォーマンスを向上させている。例えば、下図のようにデータの大きさに偏りがあったり、サイズの小さいファイルが多く存在する場合は、クエリパフォーマンスが低下する。そのためDelta Lakeには、ストレージレイヤーを最適化するためのさまざまな機能が存在している。

ストレージレイヤー最適化によるクエリパフォーマンスの向上

1. Data Compaction (Bin-packing)

Bin-packing (ビンパッキング) と呼ばれるデータのコンパクト化をすることで、多くの小さなファイルをより少ない大きなファイルにまとめ、ファイル数を削減している。 通常この処理にはコストがかかるため、ピーク時以外に実行するか、メインパイプラインとは別のクラスタで実行して、メインジョブの不要な遅延を回避するのを推奨している。また本処理はべき等であるため、新しいデータが到着していなければ、より頻繁に実行しても追加計算が発生することはない。
OPTIMIZEコマンドを実行することで、データをコンパクト化することができる。

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "events")
# deltaTable = DeltaTable.forPath(spark, "/data/events") # pathを指定

deltaTable.optimize().executeCompaction()
# deltaTable.optimize().where("date='2021-11-18'").executeCompaction() # where句で条件を指定

OPTIMIZE処理は、テーブル、ファイルパス、またはwhere節を指定することでテーブルのより小さなサブセットに対して実行することができる。 デフォルトのターゲット・ファイルサイズは1GBとなっている。optimize.minFileSizeoptimize.maxFileSizeの2つのパラメータにバイト単位で値を指定することにより、デフォルトのファイルサイズを変更することができる。 またOPTIMIZEコマンドを実行するために並列実行されるスレッドの最大数のデフォルト値は、optimize.maxThreadsを調整することによって増やすことができる。

2. Auto-Optimize

Auto-Optimizeは、Deltaテーブルへの書込み時自動的にファイルサイズの最適化を行うオプションである。
Auto-Optimizeは、以下のシナリオで特に推奨されている。
数分オーダーのレイテンシーを許容するストリーミングの場合
MERGE INTOをDelta Lake への書込み方法として使用する場合
CREATE TABLE AS SELECTまたはINSERT INTOを用いる場合

Auto-Optimizeは、最適化された書込みと自動コンパクションという2つの機能から構成されている。

(1) 最適化された書込み
実際のデータに基づいてApache Sparkのパーティションサイズを動的に最適化し、各テーブルパーティションに対して128MBのファイルを書き出す。
*データセットの特性によって多少のサイズ変動が存在する。

(2) 自動コンパクション
データの書込みの後、ファイルをさらにコンパクト化できるかどうかをチェックし、小さなファイルが最も多いパーティションに対して、ファイルをさらにコンパクト化するためにOPTIMIZEジョブを実行する。
*標準のOPTIMIZEで使用する1GBファイルサイズではなく128MBファイルサイズで実行されることに注意する。

・新規テーブルに適用する場合

set spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite = true;
set spark.databricks.delta.properties.defaults.autoOptimize.autoCompact = true;

・既存テーブルに適用する場合

ALTER TABLE [table_name | delta.`<table-path>`] SET TBLPROPERTIES (delta.autoOptimize.optimizeWrite = true, delta.autoOptimize.autoCompact = true)

・Spark Configに設定する場合

spark.databricks.delta.optimizeWrite.enabled
spark.databricks.delta.autoCompact.enabled

3. Data skipping (データスキッピング)

各カラムの最小値と最大値のようなファイルレベルの統計値をデータを書き込む際に自動的に取集することで、不要なファイルI/Oを回避し高速なクエリを提供することができる。Delta Lakeは、ファイルレベルの統計情報をDelta Logに一元化し、クエリ時にフィルタ値が各ファイルの最小値以下または最大値以上の物理ファイルを完全にスキップできるようになった。そのため実際に開いて読込む必要のあるファイルの総数を大幅に削減することができる。
データスキッピング自体はユーザ側で設定する必要はなく、デフォルトで有効になっている。ただし効果はデータのレイアウトに依存するため、Z-Orderを一緒に適用することを推奨している。

文字列やバイナリのような長い値を含む列の統計情報を収集することは、高価な処理となるため避ける必要がある。テーブル プロパティとしてdelta.dataSkippingNumIndexedColsを設定すると、不必要な列の統計情報が収集されるのを防ぐことができる。このプロパティは、テーブルのスキーマにおける列の位置インデックスを示しているため、delta.dataSkippingNumIndexedColsより小さい位置インデックスを持つ列は統計が収集される。つまり長い値を含む列の統計情報の収集を避けるには2種類の方法が使用できる。
*ネストされた列内の各フィールドは、個別の列として見なされる。

  1. テーブルのスキーマで長い値の列がこのインデックスの後になるようにdelta.dataSkippingNumIndexedColsプロパティを設定する
  2. ALTER TABLE ALTER COLUMN を使用して長い値を含む列をdelta.dataSkippingNumIndexedColsプロパティを超えるインデックス位置に移動させる

データスキッピングの仕組み (イメージ)
*参照元: Processing Petabytes of Data in Seconds with Databricks Delta - The Databricks Blog

*データスキッピングに使用される統計情報についてはこちらの記事をご参照ください。
メタデータについて理解する

4. Z-Order

Z-Order (多次元クラスタリング) は、同じファイルセットの中に関連する情報をまとめて配置するファイルレイアウト技術で、データスキッピングと共に使用することで読込みデータ量が大幅に削減される。 Z-Orderに使用するカラムには、WHERE句に頻繁に登場する、またはクエリのJOIN条件で使用される高カーディナリティ (多数の異なる値を持つ) のフィールドを指定する。一般的には、ユーザID、IPアドレス、プロダクトID、電子メールアドレス、電話番号などの識別子フィールドを指定する場合が多い。ファイルレイアウトでは、データがクラスタ化され、最小値と最大値の範囲が狭く、(理想的には)重複していない場合、データスキッピングの効果を最大化することができる。

ZORDER BYにカラムを指定すると、Z-Orderによるファイルレイアウトの最適化が実施される。Z-Orderにはカンマ区切りのリストとして複数のカラムを指定することができるが、カラムが増えるごとに局所性の効果は低下する (カラム数は3~4個が適当)。データスキッピングには列の統計情報が必要なため、統計が収集されていない列に対するZ-Orderは効果がない。

OPTIMIZE events
WHERE date >= current_timestamp() - INTERVAL 1 day
ZORDER BY (eventType)

Z-Orderにかかる時間は、複数回実行しても減少することは保証されない。一方で前回の実行以降に新規データが追加されていなければ、ノーオペレーションとなる。 Z-Orderは、タプル数に関して均等にバランスの取れたデータファイルを作成することを目的としているが、ディスク上のデータサイズに関しては必ずしも均等にはならない。

Z-Orderを適用したファイルレイアウト (イメージ)

Z-Orderによるクエリの高速化を理解するために、各パーツファイルを読込んで、X = 2 またはY = 3を検索する場合を考える。その結果、Z-Orderによるファイルレイアウトの方が効率的にデータの読込みを行なっていることがわかる。

Z-Orderによるクエリの高速化の仕組み (イメージ)
*参照元: Lakehouse with Delta Lake Deep Dive

5. Delta Cache (デルタキャッシュ)

クラウドストレージのデータコピーをワーカーノードのSSDディスク上のローカルに作成することで、データの読込みを高速化する。 データはリモートロケーションからファイルを取得する必要があるたびに、自動的にキャッシュされる。Parquet / Delta形式がサポートされている。同じデータを連続して読込む場合はローカルで実行されるため、読込み速度が大幅に向上する。

ディスクキャッシュには、リモートデータのローカルコピーが含まれ、さまざまなクエリのパフォーマンスを向上させることができる。一方で任意のサブクエリの結果を保存することはできない。そのため任意のサブクエリデータの結果や、Parquet以外の形式 (CSV、JSON、ORCなど) で保存されたデータを保存する場合は、Sparkキャッシュを用いる。またSparkキャッシュとは異なり、ディスクキャッシュはシステムメモリを使用しない。

ディスクキャッシュを使用する場合は、クラスタを構成するときにSSDボリュームを持つワーカータイプを選択する必要がある。
ディスクキャッシュ (Delta Cache) を有効化/無効化するには、以下を実行する。

spark.conf.set("spark.databricks.io.cache.enabled", "[true | false]")

ディスク使用量は、以下のプロパティを設定することで変更できる。
spark.databricks.io.cache.maxDiskUsage: キャッシュデータ用に確保されたノードごとのディスク容量 (bytes)
spark.databricks.io.cache.maxMetaDataCache: キャッシュされたメタデータのために確保されたノードごとのディスク容量 (bytes)
spark.databricks.io.cache.compression.enabled: キャッシュされたデータを圧縮形式で保存するかどうか

設定例

spark.databricks.io.cache.maxDiskUsage 50g
spark.databricks.io.cache.maxMetaDataCache 1g
spark.databricks.io.cache.compression.enabled false

Delta Lake を実現する仕組み

アトミック操作を実現する仕組み

Delta Lakeがアトミック操作を実現するための仕組みをこちらにまとめました。
記事が3万字を超えたので、別記事に切り分けましたが、
Delta Lakeを理解する上で重要なパートなので、ぜひ読んでみてください。

ktksq.hatenablog.com

ACIDトランザクションを実現する仕組み

Delta Lakeは楽観的同時実行制御 (Optimistic Concurrency Control) と多版同時実行制御 (MVCC: MultiVersion Concurrency Control) を組合わせてACIDトランザクションを実現している。

楽観的同時実行制御

楽観的同時実行制御とは、データに対してのロックは行わずに,更新対象のデータが他のトランザクションとコンフリクトがなかったことを確認してからコミットを行う方法である。具体的には、トランザクションがコミットされる時点で、対象のコミット操作の結果が変更されるようなデータへの変更が行われていないことを確認することで、データの整合性を保証している。

多版同時実行制御

多版同時実行制御(MVCC: MultiVersion Concurrency Control)によって、同時実行される2つのトランザクションのうち,先発のトランザクションがデータを更新し,コミットする前に,後発のトランザクションが同じデータを参照すると,更新前の値を返すことを可能にしている。そのため、複数のユーザから同時にデータベースの更新要求を受け取った場合でも、同時並行性と一貫性の両方を保証できる。多版同時実行制御では、更新中のデータに対する参照要求に対してトランザクション開始時点のデータのコピー (スナップショット) を提供する。また参照中のデータに対する更新要求に対しても、トランザクション開始前のデータのコピーを提供する。これにより、読込みのロックと書込みのロックがコンフリクトすることがなくなり、トランザクションの同時並行性が向上する。

コンフリクトが発生した場合の対応

2つ以上のコミットが同時に行われた場合は、以下の相互排他のルールに従って解決する。このプロトコルにより、Delta LakeはACIDの原則である分離を実現し、複数の同時書込み後のテーブルの状態が、各書込みが互いに分離して連続的に行われた場合と同じ結果となることを保証する。つまりトランザクション分離レベルがSERIALIZABLE (直列化可能) であることを保証している。
*デフォルトの分離レベルはWriteSerializable

(1) テーブルの読込み:
テーブルの最新バージョンを参照し、変更すべきファイルを特定
(2) 書込み:
新しいデータファイルを書込むことによって、すべての変更を段階的に実施
(3) 検証→コミット:
変更に対して同時にコミットされた他の変更とのコンフリクトの有無を確認
コンフリクトがない場合→すべての段階的な変更がコミットされ、書込み操作は成功
コンフリクトがある場合→同時変更例外が発生して書込み操作が失敗

コンフリクト時の挙動を理解するために、2人のユーザによって同時にコミットが行われた場合を考える。

Delta Lakeは、変更を加える前に読み込んだテーブルの開始バージョン (バージョン0) を記録する。 ユーザ1と2が同時にテーブルにデータを追加する場合、どちらか1人のユーザだけが000001.jsonのコミットを成功させることができる。ここではユーザ1のコミットが通り、ユーザ2のコミットは拒否されている。
ユーザー2のコミットが拒否された後 (エラーは発生しない) 、コミットしようとしている操作の結果を変更するようなデータへの変更が行われたかどうかを確認する。コンフリクトしない場合は、新しく更新されたテーブルに対してユーザ2のコミットを再試行し000002.jsonとして正常にコミットする。

楽観的にコンフリクトを解決する

まとめ

今回はDelta Lakeについてまとめた。

また今回記事を作成するにあたり、参考にしたテキストは無料でダウンロードできますので、ぜひダウンロードしてみてください。

www.databricks.com

参考

Cheatsheet: Delta Lake での操作

本記事のカバー範囲 (*意気込み)

はじめに

今回はDelta Lakeでの基本的な操作についてチートシートとしてまとめる。
*このドキュメントは、随時更新していく予定です。

環境情報

・Databricks Runtime Version: 10.4 LTS (includes Apache Spark 3.2.1, Scala 2.12)

前提: Delta Lake とは何か

そもそも「Delta Lakeとは何か」についてはこちらをご参照ください。

ktksq.hatenablog.com

Delta Logについてはこちらをご参照ください。

ktksq.hatenablog.com

既存テーブルをDeltaテーブルに変更する

*既存テーブルがParquetテーブルの場合のみ実行可能。

CONVERT TO DELTA <database_name.table_name>;

テーブルに書き込む

*format=deltaはデフォルト設定

# パスを指定しないで書込む場合: マネージドテーブル
df.write.format("delta").mode("append").saveAsTable("<database_name.table_name>") 

# パスを指定して書込む場合: アンマネージドテーブル
df.write.format("delta").mode("append").save("<path>")

1. パスを指定しないでテーブルに書込む場合:

データファイルとメタデータは/user/hive/warehouse配下に保存される (Hiveメタストアの場合)。
パスを指定しなかった場合マネージド テーブルと呼ばれ、DROP TABLEをした場合には、データファイルを含む全てのファイルが削除される。

例: マネージド テーブルの挙動 (パスを指定しなかった場合)

df_csv.write.format("delta").mode("overwrite").saveAsTable("managed_table")
DESCRIBE EXTENDED managed_table;

マネージドテーブルの格納先

DROP TABLE managed_table;

全てのファイルが削除されていることを確認

2. パスを指定してテーブルに書込む場合:

データファイルとメタデータは指定したパスの配下に保存される。
パスを指定した場合アンマネージド テーブルと呼ばれる。マネージド テーブルとは異なり、アンマネージド テーブルのファイルはテーブルをDROPしても削除されない
アンマネージド テーブルは、メタストア内のテーブルが既存データのスキーマ、パーティショニング、テーブルのプロパティを自動的に継承する機能を利用して、データをメタストアに「インポート」する。

例: アンマネージド テーブルの挙動 (パスを指定した場合)

raw_data_path = "dbfs:/user/ktksq/unmanaged/"
df_parquet.write.format("delta").mode("overwrite").option("path", raw_data_path).saveAsTable("unmanaged_table")
DESCRIBE EXTENDED unmanaged_table;

アンマネージドテーブルの格納先

DROP TABLE unmanaged_table;

全てのファイルが削除されないことを確認

3. マネージドテーブルとアンマネージドテーブルの違い:

マネージド テーブルはメタストアが管理する (パスの指定不可) のに対して、アンマネージド テーブルはメタストアで管理されない (パスの指定必須) 。
そのため両者のテーブルではDROP TABLE時の挙動が異なる。

  • マネージド テーブル: DROP時は全てのファイルが削除される 
  • アンマネージド テーブル: DROP時にファイルは削除されない

テーブルを読み込む*

df = spark.table("<database_name.table_name>")    

# df = spark.read.format("delta").load("<path>")  # パスを指定して読込む場合

値を削除(DELETE)する*

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, "<path>")
deltaTable.delete(f.col("<col_name>") < "<value>")

*DELETEは、最新バージョンのDeltaテーブルからデータを削除するが、古いバージョンが明示的にバキュームされるまで物理ストレージから削除されるわけではないことに注意する。

値を更新(UPDATE)する*

deltaTable = DeltaTable.forPath(spark, "<path>")
deltaTable.update(
  condition = f.col("gender") == "M",
  set = { "gender": lit("Male") }
)

*DELETEとUPDATEは、述語にパーティション・カラムを指定すると大幅に高速化できる。

値をマージ(MERGE)する*

(
  deltaTable.alias("t")
  .merge(
    sourceDF.alias("s"),
    "t.key = s.key")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .execute()
)

具体例

from delta.tables import *

deltaTablePeople = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
deltaTablePeopleUpdates = DeltaTable.forPath(spark, "/tmp/delta/people-10m-updates")

dfUpdates = deltaTablePeopleUpdates.toDF()

(
    deltaTablePeople.alias("people")
    .merge(dfUpdates.alias("updates"), "people.id = updates.id")
    .whenMatchedUpdate(
        set={
            "id": "updates.id",
            "firstName": "updates.firstName",
            "middleName": "updates.middleName",
            "lastName": "updates.lastName",
            "gender": "updates.gender",
            "birthDate": "updates.birthDate",
            "ssn": "updates.ssn",
            "salary": "updates.salary",
        }
    )
    .whenNotMatchedInsert(
        values={
            "id": "updates.id",
            "firstName": "updates.firstName",
            "middleName": "updates.middleName",
            "lastName": "updates.lastName",
            "gender": "updates.gender",
            "birthDate": "updates.birthDate",
            "ssn": "updates.ssn",
            "salary": "updates.salary",
        }
    )
    .execute()
)

変更履歴を表示する*

テーブルへの更新ごとに、操作やユーザーなどの実績情報を表示する。テーブルの履歴はデフォルトで30日間保持される。

DESCRIBE HISTORY table_name

テーブルのスキーマを更新する*

デフォルトではテーブルのデータを上書きしてもスキーマは上書きされないため、スキーマを更新したい場合はoverwriteSchemaをtrueにする。

df.write.option("overwriteSchema", "true")

テーブルをコピー (クローン) する*

Cloneコマンドを使用して、既存のDeltaテーブルのコピーを特定のバージョンで作成することができる。
Deep CloneまたはShallow Cloneのいずれに加えられた変更も、クローン自体にのみ影響し、ソーステーブルには影響しない。 そのため、PROD環境からSTAGING環境へデータを取得する場合や、規制上の理由から特定のバージョンをアーカイブする場合によく用いられる。
*クローンされたテーブルは、クローン元テーブルとは独立した更新履歴を持つため、ソーステーブルと同じタイムトラベルはできない。

テーブルのクローンには以下の2種類の方法が存在し、データファイルのコピーを含めるかが挙動の違いになっている:

1. Deep Clone:*

既存のテーブルのメタデータに加えて、データファイルもクローンターゲットにコピーする。CTASコマンド (CREATE TABLE... AS... SELECT...) によるコピーと機能的に近いが、CTASのようにパーティションや制約などの情報を再指定する必要がないため、よりシンプルに指定できる。
*VERSION AS OFを指定しない場合は最新断面がクローンされる。

CREATE TABLE IF NOT EXISTS <user_data_bronze_clone_deep>
  DEEP CLONE <user_data_bronze>
  VERSION AS OF <version_number>;

クローン先のディレクトリには_delta_logディレクトリとデータファイルが存在することを確認する。

Deep Cloneの構造

2. Shallow Clone:*

Shallow Clone (Zero Copy Clone) は、クローンされるテーブルのメタデータのみをコピーし、テーブル自体のデータファイルはコピーしない。データのソースはクローン元のソースに依存する。つまりデータの物理的なコピーが作成されないため、ストレージのコストは最小限に抑えられる。
Shallow Cloneはクローン元テーブルへのポインタとなっているため、VACUUMを実行して元となるファイルが削除されるとShallow Cloneが壊れる可能性がある。そのためShallow Cloneは通常テストや実験などの短時間のユースケースに使用される。
*VERSION AS OFを指定しない場合は最新断面がクローンされる。

CREATE TABLE IF NOT EXISTS <user_data_bronze_clone>
  SHALLOW CLONE <user_data_bronze>
  VERSION AS OF <version_number>;

クローン先のディレクトリには_delta_logディレクトリのみ存在することを確認する。

Shallow Cloneの構造

Shallow Cloneしたテーブルに変更を加えた場合は、_delta_logディレクトリと同じ階層にデータファイルが格納される。

タイムトラベルで過去データを参照する*

1. タイムトラベルの条件

タイムトラベルによって以前のバージョンのデータにアクセスするには、該当のバージョンのデルタログとデータファイルの両方を保持する必要がある。

Deltaテーブルを構成するデータファイルは、自動的に削除されることはない。データファイルは、VACUUMを実行した場合にのみ削除される。
ただしVACUUMを実行しても、デルタログファイルは削除されない。デルタログファイルは、チェックポイントが書き込まれた後に自動的にクリーンアップされる。

デフォルトでは、30日前までのDeltaテーブルにタイムトラベルすることができる。
ただし以下の場合は、30日より以前でもタイムトラベルできない場合があるので注意する。

  • DeltaテーブルでVACUUMを実行する。

  • テーブルプロパティを用いて、データファイルまたはデルタログファイルの保持期間を変更する。

(1)デルタログファイルの保持期間を変更
チェックポイントが書き込まれるたびに、Databricksは保持間隔より古いログエントリを自動的にクリーンアップする。そのため、設定値を十分に大きくした場合には多くのログエントリが保持される。

delta.logRetentionDuration = "interval <interval>": 

*デフォルト設定は30日

(2)データファイルの保持期間を変更(VACUUM対象ファイルの期間指定)
VACUUMコマンドの対象ファイルとなる期間を指定する。
VACUUMを実行しても30日分のタイムトラベルを可能にするには、30日に設定を変更する必要がある。
*ストレージコストがかかるため、コストを考慮しながら設定値を決める。

delta.deletedFileRetentionDuration = "interval <interval>":

*デフォルト設定は7日

2. タイムトラベルの実施

# タイムスタンプで指定
df1 = spark.read.format("delta").option("timestampAsOf", <timestamp_string>).load("<path>")

# バージョンで指定
df2 = spark.read.format("delta").option("versionAsOf", <version>).load("<path>")

@シンタックスで指定する場合:

# タイムスタンプで指定
spark.read.format("delta").load("/tmp/delta/people10m@20190101000000000") 

# バージョンで指定
spark.read.format("delta").load("/tmp/delta/people10m@v123")

ロールバックを行う*

-- タイムスタンプで指定
RESTORE TABLE employee TO TIMESTAMP AS OF "2022-08-02 00:00:00";

-- バージョンで指定
RESTORE TABLE employee TO VERSION AS OF 1;

まとめ

今回はDelta Lakeでの基本的な操作についてチートシートとしてまとめた。
本ドキュメントは今後随時拡充していきます。

参考

docs.databricks.com

docs.delta.io

Deep Dive: Delta Log について理解する

はじめに

Deltaは_delta_log ディレクトリとデータファイルであるParquetファイルのみから成り立っている。 _delta_log ディレクトリに格納されているファイルをDelta Logと呼び、この機能がDelta LakeでのACID トランザクション、スケーラブルなメタデータ処理、タイムトラベルなどを可能にしている。
そこで今回はDeltaLog (デルタログ)についてまとめる。

環境情報

・Databricks Runtime Version: 10.4 LTS (includes Apache Spark 3.2.1, Scala 2.12)

前提: Delta Lakeとは何か

そもそも「Delta Lakeとは何か」についてはこちらをご参照ください。

ktksq.hatenablog.com

実際のDelta Lakeの操作についてはこちらをご参照ください。

ktksq.hatenablog.com

Delta Log とは何か

Delta トランザクションログ (Delta Log) とは、ユーザーがテーブルに加えたすべての変更を順序付きで自動で記録したログであり、Single Source of Truthのソースとして機能する。イメージとしてはGitで管理されたソースコードリポジトリの.gitディレクトリに近い。トランザクションログ (Delta Log) が存在することで、Delta でのACID トランザクション、スケーラブルなメタデータ処理、タイムトラベルなどを可能にしている。

Delta Logの実体は、_delta_logディレクトリに格納されているファイルであり、以下で説明するファイルを総称してDelta トランザクションログまたは、Delta Logと呼んでいる。Delta Logには、ログを読み取るために任意のシステムで使用できるオープンプロトコル (Delta Transaction Protocol) が備わっている。

_delta_logディレクトリの構成

_delta_logディレクトリは以下の4種類のファイルから成り立っている。JSONとCRCファイルはテーブルへの変更操作ごとに作成され、1:1に対応している。Checkpoint ファイルは、10回のアトミックコミットごとに自動的に作成される。またLast Checkpoint ファイルには、直近のチェックポイントが格納されている。

_delta_log_ディレクトリの構成

1. JSONトランザクションログ:

Delta テーブルが作成されると、JSONトランザクションログは_delta_log サブディレクトリに自動作成される。そのテーブルに変更を加えると、それらの変更はJSONトランザクションログにアトミックコミットとして記録される。

2. Checkpoint ファイル:

Checkpoint ファイルには、そのバージョンまでのすべてのアクションの完全な再生が含まれ、無効なアクションは削除される。Checkpoint ファイルが存在することで、スナップショットを再構築する際に、ある時点までのログを読む手間を省いたり、過去のJSONトランザクションログの一定期間後の削除を可能にしている。

3. Last Checkpoint ファイル:

Last Checkpoint ファイルには、直近のチェックポイントが格納されており、直近のログへのポインタを提供することで、テーブルの最新スナップショットを作成するコストの削減を可能にしている。

4. CRC (Cyclic Redundancy Check) ファイル:

テーブルのバージョンの主要な統計情報が含まれており、データの整合性を検証するために使用される。

JSONトランザクションログ

Delta テーブルが作成されると、そのテーブルのJSONトランザクションログは_delta_log サブディレクトリに自動作成される。そのテーブルに変更を加えると、それらの変更はJSONトランザクションログにアトミックコミットとして記録される。

*アトミックコミットとは: 一連の個別の変更が単一の処理として実行される処理のこと。アトミックコミットが成功した場合は、すべての変更が適用される。一方でアトミックコミットを完了する前に障害が発生した場合は、すべての変更が取り消される。

*アトミックコミットとして記録される操作についてはこちらをご参照ください。
テーブルへの変更操作一覧

各コミットは00000000000000000000.json (ゼロパディングした20桁の数字) で始まるJSONファイルの中に書き出される。またユーザーがテーブルを変更する操作 (e.g. INSERT、UPDATE、DELETEなど) を行うたびに、アトミックコミット単位で、JSONファイルが生成される。
JSONファイルの数字が大きほど、最新バージョンに近いことを示しており、トランザクションログとテーブルのバージョンは1対1で対応している。

具体例をあげて説明していく。
まず1.parquet と 2.parquet から Delta テーブルを作成する。その後、追加したファイルを削除し、代わりにINSERT操作で新しいファイル (3.parquet) を追加する操作を行なった場合にどのようなJSONトランザクションログが生成されるか確認する。
この場合、テーブルを作成した際に、自動でJSONトランザクションログに000000...00.jsonが追加される。そして次の操作 (DELETE) で、00000...01.jsonが追加され、最後にINSERT操作で00000..02.jsonが追加される。

JSONトランザクションログの記録

JSONトランザクションログのスキーマ

JSONトランザクションログのデータスキーマについて確認する。以下はテーブル作成操作時(000000...00.json)のデータスキーマを表しているが、実施した操作によってJSONのスキーマは変化する。例えばコメント更新のみ行なった場合はcommitInfoとmetaDataのみ格納される。

root
 |-- add: struct (nullable = true)
 |    |-- dataChange: boolean (nullable = true)
 |    |-- modificationTime: long (nullable = true)
 |    |-- path: string (nullable = true)
 |    |-- size: long (nullable = true)
 |    |-- stats: string (nullable = true)
 |    |-- tags: struct (nullable = true)
 |    |    |-- INSERTION_TIME: string (nullable = true)
 |    |    |-- OPTIMIZE_TARGET_SIZE: string (nullable = true)
 |-- commitInfo: struct (nullable = true)
 |    |-- clusterId: string (nullable = true)
 |    |-- engineInfo: string (nullable = true)
 |    |-- isBlindAppend: boolean (nullable = true)
 |    |-- isolationLevel: string (nullable = true)
 |    |-- notebook: struct (nullable = true)
 |    |    |-- notebookId: string (nullable = true)
 |    |-- operation: string (nullable = true)
 |    |-- operationMetrics: struct (nullable = true)
 |    |    |-- numFiles: string (nullable = true)
 |    |    |-- numOutputBytes: string (nullable = true)
 |    |    |-- numOutputRows: string (nullable = true)
 |    |-- operationParameters: struct (nullable = true)
 |    |    |-- description: string (nullable = true)
 |    |    |-- isManaged: string (nullable = true)
 |    |    |-- partitionBy: string (nullable = true)
 |    |    |-- properties: string (nullable = true)
 |    |-- timestamp: long (nullable = true)
 |    |-- txnId: string (nullable = true)
 |    |-- userId: string (nullable = true)
 |    |-- userName: string (nullable = true)
 |-- metaData: struct (nullable = true)
 |    |-- configuration: struct (nullable = true)
 |    |    |-- delta.autoOptimize.autoCompact: string (nullable = true)
 |    |    |-- delta.autoOptimize.optimizeWrite: string (nullable = true)
 |    |-- createdTime: long (nullable = true)
 |    |-- format: struct (nullable = true)
 |    |    |-- provider: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- partitionColumns: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- schemaString: string (nullable = true)
 |-- protocol: struct (nullable = true)
 |    |-- minReaderVersion: long (nullable = true)
 |    |-- minWriterVersion: long (nullable = true)

アトミックコミットを構成するアクション

アトミックコミット (JSONトランザクションログ) を構成するアクションについて理解するため、000000...00.json (テーブル作成操作時のJSONトランザクションログ) に格納されているデータを確認する。

JSONトランザクションログ自体は アトミックコミット単位 = テーブルの変更操作 で記録されている。
一方でユーザーがテーブルの変更操作 (e.g. INSERT、UPDATE、DELETEなど) を行うたびに、Delta Lakeはその操作を以下の1つまたは複数のアクションからなる独立したステップに分割している。JSONトランザクションログの各行はアトミックコミットを構成するアクションである。各アクションは、順序化されながら、アトミックコミットの構成要素としてJSONトランザクションログに記録される。
今回の例ではCREATE TABLEの操作を行い、赤枠で囲われているようなJSONトランザクションログの各行がアクションを示している。

JSONトランザクションログに格納されているデータ

metaData: メタデータの更新

テーブルのメタデータを更新する。(e.g. テーブルの名前、スキーマ、パーティショニングの変更など)

metaDataカラムのスキーマ

Field Name 内容
id テーブルの一意な識別子
name テーブルのユーザー定義による識別子
description テーブルのユーザー定義による説明
format テーブルに格納されるファイルのエンコーディング仕様
schemaString テーブルのスキーマ
partitionColumns パーティションに用いるカラム (配列)
createdTime メタデータアクションが作成された時間
configuration メタデータアクションのコンフィグ (Map Type)

metaDataカラムに格納されているデータ例

df = ( spark.read.json( './_delta_log/00000000000000000000.json' )
      .where(f.col('metaData').isNotNull())
      .select('metaData')
     )
display(df)
{
   "configuration":{
      "delta.autoOptimize.autoCompact":"true",
      "delta.autoOptimize.optimizeWrite":"true"
   },
   "createdTime":1662199084374,
   "format":{
      "provider":"parquet"
   },
   "id":"6beacf7b-dea1-4ecd-bd6e-6a4dfc697d6b",
   "partitionColumns":[
      
   ],
   "schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"loan_amnt\",\"type\":\"float\",\"nullable\":true,\"metadata\":{}},{\"name\":\"funded_amnt\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"term\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"int_rate\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"addr_state\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}"
}

add: ファイルの追加

JSONトランザクションログにデータファイルを追加する。

addカラムのスキーマ

Field Name 内容
path テーブルのルートからのファイルへの相対パス、またはテーブルに追加されるべきファイルへの絶対パス
partitionValues パーテションカラムと値のマップ (e.g. {"date":"2017-12-10"})
size ファイルのサイズ (Byte)
modificationTime ファイルが作成された時間 (エポックタイムからのミリ秒単位)
dataChange falseの場合は、ファイルはすでにテーブルに存在するか、追加されたファイルのレコードが同じバージョンの1 つ以上のremoveアクションに含まれる必要がある
stats ファイルのデータに関する統計情報
(e.g. カウント、カラムの最小値/最大値など)
tags ファイルに関するメタデータ (Map Type)

addカラムに格納されているデータ例

{
   "dataChange":true,
   "modificationTime":1662199086000,
   "path":"part-00000-764ea26b-847f-4d8e-ba33-b50410648f11-c000.snappy.parquet",
   "size":892610,
   "stats":"{\"numRecords\":148014,\"minValues\":{\"id\":0,\"loan_amnt\":1000.0,\"funded_amnt\":1000,\"term\":\" 36 months\",\"int_rate\":\" 5.32%\",\"addr_state\":\"AK\"},\"maxValues\":{\"id\":25769817599,\"loan_amnt\":40000.0,\"funded_amnt\":40000,\"term\":\" 60 months\",\"int_rate\":\" 30.99%\",\"addr_state\":\"WY\"},\"nullCount\":{\"id\":0,\"loan_amnt\":1,\"funded_amnt\":1,\"term\":1,\"int_rate\":1,\"addr_state\":1}}",
   "tags":{
      "INSERTION_TIME":"1662199086000000",
      "OPTIMIZE_TARGET_SIZE":"268435456"
   }
}

remove: ファイルの削除

JSONトランザクションログからデータファイルを削除する。

removeカラムのスキーマ

Field Name 内容
path テーブルのルートからのファイルへの相対パス、またはテーブルに追加されるべきファイルへの絶対パス
deletionTimestamp ファイルが作成された時間 (エポックタイムからのミリ秒単位)
dataChange falseの場合は、削除されたファイルのレコードが同じバージョンの1 つ以上のaddアクションに含まれる必要がある
extendedFileMetadata trueの場合は、partitionValues、size、tags フィールドが存在する
partitionValues パーテションカラムと値のマップ (e.g. {"date":"2017-12-10"})
size ファイルのサイズ (Byte)
tags ファイルに関するメタデータ (Map Type)

txn: トランザクションの設定

構造化ストリーミング・ジョブが指定されたIDでマイクロバッチをコミットしたことを記録する。
アプリケーション固有のバージョンを使って進捗を追跡するインクリメンタル処理システム (e.g. ストリーミングシステムなど) では、書き込み時の失敗や再試行に直面してデータの重複を避けるために、進捗状況を記録する必要がある。トランザクション識別子を使うことで、この情報をデルタテーブルのトランザクションログに、テーブルの内容を変更する他のアクションと一緒にアトミックに記録することができる。

txnカラムのスキーマ

Field Name 内容
appId トランザクションを実行するアプリケーションの一意な識別子
version トランザクションのアプリケーション固有の数値識別子
lastUpdated トランザクションアクションが作成された時間 (エポックタイムからのミリ秒単位)

txnカラムに格納されているデータ例

{
  "txn": {
    "appId":"3ba13872-2d47-4e17-86a0-21afd2a22395",
    "version":364475
  }
}

protocol: プロトコルの変更

Delta Logを最新のソフトウェアプロトコルに切り替えることにより、新機能を有効にする。

*プロトコルのバージョニングについてはこちらの記事をご参照ください。
プロトコルのバージョニング

protocolカラムのスキーマ

Field Name 内容
minReaderVersion テーブルを正しく読み取るために、クライアントが実装しなければならないDeltaリードプロトコルの最小バージョン
minWriterVersion テーブルを正しく書き込むために、クライアントが実装しなければならないデルタ書き込みプロトコルの最小バージョン

protocolカラムに格納されているデータ例

{
   "minReaderVersion":1,
   "minWriterVersion":2
}

commitInfo: コミット情報

コミットに関する情報を追加する。(e.g. どの操作が、誰によって、何時に行われたか)

commitInfoカラムのスキーマ

Field Name 内容
operation 実行した操作の種類 (e.g. CREATE TABLE AS SELECT)
operationMetrics ファイル数 (numFiles)、出力された行数 (numOutputRows)、出力バイト数 (numOutputBytes)を記録
operationParameters ファイルの観点から、この操作がテーブル内のデータを追加するか上書きするかを指定
readVersion トランザクションコミットに関連するテーブルのバージョン
clusterID, notebook コミットを実行したDatabricksクラスターとノートブック
userID, userName 操作を実行したユーザーのIDおよび名前

commitInfoカラムに格納されているデータ例

{
   "clusterId":"0730-173143-ores78",
   "engineInfo":"Databricks-Runtime/10.4.x-cpu-ml-scala2.12",
   "isBlindAppend":true,
   "isolationLevel":"WriteSerializable",
   "notebook":{
      "notebookId":"4494197656234952"
   },
   "operation":"CREATE TABLE AS SELECT",
   "operationMetrics":{
      "numFiles":"1",
      "numOutputBytes":"892610",
      "numOutputRows":"148014"
   },
   "operationParameters":{
      "description":null,
      "isManaged":"true",
      "partitionBy":"[]",
      "properties":"{\"delta.autoOptimize.autoCompact\":\"true\",\"delta.autoOptimize.optimizeWrite\":\"true\"}"
   },
   "timestamp":1662199085825,
   "txnId":"5b12b522-1070-4139-b7b2-714dc1e6d701",
   "userId":"2497390366172842",
   "userName":"<username.gmail.com>"
}

Checkpoint ファイル

JSONトランザクションログに合計10回コミットすると、Delta Lakeは_delta_logサブディレクトリにCheckpoint ファイル (チェックポイントファイル) をParquetフォーマットで自動的に保存する。
*10回はデフォルトの回数のため変更可能。

Checkpoint ファイルには、無効なアクションについては削除された上で、ある時点のバージョンまでのすべてのアクションの完全な再生が含まれる。無効なアクションとは、リコンシリエーションルールに従って、 後続のアクションによって取り消されたアクションを指す。 (e.g. 追加したファイルを削除するなど)
Checkpoint ファイルによって、ある時点におけるテーブルの状態全体をSparkが迅速かつ容易に読み込めるようになるため、Sparkは何千もの小さくて非効率なJSONファイルの再処理を回避することができる。つまりSparkはlistFromオペレーションでDelta Log内のすべてのファイルを表示し、最新のCheckpoint ファイルを参照することで、最新のCheckpoint ファイルが保存されてから行われたJSONコミットのみを処理している。

具体例をあげて説明する。
0000013.jsonまでのJSONコミットが存在するDelta テーブルをSparkで読み込む場合を考える。Sparkは中間JSONファイルをすべて処理するのではなく、直近のチェックポイントファイルまでスキップする。そのためSparkは0000011.json、0000012.json、0000013.jsonの増分処理を行うだけで、テーブルの現在の状態を取得することができる。そしてその後Sparkはテーブルのバージョン13をメモリにキャッシュする。このワークフローに従うことで、Delta LakeはSparkを使って効率的にテーブルの状態を常に更新し続けることを可能にしている。

Checkpoint ファイルの役割

Last Checkpoint ファイル

Delta Logには、多くのJSONトランザクションログ (e.g. 10,000+) が含まれるケースも多い。そのような大きなディレクトリをリストアップするのには多くのコストがかかる。Last Checkpoint ファイルは直近のCheckpoint ファイルのポインタを提供することで、テーブルの最新スナップショットを作成するコストを削減することできる。

Last Checkpoint ファイルに格納されているデータ例

%fs
head './_delta_log/_last_checkpoint'

直近のCheckpoint ファイルはバージョン10であることを表している。

{"version":10,"size":12}

CRC (Cyclic Redundancy Check) ファイル

テーブルのバージョンの主要な統計情報が含まれており、データの整合性を検証するために使用される。

CRC (Cyclic Redundancy Check) ファイルのスキーマ

CRC (Cyclic Redundancy Check) ファイルのデータスキーマについて確認する。

root
 |-- histogramOpt: struct (nullable = true)
 |    |-- fileCounts: array (nullable = true)
 |    |    |-- element: long (containsNull = true)
 |    |-- sortedBinBoundaries: array (nullable = true)
 |    |    |-- element: long (containsNull = true)
 |    |-- totalBytes: array (nullable = true)
 |    |    |-- element: long (containsNull = true)
 |-- metadata: struct (nullable = true)
 |    |-- configuration: struct (nullable = true)
 |    |    |-- delta.autoOptimize.autoCompact: string (nullable = true)
 |    |    |-- delta.autoOptimize.optimizeWrite: string (nullable = true)
 |    |-- createdTime: long (nullable = true)
 |    |-- format: struct (nullable = true)
 |    |    |-- provider: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- partitionColumns: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- schemaString: string (nullable = true)
 |-- numFiles: long (nullable = true)
 |-- numMetadata: long (nullable = true)
 |-- numProtocol: long (nullable = true)
 |-- protocol: struct (nullable = true)
 |    |-- minReaderVersion: long (nullable = true)
 |    |-- minWriterVersion: long (nullable = true)
 |-- tableSizeBytes: long (nullable = true)

まとめ

今回はDelta Lakeを理解する上で重要な機能であるDelta Logについてまとめた。

参考

データをマスキングする - Databricks

はじめに

「データは新しい石油である」という言葉が示すように、今日、データはますます多くのビジネスの原動力となっている。パーソナライズされた顧客体験、自動化されたマーケティングメッセージ、分析に基づくインサイトなど、すべてはデータの質と量が大きく影響している。そのため、多くの企業でデータの収集や活用を進めている一方で、立法機関は、GDPRやCCPAなど個人のプライバシーとセキュリティを守るための整備を進めている。

セキュリティやコンプライアンス、データ・プライバシーに関する規制が強化される中、データの利活用を進めるためにはデータマスキングは必要不可欠な技術である。 そこで今回はデータマスキングについてまとめ、Databricksを使ってどう実現するかについて説明する。

環境情報
・Databricks Runtime Version: 10.4 LTS (includes Apache Spark 3.2.1, Scala 2.12)
・Access mode: Shared

*動的ビューによるアクセス制御はSingle User access modeでは使用できない。

データマスキングとは

データマスキングとは、機密情報を保護するためにデータを匿名化する技術である。より具体的には、機密情報を体系的に元の構造に似た架空の値に変換するプロセスである。 ほとんどの企業では、個人情報などの機密情報は、厳格なセキュリティ管理によって保護されている。一方でデータ活用を進めるためには、機密情報を保護しつつ、データへのアクセスを可能にする必要がある。 そこでデータマスキングを行うことで、匿名化したデータを利用し、実際の機密情報をアクセスすることなくデータ活用を可能にしている。

データマスキングが必要なデータ

データマスキングが必要な代表的なデータには以下の4種類がある。

PII: Personal Identifiable Information

ある特定の個人を識別することができるデータ。具体的には、氏名や性別、住所、電話番号、電子メールアドレス、勤務先、生年月日、顔写真、日本のマイナンバー(個人番号)など行政が個人に割り当てた識別番号などが含まれる。

PHI: Protected Health Information

HIPAA(Health Insurance Portability and Accountability Act)法で、プライバシー保護・セキュリティ確保について定められた個人を特定できる保健データ。具体的には、個人の健康状況や保険情報、検査結果、病歴などが含まれる。

PCI-DSS: Payment Card Industry Data Security Standard

クレジットカード会員の情報を保護することを目的に定められた、クレジットカード業界の情報セキュリティ基準のこと。具体的にはプライマリアカウント番号(PAN)やカード会員名、有効期限、サービスコードなどが含まれる。

IP: Intellectual Property

人間の知的活動によって創作された表現や、商業上有用になりうる情報や標識など、財産性のある無体物のこと。

データマスキングの種類

代表的なデータマスキング方法には以下の3種類がある。

静的データマスキング (Static Data Masking)

静的データマスキングとは、静止状態の機密情報をマスキングし、データセットの複製を作成する方法である。マスキングされたデータは、マスキング前のデータとは別の環境に保存される。具体的には以下のプロセスでマスキングが行われる。

  1. 運用中のデータのバックアップを作成
  2. 別環境にデータをコピーする
  3. 重複データを削除する
  4. データにマスキングを施し、データを完全に架空の値へ変換する

静的データマスキング
*参照元: Static and Dynamic Data Masking

動的データマスキング (Dynamic Data Masking)

動的データマスキングとは、データへのアクセス時に動的にマスキングし、ロールベースのセキュリティ処理を行うことで、権限のないユーザーに対してマスキングされたデータを提供する方法である。ユーザからのアクセスに応じてマスキングを行うため、マスキングされたデータを別環境に保存する必要はない。(オリジナルデータは変更されない。)

具体的には、ユーザーがデータを取得するためにクエリを発行すると、設定とユーザーのロールに基づいてデータをマスクし、結果を返却する。

動的データマスキング
*参照元: Static and Dynamic Data Masking

オンザフライ データマスキング (On-the-Fly Data Masking)

オンザフライ データマスキングとは、データのマスキングをしてから本番環境から別環境にデータ転送を行う方法である。オンザフライ データマスキングは、特定のステージング環境なしでデータをマスキングする必要がある場合に利用される。バックアップ環境がない場合や、データをリアルタイムに移動させる必要があるプロセスで多く使用されている。

Databricksを使ったデータアクセス制御でできること

動的ビューを用いることで、列/行レベルでのアクセス制御と動的データマスキングが可能になる。 アクセス制御は、ビューの定義に含まれる以下の関数によって実現される。
*is_account_group_member()関数を使用する場合はUnity Catalogの利用が必須

・current_user(): 現在のユーザーの電子メールアドレスを返却する。

・is_account_group_member(): 現在のユーザーが指定されたグループのメンバーである場合、TRUEを返却する。

・is_member(): 現在のユーザーが指定されたワークスペースレベルのグループのメンバーである場合、TRUE を返却する。レガシーのため、Unity Catalogで動的ビューを利用する場合は利用しない。

列レベルでのアクセス制御

特定のユーザーまたはグループに対して、アクセスできる列を制限できる。

行レベル(フィルタリング)でのアクセス制御

特定のユーザーまたはグループに対して、行またはフィールドレベル(フィルタリング)の権限を指定できる。

動的データマスキング

特定のユーザまたはグループに対して、行または列レベルでデータマスキングを行うことができる。 ビューは標準の Spark SQL で実装されるため、より複雑なSQLや正規表現を使用することで、高度なマスキングを行うことができる。

基本的なデータアクセス制御方法

動的ビューを用いたデータアクセス制御方法について説明する。
*今回はUnity Catalogを使った場合について記載する。

データの準備

(1) 利用するカタログを指定する

USE CATALOG main;

(2) 利用するデータベースを指定する

DROP database IF EXISTS demo CASCADE;
CREATE database demo;
USE database demo;

(3) オリジナルデータを格納したテーブルを作成する

CREATE OR REPLACE TABLE main.demo.original_data
(
  device_id  INT,
  user_id    STRING,
  name       STRING,
  email      STRING,
  heartrate  DOUBLE
);

INSERT INTO main.demo.original_data VALUES
  (23,'40580129','Nicholas Spears','nicholas.spears@gmail.com',54.0122153343),
  (17,'52804177','Lynn Russell','lynn.russell@gmail.com',92.5136468131),
  (37,'65300842','Samuel Hughes','samuel.hughes@gmail.com',52.1354807863),
  (23,'40580129','Nicholas Spears','nicholas.spears@gmail.com',54.6477014191),
  (17,'52804177','Lynn Russell','lynn.russell@gmail.com',95.033344842),
  (37,'65300842','Samuel Hughes','samuel.hughes@gmail.com',57.3391541312),
  (23,'40580129','Nicholas Spears','nicholas.spears@gmail.com',56.6165053697),
  (17,'52804177','Lynn Russell','lynn.russell@gmail.com',94.8134313932),
  (37,'65300842','Samuel Hughes','samuel.hughes@gmail.com',56.2469995332),
  (23,'40580129','Nicholas Spears','nicholas.spears@gmail.com',54.8372685558);

(4) 正しくデータが格納されていることを確認する

SELECT * FROM main.demo.original_data

クエリの実行結果

original_dataテーブルに格納されているデータ

列レベルでのアクセス制御

(1) uc-userグループに所属するユーザにはnameカラムを表示しない
uc-userグループに所属するユーザがアクセスした場合は、nameカラムをREDACTEDに置き換える。その他の列については、通常どおりに返却される。

CREATE OR REPLACE VIEW main.demo.gold_dailyavg AS
SELECT
  user_id,
  CASE WHEN
    is_account_group_member('uc-user') THEN 'REDACTED'
    ELSE name
  END AS name,
  device_id,
  email,
  heartrate
FROM original_data

(2) uc-userグループに所属するユーザからアクセスし、列レベルでアクセス制御されていることを確認する

SELECT * FROM main.demo.gold_dailyavg;

クエリの実行結果

列レベルでアクセス制御されてることを確認する

行レベル(フィルタリング)でのアクセス制御

(1) uc-userグループに所属するユーザにはdevice_idが30以上の行は表示しない

CREATE OR REPLACE VIEW main.demo.gold_allhr AS
SELECT
  user_id,
  email,
  device_id,
  heartrate
FROM original_data
WHERE
  CASE WHEN
    is_account_group_member('uc-user') THEN device_id < 30
    ELSE TRUE
  END

(2) uc-userグループ以外に所属するユーザからアクセスし、device_idが30以上の行が表示されていることを確認する

SELECT * FROM main.demo.gold_allhr order by device_id DESC;

フィルタリングされていないことを確認

(3) uc-userグループに所属するユーザからアクセスし、device_idが30以上の行が表示されないことを確認する

SELECT * FROM main.demo.gold_allhr order by device_id DESC;

フィルタリングされたデータが表示されないことを確認

動的データマスキング

(1) uc-userグループに所属するユーザには以下の条件でマスキングされたデータを表示する

  • user_idカラムの下二桁のみ表示する
  • emailカラムで@以降のデータのみ表示する
CREATE OR REPLACE VIEW main.demo.gold_allhr AS
SELECT
  CASE WHEN
    is_account_group_member('uc-user') THEN CONCAT("******", RIGHT(user_id, 2))
    ELSE user_id
  END AS user_id,
  CASE WHEN
    is_account_group_member('uc-user') THEN regexp_extract(email, '^.*@(.*)$', 1)
    ELSE email
  END AS email,
  name,
  device_id,
  heartrate
FROM original_data

(2) uc-userグループ以外に所属するユーザからアクセスし、データがマスキングされていないことを確認する

SELECT * FROM main.demo.gold_allhr;

データがマスキングされていないことを確認

(3) uc-userグループに所属するユーザからアクセスし、データがマスキングされていることを確認する

SELECT * FROM main.demo.gold_allhr;

データがマスキングされていることを確認

まとめ

今回はDatabricksを使ったデータマスキング方法についてまとめた。

参考

データメッシュの概念について理解する

はじめに

Forrester社の調査によると、企業内の全データの60パーセントから73パーセントが分析に使用されないままになっている。一方、アクセンチュアが最近行った調査では、データから具体的で測定可能な価値を実現できていると回答した企業はわずか32パーセント、データおよびアナリティクスプロジェクトから実用性の高い洞察や提言が得られたと回答した企業はわずか27パーセントにとどまっている。

データの利活用が進まない原因として、中央集権的なデータ管理による組織的スケーリング課題が挙げられる。データ利活用の推進とともに、データ生産者とデータ利用者の数は増加しているのに対し、中央での一元的なデータ管理がボトルネックになるためである。

そこで、データ基盤の組織的なスケーリング課題を解決するためにデータメッシュという考え方が生まれた。データメッシュは技術的なスケーリング課題ではなく、組織的なスケーリング課題に重点をおくことで、データ基盤のパラダイムシフトを促している。そこで今回はデータメッシュのコンセプトについて説明する。

データの利活用が進まない原因

データの利活用が進まない2つの代表的なアプローチがあるが、そのどちらも根本的な原因は組織的なスケールに失敗しているからである。

データの利活用が進まない原因

データメッシュとは何か

Thoughtworks社のZhamak Dehghani氏が2019年に「How to Move Beyond Monolithic Data Lake to Distributed Data Mesh」で紹介したアーキテクチャパターンのこと。従来の中央集権的なデータ管理の課題を解決し、データの利活用を推進するために、各ドメインによる企業のデータ管理に対する分散化されたアプローチを採用している。

データを 1 つの大きなリポジトリとして捉えるのではなく、独立したデータプロダクトの集合体として捉えることで、データの所有権と責任レベルを向上させることを目指している。

martinfowler.com

データメッシュアーキテクチャ

データメッシュアーキテクト on Databricks

点線はデータの共有を表し、青色のコンポーネントは各ドメインを表している。また灰色のコンポーネントは全ドメインの共通部分になっており、標準化やディスカバリ、データガバナンスを担保することで、データのサイロ化を防ぐ役割を担っている。

データメッシュアーキテクチャの特徴

  • 中央のデータレイクが必要なく、各ドメイン(Data Nodes)が、保有するデータのオーナーシップを担っている。

  • 各ドメインは必要なデータを中央のデータレイクを通すことなく、直接対象のドメインから提供されている。

  • 各ドメイン間の共通部分としてData Hub(Data Infrastructure)が存在し、データ活用のための標準化やディスカバリ、データガバナンスの担保を担っている。

データメッシュプラットフォームは、意図的に設計された分散データアーキテクチャであり、相互運用性のための集中的なガバナンスと標準化のもと、共有され調和したセルフサービス型のデータインフラによって実現される。

サイロ化されたデータ基盤との大きな違い

データメッシュのコンセプトがデータ基盤のサイロ化を推奨しているわけではないことに注意する必要がある。データのサイロ化を防ぐためにデータレイクが生まれたという歴史的な背景があるが、データメッシュはデータの物理的な一元化をすることなく、データのサイロ化を防ぐ仕組みを導入している。代表的な仕組みとしては、クロスドメインのリネージとカタログ機能などが存在する。

データメッシュの思想

データメッシュの本質は、技術的なスケールングに対する解決策ではなく、組織的なスケーリングに対するパラダイムシフトである。

中央集権的なデータ管理から、「ドメイン主導によるデータオーナーシップ」に移行することで、「データそのものをプロダクト」として捉え、ドメインデータの質の向上とデータプロダクトのエコシステムを実現する。そのために必要なのが、「プラットフォームとしてのセルフサービス型データ基盤」「ドメインと中央による分担管理型データガバナンス」である。

データメッシュを支える4つのコンセプト

データメッシュを支える4つのコンセプトについて以下で詳しく説明する。

1. ドメイン主導によるデータオーナーシップ

データメッシュが、既存のデータアーキテクチャと大きく異なる点は、データのドメイン主導によるオーナーシップの分散化を推進していることである。 ドメインデータから最大の価値を生み出すには、ドメインの専門知識を蓄積し、ドメイン専門家自身が重要な決定を下す権限と、その決定を実行する能力(スキルやリソース)の両方が必要である。前述に加えて、実行に伴う責任もドメインが担うことで、実行と責任を一元化し、ドメインにオーナーシップを与える。

ドメイン主導のオーナーシップにより、部分的なアウトソーシングなしでドメイン自身で問題を解決することができるため、効率が大幅に改善される。つまりデータのオーナーシップの分散化によって、多様化するデータソースと付随するアプリケーションにおいてより良いスケーラビリティを実現することを目指している。

ドメインとは何か

ドメインとは、一般的に共通のビジネス目的のために組織された人々の集まりのことを指す。eコマースサイトを例にすると、ユーザー、マーチャント、製品、マーケティングなどがある。

データ所有権の一元化の課題

これまで数多くのデータチームが経験してきたように、データ作成者とデータ消費者の間に断絶があると、最終的にデータからビジネス価値を引き出す際に問題となる。 中央集権的なデータ環境では、あるドメインで作成されたデータの最終的な所有者と責任が誰にあるのかが不明確になりがちだからである。

中央集権型のデータ管理では、データは最終的に業務部門の外部のデータチームに渡され、データエンジニアやアナリストが、データを理解し価値を引き出そうとしている。しかし、アナリストとデータの作成者が断絶していることで、分析時にボトルネックが発生する。例えば分析に必要な変更や追加情報の作成に時間がかかりすぎ、アナリストの仕様に合わせてデータが更新される頃には、データが不要になっていたり、追加変更が判明していたりするという「非効率性」が発生する。またデータ作成者とデータ利用者の間につながりがなければ、フィードバックが失われ、データの価値が損なわれることも多い。

ドメイン主導のデータオーナーシップ

データプロダクトのドメイン主導のオーナーシップとは、最も専門的な知識を持つ人(ドメイン)がデータプロダクトのオーナーとなることで、品質、メタデータ、パフォーマンスなどに責任を持つことを意味する。ドメインは、自分たちが作成したデータ、すなわちデータの取り込み、変換、エンドユーザーへの提供について責任を負う。データの所有権と責任をドメインに戻すことで、データの所有権を移転することなく、データ価値を失うことなく、データについて最も詳しい人々が分析用にデータを準備し提供することができる。

ドメイン主導のデータオーナーシップとは、プロダクトオーナーと開発者が以下のような責任を持つことを意味する。

(1) データプロダクトの作成と、他のドメインやエンドユーザーへの提供。

(2) データが確実にアクセス可能で、使用可能で、利用可能で、定義された品質基準を満たしていること。

(3) ユーザーからのフィードバックに基づきデータプロダクトを進化させ、使用されなくなった、または関連性がなくなったデータプロダクトは取り除くこと。

(4) データプロダクトを組織内に普及させ、「マーケティング」を推進すること。

ドメイン主導のテクノロジースタック選定

ドメインは、ドメイン環境内で自分たちのデータプロダクト開発を可能にするテクノロジースタックを決定する必要がある。例えば、あるドメインはPIIや財務データに対してより信頼性の高いアップストリーム環境を必要とするかもしれないし、サードパーティのパートナーからデータを取り込むかもしれない。そのためドメインは、各ドメインデータに最適なデータ取り込み、変換、提供ツールを選定する必要がある。一方でデータプロダクトの形式は標準化され、組織全体で標準化された方法で提供される必要があり、これによりデータプロダクトユーザのシームレスな作業を可能にしている。

2. プロダクトとしてのデータ

データメッシュを実現するにはデータをプロダクトとして扱うことで、組織におけるデータの価値を高め、全社的にデータが価値ある投資であると理解することが重要である。

データプロダクトとは何か

データプロダクトとは、あるドメインから提供され、下流のユーザーが消費してビジネス価値を生み出すデータのことを指す。データプロダクトを作成、分析し、ビジネス知識と組み合わせることで、データドリブンな意思決定を目指している。

データメッシュは、データプロダクトとアーキテクチャはドメイン主導のオーナーシップで進める一方で、データ自体は組織全体でプロダクトとして共有されることを目的としている。データメッシュを深く理解するにはデータを副産物ではなく、それ自体をビジネスプロダクトとして扱うというドラスティックな発想の転換が必要である。

3. プラットフォームとしてのセルフサービス型データ基盤

既存のプラットフォームや技術の多くは、データチームによって一元化され、全ドメインのデータを取得・共有することを前提に構築されている。これはレガシーインフラと旧来のテクノロジーの限界によって必要とされた部分が大きいが、クラウドコンピューティングや簡単に拡張できるハードウェアやアプリケーションの登場によって、これらの限界はもはや存在しなくなった。

中央集権的インフラストラクチャ・モデルでは、データプロダクトを開発するドメインは、提供されたソフトウェアで作業することを余儀なくされることが最大の課題となる。例えば、ストレージとコンピューティングリソースが中央によって独占的に扱われている場合、ドメインは中央と協力してドメイン固有の変更を行う必要がある。その場合、「最も声の大きい」ドメインが、他のドメインには通用しない方法でインフラストラクチャを変更する場合も少なくない。一方,ドメイン固有の目標やデータを考慮せずに,中央のデータアーキテクチャ設計をドメインに押し付けると,他のドメイン用に最適化されたツールやインフラのために,非効率でフラストレーションのたまるドメインも発生しがちである。

そこでデータメッシュでは、データエンジニアリングに特化した能力を持たないドメインチームが、データプロダクトを自律的に作成、開発、維持できるようなセルフサービスツールを提供することで、ドメインによるデータオーナーシップを可能にした。

セルフサービス型データ基盤とは

セルフサービス型データ基盤の背景にある考え方は、ビジネスは論理的に自律したドメインで構成されており、各ドメインはビジネス機能、製品、またはプロセスをサポートするだけでなく、データプロダクトを生成するため、そのためのデータインフラが必要であるという考え方である。各ドメインは、その基盤となるデータインフラを自分で管理するのではなく、中央IT組織が提供するセルフサービスデータプラットフォームを利用できるようにすることが必要である。このプラットフォームにより、ドメインは高品質のデータの構築に集中することができ、最終的にはデータ分析という形でビジネス価値を獲得することができる。

プラットフォーム自体は、中央IT組織が構築・保守し、ドメインに依存しないことが重要である。これにより、各ドメインの必要に応じてカスタマイズや拡張が可能になる。さらに中央のIT当局による設計やベンダーの制約に阻まれることなく、各ドメインのエンジニアが構築・保守するエンドツーエンドのソリューションを自由に成熟・設計できるようになることで、最終的には、効率的な設計と、一般的に一貫性のあるデータプロダクトの作成が可能になる。

セルフサービス型データ基盤を構成するリソースとツール

  • 監視、ロギング、アラート

  • 監査

  • クエリーのパフォーマンス、並行処理

  • コスト構造および効率性

  • オブザーバビリティ(可観測性)

  • カタログ、リネージ

  • 統合ID管理

  • スケーラビリティ(水平方向および垂直方向)

  • ユーザビリティ(SQLなど)

  • 統一的なアクセス制御

  • 分析エンジンとの相互運用性

  • データストレージ

中央IT組織とドメインのそれぞれに求められる役割と責任

セルフサービス型データ基盤プラットフォームは、データプロダクト開発者がデータ基盤構築の追加タスクで過負荷にならないようにするために、絶対に必要である。データエンジニア、アナリスト、インフラエンジニアは、特に負荷の高い3つの職種であり、各グループは、前例のない規模でドメインや機能を横断する専門知識を発揮することが期待されている。どの役割も過負荷にならないようにするには、職務と責任を明確に分けることが重要である。

その1つの方法として、データインフラをデータとインフラの2つのエンティティに分割することが挙げられる。

データとは、データモデル、スキーマ、変換ルール、ETL/ELTパイプライン、メタデータ、および関連するドメイン固有の知識のアプリケーションを指す。これはドメインに責任がある。

インフラストラクチャーとは、ドメインがデータプロダクトを作成し、維持するためのリソースを指す。これにはドメイン固有のデータプロダクトの作成に使用するツールやリソースのインストール、メンテナンス、アップグレード、スケーリングが含まれる。これは中央IT組織の責任である。

上記のように、セルフサービス型データ基盤プラットフォームの責任には範囲があり、組織がその範囲のどこに位置するかは、ドメインと中央IT組織が一緒になって明確に定義し、合意する必要がある。

4. ドメインと中央による分担管理型データガバナンス

いくつかのガバナンス活動や標準はメッシュレベルで定義され実施されなければならないが、メッシュレベルでは特定のビジネス知識がないため、実施する上で課題が生じることがある。そのため、ドメイン横断的な標準を設定することで、ドメイン間の共通性を持たせることが必要になる。

データメッシュは、ドメインに十分な自律性を持たせながらガバナンスを遵守するために、ドメインと中央IT組織の間で責任を分担することを提案している。そのためにはガバナンスのどの側面を共通で処理し、どの側面をドメインで処理するかについて、ドメインをまたいだ合意を形成することが必要である。

ドメインと中央による分担管理型データガバナンスとは

データメッシュは、中央集権的なデータチームやアーキテクチャから分散型モデルへのパラダイムシフトである。一方でガバナンスについては、メッシュレベルで設定されるものもあれば、ドメインの裁量に委ねられるものも存在する。データメッシュでは、ドメインと中央による分担管理型データガバナンスモデルにより、責任の共有を可能にしている。

一般的には、中央IT組織は、データ系統の報告、ドメイン間の標準、認証、およびグローバルリスクとコンプライアンスの標準とポリシーの明確化を担当することが多い。そしてデータ品質(定義と測定)、データの出所、権限付与、データの分類、コンプライアンスと用語の標準への準拠、ドメイン間のデータ実体標準化の定義などは、多くの場合でドメインが担当する。

ガバナンスの定義

ガバナンスには非常に多くのものが含まれるが、ここでは適切なデータを適切な人に適切なタイミングで届けることに焦点をあてて議論する。よってデータガバナンスの観点から以下を検討する必要がある。

  • セキュリティ:適切な人が認証され、データを使用する権限を与えられているか

  • コンプライアンス:データは、GDPR、RTBFなど、必要なあらゆるポリシーに従っているか

  • 可用性:許可されたユーザーがデータにアクセスできるか

  • 品質:データの品質が何らかの形で定量化され、ユーザーに伝えられているか

  • エンティティの標準化:ドメイン間で用語の統一がなされているか

  • Provenance:誰がそのデータに責任を持ち、どこから来たのかが明確になっているか

データメッシュを実現するには

データメッシュは以下のステップを経て実現される。

(1) 中央がドメインのニーズに基づいてベンダーやツールを選択し、ドメインのニーズに合わせてインフラを長期的に進化させる。

(2) ドメインは提供されたデータプラットフォームを利用して、それぞれのドメインの要件に最も効率的かつ合理的な方法でデータプロダクトを作成・管理する。

(3) ガバナンスの中央集権的な部分(企業レベルで定義されるグローバルガバナンスの懸念事項)は、インフラストラクチャープラットフォームレベルで実施される。

(4) ドメイン固有のガバナンスは、ドメインがセルフサービスインフラプラットフォームを使用して実施する。

(5) データプロダクトはデータ利用者に公開され、中央IT組織が提供・保守するインフラやツールの中で利用される。

まとめ

今回はデータメッシュのコンセプトについてまとめた。次回はデータメッシュを実現するための詳細なステップと実際にどんなデータ基盤を選べば良いのかについてまとめたい。

参考

MLflow: 4. Model Registry を使った実験管理

はじめに

今回はMLflowシリーズの最後の機能であるMLflow Model Registryについてまとめる。

環境情報

Databricks RunTime: 10.2 ML (includes Apache Spark 3.2.0, Scala 2.12)

前回までのMLflowシリーズ

  • MLflow Tracking

ktksq.hatenablog.com

  • MLflow Projects

ktksq.hatenablog.com

  • MLflow Models

ktksq.hatenablog.com

MLflowとは

前回の記事でまとめたので、そちらをご参照ください。 ktksq.hatenablog.com

Model Registryとは

MLflowの構成要素

MLflowモデルの全ライフサイクルをメンバー間で共同管理するための一元的なモデルストア、APIのセット 、および UI のこと。モデルレジストリはチームでMLモデルを共有し、実験からオンラインテスト、本番まで共同で作業し、承認とガバナンスのワークフローと統合し、MLのデプロイメントとそのパフォーマンスを監視するコラボレーションハブとして機能する。

Model Registryでできること

モデルレジストリの機能一覧

モデルレジストリはセントラル・リポジトリーとしてMLflowモデルを登録できる。登録されたモデルは、一意の名前、バージョン、ステージ、およびコメントやタグなどのメタデータを持つ。モデルレジストリでは、以下の情報を管理することができる。

モデルのバージョン管理

同じモデル名のモデルが更新された場合、自動的にモデルのバージョンを更新し管理する。

モデルのステージの遷移管理

各モデルのバージョンに「ステージング」や「プロダクション」のようなステージを割り当てることで、モデルのライフサイクルを管理する。 新規登録イベントや変更をアクティビティとして記録し、ユーザー、変更、コメントなどの追加メタデータを自動的にログに記録できる。 ステージ遷移、変更の要求、レビュー、承認の一連のフローを記録することでコントロールとガバナンスを向上させる。

モデルのコード管理

モデルがどの MLflowエクスペリメントとランから作成されたか記録する。

モデルの概要管理

アルゴリズムの説明や採用したデータセット、方法論など、チームにとって有用な関連情報をコメントやタグで記録する。

モデルのデプロイ管理

特定のモデルバージョンを要求した本番ジョブはどれかなど、どのモデルをデプロイしたか管理する。

モデルサービング

モデルレジストリに登録したMLflowモデルをRESTエンドポイントとして公開する。

Model Registryのワークフロー

モデルレジストリのワークフロー

1.モデルの登録

モデルに対応するモデルフレーバーのmlflow.<model_flavor>.log_model()を使用して、MLflowモデルを作成する。記録されたMLflowモデルはモデルレジストリに登録することができる。一度モデルが記録されると、UIやAPIを通してモデルレジストリ内のモデルを追加、変更、更新、移行、または削除が可能になる。

2.モデルのバージョン登録

登録された各モデルは、1つまたは複数のバージョンを持つことができる。新しいモデルがモデルレジストリに追加される場合は、バージョン1として追加される。同じモデル名で新しいモデルが登録されると、バージョン番号が自動的に増加する。

3.モデルステージ遷移

各バージョンのモデルは、任意の時点で1つのステージを割り当てることができる。ステージの種類にはステージング、プロダクション、アーカイブがあり、各ステージ間の移動の際に変更のリクエスト、レビュー、承認のフローを経ることでモデルの管理とガバナンスを可能にしている。

4.モデルサービング

登録したMLflowモデルをRESTエンドポイントとして公開できる。

UIでの基本的な操作

1.モデルを登録する

ランの詳細ページから、アーティファクションとして記録された MLflowモデルを選択する。その後「モデルを登録」を押下する。

MLflowモデルの登録

2.モデルのバージョンを登録する

モデルを登録すると自動でバージョンが登録されるため、特に操作は必要ない。

MLflow モデルのバージョンを管理する
モデルのバージョンを管理する

3.モデルのステージを登録する

バージョン詳細ページで、モデルのバージョンの詳細と現在のステージを確認することができる。右上のステージのドロップダウンをクリックすると、モデルバージョンを他の有効なステージに移行させることが可能である。

MLflow モデルのステージを登録する
モデルのステージを登録する

4.モデルサービング

モデルサービング

APIでの基本的な操作

モデルを登録する

APIでモデルを登録する方法は3種類ある。

mlflow.<model_flavor>.log_model()を使用する方法

from random import random, randint
from sklearn.ensemble import RandomForestRegressor

import mlflow
import mlflow.sklearn

with mlflow.start_run(run_name="YOUR_RUN_NAME") as run:
    params = {"n_estimators": 5, "random_state": 42}
    sk_learn_rfr = RandomForestRegressor(**params)

    # MLflowのAPIを使用してパラメータとメトリクスをログに記録する
    mlflow.log_params(params)
    mlflow.log_param("param_1", randint(0, 100))
    mlflow.log_metrics({"metric_1": random(), "metric_2": random() + 1})

    # sklearn モデルをログに記録し、バージョン1として登録する
    mlflow.sklearn.log_model(
        sk_model=sk_learn_rfr,
        artifact_path="sklearn-model",
        registered_model_name="sk-learn-random-forest-reg-model"
    )

mlflow.register_model()を使用する方法

runs:URIの引数の一部としてrun_idを使用してモデルを登録する。

result = mlflow.register_model(
    "runs:/d16076a3ec534311817565e6527539c0/sklearn-model",
    "sk-learn-random-forest-reg "
)

create_registered_model() を使って新しいモデルを作成する方法

# バージョンに関連付けされていない空の登録済みモデルを作成する
from mlflow.tracking import MlflowClient

client = MlflowClient()
client.create_registered_model("sk-learn-random-forest-reg-model")
# モデルの新しいバージョンを作成する
client = MlflowClient()
result = client.create_model_version(
    name="sk-learn-random-forest-reg-model",
    source="mlruns/0/d16076a3ec534311817565e6527539c0/artifacts/sklearn-model",
    run_id="d16076a3ec534311817565e6527539c0"
)


モデルレジストリからMLflowモデルを取得する

MLflow のモデルを登録したら、mlflow.<model_flavor>.load_model()を使って、任意のモデルを取得することができる。

・モデル URI の一部としてそのバージョン番号を指定して、特定のモデルのバージョンを取得する

# 特定のモデルのバージョンを取得する
import mlflow.pyfunc

model_name = "sk-learn-random-forest-reg-model"
model_version = 1

model = mlflow.pyfunc.load_model(
    model_uri=f "models:/{model_name}/{model_version}"
)
model.predict(data)

・モデルURIの一部としてモデルステージを指定して、指定ステージの最新バージョンのモデルを取得する

# ステージごとにモデルのバージョンを取得する
import mlflow.pyfunc

model_name = "sk-learn-random-forest-reg-model"
stage = 'staging'

model = mlflow.pyfunc.load_model(
    model_uri=f "models:/{model_name}/{stage}"
)
model.predict(data)


モデルレジストリからMLflowモデルをデプロイする

MLflowモデルを登録したら、モデルをホスト上のサービスとしてデプロイすることができる。

#!/usr/bin/env sh

# Model Registryが存在するトラッキングURLの環境変数を設定する
export MLFLOW_TRACKING_URI=http://localhost:5000

# モデルレジストリから本番モデルを配信する
mlflow models serve -m "models:/sk-learn-random-forest-reg-model/Production"


MLflowモデル情報の追加と更新

・モデルのバージョン情報を更新する

client = MlflowClient()
client.update_model_version(
    name="sk-learn-random-forest-reg-model",
    version=1,
    description="このモデルのバージョンは、100本の決定木を含むscikit-learnランダムフォレストです"
)

・MLflowモデル名を変更する

特定のバージョンのモデルの説明を追加または更新するのと同様に、rename_registered_model()を使用して既存の登録済みモデルの名前を変更することができる。

client = MlflowClient()
client.rename_registered_model(
    name="sk-learn-random-forest-reg-model",
    new_name="sk-learn-random-forest-reg-model-100"
)

・MLflowモデルのステージを移行する

モデルのライフサイクルの中で、モデルはステージング、プロダクション、アーカイブとステージが変化していく。ライフサイクルに合わせて、登録されたモデルは任意のステージに移行させることができる。

client = MlflowClient()
client.transition_model_version_stage(
    name="sk-learn-random-forest-reg-model",
    version=3,
    stage="Production"
)
# <stage>に指定できる値: Staging|Archived|Production|None

まとめ

今回はMLflow Model Registryについてまとめた。

参考

www.mlflow.org

MLflow: 3. Models を使った実験管理

はじめに

MLflow Tracking, Projectsを使うことでモデルの学習を記録し、学習した環境と一緒にバンドルすることができるようになった。 そこで今回は、モデルをパッケージ化し、様々なデプロイメントツールで利用できるようにするための機能であるMLflow Modelsについてまとめる。プラットフォームに依存しない方法でモデルをパッケージングすることで、デプロイメントオプションの柔軟性が高まり、多くのプラットフォームでモデルを再利用することが可能になる。

環境情報

Databricks RunTime: 10.2 ML (includes Apache Spark 3.2.0, Scala 2.12)

前回までのMLflowシリーズ

  • MLflow Tracking

ktksq.hatenablog.com

  • MLflow Projects

ktksq.hatenablog.com

MLflowとは

前回の記事でまとめたので、そちらをご参照ください。 ktksq.hatenablog.com

MLflow Modelsとは

MLflowの構成要素

「フレーバー」という概念を用いてMLモデルをパッケージングするためのフォーマット。MLflow Modelsの実態は、任意のファイルと、そのモデルが使用できるいくつかの「フレーバー」を記載したファイルを含むディレクトリとして保存される。MLモデルのフレーバーが学習環境とデプロイ環境のマッピングを管理してくれるため、推論の複雑さを大幅に軽減することができる。

MLflow Projectsとの違いは、MLflow Projectsが実行の再現性とコードのパッケージングに焦点を当てているのに対し、MLflow Modelsは様々なデプロイメント環境に焦点を当てている点である。

MLflow Modelsの構成要素

MLflow Modelsの構成要素
MLflow Modelsは、任意のファイルを含むディレクトリとML modelファイルから構成されている。ML modelのYAML フォーマットには、以下のフィールドを含めることができる。

ML modelに含まれる情報

・モデルの作成時間: モデルが作成された日付と時刻。
・ランID: モデルがMLflow Trackingを使用して保存された場合は、モデルを作成したランのIDが記録される。
・signature: モデルの入力と出力のスキーマがJSON形式で記録される。モデルの入出力はカラムベースかテンソルベースのいずれかになる。カラムベースの入出力は、MLflowのデータ型の1つとして指定された名前の付いたカラムのシーケンスとして記述することができる。テンソルベースの入出力は、numpyのデータ型の1つとして指定された名前の付いたテンソルのシーケンスとして記述することができる。
・入力データ例: 入力データ例が格納されているアーティファクトへの参照パス。
・databricks runtime: Databricksノートブックまたはジョブでモデルを学習した場合は、Databricksランタイムのバージョンとタイプが記録される。
・フレーバー: デプロイメントツールがモデルを解釈するために使用するフォーマット(規約)のこと。フレーバーを用いることで、それぞれのMLライブラリをデプロイメントツールに組み込むことなく、デプロイメントを行うことができる。MLflowでは、組み込みデプロイツールがサポートしている「標準」フレーバーが定義されており、Python関数としてモデルを実行する方法を記述した「Python関数」フレーバーなどが存在する。特定のモデルがサポートするすべてのフレーバーは、YAML 形式の ML model ファイルで定義されている。例えば、mlflow.xgboostは以下のようなモデルを出力する。

  • MLflow Modelsの構成例: mlflow.xgboost.log_model(model, "my_model")が書き出したディレクトリ
my_model/
├── ML model
├── conda.yaml
├── input_example.json
├── model.xgb
└── requirements.txt
  • ML modelファイル例
artifact_path: model
flavors:
  python_function:
    data: model.xgb
    env: conda.yaml
    loader_module: mlflow.xgboost
    python_version: 3.8.10
  xgboost:
    data: model.xgb
    model_class: xgboost.core.Booster
    xgb_version: 1.5.0
run_id: a9b64d96cb834b31aee30d85d8eff586
signature:
  inputs: '[{"name": "fixed_acidity", "type": "double"}, {"name": "volatile_acidity",
    "type": "double"}, {"name": "citric_acid", "type": "double"}, {"name": "residual_sugar",
    "type": "double"}, {"name": "chlorides", "type": "double"}, {"name": "free_sulfur_dioxide",
    "type": "double"}, {"name": "total_sulfur_dioxide", "type": "double"}, {"name":
    "density", "type": "double"}, {"name": "pH", "type": "double"}, {"name": "sulphates",
    "type": "double"}, {"name": "alcohol", "type": "double"}, {"name": "is_red", "type":
    "long"}]'
  outputs: '[{"type": "tensor", "tensor-spec": {"dtype": "float32", "shape": [-1]}}]'
utc_time_created: '2022-02-28 05:47:32.105856'

フレーバーの種類

MLモデルのフレーバーは学習環境とデプロイ環境のマッピングを管理するため、MLモデルのフレーバーを使用すれば推論時に学習環境とデプロイ環境のマッピングを行う必要がない。MLflowは標準フレーバーとして各MLモデルをサポートしているが、自作することも可能である。

標準フレーバー例

  • Python Function (mlflow.pyfunc)

  • Keras (mlflow.keras)

  • Scikit-learn (mlflow.sklearn)

  • Spark MLlib (mlflow.spark)

  • TensorFlow (mlflow.tensorflow)

  • XGBoost (mlflow.xgboost)

  • LightGBM (mlflow.lightgbm)

*参照元: MLflow Models — MLflow 1.28.0 documentation

例えば、以下のような2つのフレーバーが記述されているML modelファイルの場合は、sklearnまたはpython_functionモデルフレーバーのいずれかをサポートする任意のツールで使用することができる。

flavors:
  sklearn:
    sklearn_version: 0.19.1
    pickled_model: model.pkl
  python_function:
    loader_module: mlflow.sklearn

基本的な操作

MLflow Modelsを作成する

フレーバー関数を使ってMLflow Modelsを作成できる。

フレーバー関数一覧
*参照元: MLflow Models — MLflow 1.28.0 documentation

MLflow のパッケージをインポートする。

import mlflow

モデルをMLflow Modelsとしてパッケージングする。

mlflow.pyfunc.log_model("random_forest_model", python_model=wrappedModel, conda_env=conda_env, signature=signature)

モデルを読み込む。

model = mlflow.pyfunc.load_model(f"models:/{model_name}/production")

読み込んだモデルを使って推論する。

model.predict(X_test)

作成したMLflow Modelsを確認する

MLflow Modelsの内容

まとめ

今回はMLflow Modelsを使用したモデルの管理についてまとめた。次回はMLflow Model Registryについてまとめる。

参考

blog.amedama.jp

mlflow.org

MLflow: 2. Projects を使った実験管理

はじめに

機械学習プロジェクトには様々なライブラリの依存関係があり、実行にはビルドされた環境が必要になる。MLflow Projectsを使うことで、他のデータサイエンティストとの共有や本番環境への移行のために、MLコードを再利用可能で再現性のある形でパッケージ化することができる。今回はMLflow Projectsについてまとめる。

環境情報

Databricks RunTime: 10.2 ML (includes Apache Spark 3.2.0, Scala 2.12)

MLflowとは

前回の記事でまとめたので、そちらをご参照ください。

ktksq.hatenablog.com

MLflow Projectsとは

MLflowの構成要素

他のデータサイエンティスト(または自動化ツール)がコードを実行できるように、コードを整理して記述するためのフォーマット。パッケージングすることで、コードを管理するだけでなく、それが実行された環境も含めて管理できる。MLflow Projectsの実態はコードを含むファイルのディレクトリ、またはGitリポジトリになる。

MLflow Projectsの構成要素

MLflow Projectsの構成要素

  • MLflow Projectsの構成例
MLflow_Projects
├── MLProject
├── conda.yaml
└── main.py

YAML を使ってプロジェクトの構成要素を指定し、環境ファイルには、実行環境に関する詳細を記載する。コード自体は、モデルを作成したり、データを処理するためのステップを含めることができる。任意のコードを異なる言語で実行することができ、リモートVM、Sparkクラスタ、Databricksジョブなど、ローカルでもリモートでも実行可能である。

エントリーポイントとは:

プロジェクト内で実行可能なコマンドと、そのパラメータに関する情報。プロジェクト内の任意の .py や .sh ファイルをエントリーポイントとして呼び出すことができる。

環境ファイルとは:

プロジェクトのエントリポイントを実行するために使用されるべきソフトウェア環境が記載されたファイル。プロジェクト・コードで必要とされるすべてのライブラリの依存関係を含める必要がある。

基本的な操作

MLflow Projectsを作成する

MLflow Projectsの例として、MLproject ファイル、conda環境ファイル、train.pyファイルを作成する。 conda環境はconda.yamlで指定し、もしconda.yamlがない場合はMLflowはPython(特にCondaで利用可能な最新のPython)だけを含むConda環境を使用してプロジェクトを実行する。またプロジェクト内の任意の.pyまたは.shファイルは、明示的にパラメータを宣言することなく、エントリーポイントにすることができる。コマンドをパラメータ付きで実行する場合は、 --key <value> シンタックスを使ってコマンドライン上の各パラメータを渡す。

MLProject ファイル

name: MLProject_example
conda_env: conda.yaml

entry_points:
  main:
    parameters:
      data_path: {type: str, default: "/dbfs/mnt/training/airbnb/sf-listings/airbnb-cleaned-mlflow.csv"}
      n_estimators: {type: int, default: 10}
      max_depth: {type: int, default: 20}
      max_features: {type: str, default: "auto"}
    command: "python train.py --data_path {data_path} --n_estimators {n_estimators} --max_depth {max_depth} --max_features {max_features}"

conda.yaml

name: conda_yaml_example
channels:
  - defaults
dependencies:
  - cloudpickle={cloudpickle.__version__}
  - numpy={numpy.__version__}
  - pandas={pandas.__version__}
  - scikit-learn={sklearn.__version__}
  - pip:
    - mlflow=={mlflow.__version__}

train.py

import click
import mlflow.sklearn
import pandas as pd
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.model_selection import train_test_split

@click.command()
@click.option("--data_path", default="/dbfs/mnt/training/airbnb/sf-listings/airbnb-cleaned-mlflow.csv", type=str)
@click.option("--n_estimators", default=10, type=int)
@click.option("--max_depth", default=20, type=int)
@click.option("--max_features", default="auto", type=str)
def mlflow_rf(data_path, n_estimators, max_depth, max_features):

  with mlflow.start_run() as run:
    # Import the data
    df = pd.read_csv(data_path)
    X_train, X_test, y_train, y_test = train_test_split(df.drop(["price"], axis=1), df[["price"]].values.ravel(), random_state=42)
    
    # Create model, train it, and create predictions
    rf = RandomForestRegressor(n_estimators=n_estimators, max_depth=max_depth, max_features=max_features)
    rf.fit(X_train, y_train)
    predictions = rf.predict(X_test)

    # Log model
    mlflow.sklearn.log_model(rf, "random-forest-model")
    
    # Log params
    mlflow.log_param("n_estimators", n_estimators)
    mlflow.log_param("max_depth", max_depth)
    mlflow.log_param("max_features", max_features)

    # Log metrics
    mlflow.log_metric("mse", mean_squared_error(y_test, predictions))
    mlflow.log_metric("mae", mean_absolute_error(y_test, predictions))  
    mlflow.log_metric("r2", r2_score(y_test, predictions))  

if __name__ == "__main__":
  mlflow_rf() # Note that this does not need arguments thanks to click

プロジェクトを実行する

mlflow.projects.run()コマンドを使ってMLflow Projectを実行する

import mlflow

mlflow.projects.run(uri=working_path,
  parameters={
    "data_path": "/dbfs/mnt/training/airbnb/sf-listings/airbnb-cleaned-mlflow.csv",
    "n_estimators": 10,
    "max_depth": 20,
    "max_features": "auto"
})

mlflow.projects.run()コマンドを使って実行した場合は、UIに実行したコマンド内容が記載される。

コマンド実行内容

まとめ

今回はMLflow Projectsについてまとめた。次回はMLflow Modelsについてまとめる。

参考

docs.databricks.com

mlflow.org

MLflow: 1. Tracking を使った実験管理

はじめに

多くの機械学習プロジェクトでは、精度の向上のためパラメータや特徴量を変えて試行錯誤しながら、何度も実験を繰り返す必要がある。そのため試行回数が増加するごとにモデルの管理が難しくなり、属人化するという課題がある。そこで今回はMLflow Tracking (トラッキング) を使用したモデルの管理についてまとめる。

環境情報

Databricks RunTime: 10.2 ML (includes Apache Spark 3.2.0, Scala 2.12)

MLflowとは

機械学習のライフサイクルをエンドツーエンドで管理するためのオープンソースのプラットフォーム。データの準備からモデル開発、デプロイ、モデルの監視までの機械学習モデルのライフサイクルを一貫して管理することを目標としている。2022年02月時点でGithub上で11.4Kのスターを獲得しており、機械学習のモデル管理ツールとして広く認知されているツールである。

機械学習モデルのライフサイクル

MLflowでできること

MLflowは4つの主要な機能から構成されており、各機能は独立して使用することが可能である。

MLflowの構成要素

1. MLflow Tracking:

実験をトラッキングし、パラメータや結果を記録・比較する。

2. MLflow Projects:

他のデータサイエンティストとの共有や本番環境への移行のために、MLコードを再利用可能で再現性のある形でパッケージ化する。

3. MLflow Models:

多様なデプロイツールをサポートする汎用的なモデル形式を提供する。

4. MLflow Model Registry:

モデルのバージョン管理、ステージ遷移、アノテーションなど、MLflowモデルの全ライフサイクルをセントラルモデルストアとして一貫して管理する。

今回はMLflowの最も基本的な機能であるMLflow Trackingについて詳しく説明する。

MLflow Trackingとは

機械学習コードの実行時にパラメータ、コードバージョン、メトリクス、出力ファイルをロギングし、結果を可視化するためのAPIとUIのこと。MLflow Trackingでは、Python、REST、R API、Java APIの各APIを使って、実験のログやクエリを行うことができる。 MLflow Trackingは、機械学習コードの1度の実行であるランという概念を中心に構成されており、各ランには以下の情報が記録される。

MLflow Trackingの構成要素

トラッキングできる情報

・実行時間: 一度のランを実行するのにかかった時間。
・実行者: ランを実行したユーザ名。
・ソース: トラッキングしたランが含まれるノートブックの名前。MLflow Projectから実行する場合は、プロジェクト名と実行のエントリーポイント。
・バージョン: ノートブックから実行した場合は、ランの実行時のノートブックバージョン(履歴)。MLflow Projectから実行する場合は、実行に使用されたGitコミットのハッシュ値。
・パラメーター: キーと値のペアとして保存されるモデルのパラメーター。(キーと値はどちらも文字列であることに注意する)
・メトリクス: キーと値のペアとして保存されるモデル評価のためのメトリクス。 (値は数値のみ記録可能であることに注意する) 各メトリクスは、例えばモデルの損失関数の収束のトラッキングなど各実行の過程を記録・更新することができる。そのためMLflowはメトリクスの全履歴を記録し、視覚化することが可能である。
・タグ: キーと値のペアとして保存されるメタデータ。(キーと値はどちらも文字列であることに注意する)
・アーティファクト: 学習済みモデルや、特徴量の重要度など実験した結果として得られる成果物。任意のフォーマットでファイルの出力ができる。例えば、画像(PNGなど)、モデル(pickleしたscikit-learnモデルなど)、データファイル(Parquetファイルなど)を成果物として記録することが可能である。

概念の説明

エクスペリメントとは:

各ランをノートブックまたはワークスペース単位でまとめたもの。ノートブック単位でまとめた場合は、同じノートブック内の全てのランが1つのエクスペリメントに紐付く。一方で、ワークスペース単位でまとめた場合はノートブックに関連付けられるのではなく、どのノートブックのランでも実験IDまたは実験名を使用することで、同じ1つのエクスペリメントに紐づけることが可能。

ランとは:

各モデルの 1 回の実行に対応した試行のこと。例えばxgboostで任意のデータセットを学習させる試行を1度行った場合は、それが1つのランに対応する。

基本的な操作

各ランをトラッキングする方法は以下の2種類がある。自分でロギング内容を定義するのは面倒なので、基本はautolog()関数を使った自動ロギングがおすすめ。

  • ロギング関数を使って必要な情報をロギングする

  • 自動ロギングを使って必要な情報を自動でロギングする

1. ロギング関数を使って必要な情報をロギングする

以下の関数を使って、各ランで必要な情報をロギングする。
※関数の詳細はMLflow Tracking — MLflow 2.9.2 documentationを参照。

ロギング関数一覧

MLflow のパッケージをインポートする。

import mlflow

ロギングを開始する。

with mlflow.start_run(run_name='hogehoge_model'):

パラメータを記録する。

mlflow.log_param('n_estimators', n_estimators)

メトリックとしてaucを登録する。

 mlflow.log_metric('auc', auc_score)

タグを登録する。

mlflow.set_tag(key='hoge', value='fuga')

アーティファクトを登録する。

import mlflow

# Create a features.txt artifact file
features = "rooms, zipcode, median_price, school_rating, transport"
with open("features.txt", 'w') as f:
    f.write(features)

# With artifact_path=None write features.txt under
# root artifact_uri/artifacts directory
with mlflow.start_run():
    mlflow.log_artifact("features.txt")

登録されたアーティファクト

ロギングを終了する。

mlflow.end_run()

2. 自動ロギングを使って必要な情報を自動でロギングする

自動ロギング関数により、明示的なロギングがなくても、メトリクス、パラメータ、モデルのログを収集することが可能である。

mlflow.xgboost.autolog()

各ライブラリによってロギングできる情報は異なり、例えばxgboostモデルの場合は以下の情報が自動で取得可能である。

フレームワーク メトリクス パラメータ タグ アーティファクト
XGBoost ユーザー指定のメトリクス xgboost.trainパラメータ - MLflow Model、特徴量の重要度

2022年2月時点で、自動ロギング関数をサポートしているライブラリは以下の通り。
※参照元: MLflow Tracking — MLflow 2.9.2 documentation

  • Scikit-learn

  • TensorFlow and Keras

  • Gluon

  • XGBoost

  • LightGBM

  • Statsmodels

  • Spark

  • Fastai

  • Pytorch

3. トラッキングした情報をUI上から確認する

UI上でロギングした情報を確認する。

ランに紐づくロギング情報の確認

まとめ

今回はMLflow Trackingを使用したモデルの管理についてまとめた。次回はMLflow Projectsについてまとめる。

参考

mlflow.org

blog.amedama.jp

docs.databricks.com

アノテーション: Labelbox と Databricks を使ってラベリングする

はじめに

アノテーションとは、トレーニングデータとして利用するために、様々な形式のデータにメタデータを付与して解釈可能な意味づけをすることである。 機械学習、特にディープラーニングの普及とともに、アノテーションはますます重要な工程になっている。 そこで今回はパートナーコネクト機能で接続したDatabricksLabelboxを使ったアノテーションワークフローについてまとめる。

使用環境

・RunTime: Databricks RunTime: 9.1 LTS (includes Apache Spark 3.1.2, Scala 2.12)
・クラウドベンダー: AWS

DatabricksとLabelboxを接続するメリット

Labelboxはデータレイク上の様々な非構造化データにアノテーションすることが可能であり、Labelbox Connector for Databricksを使うことでアノテーション結果をデルタレイクに連携することができる。 そのためLabelboxとDatabricksを繋ぐことで、アノテーションからモデル開発、アノテーションフィードバックまでの一連のワークフローをレイクハウス環境下で実現できる。

Labelboxを使ったモデリングワークフロー
※引用元: Partnerships | Labelbox

Labelboxとは

データアノテーションによるトレニングデータの作成、管理をするためのトレーニングデータプラットフォーム。
「アノテーション」「モデルのパフォーマンス診断」「ラベリングの優先順位付け」という3つの機能を中心としてイテレーションを回すことで、アノテーション精度向上のフィードバックループを可能にしている。

Labelboxのライフサイクル

1. Annotate: アノテーション

チーム内で共有できるワークフローとラベリングの自動化により、データに迅速かつ正確にラベルを付けることが可能。

2. Diagnose: モデルのパフォーマンス診断

学習したモデルとそのパフォーマンスを簡単に視覚化できるだけでなく、モデルのパフォーマンスに影響を与える学習データのパターンを認識することができる。トレーニングデータの管理と同じプラットフォーム上にパフォーマンス診断機能が存在することで、プロセスが効率化され、モデルのエラーに対処したり、モデルのパフォーマンスを向上させるための高品質なトレーニングデータセットの構築が可能になった。

3. Prioritize: ラベリングの優先順位付け

モデルのパフォーマンスを向上させたり、クラスの不均衡を修正するために、パフォーマンスの低いクラスを特定する。その後、モデルの精度への影響の大きいデータを見つけ、優先順位をつけてラベリングを行うことが可能。

forbesjapan.com またビジネスニュースとしては、2022年の1月にLabelboxの評価額が10億ドルを突破し、ユニコーン企業の仲間入りを果たしたことも記憶に新しい。

Labelboxでできること

アノテーションはデータタイプ、目的別に様々な種類に分けることができる。Labelboxがサポートしている機能は以下の通り。※2022年2月現在

Labelboxがサポートしている機能一覧

※引用元: https://docs.labelbox.com/docs/editor

Labelboxへの接続手順

パートナーコネクト機能を使えば簡単にDatabricksとLabelboxを接続することができる。

パートナーコネクトを使ったLabelboxとの接続

詳細な接続方法については以下の記事を参照。 qiita.com

Labelboxを使用したアノテーションワークフロー

Labelboxを使ったラベリングワークフロー

1. データの取得

以下の3つの方法のいずれかを使って、データを取得する。

・ストレージからの読込み
データを保管しているクラウドストレージとLabelboxをIAM権限の委譲によって連携することで、Labelboxからデータレイク上のデータにアクセスする。

・署名付きURLからの読込み
データを保管しているクラウドストレージの各ファイルに署名付きURLまたは公開URL経由でLabelboxからデータにアクセスする。

・直接データをアップロード
LabelboxにWeb UI経由で直接データをアップロードしてアクセスする。

2. プロジェクトの作成

以下の2つの方法のいずれかを使ってプロジェクトを作成する。

・LabelboxのWeb UI上から定義
ユースケースに合わせて必要なアノテーションプロジェクトをWeb UIから作成する。

LabelboxのUI上からのプロジェクト定義

・Databricks上で定義
Python SDK(labelbox SDK)を使用して、Databricks上でLabelboxのプロジェクトを定義する。

  • Python SDKによるプロジェクト作成
# Create a new project
project_demo = client.create_project(name="Labelbox and Databricks Example")
project_demo.datasets.connect(demo_dataset)  # add the dataset to the queue

ontology = OntologyBuilder()

tools = [
  
  Tool(tool=Tool.Type.BBOX, name="Plant"),
  Tool(tool=Tool.Type.SEGMENTATION, name="Bird"),
]
for tool in tools: 
  ontology.add_tool(tool)

conditions = ["clear", "overcast", "rain", "other"]

weather_classification = Classification(
    class_type=Classification.Type.RADIO,
    instructions="what is the weather?", 
    options=[Option(value=c) for c in conditions]
)  
ontology.add_classification(weather_classification)


# Setup editor
for editor in client.get_labeling_frontends():
    if editor.name == 'Editor':
        project_demo.setup(editor, ontology.asdict()) 

print("Project Setup is complete.")

3. ラベリング

2.で定義したプロジェクト内容に沿って、LabelboxのWeb UI上からラベリングする。

ラベリング方法

4. ラベリングデータを取得

labelsparkライブラリを使用して、ラベリング結果をDatabricksから読み込む。

  • ラベリング結果の読込み
labels_table = labelspark.get_annotations(client, project_demo.uid, spark, sc)
labels_table.registerTempTable(LABEL_TABLE)
display(labels_table)
  • ラベリング結果
{
  "classifications": [
    {
      "answer": {
        "featureId": "ckzccw9wn00053e67irtyzlqb",
        "schemaId": "ckzccuilw1nuu10eo67580doz",
        "title": "overcast",
        "value": "overcast"
      },
      "featureId": "ckzccw9wn00063e672o0l00es",
      "schemaId": "ckzccuilw1nur10eofb181pjv",
      "title": "what is the weather?",
      "value": "what is the weather?"
    }
  ],
  "objects": [
    {
      "bbox": {
        "height": 2080,
        "left": 2387,
        "top": 478,
        "width": 1792
      },
      "color": "#ff0000",
      "featureId": "ckzccvrjl00013e675gxbg1x6",
      "instanceURI": "<URI>",
      "schemaId": "ckzccuilv1nu910eo9oxm9aky",
      "title": "Frog",
      "value": "frog"
    },
    {
      "bbox": null,
      "color": "#00ffa9",
      "featureId": "ckzccvxe900033e67w0243hij",
      "instanceURI": "<URI>",
      "schemaId": "ckzccuilw1nuh10eohuekaqh0",
      "title": "Bird",
      "value": "bird"
    }
  ],
  "relationships": []
}

まとめ

今回はDatabricksとLabelboxを使ってラベル付けする方法についてまとめた。
Labelboxは10,000件のアノテーションまでは無料なので、ぜひトライアルしてみてください。

labelbox.com

参考

databricks.com

docs.labelbox.com