diff --git a/src/fake-device-connection.js b/src/fake-device-connection.js new file mode 100644 index 0000000..2bcb2e0 --- /dev/null +++ b/src/fake-device-connection.js @@ -0,0 +1,37 @@ +const influx = require('./influx')() + +module.exports = function () { + /** + * Generates writepoints for influx bluk write operation + */ + const getWritePoints = (deviceId, data) => { + let writePoints = [] + // itterate all data chunks + data.forEach(chunk => { + // and for each data chunk key, create measurement and push it into writePoint (bulk-write) + Object + .keys(chunk) + .filter(measurement => measurement !== 'timestamp') + .forEach(measurement => { + writePoints.push({ + measurement, + tags: { + deviceId + }, + fields: { + value: chunk[measurement] + }, + timestamp: chunk['timestamp'] + }) + }) + }) + return writePoints + } + + const send = function (deviceId, data) { + return influx.writePoints(getWritePoints(deviceId, data)) + } + return { + send + } +} diff --git a/src/fake-device-manager.js b/src/fake-device-manager.js new file mode 100644 index 0000000..9b68e20 --- /dev/null +++ b/src/fake-device-manager.js @@ -0,0 +1,96 @@ +const sizeof = require('object-sizeof') +const FakeDevice = require('./fake-device') + +module.exports = function FakeDeviceManager ({ + connection, + deviceCount = 10, + spawnInterval = 200, + metricsInterval = 200, + sendInterval = 15000, + measurements, + onUpdateStats +}) { + if (!connection) { + throw new Error('Missing connection param') + } + if (!measurements) { + throw new Error('Missing measurements param') + } + let devices = [] + + let stats = { + devices: 0, + measurements: 0, + writes: 0, + size: 0, + errors: 0 + } + + /** + * Return device list + */ + const getDevices = () => { + return devices + } + + /** + * Return stats data + */ + const getStats = () => { + return stats + } + + /** + * Update stats on device data send event + */ + const onDeviceSendData = (err, data) => { + if (err) { + console.log(err) + stats.errors++ + return + } + stats.measurements += data.length + data.forEach(chunk => { + stats.size += sizeof(chunk) + }) + stats.writes++ + } + + /** + * Start device spawn process + */ + const start = () => { + let __spawnInterval = setInterval(() => { + stats.devices++ + devices.push(FakeDevice({ + deviceId: stats.devices, + connection, + measurements, + onSendData: onDeviceSendData, + autoStart: true, + metricsInterval, + sendInterval + })) + // when device limit is reached stop spawn inteval + if (stats.devices === deviceCount) { + clearTimeout(__spawnInterval) + } + }, spawnInterval) + } + + /** + * Stop all spawned devices + */ + const stop = () => { + for (let i = 0; i < stats.devices; i++) { + devices[i].stop() + } + } + + return { + start, + stop, + getDevices, + getStats + } +} diff --git a/src/fake-device.js b/src/fake-device.js new file mode 100644 index 0000000..a2ba70d --- /dev/null +++ b/src/fake-device.js @@ -0,0 +1,111 @@ +/** + * Create FakeDevice + * + * @param {any} { + * deviceId, + * metricsInterval = 100, + * sendInterval = 15000 + * } + * @returns FakeDevice + */ +module.exports = function FakeDevice ({ + deviceId, + connection, + metricsInterval = 100, + sendInterval = 30000, + onSendData, + autoStart = false, + measurements +}) { + if (!deviceId) { + throw new Error('Missing deviceId param') + } + if (!connection) { + throw new Error('Missing connection param') + } + if (!measurements) { + throw new Error('Missing measurements param') + } + + let data = [] + + let __metricsInterval = null + let __sendInterval = null + + /** + * Return data buffer + */ + const getData = () => { + return data + } + + /** + * Read data from sensors + * Will generate fake random data + */ + const readData = () => { + function getRandomArbitrary (min, max) { + return Math.random() * (max - min) + min + } + + let chunk = {} + Object.keys(measurements).forEach(measurement => { + switch (measurements[measurement].type) { + case 'integer': + chunk[measurement] = getRandomArbitrary(measurements[measurement].min, measurements[measurement].max) + break + default: + throw new Error('Invalid measurement type') + } + }) + chunk['timestamp'] = new Date() + + data.push(chunk) + } + + /** + * Send data to db and clear data buffer + * Calls onSendData function if set + */ + const send = () => { + // send data buffer to connection + connection.send(deviceId, data) + .then(() => { + if (!onSendData) return + onSendData(null, data) + // clear internal data buffer on successfull send + data = [] + }) + .catch(err => { + if (!onSendData) return + onSendData(err) + }) + } + + /** + * Stop collecting data + */ + const stop = () => { + clearInterval(__metricsInterval) + clearInterval(__sendInterval) + } + + /** + * Start collecting data + */ + const start = () => { + __metricsInterval = setInterval(readData, metricsInterval) + __sendInterval = setInterval(send, sendInterval) + } + + if (autoStart) { + start() + } + + return { + stop, + start, + send, + getData + } +} diff --git a/src/influx.js b/src/influx.js new file mode 100644 index 0000000..7e1ee1b --- /dev/null +++ b/src/influx.js @@ -0,0 +1,51 @@ +const Influx = require('influx') + +module.exports = function init () { + const influx = new Influx.InfluxDB({ + host: process.env.INFLUX_HOST || '172.18.0.1', + database: process.env.INFLUX_DATABASE || 'dummy-data', + schema: [ + { + measurement: 'temperature', + fields: { + value: Influx.FieldType.INTEGER + }, + tags: [ + 'deviceId' + ] + }, + { + measurement: 'air_humidity', + fields: { + value: Influx.FieldType.INTEGER + }, + tags: [ + 'deviceId' + ] + }, + { + measurement: 'ground_humidity', + fields: { + value: Influx.FieldType.INTEGER + }, + tags: [ + 'deviceId' + ] + }, + { + measurement: 'airforce', + fields: { + value: Influx.FieldType.INTEGER + }, + tags: [ + 'deviceId' + ] + } + ] + }) + influx.createDatabase(process.env.INFLUX_DATABASE) + .catch(console.error) + + return influx +} + diff --git a/src/stats.js b/src/stats.js new file mode 100644 index 0000000..2d22f14 --- /dev/null +++ b/src/stats.js @@ -0,0 +1,17 @@ +const ProgressBar = require('progress') + +module.exports = function stats (deviceManager) { + const bar = new ProgressBar('Time :elapsed: | Devices :devices | Measurements :measurements | Writes :writes | Size :size kb | Errors :errors', {total: 1000000}) + + const update = stats => { + stats = JSON.parse(JSON.stringify(stats)) + stats.size = Math.round((stats.size / 1024).toFixed(2)) + bar.tick(stats) + } + + setInterval(() => { + update(deviceManager.getStats()) + }, 100) + + return update +}