AWS GlueでMySQLからデータを取得した際に、型によって値の変換がされる部分があったりして、それをもとのMySQLの値と同じにしたいなとか、一部の値は別の表現をして転送したいなとか、そんな整形をしてみました。
やりたいこと
前提
- Python(PySpark)で実装します
- データのLoad先はS3としますが、ここは特に今回の実装ではあまり重要ではなく他のサービスでも可能です
やり方
MySQL内のETL対象スキーマ
- 以下のシステム表領域・モニタリング用スキーマ以外の全スキーマを対象としています
- mysql
- information_schema
- performance_schema
- sys
- ※特定のスキーマに絞りたい場合も簡単に絞れます
変換要件
こんな変換ルールで実装をしてみます。
- decimal型
- Glueで取得する際に自動で付いた小数点以降の最後の桁の'0'を削る
- 例)'4.10' -> '4.1'
- 元データ : '4.1'
- -> Glue : '4.10'(Glueで取得時に変換される)
- -> 整形 : '4.1'(それを元の形に戻す)
- 例)'4.10' -> '4.1'
- Glueで取得する際に自動で付いた小数点以降の最後の桁の'0'を削る
- datetime型
- Glueで取得する際に自動で付いた秒数の小数点以下'.0'を削る
- 例)'2021-08-25 11:22:33.0' -> '2021-08-25 11:22:33'
- 元データ : '2021-08-25 11:22:33'
- -> Glue : '2021-08-25 11:22:33.0'(Glueで取得時に変換される)
- -> 整形 : '2021-08-25 11:22:33'(それを元の形に戻す)
- 例)'2021-08-25 11:22:33.0' -> '2021-08-25 11:22:33'
- '0000-00-00 00:00:00'という値を'null'にする
- 例)'0000-00-00 00:00:00' -> null
- Glueで取得する際に自動で付いた秒数の小数点以下'.0'を削る
- date型
- '0000-00-00'という値を'null'にする
- 例)'0000-00-00' -> null
- '0000-00-00'という値を'null'にする
大まかな流れと方法
- MySQLの
information_schema.COLUMNS
から変換したい型に一致するカラムを絞り、DB・テーブル・カラムの組み合わせとして取得する - MySQLの
information_schema.TABLES
からETL対象のスキーマ・テーブル一覧を取得する - スキーマ・テーブルごとにループを回す
Glueジョブ(Python)
※GitHubにもあります。
コード(長いので折り畳み)
import sys
import datetime
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.functions import *
from awsglue.context import GlueContext
from awsglue.job import Job
arg_keys = ['JOB_NAME', 'connection_name', 'bucket_root']
args = getResolvedOptions(sys.argv, arg_keys)
(job_name, connection_name, bucket_root) = [args[k] for k in arg_keys]
now = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(job_name, args)
jdbc_conf = glueContext.extract_jdbc_conf(connection_name)
## transformするカラム一覧の抽出
transformed_columns_dyf = glueContext.create_dynamic_frame_from_options('mysql', connection_options={
"url": "{0}/{1}".format(jdbc_conf['url'], 'information_schema'), "user": jdbc_conf['user'], "password": jdbc_conf['password'], "dbtable": "COLUMNS"
}
)
## 除外スキーマを除いて、変換対象の型のカラムに絞り込む
transformed_columns = transformed_columns_dyf \
.toDF() \
.filter( \
"(DATA_TYPE = 'datetime' \
OR DATA_TYPE = 'decimal' \
OR DATA_TYPE = 'date' \
) \
AND TABLE_SCHEMA != 'mysql' \
AND TABLE_SCHEMA != 'information_schema' \
AND TABLE_SCHEMA != 'performance_schema' \
AND TABLE_SCHEMA != 'sys'" \
).collect()
## transformリスト生成(スキーマ・テーブル・カラムの組み合わせ)
transformed_columns_list = {}
for row in transformed_columns:
if row['TABLE_SCHEMA'] not in transformed_columns_list:
transformed_columns_list[row['TABLE_SCHEMA']] = {}
if row['TABLE_NAME'] not in transformed_columns_list[row['TABLE_SCHEMA']]:
transformed_columns_list[row['TABLE_SCHEMA']][row['TABLE_NAME']] = []
row_col_type = {}
row_col_type['COLUMN_NAME'] = row['COLUMN_NAME']
row_col_type['DATA_TYPE'] = row['DATA_TYPE']
transformed_columns_list[row['TABLE_SCHEMA']][row['TABLE_NAME']].append(row_col_type)
## ETL対象のスキーマ・テーブル一覧を取得するDynamicFrame生成
target_tables_dyf = glueContext.create_dynamic_frame_from_options(
'mysql', connection_options={
"url": "{0}/{1}".format(jdbc_conf['url'], 'information_schema'), "user": jdbc_conf['user'], "password": jdbc_conf['password'], "dbtable": "TABLES"
}
)
## いらないスキーマを弾いて取得
target_tables = target_tables_dyf.toDF().filter("TABLE_SCHEMA != 'mysql' AND TABLE_SCHEMA != 'information_schema' AND TABLE_SCHEMA != 'performance_schema' AND TABLE_SCHEMA != 'sys'").collect()
## DB-TableごとにS3書き込み
for row in target_tables:
db_schema = row['TABLE_SCHEMA']
table_name = row['TABLE_NAME']
## Tableのカラム一覧抽出
mapping_type_dyf = glueContext.create_dynamic_frame_from_options(
'mysql', connection_options={
"url": "{0}/{1}".format(jdbc_conf['url'], 'information_schema'), "user": jdbc_conf['user'], "password": jdbc_conf['password'], "dbtable": "COLUMNS"
}
)
mapping_type = mapping_type_dyf.toDF().filter("TABLE_SCHEMA = '" + db_schema + "' AND TABLE_NAME = '" + table_name + "'").collect()
## 型のマッピングルール定義
mappings_list = []
choice_list = []
for map_type in mapping_type:
if map_type['DATA_TYPE'] == "int":
map_source_type = "int"
map_target_type = "int"
elif map_type['DATA_TYPE'] == "bigint":
map_source_type = "bigint"
map_target_type = "long"
elif map_type['DATA_TYPE'] == "tinyint":
map_source_type = "tinyint"
map_target_type = "byte"
elif map_type['DATA_TYPE'] == "varbinary":
map_source_type = "binary"
map_target_type = "binary"
elif map_type['DATA_TYPE'] == "decimal":
map_source_type = "decimal"
map_target_type = "decimal"
elif map_type['DATA_TYPE'] == "date":
map_source_type = "date"
map_target_type = "date"
elif map_type['DATA_TYPE'] == "datetime":
map_source_type = "timestamp"
map_target_type = "timestamp"
else:
map_source_type = "string"
map_target_type = "string"
choice_tuple = (map_type['COLUMN_NAME'], "cast:" + map_source_type)
choice_list.append(choice_tuple)
mapping_tuple = (map_type['COLUMN_NAME'], map_source_type, map_type['COLUMN_NAME'], map_target_type)
mappings_list.append(mapping_tuple)
## 実テーブルからデータ取得
table_result_dyf = glueContext.create_dynamic_frame_from_options('mysql', connection_options={
"url": "{0}/{1}".format(jdbc_conf['url'], db_schema), "user": jdbc_conf['user'], "password": jdbc_conf['password'], "dbtable": table_name
}
)
## 型変換
resolvechoice = ResolveChoice.apply(frame = table_result_dyf, specs = choice_list, transformation_ctx = "resolvechoice")
applymapping = ApplyMapping.apply(frame = resolvechoice, mappings = mappings_list, transformation_ctx = "applymapping")
## 実データのDataFrameに変換
table_result = applymapping.toDF()
## 実データ変換
if db_schema in transformed_columns_list and table_name in transformed_columns_list[db_schema]:
for col_type in transformed_columns_list[row['TABLE_SCHEMA']][row['TABLE_NAME']]:
if col_type['DATA_TYPE'] == "decimal":
table_result = table_result.withColumn( \
col_type['COLUMN_NAME'], \
regexp_extract(
col_type['COLUMN_NAME'], \
'([0-9]+\.[0-9])0$', \
1 \
) \
)
elif col_type['DATA_TYPE'] == "datetime":
table_result = table_result.withColumn( \
col_type['COLUMN_NAME'], \
regexp_replace( \
col_type['COLUMN_NAME'], \
'\.0$', \
'' \
) \
)
if col_type['DATA_TYPE'] == "datetime":
from_data = '0000-00-00 00:00:00'
to_data = None
elif col_type['DATA_TYPE'] == "date":
from_data = '0000-00-00'
to_data = None
else:
continue
table_result = table_result.withColumn( \
col_type['COLUMN_NAME'], \
when( \
col(col_type['COLUMN_NAME']) == from_data, \
to_data \
).otherwise( \
col(col_type['COLUMN_NAME']) \
)
)
transformed_data_dyf = applymapping.fromDF(table_result, glueContext, "transformed_data_dyf")
glueContext.write_dynamic_frame.from_options(
frame=transformed_data_dyf,
connection_type="s3",
connection_options={"path": bucket_root + "/" + now + "/" + db_schema + "/" + table_name},
format="json"
)
job.commit()
解説
MySQLのinformation_schema.COLUMNS
から変換したい型に一致するカラムを絞って取得します。
## transformするカラム一覧の抽出 transformed_columns_dyf = glueContext.create_dynamic_frame_from_options('mysql', connection_options={ "url": "{0}/{1}".format(jdbc_conf['url'], 'information_schema'), "user": jdbc_conf['user'], "password": jdbc_conf['password'], "dbtable": "COLUMNS" } ) ## 除外スキーマを除いて、変換対象の型のカラムに絞り込む transformed_columns = transformed_columns_dyf \ .toDF() \ .filter( \ "(DATA_TYPE = 'datetime' \ OR DATA_TYPE = 'decimal' \ OR DATA_TYPE = 'date' \ ) \ AND TABLE_SCHEMA != 'mysql' \ AND TABLE_SCHEMA != 'information_schema' \ AND TABLE_SCHEMA != 'performance_schema' \ AND TABLE_SCHEMA != 'sys'" \ ).collect()
これをDB・テーブル・カラムの組み合わせとしてリストにしておきます。
## transformリスト生成(スキーマ・テーブル・カラムの組み合わせ) transformed_columns_list = {} for row in transformed_columns: if row['TABLE_SCHEMA'] not in transformed_columns_list: transformed_columns_list[row['TABLE_SCHEMA']] = {} if row['TABLE_NAME'] not in transformed_columns_list[row['TABLE_SCHEMA']]: transformed_columns_list[row['TABLE_SCHEMA']][row['TABLE_NAME']] = [] row_col_type = {} row_col_type['COLUMN_NAME'] = row['COLUMN_NAME'] row_col_type['DATA_TYPE'] = row['DATA_TYPE'] transformed_columns_list[row['TABLE_SCHEMA']][row['TABLE_NAME']].append(row_col_type)
次に、information_schema.TABLES
からETL対象のスキーマ・テーブル一覧を取得して、いよいよループ処理で変換と書き込みをしていきます。
## ETL対象のスキーマ・テーブル一覧を取得するDynamicFrame生成 target_tables_dyf = glueContext.create_dynamic_frame_from_options( 'mysql', connection_options={ "url": "{0}/{1}".format(jdbc_conf['url'], 'information_schema'), "user": jdbc_conf['user'], "password": jdbc_conf['password'], "dbtable": "TABLES" } ) ## いらないスキーマを弾いて取得 target_tables = target_tables_dyf.toDF().filter("TABLE_SCHEMA != 'mysql' AND TABLE_SCHEMA != 'information_schema' AND TABLE_SCHEMA != 'performance_schema' AND TABLE_SCHEMA != 'sys'").collect()
ループ処理ではテーブルずつで回していきます。ループ内ではまず、型変換のためにinformation_schema.COLUMNS
から該当テーブルの全カラム情報を取得します。
## DB-TableごとにS3書き込み for row in target_tables: db_schema = row['TABLE_SCHEMA'] table_name = row['TABLE_NAME'] ## Tableのカラム一覧抽出 mapping_type_dyf = glueContext.create_dynamic_frame_from_options( 'mysql', connection_options={ "url": "{0}/{1}".format(jdbc_conf['url'], 'information_schema'), "user": jdbc_conf['user'], "password": jdbc_conf['password'], "dbtable": "COLUMNS" } ) mapping_type = mapping_type_dyf.toDF().filter("TABLE_SCHEMA = '" + db_schema + "' AND TABLE_NAME = '" + table_name + "'").collect()
それぞれのカラムの型ごとに、元のMySQL上での型に対して、Glueで扱う際の型を定義しておきます。
## 型のマッピングルール定義 mappings_list = [] choice_list = [] for map_type in mapping_type: if map_type['DATA_TYPE'] == "int": map_source_type = "int" map_target_type = "int" elif map_type['DATA_TYPE'] == "bigint": map_source_type = "bigint" map_target_type = "long" elif map_type['DATA_TYPE'] == "tinyint": map_source_type = "tinyint" map_target_type = "byte" elif map_type['DATA_TYPE'] == "varbinary": map_source_type = "binary" map_target_type = "binary" elif map_type['DATA_TYPE'] == "decimal": map_source_type = "decimal" map_target_type = "decimal" elif map_type['DATA_TYPE'] == "date": map_source_type = "date" map_target_type = "date" elif map_type['DATA_TYPE'] == "datetime": map_source_type = "timestamp" map_target_type = "timestamp" else: map_source_type = "string" map_target_type = "string"
これらを、ResolveChoice.apply
、ApplyMapping.apply
に渡すためのタプルとして定義しておきます。
choice_list
はResolveChoice.apply
に渡すカラムとキャストの一覧であり、そのカラムの型を統一して揃えておくような使い方をします。
mappings_list
は、ApplyMapping.apply
に渡すカラムの変換元と変換先の型の一覧であり、この変換元のカラムの型を揃えておかないと変換がうまくいかないため上記のResolveChoice.apply
でキャストしておくというような流れになります。
choice_tuple = (map_type['COLUMN_NAME'], "cast:" + map_source_type) choice_list.append(choice_tuple) mapping_tuple = (map_type['COLUMN_NAME'], map_source_type, map_type['COLUMN_NAME'], map_target_type) mappings_list.append(mapping_tuple)
そして、ここでようやく実際に実テーブルからデータを取得し、定義した型のマッピングルールをもとに型変換を実施します。
## 実テーブルからデータ取得 table_result_dyf = glueContext.create_dynamic_frame_from_options('mysql', connection_options={ "url": "{0}/{1}".format(jdbc_conf['url'], db_schema), "user": jdbc_conf['user'], "password": jdbc_conf['password'], "dbtable": table_name } ) ## 型変換 resolvechoice = ResolveChoice.apply(frame = table_result_dyf, specs = choice_list, transformation_ctx = "resolvechoice") applymapping = ApplyMapping.apply(frame = resolvechoice, mappings = mappings_list, transformation_ctx = "applymapping")
ここで型の変換をしたら、その次は本来の目的の値の変換へと入ります。
まずDataFrameに変換しておきます。
## 実データのDataFrameに変換
table_result = applymapping.toDF()
そして、変換対象のDB・テーブル・カラムの組み合わせを格納したtransformed_columns_list
と照らし合わせ、各ループのテーブルと一致した場合、実際に変換処理をしていきます。
## 実データ変換 if db_schema in transformed_columns_list and table_name in transformed_columns_list[db_schema]: for col_type in transformed_columns_list[row['TABLE_SCHEMA']][row['TABLE_NAME']]: if col_type['DATA_TYPE'] == "decimal": table_result = table_result.withColumn( \ col_type['COLUMN_NAME'], \ regexp_extract( col_type['COLUMN_NAME'], \ '([0-9]+\.[0-9])0$', \ 1 \ ) \ ) elif col_type['DATA_TYPE'] == "datetime": table_result = table_result.withColumn( \ col_type['COLUMN_NAME'], \ regexp_replace( \ col_type['COLUMN_NAME'], \ '\.0$', \ '' \ ) \ ) if col_type['DATA_TYPE'] == "datetime": from_data = '0000-00-00 00:00:00' to_data = None elif col_type['DATA_TYPE'] == "date": from_data = '0000-00-00' to_data = None else: continue table_result = table_result.withColumn( \ col_type['COLUMN_NAME'], \ when( \ col(col_type['COLUMN_NAME']) == from_data, \ to_data \ ).otherwise( \ col(col_type['COLUMN_NAME']) \ ) )
ここで改めて変換ルールを確認します。
- decimal型のとき
- Glueで取得する際に自動で付いた小数点以降の最後の桁の'0'を削る
if col_type['DATA_TYPE'] == "decimal": table_result = table_result.withColumn( \ col_type['COLUMN_NAME'], \ regexp_extract( col_type['COLUMN_NAME'], \ '([0-9]+\.[0-9])0$', \ 1 \ ) \ )
- datetime型のとき
- Glueで取得する際に自動で付いた秒数の小数点以下'.0'を削る
- '0000-00-00 00:00:00'という値を'null'にする
elif col_type['DATA_TYPE'] == "datetime": table_result = table_result.withColumn( \ col_type['COLUMN_NAME'], \ regexp_replace( \ col_type['COLUMN_NAME'], \ '\.0$', \ '' \ ) \ ) if col_type['DATA_TYPE'] == "datetime": from_data = '0000-00-00 00:00:00' to_data = None
- date型のとき
- '0000-00-00'という値を'null'にする
elif col_type['DATA_TYPE'] == "date": from_data = '0000-00-00' to_data = None
そして特定の値のときのみ変換をするdatetime
,date
のnull変換を行います。
この変換ではwhen
とotherwise
により、カラムの値がfrom_data
('0000-00-00 00:00:00', '0000-00-00')ならto_data
(null)に変換を、そうでなければ同じ値に変換(つまり実質変換はされない)をするようにしています。
また、datetime
,date
以外のときは変換せずにcontinue
で次のループへ行く様になっています。
else: continue table_result = table_result.withColumn( \ col_type['COLUMN_NAME'], \ when( \ col(col_type['COLUMN_NAME']) == from_data, \ to_data \ ).otherwise( \ col(col_type['COLUMN_NAME']) \ ) )
ちなみに変換で使用するwithColumn
メソッドですが、これはDataFrameのメソッドであり、(対象カラム名、変換後のデータ)を与えることで変換がされます。
第1引数は、既存のカラム名にしない場合は新規カラム追加、既存カラム名にした場合は上書きとなります。
https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.withColumn.htmlspark.apache.org
そして最後にDynamicFrameに戻してからS3への書き込みをします。S3への書き込み処理自体は今回の目的とは外れるため詳細は省略します。
ループが全て終わったらjob.commit()
をして終了します。
transformed_data_dyf = applymapping.fromDF(table_result, glueContext, "transformed_data_dyf") glueContext.write_dynamic_frame.from_options( frame=transformed_data_dyf, connection_type="s3", connection_options={"path": bucket_root + "/" + now + "/" + db_schema + "/" + table_name}, format="json" ) job.commit()
最後に
長くなってしまいましたが、AWS GlueからMySQLでデータ取得し、型や値ごとに変換をするGlueジョブを作成してみました。
information_schema
をもとにテーブルやカラム情報を取得するというMySQLならではの処理になりましたが、非常に便利でGlue Data Catalogも使用せずに色々なことができます。