アルパカDiary Pro

はてなブログProではありません

Amazon SQSを効率よく処理するための簡単なサンプルコード(perl / python)

Amazon SQSって便利だと思うんですがWeb上の情報って意外と少ないですよね。
実際SQSを使う際に効率よく、かつお金も節約できるような
サンプルが欲しかったので書いてみました。

ワーカーで効率よく処理するため2つのポイント

この2つを気にするだけでだいぶ違うと思います。

  • ReceiveMessageで10メッセージずつ処理する
  • Long Pollingを使う
ReceiveMessageで10メッセージずつ処理する

1リクエストでメッセージを10個取るようにします*1

Long Pollingを使う

ReceiveMessageWaitTimeSecondsを設定するとロングポーリングでデキュー出来ます。
ロングポーリングにしておくとエンキューされたら即次実行されますし
キューが溜まっていないときには接続しっぱなしで受信処理を行うため
余計なリクエストを行わなくて済みます。

サンプルコード(perl)

Amazon::SQS::Simpleを使います。
ロングポーリングを行う場合はキュークラスに対して
'ReceiveMessageWaitTimeSeconds' を設定します。

SQS送信スクリプト
use strict;
use warnings;
use Amazon::SQS::Simple;

my $access_key = ''; # Your AWS Access Key ID
my $secret_key = ''; # Your AWS Secret Key
my $queue_name = 'perl_test_queue';

# Create an SQS object
my $sqs = Amazon::SQS::Simple->new(
    $access_key,
    $secret_key,
    Endpoint => "https://ap-northeast-1.queue.amazonaws.com",
);

# Create or Get a queue
my $q = $sqs->CreateQueue($queue_name);
# Long Polling Setting
$q->SetAttribute('ReceiveMessageWaitTimeSeconds', 20);

# Send a message
$q->SendMessage('Hello world!');
SQS受信スクリプト

ReceiveMessageで MaxNumberOfMessages を指定します。
ロングポーリングを使用しているため、
キューが存在しない時には「loop....」の表示が
20秒毎に表示されるのが確認できると思います。

use strict;
use warnings;
use Amazon::SQS::Simple;
use Time::Piece;

my $access_key = ''; # Your AWS Access Key ID
my $secret_key = ''; # Your AWS Secret Key
my $queue_name = 'perl_test_queue';

# Create an SQS object
my $sqs = Amazon::SQS::Simple->new(
    $access_key,
    $secret_key,
    Endpoint => "https://ap-northeast-1.queue.amazonaws.com",
);

# Create a new queue
my $q = $sqs->CreateQueue($queue_name);
# Long Polling Setting
$q->SetAttribute('ReceiveMessageWaitTimeSeconds', 20);

my $i=0;
while(1) {
    # fetch 10 messages
    my @msgs = $q->ReceiveMessage(
        MaxNumberOfMessages => 10,
    );
    for my $msg (@msgs) {
        warn sprintf("%s recv%s: %s", Time::Piece->new(), $i, $msg->MessageBody());
        # delete message
        $q->DeleteMessage($msg->ReceiptHandle());
        $i++;
    }
    warn sprintf("%s loop...", Time::Piece->new());
}

サンプルコード(python)

python版もperl版とほぼ同じ処理を行なってみます。
ライブラリは boto を使います。
ロングポーリングを行う場合はキュークラスに対して
'ReceiveMessageWaitTimeSeconds' を設定します。

SQS送信スクリプト
import sys
import boto.sqs
from boto.sqs.message import Message

AWS_ACCESS_KEY=''
AWS_SECRET_KEY=''

queue_name = 'python_test_queue'

# connection
conn = boto.sqs.connect_to_region(
     "ap-northeast-1",
     aws_access_key_id=AWS_ACCESS_KEY,
     aws_secret_access_key=AWS_SECRET_KEY)

# Create or Get queue
queue = conn.create_queue(queue_name)
# Long Polling Setting
queue.set_attribute('ReceiveMessageWaitTimeSeconds', 20)

# send
queue.write(Message(body='Hello World!'))
SQS受信スクリプト

get_messagesの引数に取得メッセージ数を指定します。

import sys
import boto.sqs
from boto.sqs.message import Message
from datetime import datetime

AWS_ACCESS_KEY=''
AWS_SECRET_KEY=''

queue_name = 'python_test_queue'

# connection
conn = boto.sqs.connect_to_region(
     "ap-northeast-1",
     aws_access_key_id=AWS_ACCESS_KEY,
     aws_secret_access_key=AWS_SECRET_KEY)

# Create or Get queue
queue = conn.create_queue(queue_name)
# Long Polling Setting
queue.set_attribute('ReceiveMessageWaitTimeSeconds', 20)

i=0
while 1:
    # fetch 10 messages
    msgs = queue.get_messages(10)
    for msg in msgs:
        dt = datetime.today().strftime('%Y/%m/%d %H:%M:%S')
        sys.stderr.write("%s recv%s: %s\n" % (dt, str(i), msg.get_body()))
        i = i+1
        # delete message
        queue.delete_message(msg)

    dt = datetime.today().strftime('%Y/%m/%d %H:%M:%S')
    sys.stderr.write("%s loop...\n" % (dt))

大量に捌きたいときには

perlであればParallel::ForkManagerで、
pythonであればmultiprocessingなどを用いて
マルチプロセス/マルチスレッド化するのが良いでしょう。

その他注意点

ロングポーリングを使う時、複数のキューを処理しようとすると効率が悪いらしいので
「1キュー1ワーカー」
で使うようにしましょう。

*1:10が最大らしい