You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

110 lines
3.4 KiB

from logging import RootLogger
import json, time, datetime, statistics, requests
import paho.mqtt.client
from pymongo import MongoClient, errors
from utils import get_logger, read_docker_secret
influxdb_url = "http://influxdb:8086/write?db=influx"
mongo_host = "mongodb"
mqtt_host = "mqtt"
mqtt_user = read_docker_secret("MQTT_USER")
mqtt_password = read_docker_secret("MQTT_PASSWORD")
mongo_client = MongoClient(mongo_host, 27017)
sensors = mongo_client.demo.sensors
logger = get_logger()
class Mqtt:
def __init__(self, host: str, user: str, password: str, logger: RootLogger) -> None:
self.__host = host
self.__user = user
self.__password = password
self.__logger = logger
self.__topic = None
def connect(self, topic: str):
self.__topic = topic
client = paho.mqtt.client.Client()
client.username_pw_set(mqtt_user, mqtt_password)
client.on_connect = self.on_connect
client.on_message = self.on_message
client.connect(self.__host, 1883, 60)
client.loop_start()
def on_connect(
self, client: paho.mqtt.client.Client, userdata, flags: dict, rc: int
):
client.subscribe(self.__topic)
def on_message(
self,
client: paho.mqtt.client.Client,
userdata,
msg: paho.mqtt.client.MQTTMessage,
):
try:
message = msg.payload.decode("utf-8")
decoded_data = json.loads(message)
except Exception as e:
self.__logger.error(
"could not decode message {0}, error: {1}".format(msg, str(e))
)
return
# skip processing if it's averages topic to avoid an infinit loop
sensors.update_one(
{"_id": decoded_data["sensor_id"]},
{
"$push": {
"items": {
"$each": [
{
"value": decoded_data["sensor_value"],
"date": datetime.datetime.utcnow(),
}
],
"$sort": {"date": -1},
"$slice": 5,
}
}
},
upsert=True,
)
# add data to grafana through influxdb
try:
requests.post(
url=influxdb_url,
data="{0} value={1}".format(
decoded_data["sensor_id"], decoded_data["sensor_value"]
),
)
except Exception as e:
self.__logger.error(
"Error writing to influxdb {0}, error: {1}".format(msg, str(e))
)
# obtain the mongo sensor data by id
sensor_data = list(sensors.find({"_id": decoded_data["sensor_id"]}))
# we extract the sensor last values from sensor_data
sensor_values = [d["value"] for d in sensor_data[0]["items"]]
client.publish(
"averages/{0}".format(decoded_data["sensor_id"]),
statistics.mean(sensor_values),
2,
)
mqtt = Mqtt(mqtt_host, mqtt_user, mqtt_password, logger)
mqtt.connect("sensors")
logger.debug("MQTT App started")
try:
mongo_client.admin.command("replSetInitiate")
except errors.OperationFailure as e:
logger.error("Error setting mongodb replSetInitiate error: {0}".format(str(e)))
while True:
time.sleep(0.05)