こんにちは、中の人です。
今回はKinesisにデータをPutする時の注意点をまとめてみました。
Kinesisは、シャードという単位で帯域を変えることが出来、1シャードで1000req/secもデータをPutする事が出来ます。
ずーっと1000req/sec使い続けた場合、約2億6千万リクエストにもなりますが料金はなんと$100にも満たない程度というコストパフォーマンスにも優れたサービスです。
ただし、このパフォーマンスを活かすには送信プログラムにも工夫が必要です。
以前、『Amazon Kinesis/Redshift編~アクセスログをkinesisで加工してTableauで表示してみよう① [全4回]~』で紹介した様な方法で行った場合、10req/sec程度しかパフォーマンスが出ません。
それ以上のリクエストについては処理待ちの状態となるため、10req/sec以上が続く場合処理が溜まってしまいます。
解決方法としては、Pythonを例に説明すると、threadingを利用することで解決できます。
threadingを使い必要なだけ処理を増やすことでパフォーマンスを上げることが出来ます。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
$ vim putrecords.py #!/usr/bin/python # -*- coding: utf-8 -*- import boto.kinesis,datetime,time import threading, Queue, subprocess, sys import tailer max_thread = 10 class Worker(threading.Thread): queue = Queue.Queue(30) def __init__(self, index): self.index = index self.conn = boto.kinesis.connect_to_region('ap-northeast-1') super(Worker, self).__init__() def run(self): while True: log = unicode(self.queue.get(True, None), "utf-8") self.conn.put_record('demo-knowcom', log, '{n:d}'.format(n=self.index)) def tail(filename): for line in tailer.follow(open(filename)): Worker.queue.put(line.strip()) def main(): for i in range(max_thread): w = Worker(i + 1) w.setDaemon(True) w.start() tail(FILENAME) if __name__ == '__main__': main() |
上記では前回1スレッドだった処理を最大10スレッドに増やしており、約100req/sec程度のパフォーマンスとなります。
後は必要な速度とマシンのパフォーマンスに合わせて、max_threadを調整することでより最適なパフォーマンスにする事が可能です。
いかがでしたでしょうか?
次回はPut編に続きてGetについての注意点を紹介します。
次回もお楽しみに!!