cse47122
4 years ago
5 changed files with 312 additions and 0 deletions
@ -0,0 +1,37 @@ |
|||
const influx = require('./influx')() |
|||
|
|||
module.exports = function () { |
|||
/** |
|||
* Generates writepoints for influx bluk write operation |
|||
*/ |
|||
const getWritePoints = (deviceId, data) => { |
|||
let writePoints = [] |
|||
// itterate all data chunks
|
|||
data.forEach(chunk => { |
|||
// and for each data chunk key, create measurement and push it into writePoint (bulk-write)
|
|||
Object |
|||
.keys(chunk) |
|||
.filter(measurement => measurement !== 'timestamp') |
|||
.forEach(measurement => { |
|||
writePoints.push({ |
|||
measurement, |
|||
tags: { |
|||
deviceId |
|||
}, |
|||
fields: { |
|||
value: chunk[measurement] |
|||
}, |
|||
timestamp: chunk['timestamp'] |
|||
}) |
|||
}) |
|||
}) |
|||
return writePoints |
|||
} |
|||
|
|||
const send = function (deviceId, data) { |
|||
return influx.writePoints(getWritePoints(deviceId, data)) |
|||
} |
|||
return { |
|||
send |
|||
} |
|||
} |
@ -0,0 +1,96 @@ |
|||
const sizeof = require('object-sizeof') |
|||
const FakeDevice = require('./fake-device') |
|||
|
|||
module.exports = function FakeDeviceManager ({ |
|||
connection, |
|||
deviceCount = 10, |
|||
spawnInterval = 200, |
|||
metricsInterval = 200, |
|||
sendInterval = 15000, |
|||
measurements, |
|||
onUpdateStats |
|||
}) { |
|||
if (!connection) { |
|||
throw new Error('Missing connection param') |
|||
} |
|||
if (!measurements) { |
|||
throw new Error('Missing measurements param') |
|||
} |
|||
let devices = [] |
|||
|
|||
let stats = { |
|||
devices: 0, |
|||
measurements: 0, |
|||
writes: 0, |
|||
size: 0, |
|||
errors: 0 |
|||
} |
|||
|
|||
/** |
|||
* Return device list |
|||
*/ |
|||
const getDevices = () => { |
|||
return devices |
|||
} |
|||
|
|||
/** |
|||
* Return stats data |
|||
*/ |
|||
const getStats = () => { |
|||
return stats |
|||
} |
|||
|
|||
/** |
|||
* Update stats on device data send event |
|||
*/ |
|||
const onDeviceSendData = (err, data) => { |
|||
if (err) { |
|||
console.log(err) |
|||
stats.errors++ |
|||
return |
|||
} |
|||
stats.measurements += data.length |
|||
data.forEach(chunk => { |
|||
stats.size += sizeof(chunk) |
|||
}) |
|||
stats.writes++ |
|||
} |
|||
|
|||
/** |
|||
* Start device spawn process |
|||
*/ |
|||
const start = () => { |
|||
let __spawnInterval = setInterval(() => { |
|||
stats.devices++ |
|||
devices.push(FakeDevice({ |
|||
deviceId: stats.devices, |
|||
connection, |
|||
measurements, |
|||
onSendData: onDeviceSendData, |
|||
autoStart: true, |
|||
metricsInterval, |
|||
sendInterval |
|||
})) |
|||
// when device limit is reached stop spawn inteval
|
|||
if (stats.devices === deviceCount) { |
|||
clearTimeout(__spawnInterval) |
|||
} |
|||
}, spawnInterval) |
|||
} |
|||
|
|||
/** |
|||
* Stop all spawned devices |
|||
*/ |
|||
const stop = () => { |
|||
for (let i = 0; i < stats.devices; i++) { |
|||
devices[i].stop() |
|||
} |
|||
} |
|||
|
|||
return { |
|||
start, |
|||
stop, |
|||
getDevices, |
|||
getStats |
|||
} |
|||
} |
@ -0,0 +1,111 @@ |
|||
/** |
|||
* Create FakeDevice |
|||
* |
|||
* @param {any} { |
|||
* deviceId, |
|||
* metricsInterval = 100, |
|||
* sendInterval = 15000 |
|||
* } |
|||
* @returns FakeDevice |
|||
*/ |
|||
module.exports = function FakeDevice ({ |
|||
deviceId, |
|||
connection, |
|||
metricsInterval = 100, |
|||
sendInterval = 30000, |
|||
onSendData, |
|||
autoStart = false, |
|||
measurements |
|||
}) { |
|||
if (!deviceId) { |
|||
throw new Error('Missing deviceId param') |
|||
} |
|||
if (!connection) { |
|||
throw new Error('Missing connection param') |
|||
} |
|||
if (!measurements) { |
|||
throw new Error('Missing measurements param') |
|||
} |
|||
|
|||
let data = [] |
|||
|
|||
let __metricsInterval = null |
|||
let __sendInterval = null |
|||
|
|||
/** |
|||
* Return data buffer |
|||
*/ |
|||
const getData = () => { |
|||
return data |
|||
} |
|||
|
|||
/** |
|||
* Read data from sensors |
|||
* Will generate fake random data |
|||
*/ |
|||
const readData = () => { |
|||
function getRandomArbitrary (min, max) { |
|||
return Math.random() * (max - min) + min |
|||
} |
|||
|
|||
let chunk = {} |
|||
Object.keys(measurements).forEach(measurement => { |
|||
switch (measurements[measurement].type) { |
|||
case 'integer': |
|||
chunk[measurement] = getRandomArbitrary(measurements[measurement].min, measurements[measurement].max) |
|||
break |
|||
default: |
|||
throw new Error('Invalid measurement type') |
|||
} |
|||
}) |
|||
chunk['timestamp'] = new Date() |
|||
|
|||
data.push(chunk) |
|||
} |
|||
|
|||
/** |
|||
* Send data to db and clear data buffer |
|||
* Calls onSendData function if set |
|||
*/ |
|||
const send = () => { |
|||
// send data buffer to connection
|
|||
connection.send(deviceId, data) |
|||
.then(() => { |
|||
if (!onSendData) return |
|||
onSendData(null, data) |
|||
// clear internal data buffer on successfull send
|
|||
data = [] |
|||
}) |
|||
.catch(err => { |
|||
if (!onSendData) return |
|||
onSendData(err) |
|||
}) |
|||
} |
|||
|
|||
/** |
|||
* Stop collecting data |
|||
*/ |
|||
const stop = () => { |
|||
clearInterval(__metricsInterval) |
|||
clearInterval(__sendInterval) |
|||
} |
|||
|
|||
/** |
|||
* Start collecting data |
|||
*/ |
|||
const start = () => { |
|||
__metricsInterval = setInterval(readData, metricsInterval) |
|||
__sendInterval = setInterval(send, sendInterval) |
|||
} |
|||
|
|||
if (autoStart) { |
|||
start() |
|||
} |
|||
|
|||
return { |
|||
stop, |
|||
start, |
|||
send, |
|||
getData |
|||
} |
|||
} |
@ -0,0 +1,51 @@ |
|||
const Influx = require('influx') |
|||
|
|||
module.exports = function init () { |
|||
const influx = new Influx.InfluxDB({ |
|||
host: process.env.INFLUX_HOST || '172.18.0.1', |
|||
database: process.env.INFLUX_DATABASE || 'dummy-data', |
|||
schema: [ |
|||
{ |
|||
measurement: 'temperature', |
|||
fields: { |
|||
value: Influx.FieldType.INTEGER |
|||
}, |
|||
tags: [ |
|||
'deviceId' |
|||
] |
|||
}, |
|||
{ |
|||
measurement: 'air_humidity', |
|||
fields: { |
|||
value: Influx.FieldType.INTEGER |
|||
}, |
|||
tags: [ |
|||
'deviceId' |
|||
] |
|||
}, |
|||
{ |
|||
measurement: 'ground_humidity', |
|||
fields: { |
|||
value: Influx.FieldType.INTEGER |
|||
}, |
|||
tags: [ |
|||
'deviceId' |
|||
] |
|||
}, |
|||
{ |
|||
measurement: 'airforce', |
|||
fields: { |
|||
value: Influx.FieldType.INTEGER |
|||
}, |
|||
tags: [ |
|||
'deviceId' |
|||
] |
|||
} |
|||
] |
|||
}) |
|||
influx.createDatabase(process.env.INFLUX_DATABASE) |
|||
.catch(console.error) |
|||
|
|||
return influx |
|||
} |
|||
|
@ -0,0 +1,17 @@ |
|||
const ProgressBar = require('progress') |
|||
|
|||
module.exports = function stats (deviceManager) { |
|||
const bar = new ProgressBar('Time :elapsed: | Devices :devices | Measurements :measurements | Writes :writes | Size :size kb | Errors :errors', {total: 1000000}) |
|||
|
|||
const update = stats => { |
|||
stats = JSON.parse(JSON.stringify(stats)) |
|||
stats.size = Math.round((stats.size / 1024).toFixed(2)) |
|||
bar.tick(stats) |
|||
} |
|||
|
|||
setInterval(() => { |
|||
update(deviceManager.getStats()) |
|||
}, 100) |
|||
|
|||
return update |
|||
} |
Loading…
Reference in new issue