はじめに こんにちは。TIGの藤田です。
Python連載 の8日目として、PySparkを使用したGlueジョブ開発のお話をします。
ETLツールとして使用されるAWS Glueですが、業務バッチで行うような複雑な処理も実行できます。また、処理はGlueジョブとして、Apache Spark分散・並列処理のジョブフローに簡単に乗せることができます!
特に複雑な処理は、やや割高な開発エンドポイントは使用せず、ローカル端末で、しっかり開発・テストを行いたいですよね。そのためのローカル開発Tipsをご紹介します。
内容
Glueジョブの開発と実行概要
Tip1: ローカル環境構築
Tip2: PySpark, SparkSQL開発
Tip3: 単体テスト(pytest)
Tip4: データカタログどうする問題
Glueジョブの開発と実行概要 ローカル開発の前に、AWS Glueでのジョブ実行方法を簡単にお話します。複雑な処理をSparkジョブで実行するには、以下4ステップでOKです。
1)ジョブズクリプトを作成、S3に配置 2)ジョブ実行定義 3)「ワークフロー」によるジョブフロー定義 4)AWS Athenaを使った実行結果確認
3)のジョブフロー定義については、規模や構成によって他の方法を検討する余地が大きいですが、Glueの「ワークフロー」でも、以下のような機能は用意されています。
・画面GUIでのジョブフロー定義 ・ジョブの並列実行、分岐、待合せ ・オンディマンド、スケジュール、EventBridgeイベントによるトリガ実行 ・画面からの実行状態、結果、エラー確認、リトライ実行
4)について、Athena は、標準的なSQLを使用してS3のデータを直接分析できるサービスです。Athenaのクエリ実行には、AWS Glueデータカタログ(DatabaseやTable)の登録が必要ですが、これはAthenaのクエリエディタにDDLを実行すると簡単に行えます(Glueのデータカタログ定義はTerraform等でも行えるので運用上は他の方法でもよいと思います)。
Tip1: ローカル環境構築 AWS公式にGlueコンテナが配布 されて、docker-composeによる環境構築が容易になりました。ローカル環境構築の詳細は、AWS Glueの開発環境の構築(2021) を参照ください。
Tip2: PySpark, SparkSQL開発 Glueでは、3つのジョブタイプ、Python shell, Spark streaming, Spark script (Python, Scala)が選択できますが、今回はSpark script(PySpark, SparkSQL)を採用しました。PySpark は、Apache Spark をPythonで呼出すライブラリです。SparkSQL は、Apache Sparkのモジュールの1つで、SQLとDataFrameによる構造化データの処理を可能にします。
複雑な業務処理の実装にも以下のメリットがありました。
構造化データ(Table)をメモリ上のDataFrameに取込み効率的に加工できる。
データカタログ(Table定義)があれば、プログラム上データ取込用のモデル定義を別につくる必要がない。
SparkSQLにより、複数ファイル(Table)の結合を含む、標準的なSQLによる操作が可能 。
SQL関数に含まれないPythonの関数やライブラリを使いたい場合にも、ユーザー定義関数 (UDF)を使えば、DataFrameの構造を維持したまま、特定のカラムに対してのみ処理を実行できる。
以下、2ファイル(2 Tables)を結合してユーザー定義関数処理をするスクリプト例です。
import sysfrom pyspark.context import SparkContextfrom awsglue.context import GlueContextfrom awsglue.utils import getResolvedOptionsfrom pyspark.sql import functions as Ffrom pyspark.sql.types import DecimalTypefrom decimal import Decimal, ROUND_FLOOR, ROUND_HALF_UP, ROUND_CEILINGdef round_fraction (target: Decimal, meth: str , pos: str ): p = { "1" : "1." , "2" : "0.1" , "3" : "0.01" , "4" : "0.001" , } methods = { "1" : ROUND_FLOOR, "2" : ROUND_HALF_UP, "3" : ROUND_CEILING, } if meth is None or pos is None or meth == "" or pos == "" : return target return target.quantize(Decimal(p[pos]), rounding=methods[meth]) udf_round_fraction = F.udf(round_fraction, DecimalType(10 , 3 )) def exec_sample (glueContext, spark, input_dir, output_dir ): data = ["calc_source" , "attributes" ] for d in data: p = f"s3://{input_dir} /{d} /" glueContext.create_dynamic_frame.from_options( connection_type="s3" , connection_options={"paths" : [p]}, format ="parquet" , ).toDF().createOrReplaceTempView(d) wk_main = spark.sql(""" SELECT src.id , src.number1 , src.number2 , src.position , src.method , src.group , att.attribute1 FROM calc_source src INNER JOIN attributes att ON src.group = att.group WHERE src.number1 IS NOT NULL AND src.number2 IS NOT NULL AND src.number2 <> 0 """ ) wk_main = wk_main.withColumn( "calc_result" , udf_round_fraction( F.col("number1" ) / F.col("number2" ), F.col("method" ), F.col("position" )) ) wk_main.write.mode("overwrite" ).format ("parquet" ).save(f"s3://{output_dir} /sample_out/" ) def main (): args = getResolvedOptions(sys.argv, ["input_dir" , "output_dir" ]) glueContext = GlueContext(SparkContext.getOrCreate()) spark = glueContext.spark_session exec_sample(glueContext, spark, args["input_dir" ], args["output_dir" ]) if __name__ == '__main__' : main()
Tip3: 単体テスト(pytest) ローカル環境での、PySparkスクリプトの単体テストはpytest で可能です。方法はAWS Glueの単体テスト環境の構築手順 を参照ください。実行結果のアサーションをファイル単位で行う場合は、DataFrameを比較できるツール(chispa など)を利用すると便利です。
Tip4: データカタログどうする問題 データカタログは、データのファイルシステムをDatabaseとTableのように定義して管理するHive メタストア同様の機能を担っています。
データカタログは、上記Glueコンテナのデフォルト設定では呼出すことができず、CSVファイルを読込む際にデータ型定義ができない課題がありました。
CSVファイルをDataFrameに読込む際に、schema定義をかいてやることはできますが、ローカル環境でしか使わないコードを、対象データのカラムすべてに対して書くのは嬉しくありません。AWS環境のGlueデータカタログの定義と二重管理にもなります。そこで、2パターンの解決策をご紹介します。
Tip4-1. AWS環境に接続してGlueデータカタログを使用する
Tip4-2. CSVではなく、Parquetファイルを使う
Tip4-1. AWS環境に接続してGlueデータカタログを使用する AWSアカウントの使える状態であれば、AWS環境のS3からGlueデータカタログを使用してファイルを読込むのが楽です。ローカル環境のGlueコンテナ内から、以下のようなコードが実行できます。
from pyspark.context import SparkContextfrom awsglue.context import GlueContextglueContext = GlueContext(SparkContext.getOrCreate()) df = glueContext.create_dynamic_frame.from_catalog( database="sampledb" , table_name="departuredelays" , push_down_predicate="(any=='sample')" , ).toDF() df.write.mode("overwrite" ).format ("parquet" ).save( "s3://bucket_name/departuredelays_out/any=sample/" )
このスクリプト実行のためには、DatabaseとTable定義を予めGlueデータカタログに登録しておく必要があります。Athenaから登録するには以下のようなDDLを使用します。読込みファイルがCSVの場合です。
CREATE DATABASE sampledb LOCATION 's3://bucket_name/' ; CREATE EXTERNAL TABLE sampledb.departuredelays ( `date ` string, `delay` int , `distance` int , `origin` string, `destination` string) PARTITIONED BY ( `any ` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' WITH SERDEPROPERTIES ('separatorChar' = ',' , 'escapeChar' = '\\' , 'quoteChar' = '\"' )STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://bucket_name/departuredelays' TBLPROPERTIES ( 'has_encrypted_data' = 'false' , 'skip.header.line.count' = '1' ) ;
おまけですが、出力結果をAthenaから確認するためには、出力ディレクトリのTable定義を登録します。今回出力ファイルはParquetなので、DDLは以下のようになります。
CREATE EXTERNAL TABLE IF NOT EXISTS sampledb.departuredelays_out ( `date ` string, `delay` int , `distance` int , `origin` string, `destination` string) PARTITIONED BY ( `any ` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 's3://bucket_name/departuredelays_out' TBLPROPERTIES ( 'has_encrypted_data' = 'false' ) ;
Tip4-2. CSVではなく、Parquetファイルを使う AWS環境の使えない状態でも、ファイルをParquetフォーマットで作成できれば、型の保存された状態で読込ができます。Parquetは、CSVよりも保存性や読書き性能の面で有利です(Apache Parquetについて )。
Parquetファイルは直接開いて中が確認できないですが、上記のようにAthenaで確認できますし、ローカル環境でも、Jupyter Notebook上でDataFrameに読込んでschema表示・データ表示できます。
まとめ AWS Glueで複雑な処理を開発するときのTipsをご紹介しました。複雑なロジック開発とテストにAWS Glue環境を用いるのは費用面で不利なので、ぜひローカル環境を活用したいところです。特にファイルI/Oについては、ローカル環境とAWS環境で同じコードで処理できるようにするのがポイントだと思います。Glueジョブ開発の一助になれば幸いです。
参考リンク