バスキュール技術ブログ

バスキュールが得意とするインタラクティブエンジニアリングを、あますことなくお届け!

Locustを利用したMQTT over WebSocketの性能テスト

f:id:bascule-dev:20170509135655j:plain

はじめまして。 バックエンドエンジニアの会津@ido)です。

2016年に携わった案件「流星放送局」ではサイトを訪問している大量ユーザーにプッシュ通知を行う必要がありました。

Websocketサーバ実装といえばSocket.IOなどがありますが、昨今の流行りに乗って自前で開発はせずサーバレスでフルマネージメントなAWS IoTのMQTT over WebSocketを利用して実装しました。

疎通まではコンソールからAWS IoTの設定するだけの簡単なお仕事なのですが、大量のWebsocket接続へのプッシュ通知がどの程度遅延するかを計測するというなかなか骨の折れそうな性能テストをしなければなりません。 ただ流石にこんなニッチな性能テストができるサービスは存在していなかったので、Locustを利用してテストを行いました。

なぜLocustなのか

  1. 普段Pythonばかり書いているのでテストシナリオはPythonで書きたい、間違ってもXMLなんて書きたくない。
  2. 接続するWebsocketクライアント数を手軽に変更したい、スケーラビリティ大事。

以上の2点からLocustを選びました。

ちなみにLocustは日本語訳でイナゴです。 イナゴを大量発生させて操れるなんて気分はモーセですね。

テストを行う

  1. ANSIBLE + Packerを使ってLocustのマスター・スレーブ2種類のAMI作成
  2. TerraformでLocustクラスターを構築(今回スレーブは最大200インスタンス起動)
  3. マスターのWebGUIから接続数を設定して実行
  4. ローカルPCまたはAWS IoTコンソールからMQTT BrokerにPublish
  5. マスターのWebGUIで性能確認

大まかに書くと以上の手順でテストを行いますが、構築などの詳細を書くと膨大になるため割愛。(リクエストあれば書くかも) テストシナリオとテストPublishを行うコードは以下のとおりです。

locustfile.py

# -*- coding: utf-8 -*-
from __future__ import division, print_function, absolute_import

import json
import time
import uuid

from locust import TaskSet, task, Locust
from locust.events import request_success

from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient

CA_FILE_PATH = './rootCA.pem'
QoS = 1
AWS_ACCESS_KEY_ID = 'XXXXXXXXX'
AWS_SECRET_ACCESS_KEY = 'XXXXXXXXX'
ENDPOINT = 'XXXXXXXXXX.iot.ap-northeast-1.amazonaws.com'
TOPIC = '/ryusei/notification'


def on_recive(client, userdata, message):
    payload = json.loads(message.payload)
    elapsed = time.time() - payload['started_at']
    request_success.fire(
        request_type='Recive Notification',
        name=message.topic,
        response_time=elapsed,
        response_length=len(message.payload),
    )


def get_client():
    client = AWSIoTMQTTClient(clientID=uuid.uudi4().hex,
                              useWebsocket=True)
    client.configureIAMCredentials(AWSAccessKeyID=AWS_ACCESS_KEY_ID,
                                   AWSSecretAccessKey=AWS_SECRET_ACCESS_KEY)
    client.configureCredentials(CAFilePath=CA_FILE_PATH)
    client.configureEndpoint(hostName=ENDPOINT,
                             portNumber=443)
    client.configureOfflinePublishQueueing(-1)
    client.configureDrainingFrequency(2)
    client.configureConnectDisconnectTimeout(10)
    client.configureMQTTOperationTimeout(60)
    return client


class AWSIoTTaskSet(TaskSet):
    bufsize = 5000

    def on_start(self):
        client = get_client()
        client.connect()
        client.subscribe(TOPIC, QoS, on_recive)

        while True:
            time.sleep(1)


class AWSIoTUser(Locust):
    task_set = AWSIoTTaskSet
    min_wait = 100
    max_wait = 100

publish.py

# -*- coding: utf-8 -*-
from __future__ import division, print_function, absolute_import

import json
import time
import uuid

from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient

CA_FILE_PATH = './rootCA.pem'
QoS = 1
AWS_ACCESS_KEY_ID = 'XXXXXXXXX'
AWS_SECRET_ACCESS_KEY = 'XXXXXXXXX'
ENDPOINT = 'XXXXXXXXXX.iot.ap-northeast-1.amazonaws.com'
TOPIC = '/ryusei/notification'



def get_client():
    client = AWSIoTMQTTClient(clientID=uuid.uudi4().hex,
                              useWebsocket=True)
    client.configureIAMCredentials(AWSAccessKeyID=AWS_ACCESS_KEY_ID,
                                   AWSSecretAccessKey=AWS_SECRET_ACCESS_KEY)
    client.configureCredentials(CAFilePath=CA_FILE_PATH)
    client.configureEndpoint(hostName=ENDPOINT,
                             portNumber=443)
    client.configureOfflinePublishQueueing(-1)
    client.configureDrainingFrequency(2)
    client.configureConnectDisconnectTimeout(10)
    client.configureMQTTOperationTimeout(60)
    return client

def main():
    client = get_client()
    client.connect()
    client.publish(TOPIC, json.dumps({'started_at': time.time()}), 1)

if __name__ == "__main__":
    main()

おわりに

Locustでの性能テストを繰り返すことでAWS IoTの特性を知ることができ、TVの放送本番までには満足できる結果を得られるまでチューニングを行いました。 みなさんもLocustを使って強力かつ楽チンな性能テストライフをはじめてはいかがでしょうか。