はじめまして。 バックエンドエンジニアの会津(@ido)です。
2016年に携わった案件「流星放送局」ではサイトを訪問している大量ユーザーにプッシュ通知を行う必要がありました。
Websocketサーバ実装といえばSocket.IOなどがありますが、昨今の流行りに乗って自前で開発はせずサーバレスでフルマネージメントなAWS IoTのMQTT over WebSocketを利用して実装しました。
疎通まではコンソールからAWS IoTの設定するだけの簡単なお仕事なのですが、大量のWebsocket接続へのプッシュ通知がどの程度遅延するかを計測するというなかなか骨の折れそうな性能テストをしなければなりません。 ただ流石にこんなニッチな性能テストができるサービスは存在していなかったので、Locustを利用してテストを行いました。
なぜLocustなのか
以上の2点からLocustを選びました。
ちなみにLocustは日本語訳でイナゴです。 イナゴを大量発生させて操れるなんて気分はモーセですね。
テストを行う
- ANSIBLE + Packerを使ってLocustのマスター・スレーブ2種類のAMI作成
- TerraformでLocustクラスターを構築(今回スレーブは最大200インスタンス起動)
- マスターのWebGUIから接続数を設定して実行
- ローカルPCまたはAWS IoTコンソールからMQTT BrokerにPublish
- マスターのWebGUIで性能確認
大まかに書くと以上の手順でテストを行いますが、構築などの詳細を書くと膨大になるため割愛。(リクエストあれば書くかも) テストシナリオとテストPublishを行うコードは以下のとおりです。
locustfile.py
# -*- coding: utf-8 -*- from __future__ import division, print_function, absolute_import import json import time import uuid from locust import TaskSet, task, Locust from locust.events import request_success from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient CA_FILE_PATH = './rootCA.pem' QoS = 1 AWS_ACCESS_KEY_ID = 'XXXXXXXXX' AWS_SECRET_ACCESS_KEY = 'XXXXXXXXX' ENDPOINT = 'XXXXXXXXXX.iot.ap-northeast-1.amazonaws.com' TOPIC = '/ryusei/notification' def on_recive(client, userdata, message): payload = json.loads(message.payload) elapsed = time.time() - payload['started_at'] request_success.fire( request_type='Recive Notification', name=message.topic, response_time=elapsed, response_length=len(message.payload), ) def get_client(): client = AWSIoTMQTTClient(clientID=uuid.uudi4().hex, useWebsocket=True) client.configureIAMCredentials(AWSAccessKeyID=AWS_ACCESS_KEY_ID, AWSSecretAccessKey=AWS_SECRET_ACCESS_KEY) client.configureCredentials(CAFilePath=CA_FILE_PATH) client.configureEndpoint(hostName=ENDPOINT, portNumber=443) client.configureOfflinePublishQueueing(-1) client.configureDrainingFrequency(2) client.configureConnectDisconnectTimeout(10) client.configureMQTTOperationTimeout(60) return client class AWSIoTTaskSet(TaskSet): bufsize = 5000 def on_start(self): client = get_client() client.connect() client.subscribe(TOPIC, QoS, on_recive) while True: time.sleep(1) class AWSIoTUser(Locust): task_set = AWSIoTTaskSet min_wait = 100 max_wait = 100
publish.py
# -*- coding: utf-8 -*- from __future__ import division, print_function, absolute_import import json import time import uuid from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient CA_FILE_PATH = './rootCA.pem' QoS = 1 AWS_ACCESS_KEY_ID = 'XXXXXXXXX' AWS_SECRET_ACCESS_KEY = 'XXXXXXXXX' ENDPOINT = 'XXXXXXXXXX.iot.ap-northeast-1.amazonaws.com' TOPIC = '/ryusei/notification' def get_client(): client = AWSIoTMQTTClient(clientID=uuid.uudi4().hex, useWebsocket=True) client.configureIAMCredentials(AWSAccessKeyID=AWS_ACCESS_KEY_ID, AWSSecretAccessKey=AWS_SECRET_ACCESS_KEY) client.configureCredentials(CAFilePath=CA_FILE_PATH) client.configureEndpoint(hostName=ENDPOINT, portNumber=443) client.configureOfflinePublishQueueing(-1) client.configureDrainingFrequency(2) client.configureConnectDisconnectTimeout(10) client.configureMQTTOperationTimeout(60) return client def main(): client = get_client() client.connect() client.publish(TOPIC, json.dumps({'started_at': time.time()}), 1) if __name__ == "__main__": main()
おわりに
Locustでの性能テストを繰り返すことでAWS IoTの特性を知ることができ、TVの放送本番までには満足できる結果を得られるまでチューニングを行いました。 みなさんもLocustを使って強力かつ楽チンな性能テストライフをはじめてはいかがでしょうか。