先日、個人的にAmazon Elasticsearch Service で _ingest と _bulk API を使ってインデクシングして Kibana で可視化する | shinodogg.comというブログを書きましたが、色々シリアルだし、せっかくだったらサーバーレスにしたいな、ということで、これまた個人的にやってみました。
なんというか、こういう風に使ってみると、昔Javaでマルチスレッドなバッチを書いて、レビューで”ココってスレッドセーフじゃないですよねー?”とかって突っ込まれるような危ない橋を渡るより(テストで気づきにくい)、Lambdaファンクション複数立ち上げちゃった方が楽チンでイイわぁ〜と改めて実感したりするものです。
大まかなアーキテクチャ
使うログデータ
NASA-HTTPのWebサイトで公開されている、1995年の7月のデータ(Jul 01 to Jul 31, ASCII format, 20.7 MB gzip compressed)と8月のデータ(Aug 04 to Aug 31, ASCII format, 21.8 MB gzip compressed)を使います。
Apacheのログになりますが、データの詳細は、それなりの時系列ログデータが欲しい時 | shinodogg.comを参照ください。
Lambdaファンクション
Lambdaファンクションは最長で5分という制約があるため、並列化と時間短縮を図るため以下のような流れにします。
1. NASA-HTTPのWebサイトからFTPでgzファイルをダウンロードして、
2MBずつに分割してS3に配置するLambdaファンクション。
7月分と8月分それぞれにLambdaファンクションを起動して並列に処理を行う。
(3MBだと5分を超えてしまいそうなものもあったので念の為2MBにしました)
2. 1つのLambdaファンクションから、上記1.で分割配置されたファイルの数だけ、
Lambdaファンクションを起動してAmazon ESにデータを送信する
今回は気分的にPythonでコードを書くことにしました。いっぱい日本語のコメント付けてるので、それぞれ最初に『# coding:utf-8』を付けてあげます。
Step Functionsで処理を実行
上記のLambdaファンクションをStep Functionsから以下のように起動します。
7月分と8月分のログをそれぞれパラレルでダウンロードして分割します。分割したファイルの並列実行はStep Functionsではなく、Lambdaファンクションの処理ループの中で、それぞれのLambdaファンクションを非同期で呼び出すことにしました(つまり、ワークフローは全体の処理終了と一致するわけではないです)。
//embedr.flickr.com/assets/client-code.js
NASA-HTTPからダウンロード⇒ファイル分割⇒S3に配置するLambdaファンクション
変数諸々。
# S3に配置する際のバケット名とダウンロードしてくるファイル名 # および後続処理で使うAmazonESのエンドポイントを # Step FunctionsのInputPathを使って、eventから取得 bucket_name = event['bucket_name'] file_name = event['file_name'] amazon_es_endpoint = event['amazon_es_endpoint'] print bucket_name print file_name print amazon_es_endpoint # ファイルの一時置き場として/tmpを使う # gzを展開する時の名前は".gz"を取り除く gzip_file_path = '/tmp/' + file_name file_path = gzip_file_path[:-3] print gzip_file_path print file_path
NASA-HTTPのファイルをFTPでダウンロード。Lambdaファンクションのインスタンスは再利用されることがあるので(https://aws.amazon.com/jp/lambda/faqs/ の Q: AWS Lambda は関数インスタンスを再利用しますか?)、お掃除してから配置します。
# 同じコンテナが使い回されることもあるのでお掃除 commands.getoutput("rm -rf /tmp/*") # NASA-HTTPのファイルを/tmpにFTPで配置 ftp = FTP() ftp.connect(host='ita.ee.lbl.gov') ftp.login(user='anonymous', passwd="") ftp.set_pasv(True) ftp.retrbinary("RETR /traces/" + file_name, open(gzip_file_path, 'wb').write)
OSコマンドでダウンロードしたファイルを2MBずつに分割。Pythonで高速なファイル分割のコードが書けないか、数時間Stack Overflowを検索してコピペしながら試してみたのですが、splitコマンドの方が全然早かった…(;・∀・)
# gzipファイルを解凍解凍して2MBごとに分割 commands.getoutput("gzip -d " + gzip_file_path) commands.getoutput("split -b 2m " + file_path + " " + file_path + "-") print commands.getoutput("ls -la /tmp")
分割したファイルをそれぞれS3にアップロード。/tmpにゴミがあるかもしれないので、lsでそれっぽいファイル名のものだけを対象に。
# 分割したファイルをS3バケットにアップロード file_list = commands.getoutput("ls " + file_path + "-*").split("n") s3 = boto3.resource('s3') bucket = s3.Bucket(bucket_name) for split_file in file_list: bucket.upload_file(split_file, os.path.basename(split_file))
Step Functionsで次の処理にデータを渡す用途でreturnはS3のバケット名を返す。今回は同じLambdaファンクションが並列に2回呼ばれるので、[JSON文字列,JSON文字列]みたいな配列が渡されてしまい、ちょっと不格好ですが…
# Step Functionsの後続処理のInputPath用にS3バケット名とAmazonESのエンドポイント return "{"bucket_name":"" + bucket_name + "", "amazon_es_endpoint":"" + amazon_es_endpoint + ""}"
S3に分割配置されたファイルを元にAmazon ESに送信するLambdaファンクションを起動
S3のバケット名をStep FunctionsのInputPathで渡された名前。前述のようにパラレル実行すると配列で(ry
# 前処理から引き継いだ値。パラレル実行の場合 # [JSON文字列,JSON文字列]のような配列になるため1つ目の要素だけ利用 input_path = event[0] input_json = json.loads(input_path) bucket_name = input_json['bucket_name'] amazon_es_endpoint = input_json['amazon_es_endpoint'] print bucket_name print amazon_es_endpoint
S3バケットからファイルのリストを取得
# S3から対象ファイル対象ファイルのリストを取得 s3 = boto3.resource('s3') bucket = s3.Bucket(bucket_name) objects = bucket.objects.all() prefix = "NASA_access_log_"
対象ファイル分ループを回してLambdaファンクションを起動
lambdaClient = boto3.client('lambda') for object in objects: file = object.key # 想定ファイルだけ処理処理対象 if file.startswith(prefix): print file # 処理対象ファイルとS3のバケット名とAmazonESのエンドポイントをLambdaファンクションに渡す payload = "{"file_name": "" + file + "", "bucket_name": "" + bucket_name + "", "amazon_es_endpoint": "" + amazon_es_endpoint + ""}" print payload lambdaClient.invoke( FunctionName="nasa-http_send", # Eventにすることで非同期実行 InvocationType="Event", Payload=payload ) # Lambdaの同時起動制限にひっかからないように… time.sleep(0.5)
S3バケットにあるファイルをAmazon ESに送信するLambdaファンクション
変数諸々。前処理のPayloadで指定された値の受取りとゴミ掃除
# 前処理のPayloadに指定されたS3バケット名と処理対象ファイル名とAmazonESのエンドポイント bucket_name = event['bucket_name'] file_name = event['file_name'] amazon_es_endpoint = event['amazon_es_endpoint'] print bucket_name print file_name print amazon_es_endpoint # /tmpにダウンロードして処理を行う。念の為ゴミ掃除も file_path = "/tmp/" + file_name commands.getoutput("rm -rf /tmp/*")
S3からファイルを/tmpにダウンロードして開く
# S3からファイルをファイルをダウンロードして開く s3 = boto3.resource('s3') s3.Bucket(bucket_name).download_file(file_name, file_path) file = open(file_path, 'rb')
ファイルを読み込んでAmazonES連携用JSONを作る。(bulkでAmazon ESに送信する詳細に関しては Amazon Elasticsearch Service で _ingest と _bulk API を使ってインデクシングして Kibana で可視化する | shinodogg.com を参照ください)
# ループカウンタとAmazonESに連携するJSON文字列用の変数 i = 0 jsonstring = "" # 1行ずつ読んでAmazonES連携用のデータを作成 line = file.readline() while line: meta = "{"index": {"_index":"nasa-logs-", "_type":"nasa-http"}} n" dict = {"message": line} try: jsonstring = jsonstring + meta + json.dumps(dict) + "n" except UnicodeDecodeError: print line line = file.readline() i += 1 # 何件処理したか出力して開いたファイルを閉じる print i file.close()
Signature Version 4でAmazon ESにデータを送る。(Amazon ES側でLambdaファンクションに設定されたIAM Roleの権限を許可する)
# バージョン 4 署名リクエスト credentials = Credentials(os.environ["AWS_ACCESS_KEY_ID"], os.environ["AWS_SECRET_ACCESS_KEY"], os.environ["AWS_SESSION_TOKEN"]) request = AWSRequest(method="POST", url=amazon_es_endpoint + "/_bulk?pipeline=nasa-http", data=jsonstring) SigV4Auth(credentials, "es", os.environ["AWS_REGION"]).add_auth(request) BotocoreHTTPSession().send(request.prepare())
Step Functionの定義と実行時のインプット
上記の図のようにParallelで並列実行後にAmazon ESにデータを送るLambdaファンクションを呼び出す。
{ "Comment": "download Apache log files from nasa-http, and send to AmazonES", "StartAt": "Parallel", "States": { "Parallel": { "Type": "Parallel", "Next": "Send to AmazonES", "Branches": [ { "StartAt": "Download July Data", "States": { "Download July Data": { "Type": "Task", "InputPath": "$.july", "Resource": "arn:aws:lambda:ap-northeast-1:xxx:function:nasa-http_download", "End": true } } }, { "StartAt": "Download Aug Data", "States": { "Download Aug Data": { "Type": "Task", "InputPath": "$.aug", "Resource": "arn:aws:lambda:ap-northeast-1:xxx:function:nasa-http_download", "End": true } } } ] }, "Send to AmazonES": { "Type": "Task", "InputPath": "$", "Resource": "arn:aws:lambda:ap-northeast-1:xxx:function:nasa-http_invoke", "End": true } } }
Parallel 部分の InputPath の $.july とか $.aug に関しては、Step Functions を実行する際に以下のようなJSONを渡す形。今思えば、S3バケット名とAmazon ESのエンドポイントは共通なのだから別定義にして、InputPathは $ でズゴっと渡してLambdaファンクションの中でホゲホゲすればよかったのかな…。
{ "july" : { "bucket_name": "S3バケット名", "file_name": "NASA_access_log_Jul95.gz", "amazon_es_endpoint": "https://search-demo-xxx.ap-northeast-1.es.amazonaws.com" }, "aug" : { "bucket_name": "S3バケット名", "file_name": "NASA_access_log_Aug95.gz", "amazon_es_endpoint": "https://search-demo-xxx.ap-northeast-1.es.amazonaws.com" } }
Amazon ESのingestパイプラインとテンプレートのmapping
ingestパイプラインの詳細にについては Amazon Elasticsearch Service で _ingest と _bulk API を使ってインデクシングして Kibana で可視化する | shinodogg.com に記載しましたが、今回も前回と同様に。
PUT _ingest/pipeline/nasa-http { "description": "nasa-http", "processors": [ { "grok": { "field": "message", "patterns": ["%{COMMONAPACHELOG}"] } }, { "date": { "field": "timestamp", "formats" : ["dd/MMM/yyyy:HH:mm:ss Z"] } }, { "date_index_name": { "field": "@timestamp", "index_name_prefix": "nasa-logs-", "date_rounding": "d" } } ] }
また、今回はデータ量も少ないし、Kibanaで可視化するだけなので、各フィールドのAnalyzeは必要ない、ということで、nasa-logs〜っていうインデックスに関しては以下のような感じで。
・シャード数は1つだけ(Amazon ESでのシャード見積もりに関しては Amazon Elasticsearch Service をはじめよう: シャード数の算出方法 を参照)
・基本的にkeyword(Elasticsearchバージョン5になってからstringは、textとkeywordに分かれました。textはAnalyzeするやつで、keywordはそうじゃないヤツ、といった扱い。ES5.2からはkeywordにnormalizerの定義が出来るようになったりしています)
PUT _template/nasa-http { "template": "nasa-logs*", "settings": { "number_of_shards": 1 }, "mappings": { "type1": { "_source": { "enabled": false }, "properties": { "@timestamp": { "type": "date" }, "auth": { "type": "keyword" }, "bytes": { "type": "keyword" }, "clientip": { "type": "keyword" }, "httpversion": { "type": "keyword" }, "ident": { "type": "keyword" }, "message": { "type": "keyword" }, "rawrequest": { "type": "keyword" }, "request": { "type": "keyword" }, "response": { "type": "keyword" }, "timestamp": { "type": "keyword" }, "verb": { "type": "keyword" } } } } }
Amazon Elasticsearch Serviceのインスタンス
何回か処理を流してみたところ、そこそこCPU使ってるのでm3.2xlargeを1インスタンスにしました。
//embedr.flickr.com/assets/client-code.js
c4でも良かったのですが、その場合はEBSの設定が必要になります。初期投入だけはCPU使うけどそれ以降はそうでもない、、とか、やっていく中でワークロードは変わってくると思うので、その辺を柔軟にチャチャッと構成変更できるという点でAmazonESは楽チンですね。(構成変更や権限変更にかかる時間の短縮化は今後に期待です。というか、IAMとか今回の権限周りはまた別途…)
//embedr.flickr.com/assets/client-code.js
今回の処理結果
ということで、Step Functionsで処理が完了して、
//embedr.flickr.com/assets/client-code.js
投入したデータをKibanaから見れるようになりました。
//embedr.flickr.com/assets/client-code.js
マイナビ出版 (2016-06-10)
売り上げランキング: 20,685
コメント