こんにちは、中の人です。
Tableauは非常に強力なBIツールでエリア情報を地図にマッピングしてくれる機能などもあります。
IPアドレスのままではTableauに表示することが出来ないので、Amazon KinesisでIPアドレスを緯度経度に変換して、Redshiftに取り込んでTableauで表示させます。
前回の『Amazon Kinesis/Redshift編~アクセスログをkinesisで加工してTableauで表示してみよう②~』に引き続き、今回は第3回としてS3に保存したデータを1分間隔でRedshiftにimportします。
importS3toRedshift.phpの作成
1 |
$ vim importS3toRedshift.php |
※コメントで解説しています。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
<?php # パスは必要に応じて変更 include "/home/ubuntu/vendor/autoload.php"; use Aws\Redshift\RedshiftClient; $client = RedshiftClient::factory(array( 'key' => 'ここにアクセスキー', 'secret' => 'ここにアクセスキーシークレット', 'region' => 'us-east-1' )); # redshiftの起動チェック(cron等で呼び出した時にredshiftが起動しているか確認) $result = $client->describeClusters(array()); $clusters = $result->get('Clusters'); # 起動していなければredshftを起動させる if(count($clusters)==0){ # redshiftの起動 $res = $client->createCluster(array( 'ClusterIdentifier' => 'demoCluster', 'ClusterType' => 'single-node', 'MasterUsername' => 'dbmaster', 'MasterUserPassword' => 'パスワード', 'NodeType' => 'dw2.large', 'PubliclyAccessible ' => true, )); }else{ echo "redshiftは起動しています。\n"; } # 起動していなければ10秒間隔でチェックし、redshiftのステイタスがtrueになるまで読み込み $ClusterStatus = ""; while($ClusterStatus!="available"){ # cluster情報の取得 $result = $client->describeClusters(array( 'ClusterIdentifier' => 'demoCluster', )); $clusters = $result->get('Clusters'); # available, creating, deleting, rebooting, and resizing $ClusterStatus = $clusters[0]['ClusterStatus']; $PrivateIPAddress = $clusters[0]['ClusterNodes'][0]['PublicIPAddress']; if($ClusterStatus!="available") sleep(10); } # redshiftに接続 $conn = "host=".$PrivateIPAddress." port=5439 dbname=dev user=dbmaster password=Password1"; $link = pg_connect($conn); if (!$link) { die('ERROR!'.pg_last_error()); } # エラーを取得するためのviewの作成 $sql = "create view loadview as (select distinct tbl, trim(name) as table_name, query, starttime, trim(filename) as input, line_number, colname, err_code, trim(err_reason) as reason from stl_load_errors sl, stv_tbl_perm sp where sl.tbl = sp.id);"; $result = pg_query($link, $sql); ################################################################################### # ここは初回のみ実行(ただし、そのままでも問題なし) $sql = "create table accesslog ( datetime timestamp, method varchar(12), path varchar(20), protocol varchar(12), status int, ua varchar(20), ip varchar(15), latitude real, longitude real );"; $result = pg_query($link, $sql); $error = pg_last_error($link); # 1分前のログを送信 $time = time() - 60; $datetime = date("YmdHi", $time); # データ挿入 $sql = "copy accesslog from 's3://kinesis-demo-knowcom/access_log.".$datetime."' CREDENTIALS 'aws_access_key_id=[ここにアクセスキー];aws_secret_access_key=[ここにアクセスキーシークレット]' delimiter '\t';"; $result = pg_query($link, $sql); $sql = "select datetime from accesslog"; $result = pg_query($link, $sql); $rows = pg_num_rows($result); echo date(DATE_RFC2822)."\t[redshiftのaccesslogデータに(".number_format($rows)."行)挿入完了]\n"; $sql = "select * from loadview order by starttime desc "; $result = pg_query($link, $sql); while($row = pg_fetch_assoc($result)){ var_dump($row); } $close_flag = pg_close($link); |
では、早速実行してみましょう
1 2 3 4 5 6 7 |
$ /usr/bin/php importS3toRedshift.php redshiftは起動しています。 available 1404346396.9093秒 PHP Warning: pg_query(): Query failed: ERROR: relation "loadview" already exists in /home/ubuntu/redshift-kinesis.php on line 66 PHP Warning: pg_query(): Query failed: ERROR: relation "accesslog" already exists in /home/ubuntu/redshift-kinesis.php on line 80 Thu, 03 Jul 2014 00:13:17 +0000 [redshiftのaccesslogデータに(3,474行)挿入完了] |
上記のようなログが出れば確認完了です。
cronで1分おきに実行することで1分間隔でredshiftにimportする事ができます。
importしたデータをtableauに表示させます。
お楽しみに!!
関連ソリューション