概要 フューチャーアドベントカレンダー の6日目のエントリーです。 昨日はyut0n さんによる「GoogleカレンダーのイベントをHangouts Chatに通知するbotを作った話 」でした。
当記事では、AWS Glue をローカル環境で単体テストするための環境構築方法についてまとめました。
手順 
環境構築 
pytest の環境構築 
conftest.py の設定 
テスト対象の作成 
テスト実行 
 
実行環境 
Amazon Linux 2 AMI 2.0.20190618 x86_64 HVM gp2 
Docker 18.06.1-ce 
docker-compose version 1.24.0 
 
1. 環境構築 docker compose を利用します。 GlueのDockerfileは、 こちらの記事(AWS Glueの開発エンドポイントがそこそこお高いのでローカル開発環境を用意しました | Future Tech Blog - フューチャーアーキテクト ) にて、紹介されているDockerfileをベースに利用します。 少々イメージサイズが大きかったので、小さくする対応をしていますが基本は同じです。
ディレクトリ構成
├── Dockerfile ├── docker-compose.yml ├── src                  └── tests                
 
Dockerfile 
FROM  centos:7 RUN  yum update -y \   && yum install -y gcc gcc-c++ make openssl-devel readline-devel zlib-devel wget curl unzip vim epel-release git \   && yum install -y vim-enhanced bash-completion net-tools bind-utils \   && yum install -y https://centos7.iuscommunity.org/ius-release.rpm \   && yum install -y python36u python36u-libs python36u-devel python36u-pip \   && yum install -y java java-1.8.0-openjdk-devel \   && rm  -rf /var/cache/yum/* \   && yum clean all RUN  localedef -f UTF-8 -i ja_JP ja_JP.UTF-8 ENV  LANG ja_JP.UTF-8 ENV  LC_CTYPE "ja_JP.UTF-8" ENV  LC_NUMERIC "ja_JP.UTF-8" ENV  LC_TIME "ja_JP.UTF-8" ENV  LC_COLLATE "ja_JP.UTF-8" ENV  LC_MONETARY "ja_JP.UTF-8" ENV  LC_MESSAGES "ja_JP.UTF-8" ENV  LC_PAPER "ja_JP.UTF-8" ENV  LC_NAME "ja_JP.UTF-8" ENV  LC_ADDRESS "ja_JP.UTF-8" ENV  LC_TELEPHONE "ja_JP.UTF-8" ENV  LC_MEASUREMENT "ja_JP.UTF-8" ENV  LC_IDENTIFICATION "ja_JP.UTF-8" ENV  LC_ALL ja_JP.UTF-8 RUN  curl -OL https://archive.apache.org/dist/maven/maven-3/3.6.2/binaries/apache-maven-3.6.2-bin.tar.gz \   && tar -xzvf apache-maven-3.6.2-bin.tar.gz \   && mv  apache-maven-3.6.2 /opt/ \   && ln  -s /opt/apache-maven-3.6.2 /opt/apache-maven \   && rm  apache-maven-3.6.2-bin.tar.gz ENV  JAVA_HOME /usr/lib/jvm/java-1.8 .0 -openjdk/jre/ENV  PATH $PATH:/opt/apache-maven/binRUN  mvn -version RUN  curl -OL https://aws-glue-etl-artifacts.s3.amazonaws.com/glue-1.0/spark-2.4.3-bin-hadoop2.8.tgz \   && tar -xzvf spark-2.4.3-bin-hadoop2.8.tgz \   && mv  spark-2.4.3-bin-spark-2.4.3-bin-hadoop2.8 /opt/ \   && ln  -s /opt/spark-2.4.3-bin-spark-2.4.3-bin-hadoop2.8 /opt/spark \   && rm  ./spark-2.4.3-bin-hadoop2.8.tgz ENV  SPARK_HOME /opt/sparkRUN  unlink  /bin/python \   && ln  -s /bin/python3 /bin/python \   && ln  -s /bin/pip3.6 /bin/pip RUN  git config --global http.sslVerify false  \   && git clone  -b glue-1.0 --depth 1  https://github.com/awslabs/aws-glue-libs \   && ln  -s ${SPARK_HOME} /jars /aws-glue-libs/jarsv1 \   && sed -i -e 's/mvn/mvn -T 4/'  /aws-glue-libs/bin/glue-setup.sh \   && ./aws-glue-libs/bin/gluepyspark ENV  PATH $PATH:/aws-glue-libs/bin/WORKDIR  /opt/src ENTRYPOINT  ["/bin/sh" , "-c" , "while :; do sleep 10; done" ] 
 
  
S3の環境が必要だったため、 LocalStack  を利用しています。
docker-compose.yml
version:  "3" services:   glue.local:      build:        context:  ./      container_name:  gluelocal      volumes:        -  ./src:/opt/src/src        -  ./tests:/opt/src/tests      environment:               -  AWS_DEFAULT_REGION=ap-northeast-1        -  AWS_DEFAULT_OUTPUT=json        -  AWS_ACCESS_KEY_ID=xxx        -  AWS_SECRET_ACCESS_KEY=xxx    aws.local:      image:  localstack/localstack      environment:        -  SERVICES=s3        -  DEFAULT_REGION=ap-northeast-1               -  AWS_DEFAULT_REGION=ap-northeast-1        -  AWS_DEFAULT_OUTPUT=json        -  AWS_ACCESS_KEY_ID=xxx        -  AWS_SECRET_ACCESS_KEY=xxx  
 
コンテナ起動
docker-compose up -d --build 
 
2. pytestの環境構築 必要なパッケージのインストールをします。Glueバージョン 1.0  を想定して、pysparkは2.4.3を明示的にインストールします。
docker exec  -it gluelocal pip install pyspark==2.4.3 boto3 pytest 
 
3. conftest.py の設定 pytestではテストの前後処理を tests/conftest.py 内に実装する慣習があるためそれにならいます。 Test実行時に1回だけ実行したい処理をまとめています。
import  pytestimport  osfrom  pyspark.context import  SparkContextfrom  awsglue.context import  GlueContext@pytest.fixture(scope="session" , autouse=True  ) def  scope_session ():         os.environ["TEST_S3_ENDPOINT_URL" ] = "http://aws.local:4572"      sc = SparkContext()          sc._jsc.hadoopConfiguration().set ("fs.s3a.endpoint" , "http://aws.local:4572" )     sc._jsc.hadoopConfiguration().set ("fs.s3a.path.style.access" , "true" )     sc._jsc.hadoopConfiguration().set ("fs.s3a.signing-algorithm" , "S3SignerType" )     pytest.sc = sc     pytest.glueContext = GlueContext(pytest.sc)     pytest.spark = pytest.glueContext.spark_session 
 
4. テスト対象の作成 サンプル程度に、S3上のcsvファイルからDynamicFrameを生成する関数をテストします。
Glueスクリプト: src/etl.py
from  awsglue.dynamicframe import  DynamicFramefrom  awsglue.context import  GlueContextimport  sysdef  load_dynamic_frame_from_csv (glueContext: GlueContext, spark, bucket: str , path: str  ) -> DynamicFrame:    p = "s3://{}/{}" .format (bucket, path)     return  glueContext.create_dynamic_frame_from_options(         connection_type="s3" ,         connection_options={"paths" : [p]},         format ="csv" ,         format_options={"withHeader" : True , "separator" : "," },     ) 
 
テストコード: tests/test_etl.py
import  pytestimport  boto3import  jsonimport  osimport  sysimport  ioimport  csvfrom  src.etl import  load_dynamic_frame_from_csvfrom  botocore.client import  Configdef  test_load_dynamic_frame_from_csv ():         inputs = [       {         "id" : "1" ,         "name" : "xxx" ,         "address" : "xxx@example.co.jp"        },       {         "id" : "2" ,         "name" : "yyy" ,         "address" : "yyy@example.co.jp"        }     ]     input_str = io.StringIO()     w = csv.DictWriter(input_str, fieldnames=inputs[0 ].keys())     w.writeheader()     for  input  in  inputs:       w.writerow(input )     s3 = boto3.resource(         "s3" ,         endpoint_url=os.environ["TEST_S3_ENDPOINT_URL" ],         region_name="ap-northeast-1" ,         use_ssl=False ,         config=Config(s3={"addressing_style" : "path" }),     )     bucket_name = "test-csv-bucket"      bucket = s3.Bucket(bucket_name)     bucket.create(ACL="public-read-write" )     body = input_str.getvalue()     key = "user/2019/12/06/users.csv"      bucket.put_object(Key=key, Body=body, ACL="public-read-write" )          res_df = load_dynamic_frame_from_csv(pytest.glueContext, pytest.spark, bucket_name, key)          assert  res_df.count() == len (inputs)     res_df_json = res_df.toDF().toJSON().take(len (inputs))     for  res in  res_df_json:       r = json.loads(res)       assert  r in  inputs 
 
5. テスト実行 Glue環境を構築して pytest を実行する gluepytest コマンドが用意されているため、そちらを利用します。 PATHを通してあるので、下記で実行できます。
docker exec  -it gluelocal gluepytest 
 
結果
  adding: awsglue/ (stored 0%)   adding: awsglue/README.md (deflated 57%)   adding: awsglue/__init__.py (deflated 37%)   adding: awsglue/context.py (deflated 78%)   adding: awsglue/data_sink.py (deflated 60%) . . . ================================== test  session starts ================================== platform linux -- Python 3.6.8, pytest-5.3.1, py-1.8.0, pluggy-0.13.1 rootdir: /opt/src collected 1 item tests/test_etl.py .                                                               [100%] =================================== 1 passed in  9.79s =================================== 
 
所感 Glueをローカル環境にて、単体テストを実施する環境を整備してみました。 Glueの動作確認は、開発エンドポイントを利用して確認することが多く、 少々面倒かつ再現性がなかったため、テスト環境を構築してテストを実行することで安定して開発を進めていきたいです。 Dockerで作成しているため、CI/CD等にも組み込んでいけたらと考えています。 (Dockerfileが1GB程度とまだまだ大きいため、もう少しスリムにしたいなとは思っています。)
参考