こんにちは! JQです。
前回は『Kinesis編~Kinesisを試してみる①~』と題して、Kinesisを試してみました。
今回は『Kinesis編~Kinesisを試してみる②~』と題して、BotoからKinesisを試してみたいと思います。
環境準備
1.IAM Role付与インスタンスの起動
Botoから操作するためのインスタンスを起動します。
RoleにてKinesisの許可を与えておきます。
※詳細は以前の記事をご確認ください。
■Amazon EC2編~IAM roles for EC2インスタンスを立ちあげてみよう!~
スクリプト
2.スクリプトの作成
簡単な動作確認スクリプトを作成します。
下記では到着した新しいレコードからスタートします。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
getdata.py #!/usr/bin/python # -*- coding: utf-8 -*- import boto.kinesis,datetime,time connection = boto.kinesis.connect_to_region('us-east-1') stream_name = 'StreamName’ stream = connection.describe_stream(stream_name) shards = stream['StreamDescription']['Shards'][0]['ShardId'] kinesis_iterator = connection.get_shard_iterator(stream_name,shards,'LATEST') next_iterator = None while True: if next_iterator is None: next_iterator = kinesis_iterator['ShardIterator'] else: next_iterator = responce['NextShardIterator'] responce = None responce = connection.get_records(next_iterator,limit=1) print responce['Records'] time.sleep(1) |
次にPutするスクリプトを作成します。
下記では日付をデータとしてPutします。
1 2 3 4 5 6 7 8 9 10 11 |
#!/usr/bin/python # -*- coding: utf-8 -*- import boto.kinesis,datetime,time connection = boto.kinesis.connect_to_region('us-east-1') stream_name = 'StreamName’ partition_key = 'partition_1' sampledata = datetime.datetime.now().strftime('%Y%m%d%H%M%S') put_data = connection.put_record(stream_name,sampledata,partition_key) |
3.確認
それでは実際に確認してみましょう!
Getを動かした状態でPutしてみます。
1 2 3 4 5 6 7 8 |
# python getdata.py [] [] [] [] [] [{u'PartitionKey': u'partition_1', u'Data': '20140402164003', u'SequenceNumber': u'49538079870049146416514521285925208671557181958806044673'}] [] |
Putしたデータを取得出来ました。
今回は動作確認の為に簡単なスクリプトで試していますが、
実際に利用する場合にはShardsやSequenceNumberの管理や、
アプリケーション側のロードバランシング、調整、エラーハンドリングやAuto Scalingが必要になります。
※Kinesisクライアント・ライブラリ(Java製)はその辺を簡素化してくれます。
いかがでしたでしょうか?
次回もお楽しみに!!!
関連ソリューション