AWS GlueでMySQLのデータを整形して転送するETL

AWS GlueでMySQLからデータを取得した際に、型によって値の変換がされる部分があったりして、それをもとのMySQLの値と同じにしたいなとか、一部の値は別の表現をして転送したいなとか、そんな整形をしてみました。


やりたいこと

  • AWS GlueでMySQLからデータを取得し、特定の型や値に対して変換ルールを設けて整形をする


前提

  • Python(PySpark)で実装します
  • データのLoad先はS3としますが、ここは特に今回の実装ではあまり重要ではなく他のサービスでも可能です


やり方

MySQL内のETL対象スキーマ

  • 以下のシステム表領域・モニタリング用スキーマ以外の全スキーマを対象としています 
    • mysql
    • information_schema
    • performance_schema
    • sys
  • ※特定のスキーマに絞りたい場合も簡単に絞れます

変換要件

こんな変換ルールで実装をしてみます。

  • decimal型
    1. Glueで取得する際に自動で付いた小数点以降の最後の桁の'0'を削る
      • 例)'4.10' -> '4.1'
        • 元データ : '4.1'
        • -> Glue : '4.10'(Glueで取得時に変換される)
        • -> 整形 : '4.1'(それを元の形に戻す)
  • datetime型
    1. Glueで取得する際に自動で付いた秒数の小数点以下'.0'を削る
      • 例)'2021-08-25 11:22:33.0' -> '2021-08-25 11:22:33'
        • 元データ : '2021-08-25 11:22:33'
        • -> Glue : '2021-08-25 11:22:33.0'(Glueで取得時に変換される)
        • -> 整形 : '2021-08-25 11:22:33'(それを元の形に戻す)
    2. '0000-00-00 00:00:00'という値を'null'にする
      • 例)'0000-00-00 00:00:00' -> null
  • date型
    1. '0000-00-00'という値を'null'にする
      • 例)'0000-00-00' -> null

大まかな流れと方法

  • MySQLinformation_schema.COLUMNSから変換したい型に一致するカラムを絞り、DB・テーブル・カラムの組み合わせとして取得する
  • MySQLinformation_schema.TABLESからETL対象のスキーマ・テーブル一覧を取得する
  • スキーマ・テーブルごとにループを回す
    • MySQLinformation_schema.COLUMNSから、該当テーブルのカラム情報(型など)を全て取得する
    • MySQLの型とGlueで書き込む際の型の間のマッピングルールを指定する
    • 実テーブルからデータを取得し、上記の型マッピングルールに従って型の変換を行う
    • 最初に記載した「変換要件」に沿って、型ごとに値の変換を行う
    • 変換後のデータをS3に書き込む

Glueジョブ(Python)

GitHubにもあります。

github.com

コード(長いので折り畳み)

import sys
import datetime
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.functions import *
from awsglue.context import GlueContext
from awsglue.job import Job

arg_keys =  ['JOB_NAME', 'connection_name', 'bucket_root']
args = getResolvedOptions(sys.argv, arg_keys)

(job_name, connection_name, bucket_root) = [args[k] for k in arg_keys]

now = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

job = Job(glueContext)
job.init(job_name, args)

jdbc_conf = glueContext.extract_jdbc_conf(connection_name)

## transformするカラム一覧の抽出
transformed_columns_dyf = glueContext.create_dynamic_frame_from_options('mysql', connection_options={
        "url": "{0}/{1}".format(jdbc_conf['url'], 'information_schema'), "user": jdbc_conf['user'], "password": jdbc_conf['password'], "dbtable": "COLUMNS"
    }
)

## 除外スキーマを除いて、変換対象の型のカラムに絞り込む
transformed_columns = transformed_columns_dyf \
    .toDF() \
    .filter( \
        "(DATA_TYPE = 'datetime' \
            OR DATA_TYPE = 'decimal' \
            OR DATA_TYPE = 'date' \
        ) \
        AND TABLE_SCHEMA != 'mysql' \
        AND TABLE_SCHEMA != 'information_schema' \
        AND TABLE_SCHEMA != 'performance_schema' \
        AND TABLE_SCHEMA != 'sys'" \
    ).collect()

## transformリスト生成(スキーマ・テーブル・カラムの組み合わせ)
transformed_columns_list = {}

for row in transformed_columns:
    if row['TABLE_SCHEMA'] not in transformed_columns_list:
        transformed_columns_list[row['TABLE_SCHEMA']] = {}
    if row['TABLE_NAME'] not in transformed_columns_list[row['TABLE_SCHEMA']]:
        transformed_columns_list[row['TABLE_SCHEMA']][row['TABLE_NAME']] = []
        
    row_col_type = {}
    row_col_type['COLUMN_NAME'] = row['COLUMN_NAME']
    row_col_type['DATA_TYPE'] = row['DATA_TYPE']

    transformed_columns_list[row['TABLE_SCHEMA']][row['TABLE_NAME']].append(row_col_type)
    
    
## ETL対象のスキーマ・テーブル一覧を取得するDynamicFrame生成
target_tables_dyf = glueContext.create_dynamic_frame_from_options(
  'mysql', connection_options={
    "url": "{0}/{1}".format(jdbc_conf['url'], 'information_schema'), "user": jdbc_conf['user'], "password": jdbc_conf['password'], "dbtable": "TABLES"
  }
)

## いらないスキーマを弾いて取得
target_tables = target_tables_dyf.toDF().filter("TABLE_SCHEMA != 'mysql' AND TABLE_SCHEMA != 'information_schema' AND TABLE_SCHEMA != 'performance_schema' AND TABLE_SCHEMA != 'sys'").collect()

## DB-TableごとにS3書き込み
for row in target_tables:
    db_schema = row['TABLE_SCHEMA']
    table_name = row['TABLE_NAME']
    
    ## Tableのカラム一覧抽出
    mapping_type_dyf = glueContext.create_dynamic_frame_from_options(
      'mysql', connection_options={
        "url": "{0}/{1}".format(jdbc_conf['url'], 'information_schema'), "user": jdbc_conf['user'], "password": jdbc_conf['password'], "dbtable": "COLUMNS"
      }
    )
    
    mapping_type = mapping_type_dyf.toDF().filter("TABLE_SCHEMA = '" + db_schema + "' AND TABLE_NAME = '" + table_name + "'").collect()

    ## 型のマッピングルール定義
    mappings_list = []
    choice_list = []
    for map_type in mapping_type:
        if map_type['DATA_TYPE'] == "int":
            map_source_type = "int"
            map_target_type = "int"
        elif map_type['DATA_TYPE'] == "bigint":
            map_source_type = "bigint"
            map_target_type = "long"
        elif map_type['DATA_TYPE'] == "tinyint":
            map_source_type = "tinyint"
            map_target_type = "byte"
        elif map_type['DATA_TYPE'] == "varbinary":
            map_source_type = "binary"
            map_target_type = "binary"
        elif map_type['DATA_TYPE'] == "decimal":
            map_source_type = "decimal"
            map_target_type = "decimal"
        elif map_type['DATA_TYPE'] == "date":
            map_source_type = "date"
            map_target_type = "date"
        elif map_type['DATA_TYPE'] == "datetime":
            map_source_type = "timestamp"
            map_target_type = "timestamp"
        else:
            map_source_type = "string"
            map_target_type = "string"
            
        choice_tuple = (map_type['COLUMN_NAME'], "cast:" + map_source_type)
        choice_list.append(choice_tuple)
            
        mapping_tuple = (map_type['COLUMN_NAME'], map_source_type, map_type['COLUMN_NAME'], map_target_type)
        mappings_list.append(mapping_tuple)

    ## 実テーブルからデータ取得
    table_result_dyf = glueContext.create_dynamic_frame_from_options('mysql', connection_options={
            "url": "{0}/{1}".format(jdbc_conf['url'], db_schema), "user": jdbc_conf['user'], "password": jdbc_conf['password'], "dbtable": table_name
        }
    )
    
    ## 型変換
    resolvechoice = ResolveChoice.apply(frame = table_result_dyf, specs = choice_list, transformation_ctx = "resolvechoice")
    applymapping = ApplyMapping.apply(frame = resolvechoice, mappings = mappings_list, transformation_ctx = "applymapping")
    
    ## 実データのDataFrameに変換
    table_result = applymapping.toDF()

    ## 実データ変換
    if db_schema in transformed_columns_list and table_name in transformed_columns_list[db_schema]:
        for col_type in transformed_columns_list[row['TABLE_SCHEMA']][row['TABLE_NAME']]:
            if col_type['DATA_TYPE'] == "decimal":
                table_result = table_result.withColumn( \
                    col_type['COLUMN_NAME'], \
                    regexp_extract(
                        col_type['COLUMN_NAME'], \
                        '([0-9]+\.[0-9])0$', \
                        1 \
                    ) \
                )
            elif col_type['DATA_TYPE'] == "datetime":
                table_result = table_result.withColumn( \
                    col_type['COLUMN_NAME'], \
                    regexp_replace( \
                        col_type['COLUMN_NAME'], \
                        '\.0$', \
                        '' \
                    ) \
                )

            if col_type['DATA_TYPE'] == "datetime":
                from_data = '0000-00-00 00:00:00'
                to_data = None
            elif col_type['DATA_TYPE'] == "date":
                from_data = '0000-00-00'
                to_data = None
            else:
                continue

            table_result = table_result.withColumn( \
                col_type['COLUMN_NAME'], \
                when( \
                    col(col_type['COLUMN_NAME']) == from_data, \
                        to_data \
                    ).otherwise( \
                        col(col_type['COLUMN_NAME']) \
                ) 
            )
            
    transformed_data_dyf = applymapping.fromDF(table_result, glueContext, "transformed_data_dyf")
    
    glueContext.write_dynamic_frame.from_options(
        frame=transformed_data_dyf,
        connection_type="s3",
        connection_options={"path": bucket_root + "/" + now + "/" + db_schema + "/" + table_name},
        format="json"
    )
    
job.commit()


解説

MySQLinformation_schema.COLUMNSから変換したい型に一致するカラムを絞って取得します。

## transformするカラム一覧の抽出
transformed_columns_dyf = glueContext.create_dynamic_frame_from_options('mysql', connection_options={
        "url": "{0}/{1}".format(jdbc_conf['url'], 'information_schema'), "user": jdbc_conf['user'], "password": jdbc_conf['password'], "dbtable": "COLUMNS"
    }
)

## 除外スキーマを除いて、変換対象の型のカラムに絞り込む
transformed_columns = transformed_columns_dyf \
    .toDF() \
    .filter( \
        "(DATA_TYPE = 'datetime' \
            OR DATA_TYPE = 'decimal' \
            OR DATA_TYPE = 'date' \
        ) \
        AND TABLE_SCHEMA != 'mysql' \
        AND TABLE_SCHEMA != 'information_schema' \
        AND TABLE_SCHEMA != 'performance_schema' \
        AND TABLE_SCHEMA != 'sys'" \
    ).collect()


これをDB・テーブル・カラムの組み合わせとしてリストにしておきます。

## transformリスト生成(スキーマ・テーブル・カラムの組み合わせ)
transformed_columns_list = {}

for row in transformed_columns:
    if row['TABLE_SCHEMA'] not in transformed_columns_list:
        transformed_columns_list[row['TABLE_SCHEMA']] = {}
    if row['TABLE_NAME'] not in transformed_columns_list[row['TABLE_SCHEMA']]:
        transformed_columns_list[row['TABLE_SCHEMA']][row['TABLE_NAME']] = []
        
    row_col_type = {}
    row_col_type['COLUMN_NAME'] = row['COLUMN_NAME']
    row_col_type['DATA_TYPE'] = row['DATA_TYPE']

    transformed_columns_list[row['TABLE_SCHEMA']][row['TABLE_NAME']].append(row_col_type)


次に、information_schema.TABLESからETL対象のスキーマ・テーブル一覧を取得して、いよいよループ処理で変換と書き込みをしていきます。

## ETL対象のスキーマ・テーブル一覧を取得するDynamicFrame生成
target_tables_dyf = glueContext.create_dynamic_frame_from_options(
  'mysql', connection_options={
    "url": "{0}/{1}".format(jdbc_conf['url'], 'information_schema'), "user": jdbc_conf['user'], "password": jdbc_conf['password'], "dbtable": "TABLES"
  }
)

## いらないスキーマを弾いて取得
target_tables = target_tables_dyf.toDF().filter("TABLE_SCHEMA != 'mysql' AND TABLE_SCHEMA != 'information_schema' AND TABLE_SCHEMA != 'performance_schema' AND TABLE_SCHEMA != 'sys'").collect()


ループ処理ではテーブルずつで回していきます。ループ内ではまず、型変換のためにinformation_schema.COLUMNSから該当テーブルの全カラム情報を取得します。

## DB-TableごとにS3書き込み
for row in target_tables:
    db_schema = row['TABLE_SCHEMA']
    table_name = row['TABLE_NAME']
    
    ## Tableのカラム一覧抽出
    mapping_type_dyf = glueContext.create_dynamic_frame_from_options(
      'mysql', connection_options={
        "url": "{0}/{1}".format(jdbc_conf['url'], 'information_schema'), "user": jdbc_conf['user'], "password": jdbc_conf['password'], "dbtable": "COLUMNS"
      }
    )
    
    mapping_type = mapping_type_dyf.toDF().filter("TABLE_SCHEMA = '" + db_schema + "' AND TABLE_NAME = '" + table_name + "'").collect()


それぞれのカラムの型ごとに、元のMySQL上での型に対して、Glueで扱う際の型を定義しておきます。

    ## 型のマッピングルール定義
    mappings_list = []
    choice_list = []
    for map_type in mapping_type:
        if map_type['DATA_TYPE'] == "int":
            map_source_type = "int"
            map_target_type = "int"
        elif map_type['DATA_TYPE'] == "bigint":
            map_source_type = "bigint"
            map_target_type = "long"
        elif map_type['DATA_TYPE'] == "tinyint":
            map_source_type = "tinyint"
            map_target_type = "byte"
        elif map_type['DATA_TYPE'] == "varbinary":
            map_source_type = "binary"
            map_target_type = "binary"
        elif map_type['DATA_TYPE'] == "decimal":
            map_source_type = "decimal"
            map_target_type = "decimal"
        elif map_type['DATA_TYPE'] == "date":
            map_source_type = "date"
            map_target_type = "date"
        elif map_type['DATA_TYPE'] == "datetime":
            map_source_type = "timestamp"
            map_target_type = "timestamp"
        else:
            map_source_type = "string"
            map_target_type = "string"


これらを、ResolveChoice.applyApplyMapping.applyに渡すためのタプルとして定義しておきます。

choice_listResolveChoice.applyに渡すカラムとキャストの一覧であり、そのカラムの型を統一して揃えておくような使い方をします。

mappings_listは、ApplyMapping.applyに渡すカラムの変換元と変換先の型の一覧であり、この変換元のカラムの型を揃えておかないと変換がうまくいかないため上記のResolveChoice.applyでキャストしておくというような流れになります。

        choice_tuple = (map_type['COLUMN_NAME'], "cast:" + map_source_type)
        choice_list.append(choice_tuple)
            
        mapping_tuple = (map_type['COLUMN_NAME'], map_source_type, map_type['COLUMN_NAME'], map_target_type)
        mappings_list.append(mapping_tuple)


そして、ここでようやく実際に実テーブルからデータを取得し、定義した型のマッピングルールをもとに型変換を実施します。

    ## 実テーブルからデータ取得
    table_result_dyf = glueContext.create_dynamic_frame_from_options('mysql', connection_options={
            "url": "{0}/{1}".format(jdbc_conf['url'], db_schema), "user": jdbc_conf['user'], "password": jdbc_conf['password'], "dbtable": table_name
        }
    )
    
    ## 型変換
    resolvechoice = ResolveChoice.apply(frame = table_result_dyf, specs = choice_list, transformation_ctx = "resolvechoice")
    applymapping = ApplyMapping.apply(frame = resolvechoice, mappings = mappings_list, transformation_ctx = "applymapping")


ここで型の変換をしたら、その次は本来の目的の値の変換へと入ります。

まずDataFrameに変換しておきます。

    ## 実データのDataFrameに変換
    table_result = applymapping.toDF()


そして、変換対象のDB・テーブル・カラムの組み合わせを格納したtransformed_columns_listと照らし合わせ、各ループのテーブルと一致した場合、実際に変換処理をしていきます。

    ## 実データ変換
    if db_schema in transformed_columns_list and table_name in transformed_columns_list[db_schema]:
        for col_type in transformed_columns_list[row['TABLE_SCHEMA']][row['TABLE_NAME']]:
            if col_type['DATA_TYPE'] == "decimal":
                table_result = table_result.withColumn( \
                    col_type['COLUMN_NAME'], \
                    regexp_extract(
                        col_type['COLUMN_NAME'], \
                        '([0-9]+\.[0-9])0$', \
                        1 \
                    ) \
                )
            elif col_type['DATA_TYPE'] == "datetime":
                table_result = table_result.withColumn( \
                    col_type['COLUMN_NAME'], \
                    regexp_replace( \
                        col_type['COLUMN_NAME'], \
                        '\.0$', \
                        '' \
                    ) \
                )

            if col_type['DATA_TYPE'] == "datetime":
                from_data = '0000-00-00 00:00:00'
                to_data = None
            elif col_type['DATA_TYPE'] == "date":
                from_data = '0000-00-00'
                to_data = None
            else:
                continue

            table_result = table_result.withColumn( \
                col_type['COLUMN_NAME'], \
                when( \
                    col(col_type['COLUMN_NAME']) == from_data, \
                        to_data \
                    ).otherwise( \
                        col(col_type['COLUMN_NAME']) \
                ) 
            )


ここで改めて変換ルールを確認します。

  • decimal型のとき
    • Glueで取得する際に自動で付いた小数点以降の最後の桁の'0'を削る
            if col_type['DATA_TYPE'] == "decimal":
                table_result = table_result.withColumn( \
                    col_type['COLUMN_NAME'], \
                    regexp_extract(
                        col_type['COLUMN_NAME'], \
                        '([0-9]+\.[0-9])0$', \
                        1 \
                    ) \
                )
  • datetime型のとき
    • Glueで取得する際に自動で付いた秒数の小数点以下'.0'を削る
    • '0000-00-00 00:00:00'という値を'null'にする
            elif col_type['DATA_TYPE'] == "datetime":
                table_result = table_result.withColumn( \
                    col_type['COLUMN_NAME'], \
                    regexp_replace( \
                        col_type['COLUMN_NAME'], \
                        '\.0$', \
                        '' \
                    ) \
                )

            if col_type['DATA_TYPE'] == "datetime":
                from_data = '0000-00-00 00:00:00'
                to_data = None
  • date型のとき
    • '0000-00-00'という値を'null'にする
            elif col_type['DATA_TYPE'] == "date":
                from_data = '0000-00-00'
                to_data = None


そして特定の値のときのみ変換をするdatetime,dateのnull変換を行います。

この変換ではwhenotherwiseにより、カラムの値がfrom_data('0000-00-00 00:00:00', '0000-00-00')ならto_data(null)に変換を、そうでなければ同じ値に変換(つまり実質変換はされない)をするようにしています。

また、datetime,date以外のときは変換せずにcontinueで次のループへ行く様になっています。

            else:
                continue

            table_result = table_result.withColumn( \
                col_type['COLUMN_NAME'], \
                when( \
                    col(col_type['COLUMN_NAME']) == from_data, \
                        to_data \
                    ).otherwise( \
                        col(col_type['COLUMN_NAME']) \
                ) 
            )


ちなみに変換で使用するwithColumnメソッドですが、これはDataFrameのメソッドであり、(対象カラム名、変換後のデータ)を与えることで変換がされます。

第1引数は、既存のカラム名にしない場合は新規カラム追加、既存カラム名にした場合は上書きとなります。

https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.withColumn.htmlspark.apache.org



そして最後にDynamicFrameに戻してからS3への書き込みをします。S3への書き込み処理自体は今回の目的とは外れるため詳細は省略します。

ループが全て終わったらjob.commit()をして終了します。

    transformed_data_dyf = applymapping.fromDF(table_result, glueContext, "transformed_data_dyf")
    
    glueContext.write_dynamic_frame.from_options(
        frame=transformed_data_dyf,
        connection_type="s3",
        connection_options={"path": bucket_root + "/" + now + "/" + db_schema + "/" + table_name},
        format="json"
    )
    
job.commit()


最後に

長くなってしまいましたが、AWS GlueからMySQLでデータ取得し、型や値ごとに変換をするGlueジョブを作成してみました。

information_schemaをもとにテーブルやカラム情報を取得するというMySQLならではの処理になりましたが、非常に便利でGlue Data Catalogも使用せずに色々なことができます。