はじめに
PySparkを使ってCSVファイルをシングルファイルで保存する方法についてまとめる。
使用環境
Databricks RunTime: 9.1 LTS (includes Apache Spark 3.1.2, Scala 2.12)
保存方法
0. 現在のパーテション数を確認する
Dataframeで保存されている場合はRDDに変換して確認する。パーテション数が1の場合はシングルファイルで保存される。
sdf.rdd.getNumPartitions()
1. パーテション数を1に設定して保存する
今回のケースではcoalesceメソッドが推奨される。
- repartitionメソッドを使用した場合
outputpath = '/my-output/repartition-csv' sdf.repartition(1).write.mode('overwrite').parquet(outputpath)
- coalesceメソッドを使用した場合
outputpath = '/my-output/coalesce-csv' sdf.coalesce(1).write.mode('overwrite').parquet(outputpath)
2. Pandas DFに変更して保存する
ファイル名を指定したい場合はPandas DFに変更して保存する方法しかない。パスの先頭に/dbfsを指定する必要がある点に注意する。
outputpath = '/dbfs/my-output/oandas.csv' pdf = sdf.toPandas() pdf.to_csv(outputpath, header=True, index=False)
デフォルトのパーテション数
DataFrameに対してgroupBy()、union()、join()などの関数を呼び出すと、複数のエクゼキュータやマシン間でデータがシャッフルされ、最終的にはデフォルトで200のパーティションに再分割される。Spark のデフォルトでは spark.sql.shuffle.partitions
の設定により、シャッフルするパーティションを 200 に定義している。
repartitionメソッドとcoalesceメソッドの違い
coalesceは既存のパーティションを使用して、シャッフルされるデータの量を最小限にするため、データサイズの異なるパーティションが作成される。(パーテション間でデータサイズにばらつきが起こる)
一方で、repartitionでは、フルシャッフルによってすべてのパーティションからデータが再分配されるため、ほぼ同じサイズのパーティションが作成される。
よって、repartitionはパーティション数の増減、coalesceはパーティション数を効率的に減らすのに使うのがいいとされている。
ただしどちらも多くのパーティションに渡ってデータをシャッフルするため、非常にコストのかかる操作である点には注意する。
repartition
再分割によってすべてのパーティションからデータが再分配される。フルシャッフルを行うので、何十億、何兆ものデータを扱う場合には非常にコストのかかる操作になる。 例えばパーテション数5から6に1増加する場合も全てのパーテションからデータが移動する。 ただフルシャッフルを行うので、パーテション毎のデータ数はほぼ等しくなるため、repartitionしたデータの分散処理速度は上がる。
coalesce
既存のパーティションを使用して、シャッフルされるデータの量を最小限に抑える。 例えばパーテション数5から3に2減少する場合は、2つのパーテションのみデータ移動する。
Pandas DF保存パスに/dbfsを指定する理由
シングルノード向けのライブラリで作業する場合は、明示的に指定しないとローカルドライバーノードに保存される。ローカルドライバーノード上のデータはクラスタの終了とともに消えてしまうので、Pandas DF保存パスに/dbfsを指定している。
- Pandas DF保存パスに/dbfsを指定しない場合
outputpath = 'pandas.csv' pdf = sdf.toPandas() pdf.to_csv(outputpath, header=True, index=False)