AWS GlueでETLを行う際に、機密情報や個人情報に当たる様な一部のデータをマスキングしてみました。
やりたいこと
補足
前回の「DynamoDBからMySQLにJSON型で転送するETLのGlueジョブ」のコードに、「一部のデータをマスキングする」機能を追加してみます。
マスキング以外のコード解説は以下をご覧下さい。
やり方
※GitHubにもあります。
Glueジョブ(Python)
import sys import boto3 import pymysql import json import re from pyspark.sql.types import * from pyspark.sql.functions import * from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions def get_all_tablenames(dynamodb): try: res = dynamodb.tables.all() return res except Exception as e: return e ## マスキング用のテーブル・カラムリスト masking_table_columns = { "User": [ "name", "email", ], "OrderedData": [ "data", "price", ], } arg_keys = [ 'JOB_NAME', 'connection_name', 'target_table_prefix', 'default_mysql_db' ] args = getResolvedOptions(sys.argv, arg_keys) (job_name, connection_name, target_table_prefix, default_mysql_db) = [args[k] for k in arg_keys] sc = SparkContext() glue_context = GlueContext(sc) spark = glue_context.spark_session job = Job(glue_context) job.init(job_name, args) jdbc_conf = glue_context.extract_jdbc_conf(connection_name) mysql_host = re.findall('jdbc:mysql://(.*):3306', jdbc_conf['url']) try: mysql_conn = pymysql.connect( host=mysql_host[0], user=jdbc_conf['user'], passwd=jdbc_conf['password'], db=default_mysql_db, cursorclass=pymysql.cursors.DictCursor, connect_timeout=10 ) with mysql_conn.cursor() as cursor: sql = "SELECT TABLE_NAME FROM information_schema.TABLES WHERE TABLE_SCHEMA = %s AND TABLE_NAME LIKE %s" cursor.execute(sql, (default_mysql_db, target_table_prefix.replace('-', '_') + "\\_%")) results = cursor.fetchall() for r in results: table_name = r['TABLE_NAME'] sql = "DROP TABLE IF EXISTS " + table_name cursor.execute(sql) dynamodb = boto3.resource('dynamodb') for table in get_all_tablenames(dynamodb): if target_table_prefix in table.table_name: mysql_table_name = table.table_name.replace('-', '_') sql = "CREATE TABLE " + mysql_table_name + \ " (json_data JSON) ENGINE = Innodb" cursor.execute(sql) dyf = glue_context.create_dynamic_frame.from_options( connection_type="dynamodb", connection_options={ "dynamodb.input.tableName": table.table_name, "dynamodb.throughput.read.percent": "1.0", "dynamodb.splits": "100" } ) raw_count = dyf.count() if raw_count > 0: print( table.table_name + ' : raw count(>0) = ' + str(raw_count) ) dyf_df = dyf.toDF() ## DynamoDBテーブル名からプレフィックスを抜いたテーブル名 trimmed_table_name = re.findall( target_table_prefix + '-(.*)', table.table_name ) masking_target = trimmed_table_name[0] ## 対象カラムのデータをマスキング if masking_target in masking_table_columns: for masking_column in masking_table_columns[masking_target]: dyf_df = dyf_df.withColumn( masking_column, lit('**********') ) dyf_df_json = dyf_df.toJSON() json_list = dyf_df_json.collect() data_list = [] for row in json_list: data_list.append({"json_data": row}) df = spark.createDataFrame(data_list) df.write \ .format("jdbc") \ .option("url", "{0}/{1}".format(jdbc_conf['url'], default_mysql_db)) \ .option("dbtable", mysql_table_name) \ .option("user", jdbc_conf['user']) \ .option("password", jdbc_conf['password']) \ .mode("append") \ .save() job.commit() except: print("ERROR: Unexpected error.") sys.exit() finally: mysql_conn.close()
解説
マスキング対象のテーブル名とカラム名をリストとして定義しておきます。本来であれば別ファイルで定義すると管理がより楽になります。
## マスキング用のテーブル・カラムリスト masking_table_columns = { "User": [ "name", "email", ], "OrderedData": [ "data", "price", ], }
今回のコードでは、ETLの対象データ(DynamoDBテーブル)を「特定のプレフィックスが付くテーブル」(例: Target-tbl1, Target-tbl2, etc...)で絞っているので、より汎用的なコードになる様にプレフィックス部分を抜いた部分の名前でリストを作っています。
例)プレフィックスをTarget
と指定した場合、上記コードだとTarget-User
とTarget-OrderedData
というテーブルがマスキング対象となります。
次は、実際のDynamoDBテーブル名からプレフィックスを抜きます。
## DynamoDBテーブル名からプレフィックスを抜いたテーブル名 trimmed_table_name = re.findall( target_table_prefix + '-(.*)', table.table_name ) masking_target = trimmed_table_name[0]
定義したマスキング対象リストにテーブル(masking_target
)が入っていた場合、マスキング対象カラムごとに変換処理をしていきます。
## 対象カラムのデータをマスキング if masking_target in masking_table_columns: for masking_column in masking_table_columns[masking_target]: dyf_df = dyf_df.withColumn( masking_column, lit('**********') )
こちらのように、DataFrameのwithColumn
というメソッドに、(対象カラム名、変換後のデータ)を与えることで変換がされます。
第1引数は、既存のカラム名にしない場合は新規カラム追加、既存カラム名にした場合は上書きとなります。
https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.withColumn.htmlspark.apache.org
lit('**********')
というのは、lit
というリテラル値(固定値)を作成するメソッドに、マスキング後の値**********
をセットすることで、各行の該当カラムの値を全行このリテラル値に変換することができます。
この方法によって、特定のテーブル・カラムのデータをマスキングすることができます。
最後に
今回はGlueジョブにてデータのマスキングをする方法を紹介しました。
DynamoDB -> MySQLを例に実装していますが、他の場合でも簡単に使いまわせると思います。