-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathMessage aggregation with Key Values
91 lines (79 loc) · 5 KB
/
Message aggregation with Key Values
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# _____ _ _ _____ _____ _____ _____
#| ___|| \ | |/ __ \| _ | |_ _|| _ |
#| |__ | \| || / \/| | | | | | | | | |
#| __| | . ` || | | | | | | | | | | |
#| |___ | |\ || \__/\\ \_/ / _ _| |_ \ \_/ /
#\____/ \_| \_/ \____/ \___/ (_) \___/ \___/
#__________________Generated script by enco.io
# This scripts receives separate sensor values from a eWattch Ambiance device : temperature, humidity, luminosity and presence
# See http://www.ewattch.com/ for info on these devices
# Script will wait until it has received the 4 sensor data from the device, and send a reconstructed json with all data at once
# For the presence sensor (PIR sensor), it will keep the formerly received value and send the difference with the newly received value
# Storing data in between messages is done using the "keyvalues" module
# Finally, data is sent over MQTT to IBM Watson IOT
# Basic imports
object json = create("Json");
object array = create("Array");
object debug = create("Debug");
object containerstore = create("KeyValue");
# Entrypoint
function run(object data, object tags, string asset) {
# HTTP input from the sensor, filtering on selected tags
# Declare the json parser structure to extract the sensor values more easily
if( (array.containsAll(tags, ["decoded", "HTTP", "ambiance"])) && asset == "cloudchannels" ) {
object combinedData = [];
object jsonObjectField;
# Fields
jsonObjectField = json.parse(data["QUERY"]);
combinedData = array.merge(combinedData, jsonObjectField.getFlatArrayWithPrefix("QUERY"));
jsonObjectField = json.parse(data["HEADER"]);
combinedData = array.merge(combinedData, jsonObjectField.getFlatArrayWithPrefix("HEADER"));
# Parser
string jsonStructure = "{\"type\":\"object\",\"properties\":{\"companyName\":{\"type\":\"string\",\"isRequired\":false},\"DevEUI\":{\"type\":\"string\"},\"container\":{\"isRequired\":false,\"type\":\"string\"},\"locationFriendlyName1\":{\"isRequired\":false,\"type\":\"string\"},\"value\":{\"isRequired\":false,\"type\":\"string\"},\"timestamp\":{\"isRequired\":false,\"type\":\"string\"}},\"required\":[\"DevEUI\"]}";
object jsonObj = data["BODY"];
object jsonParsed = json.validate(jsonObj.getString(), jsonStructure);
combinedData = array.merge(combinedData, jsonParsed.getFlatArray());
}
# If we haven't received all sensor data, checking with our keyvalue object
if(containerstore.get("luminosity")==null || containerstore.get("presence")==null || containerstore.get("humidity")==null || containerstore.get("temperature")==null) {
if(combinedData["container"]=="presence") {
debug.log("container is presence");
# Again use the keyvalue feature to check on former received sensor value, if already received
if (containerstore.get("previouspresence")==null) {
containerstore.put("previouspresence","0");
}
int previous = containerstore.get("previouspresence");
int presence = combinedData["value"];
containerstore.put("previouspresence",presence);
# The sensor is 16 bit, so we need to check if the sensor has gone past its max value and restarted counting at 0
if(presence>=previous) {
presence = presence - previous;
containerstore.put("presence",presence);
} else {
int overpresence=65535-previous+presence;
containerstore.put("presence",overpresence);
}
# If the received container is something else than "presence", just store it in the keyvalue
} else {
containerstore.put(combinedData["container"],combinedData["value"]);
}
}
# If all 4 expected sensor values have been received and store in our keyvalue, send a new json structure to our outgoing endpoints
if(containerstore.get("luminosity")!=null && containerstore.get("presence")!=null && containerstore.get("humidity")!=null && containerstore.get("temperature")!=null) {
string webpayload = "{\"luminosity\":"+containerstore.get("luminosity")+",\"presence\":"+containerstore.get("presence")+",\"humidity\":"+containerstore.get("humidity")+",\"temperature\":"+containerstore.get("temperature")+"}";
# send to IBM Watson IOT via MQTT
object mqtt = create("MQTT", "ssl://ourorgid.messaging.internetofthings.ibmcloud.com", "iot-2/evt/data/fmt/json");
mqtt.setUsername("use-token-auth");
mqtt.setPassword("ourdevicetoken");
mqtt.setQos(0);
mqtt.setClientId("d:ourorgid:ewattch:ambiance01");
mqtt.setPayload("{\"d\":"+webpayload+"}");
mqtt.send();
debug.log("MQTT sent");
# now let's reset the keyvalues so we are ready to start all over again
containerstore.delete("luminosity");
containerstore.delete("presence");
containerstore.delete("humidity");
containerstore.delete("temperature");
}
}