こんにちは、中の人です。
Tableauは非常に強力なBIツールでエリア情報を地図にマッピングしてくれる機能などもあります。
IPアドレスのままではTableauに表示することが出来ないので、Amazon KinesisでIPアドレスを緯度経度に変換して、Redshiftに取り込んでTableauで表示させます。
前回の『Amazon Kinesis/Redshift編~アクセスログをkinesisで加工してTableauで表示してみよう~』に引き続き、今回は第2回としてKinesis Streamにputしたアクセスログを、Kinesis AppでgetしIPアドレスを緯度経度に変換してS3に保存します。
IPアドレスから緯度経度への変換を行うAPIは複数ありますが、検証した中では1番レスポンスが良かった以下を使用しております。
http://ip-json.rhcloud.com/
getrecordsToGEO.pyの作成
1 |
$ vim getrecordsToGEO.py |
※できるだけコメントで解説しています。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
#!/usr/bin/python # -*- coding: utf-8 -*- import boto.kinesis,datetime,time import re import json import urllib2 import os.path import calendar from urllib2 import Request, urlopen, URLError, HTTPError from boto.s3.key import Key # kinesisで加工したログの保存先S3 # kinesis-demo-***はredshiftと同じリージョンにある任意のバケット名 s3 = boto.connect_s3() bucket = s3.get_bucket("kinesis-demo-***") k = Key(bucket) dt = "" # stream_name(demo-knowcom)は自分のStream名に変更して下さい。 connection = boto.kinesis.connect_to_region('us-east-1') stream_name = 'demo-knowcom' stream = connection.describe_stream(stream_name) shards = stream['StreamDescription']['Shards'][0]['ShardId'] kinesis_iterator = connection.get_shard_iterator(stream_name,shards,'LATEST') next_iterator = None while True: if next_iterator is None: next_iterator = kinesis_iterator['ShardIterator'] else: next_iterator = responce['NextShardIterator'] responce = None responce = connection.get_records(next_iterator,limit=1) if len(responce['Records'])!=0: result = responce['Records'][0]['Data'] # 日付のフォーマット変換 resultArray = result.split(' [') dateTime = resultArray[1].split(']')[0] dateArray = dateTime.split('/') timeArray = dateArray[2].split(':') for i ,v in enumerate(calendar.month_abbr): if dateArray[1]==v: dateMonth = i eDatetime = timeArray[0]+'-'+str('%02d' % dateMonth)+'-'+dateArray[0]+' '+timeArray[1]+':'+timeArray[2]+':'+re.match(r'(.*) ',timeArray[3]).group(1) # その他のパラメータを取得 resultArray = result.split('"') eMethod = re.match(r'^([A-Z]*) ([^ ]*) (.*)', resultArray[1]).group(1) ePath = re.match(r'^([A-Z]*) ([^ ]*) (.*)', resultArray[1]).group(2)[:20] eProtocol = re.match(r'^([A-Z]*) ([^ ]*) (.*)', resultArray[1]).group(3) eStatus = re.match(r'^ ([0-9]*) ', resultArray[2]).group(1) eUserAgent = resultArray[5][:20] # elb配下のため、x-forwarded-forをIPとして取得 ipAddress = re.match(r'^ [0-9]* ([^ ]*) .*', resultArray[6]).group(1) # IPアドレスを緯度経度情報に変換 if re.search("^10\.",ipAddress): ipAddress = "" else: # ip ⇒ GEO変換用のAPI url = 'http://ip-json.rhcloud.com/json/'+ipAddress req = urllib2.Request(url) try: fp = urlopen(req) except HTTPError, e: print 'Error code: ', e.code except URLError, e: print 'Reason: ', e.reason else: data = json.load(fp) od = dt d = datetime.datetime.today() # 分単位でログファイルを生成 dt = '%s%s%s%s%s' % (d.year, str('%02d' % d.month), str('%02d' % d.day), str('%02d' % d.hour), str('%02d' % d.minute)) if od != dt: logfile = 'access_log.' + od if os.path.exists(logfile): print logfile k.key = logfile k.set_contents_from_filename(logfile) print '%s, %s' % (str(data.get("latitude")), str(data.get("longitude"))) if len(data)!=1: fi = open('access_log.' + dt, 'a') fi.write(eDatetime + "\t" + eMethod + "\t" + ePath + "\t" + eProtocol + "\t" + eStatus + "\t" + eUserAgent + "\t" + ipAddress + "\t" + str(data.get("latitude")) + "\t" + str(data.get("longitude")) + "\n") fi.close() # 1件づつ取得をしているため、Streamの帯域にあわせて制御 time.sleep(0.2) |
では、早速アクセスログをkinesisにgetしてみましょう
1 2 3 4 5 6 7 8 9 10 11 |
$ python getrecordsToGEO.py 37.4192008972, -122.057403564 37.4192008972, -122.057403564 37.4192008972, -122.057403564 35.6850013733, 139.751403809 37.4192008972, -122.057403564 52.5, 5.75 22.3500003815, 114.133300781 39.9289016724, 116.388298035 37.4192008972, -122.057403564 |
上記のようなログが出れば確認完了です。
プログラムのprint部分を変更することで、表示を止めることも可能です。
1 |
- print '%s, %s' % (str(data.get("latitude")), str(data.get("longitude"))) |
1 |
$ python getrecordsToGEO.py & |
でバックグラウンド実行させます。
次回はS3に保存したデータを1分おきにRedshiftにimportします。
お楽しみに!!
関連ソリューション