You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
110 lines
3.4 KiB
110 lines
3.4 KiB
from logging import RootLogger
|
|
import json, time, datetime, statistics, requests
|
|
|
|
import paho.mqtt.client
|
|
from pymongo import MongoClient, errors
|
|
|
|
from utils import get_logger, read_docker_secret
|
|
|
|
|
|
influxdb_url = "http://influxdb:8086/write?db=influx"
|
|
mongo_host = "mongodb"
|
|
mqtt_host = "mqtt"
|
|
mqtt_user = read_docker_secret("MQTT_USER")
|
|
mqtt_password = read_docker_secret("MQTT_PASSWORD")
|
|
|
|
mongo_client = MongoClient(mongo_host, 27017)
|
|
sensors = mongo_client.demo.sensors
|
|
logger = get_logger()
|
|
|
|
|
|
class Mqtt:
|
|
def __init__(self, host: str, user: str, password: str, logger: RootLogger) -> None:
|
|
self.__host = host
|
|
self.__user = user
|
|
self.__password = password
|
|
self.__logger = logger
|
|
self.__topic = None
|
|
|
|
def connect(self, topic: str):
|
|
self.__topic = topic
|
|
client = paho.mqtt.client.Client()
|
|
client.username_pw_set(mqtt_user, mqtt_password)
|
|
client.on_connect = self.on_connect
|
|
client.on_message = self.on_message
|
|
client.connect(self.__host, 1883, 60)
|
|
client.loop_start()
|
|
|
|
def on_connect(
|
|
self, client: paho.mqtt.client.Client, userdata, flags: dict, rc: int
|
|
):
|
|
client.subscribe(self.__topic)
|
|
|
|
def on_message(
|
|
self,
|
|
client: paho.mqtt.client.Client,
|
|
userdata,
|
|
msg: paho.mqtt.client.MQTTMessage,
|
|
):
|
|
try:
|
|
message = msg.payload.decode("utf-8")
|
|
decoded_data = json.loads(message)
|
|
except Exception as e:
|
|
self.__logger.error(
|
|
"could not decode message {0}, error: {1}".format(msg, str(e))
|
|
)
|
|
return
|
|
|
|
# skip processing if it's averages topic to avoid an infinit loop
|
|
sensors.update_one(
|
|
{"_id": decoded_data["sensor_id"]},
|
|
{
|
|
"$push": {
|
|
"items": {
|
|
"$each": [
|
|
{
|
|
"value": decoded_data["sensor_value"],
|
|
"date": datetime.datetime.utcnow(),
|
|
}
|
|
],
|
|
"$sort": {"date": -1},
|
|
"$slice": 5,
|
|
}
|
|
}
|
|
},
|
|
upsert=True,
|
|
)
|
|
# add data to grafana through influxdb
|
|
try:
|
|
requests.post(
|
|
url=influxdb_url,
|
|
data="{0} value={1}".format(
|
|
decoded_data["sensor_id"], decoded_data["sensor_value"]
|
|
),
|
|
)
|
|
except Exception as e:
|
|
self.__logger.error(
|
|
"Error writing to influxdb {0}, error: {1}".format(msg, str(e))
|
|
)
|
|
# obtain the mongo sensor data by id
|
|
sensor_data = list(sensors.find({"_id": decoded_data["sensor_id"]}))
|
|
# we extract the sensor last values from sensor_data
|
|
sensor_values = [d["value"] for d in sensor_data[0]["items"]]
|
|
client.publish(
|
|
"averages/{0}".format(decoded_data["sensor_id"]),
|
|
statistics.mean(sensor_values),
|
|
2,
|
|
)
|
|
|
|
|
|
mqtt = Mqtt(mqtt_host, mqtt_user, mqtt_password, logger)
|
|
mqtt.connect("sensors")
|
|
|
|
logger.debug("MQTT App started")
|
|
try:
|
|
mongo_client.admin.command("replSetInitiate")
|
|
except errors.OperationFailure as e:
|
|
logger.error("Error setting mongodb replSetInitiate error: {0}".format(str(e)))
|
|
|
|
while True:
|
|
time.sleep(0.05)
|
|
|