かれ4

かれこれ4個目のブログ

SQSってなんじゃ?(SQSに関する考察)

はじめに

AWS初心者ですが、わからないなりに、SQSという物を調べてみました。
特に重複について


先日SQSのアップデートがあったみたいです。
エヴァンジェリスト 堀内さんのブログにはものすごく大事な事が書かれていました。

”重要な副作用として、ロングポーリングはメッセージのための全てのSQSホストをチェックします(定期的なポーリングがサブセットをチェックします)。ロングポーリングが空のメッセージセットを返すことにより、未処理のメッセージがキューに存在しないことが確認できます。”

これは。。。
SQSの重複を緩和できる可能性が出てきました。


ここからは初心者ながら知ったかぶって書くことにします。

SQSの重複発生の理由の想像

そもそもSQSでなぜ重複が起こっていたかというと、
VisivilityTimeoutになる前にreceiveしてしまったからでは無いかと考えられます。

昔マイスターシリーズで公開された資料を見てください。



この図の中で注目すべき所はVisibilityTimeoutの青色の左端の所です。



この赤いまるで囲んだ部分で、ReceiveからVisibilityTimeoutまでの時間が0では無いのではないかと考えられます。

とても短い間隔で2回Receiveしてしまうと、VisibilityTimeoutの情報が伝搬する前に取得する事ができてしまうから重複が発生するはずです。
1つのクライアントからではありえない間隔だとしても、複数のクライアントがあった場合は、どんなタイミングでreceiveするかなんてわかりません。
ここを拡大すると、こうなってます。

receiveしてからVsibilityTimeoutとなるまでの間に重複発生時間が有ると推測できます。


なんで、ここに重複発生時間が存在するのかというところですが、SQSがサービスとして提供していて、堅牢性を確保する為に、
一台のマシンで処理しているのではないからです。


1つのQueueに対しても複数台のホストを使って、メッセージを保存しているはずです。


SQSのホストたちはメッセージをreceiveされると、receiveされたことを他のサブセットに含まれるノードにお知らせしなくてはいけないはずなので、
そのタイムラグが重複発生時間の原因になると考えられます。



この辺りの説明は
Amazon.com CTO Wernerのブログ
を見たらより理解出来ると思います。


重複が発生しないように

重複が発生しないようにするには、1つのQueueからReceiveしているWorkerの数を調整するしかないのでしょうか。
その辺りをもう少し追っていこうと思います。

まず上で出した、重複発生時間この時間を t秒 とします。

一度目のreceiveでMessageAを受信したとします。そこからt秒以内に次のreceiveをしてしまうと重複してしまうかもしれません。
ここで一度目のreceiveをした時間をr1、次のreceiveをした時間をr2とします。


かもしれませんというのは、二回目のreceiveが参照するホストのサブセットがMessageAを含んでいない可能性があるからです。
その場合は重複しないことになります。
ここで、確実に重複が無いことが保証されているのは


r1 + t > r2


が成り立つ時です。
では、逆に重複が発生する条件としては、


r1 + t < r2 が成り立ち、r2でのreceiveの時に参照したサブセットがMessageAを含んでいる場合となります。


サブセットがMessageAを含んでいる確率というのも実測で求めてみたのですが、ほぼ1で面白くなかったので細かく書くのを辞めました。


調べ方としては

  1. Queueを作成
  2. メッセージを送信
  3. ランダムミリ秒待機
  4. メッセージ受信
  5. Queueを削除

を何度も繰り返し、2〜4の間の時間と、
メッセージがあったかどうかの確認です。



調べる用のソースはこちら


IAMRoleな環境で動きます。


データはこちらにあります。


1列目が、send_messageしてからreceive_messageまでの時間
2列目が、送信したメッセージ1つに対してreceive出来た数です。


重複の記録

では重複の発生を記録して、どのような時に発生するのかを見てみます。
重複の条件は上であげた

r1 + t < r2 が成り立ち、r2でのreceiveの時に参照したサブセットがMessageAを含んでいる場合
となるはずで、

サブセットがあるメッセージを含んでいる場合というのはほぼ1だったので、
あとは r1 + t < r2 の条件 すなわち t がわかれば良いと思います。

ということで t が実際どの程度の値なのかというのを計測したいと思います。

以下のようなコードをEC2を30台くらい使って動かし、300並列で処理します。



EC2いっぱいで気持ちいいの図

やってて良かった上限緩和申請

package com.tottokug;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.URL;
import java.util.Calendar;
import java.util.HashMap;
import java.util.Map;

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.dynamodb.AmazonDynamoDBClient;
import com.amazonaws.services.dynamodb.model.AttributeAction;
import com.amazonaws.services.dynamodb.model.AttributeValue;
import com.amazonaws.services.dynamodb.model.AttributeValueUpdate;
import com.amazonaws.services.dynamodb.model.Key;
import com.amazonaws.services.dynamodb.model.PutItemRequest;
import com.amazonaws.services.dynamodb.model.ReturnValue;
import com.amazonaws.services.dynamodb.model.UpdateItemRequest;
import com.amazonaws.services.sns.AmazonSNSClient;
import com.amazonaws.services.sqs.AmazonSQSClient;
import com.amazonaws.services.sqs.model.CreateQueueRequest;
import com.amazonaws.services.sqs.model.CreateQueueResult;
import com.amazonaws.services.sqs.model.DeleteMessageRequest;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import com.amazonaws.services.sqs.model.SendMessageRequest;
import com.amazonaws.services.sqs.model.SendMessageResult;

publicclass App {

  publicstaticvoid main(String[] args) {
    final String tablename = "sqsduplicate1";
    finalint baseSleepTime = 20;
    final AmazonDynamoDBClient d = new AmazonDynamoDBClient();
    d.setEndpoint("dynamodb.ap-northeast-1.amazonaws.com");
    final AmazonSQSClient q = new AmazonSQSClient();
    q.setEndpoint("sqs.ap-northeast-1.amazonaws.com");
    final AmazonSNSClient n = new AmazonSNSClient();
    n.setEndpoint("sns.ap-northeast-1.amazonaws.com");
    final String hostname;
    String buf = null;

    try {
      URL url = new URL("http://169.254.169.254/latest/meta-data/public-hostname");
      BufferedReader br = new BufferedReader(new InputStreamReader(url.openStream()));
      while ((buf = br.readLine()) != null) {
        break;
      }
    } catch (Exception e) {
    }

    if (buf == null) {
      buf = "unknown host";
    }
    hostname = buf;
    final String queueName;
    finalint multiplicity;
    if (args.length == 2 && args[1].matches("^\\d+$")) {
      queueName = args[0];
      multiplicity = Integer.parseInt(args[1]);
    } elseif (args.length == 1) {
      queueName = args[0];
      multiplicity = 10;
    } else {
      queueName = "normalQueue";
      multiplicity = 10;
    }
    CreateQueueResult qr = q.createQueue(new CreateQueueRequest(queueName));
    final String queueUrl = qr.getQueueUrl();

    new Thread(new Runnable() {
      @Override
      publicvoid run() {
        while (true) {
          try {
            SendMessageRequest r = new SendMessageRequest();
            long beforeTime = System.currentTimeMillis();
            r.withQueueUrl(queueUrl).withMessageBody(String.valueOf(Calendar.getInstance().getTimeInMillis()));
            System.out.println("[" + queueName + "]Message送信 : " + r.getMessageBody());
            SendMessageResult res = q.sendMessage(r);
            String messageId = res.getMessageId();
            HashMap<String, AttributeValue> item = new HashMap<String, AttributeValue>();
            item.put("queueName", new AttributeValue().withS(queueName));
            item.put("messageId", new AttributeValue().withS(messageId));
            item.put("sendHost", new AttributeValue().withS(hostname));
            item.put("afterSentTime", new AttributeValue().withN(String.valueOf(System.currentTimeMillis())));
            item.put("beforeSentTime", new AttributeValue().withN(String.valueOf(beforeTime)));
            PutItemRequest pir = new PutItemRequest(tablename, item);
            while (true) {
              try {
                d.putItem(pir);
                break;
              } catch (Exception e) {
                try {
                  Thread.sleep(1000);
                  System.out.println("retry");
                } catch (InterruptedException e1) {
                  // TODO Auto-generated catch block
                  e1.printStackTrace();
                }
                continue;
              }
            }

            try {
              Thread.sleep(baseSleepTime * 1);
            } catch (InterruptedException e) {
            }
          } catch (Exception e) {

          }
        }
      }
    }).start();
    for (int i = 0; i < multiplicity; i++) {
      new Thread(new Runnable() {
        @Override
        publicvoid run() {
          while (true) {
            try {
              long beforeRecieveTime = System.currentTimeMillis();
              ReceiveMessageResult rmr = q.receiveMessage(new ReceiveMessageRequest().withQueueUrl(queueUrl).withWaitTimeSeconds(20));
              for (Message m : rmr.getMessages()) {
                System.out.println("[" + queueName + "]Message受信 : " + m.getBody() + "(" + m.getMessageId() + ")");
                if (m.getBody().matches("kill")) {
                  // message body がkillだったら プロセス終了
                  System.exit(0);
                }
                UpdateItemRequest count = new UpdateItemRequest();
                Map<String, AttributeValueUpdate> countItem = new HashMap<String, AttributeValueUpdate>();
                countItem.put("count", new AttributeValueUpdate(new AttributeValue().withN("1"), AttributeAction.ADD));
                count.withKey(new Key(new AttributeValue(queueName), new AttributeValue("all")));
                count.withTableName(tablename);
                count.setAttributeUpdates(countItem);
                d.updateItem(count);

                UpdateItemRequest message = new UpdateItemRequest();
                message.setReturnValues(ReturnValue.UPDATED_OLD);
                Map<String, AttributeValueUpdate> messageItem = new HashMap<String, AttributeValueUpdate>();
                messageItem.put("count", new AttributeValueUpdate(new AttributeValue().withN("1"), AttributeAction.ADD));
                messageItem.put("message", new AttributeValueUpdate(new AttributeValue().withS(m.getBody()), AttributeAction.PUT));
                messageItem.put("recieveHost", new AttributeValueUpdate(new AttributeValue().withSS(hostname), AttributeAction.ADD));
                messageItem.put("beforeRecieveTime", new AttributeValueUpdate(new AttributeValue().withNS(String.valueOf(beforeRecieveTime)), AttributeAction.ADD));
                messageItem.put("afterRecieveTime", new AttributeValueUpdate(new AttributeValue().withNS(String.valueOf(System.currentTimeMillis())), AttributeAction.ADD));
                message.withKey(new Key(new AttributeValue(queueName), new AttributeValue(m.getMessageId())));
                message.withTableName(tablename);
                message.setAttributeUpdates(messageItem);
                q.deleteMessage(new DeleteMessageRequest(queueUrl, m.getReceiptHandle()));

                while (true) {
                  try {
                    d.updateItem(message);
                    break;
                  } catch (Exception e) {
                    try {
                      Thread.sleep(1000);
                      System.out.println("retry");
                    } catch (InterruptedException e1) {
                      // TODO Auto-generated catch block
                      e1.printStackTrace();
                    }
                    continue;
                  }
                }
              }
              try {
                Thread.sleep(baseSleepTime * multiplicity);
              } catch (InterruptedException e) {
              }
            } catch (Exception e) {

            }
          }
        }
      }).start();
    }
  }
}

これで約30万メッセージ分のデータをDynamoDBに入れてみました。

HashKey queueName S
RangeKey messageId S
送信前のタイムスタンプ(ms) afterSendTime N
送信後のタイムスタンプ(ms) beforeSendTime N
受信前のタイムスタンプ(ms) afterReceiveTime NS
受信後のタイムスタンプ(ms) beforeReceiveTime NS
受信回数 count N

受信する度にcountの値を+1します。重複があるとcountが2以上になります。
送受信共に前後の時間をとってるのはSQSへの通信のレイテンシがtの値に比べて非常に大きそうだったからです。
細かいこと考えると大変なので前後の時間の平均がSQSにリクエストが届いた時間とみなすことにします。
さらにReceiveTimeの2つはcountの数だけ数値を持つナンバーセットです。

このデータから t はおおよそ以下の式で推測できるのかなと
abs(avg(afterReceiveTime.2,beforeReceiveTime.2) - avg(afterReceiveTime.1,beforeReceiveTime.1) ) < t


記録されたデータからの考察

実際にとったデータでこれを見てみると
重複があったメッセージは約300000件中、29件

abs(avg(afterReceiveTime.2,beforeReceiveTime.2) - avg(afterReceiveTime.1,beforeReceiveTime.1) )
は 7 〜 796(ms) の範囲に入っていて、平均すると119 ms となっていました。

receiveの間隔が119(ms)以内であれば、重複の発生確率は0.01%になります。

となると、一つのメッセージを処理するのに1秒かかるworkerであれば9台までであれば、ほぼほぼ重複なしで動かせるのでは無いかと考えられます。
この119msはSQSがリクエストを受け取って、レスポンスを返すまで時間の半分も含まれていますので、
台数はもうちょっと増やしても大丈夫なはずです。


longpollingは?

次に同じコードをlongpollingを設定したQueueでも試してみます
そしてメッセージ数 100,000 200,000 300,000でどれくらいの重複があるのか普通のQueueと比較してみます。

こんな結果が出てきました。

表1 メッセージ数と重複発生の回数

/ 100,000 200,000 300,000
longpolling 3 5 6
normalPolling 10 14 29

このデータから
「longpollingを使うことで全く同じソースコードでも、重複の発生件数を半分以下に抑える事が出来るようになった」

という結果を読み取っても良いんじゃないかと思います。

これは全ホストをチェックするという重要な副作用からくるものと考えられますが、
私の予想では、全ホストをチェックしているのではなく、全サブセットをチェックしているのではと思っています。
全サブセットにアクセスする事で、VisibilityTimeoutの伝搬が各サブセット内で並列に行われているため、
t の時間が少なくなり重複の発生が抑制されているのでは無いでしょうか。

SQSの重複に関する調査でした。


P.S.
re:Inventの時にSQSの人に聞いたら t は1〜20ms って言っていたので、計測の誤差がだいぶ大きいなという。。。。
今度microじゃないインスタンスでやってみなくては。



データ取るのに8,000円くらい使ってしまった