You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
111 lines
2.9 KiB
111 lines
2.9 KiB
import json
|
|
import requests
|
|
import pytest
|
|
import time
|
|
import os
|
|
from typing import Generator, Any
|
|
|
|
import paho.mqtt.client as mqtt
|
|
|
|
from utils import Collection
|
|
|
|
|
|
influx_query_url = "http://localhost:8086/query?db=influx&"
|
|
# status for mqtt
|
|
SUCCESS = 0
|
|
|
|
|
|
@pytest.fixture
|
|
def sensors(demo_db) -> Generator[Collection, None, None]:
|
|
collection = Collection(demo_db, "sensors")
|
|
yield collection
|
|
collection.drop()
|
|
|
|
|
|
@pytest.fixture
|
|
def mqtt_client() -> Generator[mqtt.Client, None, None]:
|
|
username = ""
|
|
password = ""
|
|
parent_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
secrets_path = os.path.join(parent_path, "secrets")
|
|
|
|
with open(os.path.join(secrets_path, "mqtt_user.txt"), "r") as file:
|
|
username = file.read()
|
|
with open(os.path.join(secrets_path, "mqtt_pass.txt"), "r") as file:
|
|
password = file.read()
|
|
|
|
mqtt_client = mqtt.Client()
|
|
mqtt_client.username_pw_set(username, password)
|
|
mqtt_client.connect("localhost", 1883)
|
|
yield mqtt_client
|
|
mqtt_client.disconnect()
|
|
|
|
|
|
def test_db_insert(mqtt_client, sensors):
|
|
# publish message
|
|
measurement = "temperature"
|
|
cleanup_influx(measurement)
|
|
mqtt_response = publish_message(
|
|
mqtt_client,
|
|
"sensors",
|
|
json.dumps({"sensor_id": measurement, "sensor_value": 10}),
|
|
)
|
|
assert mqtt_response == SUCCESS
|
|
|
|
# influx
|
|
query = "q=SELECT * FROM {}".format(measurement)
|
|
response = requests.get(influx_query_url + query)
|
|
results = response.json()["results"]
|
|
series = results[0]["series"]
|
|
values = series[0]["values"]
|
|
name = series[0]["name"]
|
|
|
|
assert len(results) == 1
|
|
assert name == measurement
|
|
assert values[0][1] == 10
|
|
mqtt_client.disconnect()
|
|
|
|
# mongo
|
|
response = sensors.get({})
|
|
items = response[0]["items"]
|
|
assert len(items) == 1
|
|
assert items[0]["value"] == 10
|
|
|
|
# delete data
|
|
cleanup_influx(measurement)
|
|
|
|
|
|
def test_mqtt_publish(mqtt_client, sensors):
|
|
measurement = "temperature"
|
|
cleanup_influx(measurement)
|
|
|
|
publish_message(
|
|
mqtt_client,
|
|
"sensors",
|
|
json.dumps({"sensor_id": measurement, "sensor_value": 10}),
|
|
)
|
|
|
|
mqtt_client.subscribe("averages/{}".format(measurement))
|
|
mqtt_client.on_message = check_message
|
|
mqtt_client.loop_start()
|
|
cleanup_influx(measurement)
|
|
sensors.delete(measurement)
|
|
|
|
|
|
def publish_message(mqtt_client, topic: str, data: str) -> int:
|
|
mqtt_response = mqtt_client.publish(topic, data)
|
|
time.sleep(0.5)
|
|
return mqtt_response[0]
|
|
|
|
|
|
def cleanup_influx(measurement: str) -> int:
|
|
resp = requests.post(
|
|
'http://localhost:8086/query?db=influx&q=DELETE FROM "{}"'.format(measurement)
|
|
)
|
|
return resp.status_code
|
|
|
|
|
|
def check_message(client: mqtt.Client, userdata: Any, msg: mqtt.MQTTMessage):
|
|
message = msg.payload.decode("utf-8")
|
|
decoded_data = json.loads(message)
|
|
assert decoded_data["sensor_value"] == 10
|
|
|