PySpark: シングルファイルとしてCSVを保存する

はじめに

PySparkを使ってCSVファイルをシングルファイルで保存する方法についてまとめる。

使用環境

Databricks RunTime: 9.1 LTS (includes Apache Spark 3.1.2, Scala 2.12)

保存方法

0. 現在のパーテション数を確認する

Dataframeで保存されている場合はRDDに変換して確認する。パーテション数が1の場合はシングルファイルで保存される。

sdf.rdd.getNumPartitions()

1. パーテション数を1に設定して保存する

今回のケースではcoalesceメソッドが推奨される。

  • repartitionメソッドを使用した場合
outputpath = '/my-output/repartition-csv'
sdf.repartition(1).write.mode('overwrite').parquet(outputpath)
  • coalesceメソッドを使用した場合
outputpath = '/my-output/coalesce-csv'
sdf.coalesce(1).write.mode('overwrite').parquet(outputpath)

2. Pandas DFに変更して保存する

ファイル名を指定したい場合はPandas DFに変更して保存する方法しかない。パスの先頭に/dbfsを指定する必要がある点に注意する。

outputpath = '/dbfs/my-output/oandas.csv'
pdf = sdf.toPandas()
pdf.to_csv(outputpath, header=True, index=False)

デフォルトのパーテション数

DataFrameに対してgroupBy()、union()、join()などの関数を呼び出すと、複数のエクゼキュータやマシン間でデータがシャッフルされ、最終的にはデフォルトで200のパーティションに再分割される。Spark のデフォルトでは spark.sql.shuffle.partitions の設定により、シャッフルするパーティションを 200 に定義している。

repartitionメソッドとcoalesceメソッドの違い

coalesceは既存のパーティションを使用して、シャッフルされるデータの量を最小限にするため、データサイズの異なるパーティションが作成される。(パーテション間でデータサイズにばらつきが起こる)
一方で、repartitionでは、フルシャッフルによってすべてのパーティションからデータが再分配されるため、ほぼ同じサイズのパーティションが作成される。
よって、repartitionはパーティション数の増減、coalesceはパーティション数を効率的に減らすのに使うのがいいとされている。
ただしどちらも多くのパーティションに渡ってデータをシャッフルするため、非常にコストのかかる操作である点には注意する。

repartition

再分割によってすべてのパーティションからデータが再分配される。フルシャッフルを行うので、何十億、何兆ものデータを扱う場合には非常にコストのかかる操作になる。 例えばパーテション数5から6に1増加する場合も全てのパーテションからデータが移動する。 ただフルシャッフルを行うので、パーテション毎のデータ数はほぼ等しくなるため、repartitionしたデータの分散処理速度は上がる。

coalesce

既存のパーティションを使用して、シャッフルされるデータの量を最小限に抑える。 例えばパーテション数5から3に2減少する場合は、2つのパーテションのみデータ移動する。

Pandas DF保存パスに/dbfsを指定する理由

シングルノード向けのライブラリで作業する場合は、明示的に指定しないとローカルドライバーノードに保存される。ローカルドライバーノード上のデータはクラスタの終了とともに消えてしまうので、Pandas DF保存パスに/dbfsを指定している。

  • Pandas DF保存パスに/dbfsを指定しない場合
outputpath = 'pandas.csv'
pdf = sdf.toPandas()
pdf.to_csv(outputpath, header=True, index=False)

ローカルドライバーノードに保存される

参考

sparkbyexamples.com

docs.databricks.com