Amazon SQSを効率よく処理するための簡単なサンプルコード(perl / python)
Amazon SQSって便利だと思うんですがWeb上の情報って意外と少ないですよね。
実際SQSを使う際に効率よく、かつお金も節約できるような
サンプルが欲しかったので書いてみました。
ワーカーで効率よく処理するため2つのポイント
この2つを気にするだけでだいぶ違うと思います。
- ReceiveMessageで10メッセージずつ処理する
- Long Pollingを使う
ReceiveMessageで10メッセージずつ処理する
1リクエストでメッセージを10個取るようにします*1
Long Pollingを使う
ReceiveMessageWaitTimeSecondsを設定するとロングポーリングでデキュー出来ます。
ロングポーリングにしておくとエンキューされたら即次実行されますし
キューが溜まっていないときには接続しっぱなしで受信処理を行うため
余計なリクエストを行わなくて済みます。
サンプルコード(perl)
Amazon::SQS::Simpleを使います。
ロングポーリングを行う場合はキュークラスに対して
'ReceiveMessageWaitTimeSeconds' を設定します。
SQS送信スクリプト
use strict; use warnings; use Amazon::SQS::Simple; my $access_key = ''; # Your AWS Access Key ID my $secret_key = ''; # Your AWS Secret Key my $queue_name = 'perl_test_queue'; # Create an SQS object my $sqs = Amazon::SQS::Simple->new( $access_key, $secret_key, Endpoint => "https://ap-northeast-1.queue.amazonaws.com", ); # Create or Get a queue my $q = $sqs->CreateQueue($queue_name); # Long Polling Setting $q->SetAttribute('ReceiveMessageWaitTimeSeconds', 20); # Send a message $q->SendMessage('Hello world!');
SQS受信スクリプト
ReceiveMessageで MaxNumberOfMessages を指定します。
ロングポーリングを使用しているため、
キューが存在しない時には「loop....」の表示が
20秒毎に表示されるのが確認できると思います。
use strict; use warnings; use Amazon::SQS::Simple; use Time::Piece; my $access_key = ''; # Your AWS Access Key ID my $secret_key = ''; # Your AWS Secret Key my $queue_name = 'perl_test_queue'; # Create an SQS object my $sqs = Amazon::SQS::Simple->new( $access_key, $secret_key, Endpoint => "https://ap-northeast-1.queue.amazonaws.com", ); # Create a new queue my $q = $sqs->CreateQueue($queue_name); # Long Polling Setting $q->SetAttribute('ReceiveMessageWaitTimeSeconds', 20); my $i=0; while(1) { # fetch 10 messages my @msgs = $q->ReceiveMessage( MaxNumberOfMessages => 10, ); for my $msg (@msgs) { warn sprintf("%s recv%s: %s", Time::Piece->new(), $i, $msg->MessageBody()); # delete message $q->DeleteMessage($msg->ReceiptHandle()); $i++; } warn sprintf("%s loop...", Time::Piece->new()); }
サンプルコード(python)
python版もperl版とほぼ同じ処理を行なってみます。
ライブラリは boto を使います。
ロングポーリングを行う場合はキュークラスに対して
'ReceiveMessageWaitTimeSeconds' を設定します。
SQS送信スクリプト
import sys import boto.sqs from boto.sqs.message import Message AWS_ACCESS_KEY='' AWS_SECRET_KEY='' queue_name = 'python_test_queue' # connection conn = boto.sqs.connect_to_region( "ap-northeast-1", aws_access_key_id=AWS_ACCESS_KEY, aws_secret_access_key=AWS_SECRET_KEY) # Create or Get queue queue = conn.create_queue(queue_name) # Long Polling Setting queue.set_attribute('ReceiveMessageWaitTimeSeconds', 20) # send queue.write(Message(body='Hello World!'))
SQS受信スクリプト
get_messagesの引数に取得メッセージ数を指定します。
import sys import boto.sqs from boto.sqs.message import Message from datetime import datetime AWS_ACCESS_KEY='' AWS_SECRET_KEY='' queue_name = 'python_test_queue' # connection conn = boto.sqs.connect_to_region( "ap-northeast-1", aws_access_key_id=AWS_ACCESS_KEY, aws_secret_access_key=AWS_SECRET_KEY) # Create or Get queue queue = conn.create_queue(queue_name) # Long Polling Setting queue.set_attribute('ReceiveMessageWaitTimeSeconds', 20) i=0 while 1: # fetch 10 messages msgs = queue.get_messages(10) for msg in msgs: dt = datetime.today().strftime('%Y/%m/%d %H:%M:%S') sys.stderr.write("%s recv%s: %s\n" % (dt, str(i), msg.get_body())) i = i+1 # delete message queue.delete_message(msg) dt = datetime.today().strftime('%Y/%m/%d %H:%M:%S') sys.stderr.write("%s loop...\n" % (dt))
大量に捌きたいときには
perlであればParallel::ForkManagerで、
pythonであればmultiprocessingなどを用いて
マルチプロセス/マルチスレッド化するのが良いでしょう。
その他注意点
ロングポーリングを使う時、複数のキューを処理しようとすると効率が悪いらしいので
「1キュー1ワーカー」
で使うようにしましょう。
*1:10が最大らしい