cse47242
4 years ago
5 changed files with 312 additions and 0 deletions
@ -0,0 +1,37 @@ |
|||||
|
const influx = require('./influx')() |
||||
|
|
||||
|
module.exports = function () { |
||||
|
/** |
||||
|
* Generates writepoints for influx bluk write operation |
||||
|
*/ |
||||
|
const getWritePoints = (deviceId, data) => { |
||||
|
let writePoints = [] |
||||
|
// itterate all data chunks
|
||||
|
data.forEach(chunk => { |
||||
|
// and for each data chunk key, create measurement and push it into writePoint (bulk-write)
|
||||
|
Object |
||||
|
.keys(chunk) |
||||
|
.filter(measurement => measurement !== 'timestamp') |
||||
|
.forEach(measurement => { |
||||
|
writePoints.push({ |
||||
|
measurement, |
||||
|
tags: { |
||||
|
deviceId |
||||
|
}, |
||||
|
fields: { |
||||
|
value: chunk[measurement] |
||||
|
}, |
||||
|
timestamp: chunk['timestamp'] |
||||
|
}) |
||||
|
}) |
||||
|
}) |
||||
|
return writePoints |
||||
|
} |
||||
|
|
||||
|
const send = function (deviceId, data) { |
||||
|
return influx.writePoints(getWritePoints(deviceId, data)) |
||||
|
} |
||||
|
return { |
||||
|
send |
||||
|
} |
||||
|
} |
@ -0,0 +1,96 @@ |
|||||
|
const sizeof = require('object-sizeof') |
||||
|
const FakeDevice = require('./fake-device') |
||||
|
|
||||
|
module.exports = function FakeDeviceManager ({ |
||||
|
connection, |
||||
|
deviceCount = 10, |
||||
|
spawnInterval = 200, |
||||
|
metricsInterval = 200, |
||||
|
sendInterval = 15000, |
||||
|
measurements, |
||||
|
onUpdateStats |
||||
|
}) { |
||||
|
if (!connection) { |
||||
|
throw new Error('Missing connection param') |
||||
|
} |
||||
|
if (!measurements) { |
||||
|
throw new Error('Missing measurements param') |
||||
|
} |
||||
|
let devices = [] |
||||
|
|
||||
|
let stats = { |
||||
|
devices: 0, |
||||
|
measurements: 0, |
||||
|
writes: 0, |
||||
|
size: 0, |
||||
|
errors: 0 |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* Return device list |
||||
|
*/ |
||||
|
const getDevices = () => { |
||||
|
return devices |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* Return stats data |
||||
|
*/ |
||||
|
const getStats = () => { |
||||
|
return stats |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* Update stats on device data send event |
||||
|
*/ |
||||
|
const onDeviceSendData = (err, data) => { |
||||
|
if (err) { |
||||
|
console.log(err) |
||||
|
stats.errors++ |
||||
|
return |
||||
|
} |
||||
|
stats.measurements += data.length |
||||
|
data.forEach(chunk => { |
||||
|
stats.size += sizeof(chunk) |
||||
|
}) |
||||
|
stats.writes++ |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* Start device spawn process |
||||
|
*/ |
||||
|
const start = () => { |
||||
|
let __spawnInterval = setInterval(() => { |
||||
|
stats.devices++ |
||||
|
devices.push(FakeDevice({ |
||||
|
deviceId: stats.devices, |
||||
|
connection, |
||||
|
measurements, |
||||
|
onSendData: onDeviceSendData, |
||||
|
autoStart: true, |
||||
|
metricsInterval, |
||||
|
sendInterval |
||||
|
})) |
||||
|
// when device limit is reached stop spawn inteval
|
||||
|
if (stats.devices === deviceCount) { |
||||
|
clearTimeout(__spawnInterval) |
||||
|
} |
||||
|
}, spawnInterval) |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* Stop all spawned devices |
||||
|
*/ |
||||
|
const stop = () => { |
||||
|
for (let i = 0; i < stats.devices; i++) { |
||||
|
devices[i].stop() |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
return { |
||||
|
start, |
||||
|
stop, |
||||
|
getDevices, |
||||
|
getStats |
||||
|
} |
||||
|
} |
@ -0,0 +1,111 @@ |
|||||
|
/** |
||||
|
* Create FakeDevice |
||||
|
* |
||||
|
* @param {any} { |
||||
|
* deviceId, |
||||
|
* metricsInterval = 100, |
||||
|
* sendInterval = 15000 |
||||
|
* } |
||||
|
* @returns FakeDevice |
||||
|
*/ |
||||
|
module.exports = function FakeDevice ({ |
||||
|
deviceId, |
||||
|
connection, |
||||
|
metricsInterval = 100, |
||||
|
sendInterval = 30000, |
||||
|
onSendData, |
||||
|
autoStart = false, |
||||
|
measurements |
||||
|
}) { |
||||
|
if (!deviceId) { |
||||
|
throw new Error('Missing deviceId param') |
||||
|
} |
||||
|
if (!connection) { |
||||
|
throw new Error('Missing connection param') |
||||
|
} |
||||
|
if (!measurements) { |
||||
|
throw new Error('Missing measurements param') |
||||
|
} |
||||
|
|
||||
|
let data = [] |
||||
|
|
||||
|
let __metricsInterval = null |
||||
|
let __sendInterval = null |
||||
|
|
||||
|
/** |
||||
|
* Return data buffer |
||||
|
*/ |
||||
|
const getData = () => { |
||||
|
return data |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* Read data from sensors |
||||
|
* Will generate fake random data |
||||
|
*/ |
||||
|
const readData = () => { |
||||
|
function getRandomArbitrary (min, max) { |
||||
|
return Math.random() * (max - min) + min |
||||
|
} |
||||
|
|
||||
|
let chunk = {} |
||||
|
Object.keys(measurements).forEach(measurement => { |
||||
|
switch (measurements[measurement].type) { |
||||
|
case 'integer': |
||||
|
chunk[measurement] = getRandomArbitrary(measurements[measurement].min, measurements[measurement].max) |
||||
|
break |
||||
|
default: |
||||
|
throw new Error('Invalid measurement type') |
||||
|
} |
||||
|
}) |
||||
|
chunk['timestamp'] = new Date() |
||||
|
|
||||
|
data.push(chunk) |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* Send data to db and clear data buffer |
||||
|
* Calls onSendData function if set |
||||
|
*/ |
||||
|
const send = () => { |
||||
|
// send data buffer to connection
|
||||
|
connection.send(deviceId, data) |
||||
|
.then(() => { |
||||
|
if (!onSendData) return |
||||
|
onSendData(null, data) |
||||
|
// clear internal data buffer on successfull send
|
||||
|
data = [] |
||||
|
}) |
||||
|
.catch(err => { |
||||
|
if (!onSendData) return |
||||
|
onSendData(err) |
||||
|
}) |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* Stop collecting data |
||||
|
*/ |
||||
|
const stop = () => { |
||||
|
clearInterval(__metricsInterval) |
||||
|
clearInterval(__sendInterval) |
||||
|
} |
||||
|
|
||||
|
/** |
||||
|
* Start collecting data |
||||
|
*/ |
||||
|
const start = () => { |
||||
|
__metricsInterval = setInterval(readData, metricsInterval) |
||||
|
__sendInterval = setInterval(send, sendInterval) |
||||
|
} |
||||
|
|
||||
|
if (autoStart) { |
||||
|
start() |
||||
|
} |
||||
|
|
||||
|
return { |
||||
|
stop, |
||||
|
start, |
||||
|
send, |
||||
|
getData |
||||
|
} |
||||
|
} |
@ -0,0 +1,51 @@ |
|||||
|
const Influx = require('influx') |
||||
|
|
||||
|
module.exports = function init () { |
||||
|
const influx = new Influx.InfluxDB({ |
||||
|
host: process.env.INFLUX_HOST || '172.18.0.1', |
||||
|
database: process.env.INFLUX_DATABASE || 'dummy-data', |
||||
|
schema: [ |
||||
|
{ |
||||
|
measurement: 'temperature', |
||||
|
fields: { |
||||
|
value: Influx.FieldType.INTEGER |
||||
|
}, |
||||
|
tags: [ |
||||
|
'deviceId' |
||||
|
] |
||||
|
}, |
||||
|
{ |
||||
|
measurement: 'air_humidity', |
||||
|
fields: { |
||||
|
value: Influx.FieldType.INTEGER |
||||
|
}, |
||||
|
tags: [ |
||||
|
'deviceId' |
||||
|
] |
||||
|
}, |
||||
|
{ |
||||
|
measurement: 'ground_humidity', |
||||
|
fields: { |
||||
|
value: Influx.FieldType.INTEGER |
||||
|
}, |
||||
|
tags: [ |
||||
|
'deviceId' |
||||
|
] |
||||
|
}, |
||||
|
{ |
||||
|
measurement: 'airforce', |
||||
|
fields: { |
||||
|
value: Influx.FieldType.INTEGER |
||||
|
}, |
||||
|
tags: [ |
||||
|
'deviceId' |
||||
|
] |
||||
|
} |
||||
|
] |
||||
|
}) |
||||
|
influx.createDatabase(process.env.INFLUX_DATABASE) |
||||
|
.catch(console.error) |
||||
|
|
||||
|
return influx |
||||
|
} |
||||
|
|
@ -0,0 +1,17 @@ |
|||||
|
const ProgressBar = require('progress') |
||||
|
|
||||
|
module.exports = function stats (deviceManager) { |
||||
|
const bar = new ProgressBar('Time :elapsed: | Devices :devices | Measurements :measurements | Writes :writes | Size :size kb | Errors :errors', {total: 1000000}) |
||||
|
|
||||
|
const update = stats => { |
||||
|
stats = JSON.parse(JSON.stringify(stats)) |
||||
|
stats.size = Math.round((stats.size / 1024).toFixed(2)) |
||||
|
bar.tick(stats) |
||||
|
} |
||||
|
|
||||
|
setInterval(() => { |
||||
|
update(deviceManager.getStats()) |
||||
|
}, 100) |
||||
|
|
||||
|
return update |
||||
|
} |
Loading…
Reference in new issue