Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ Changelog
incomplete
==========
- Add possibility to acquire bulk readings in JSON format
- Add possibility to acquire bulk readings in compact JSON format,
with timestamps as keys


in progress
Expand Down
7 changes: 7 additions & 0 deletions kotori/daq/decoder/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
# (c) 2019-2021 Andreas Motl <andreas@getkotori.org>
from kotori.daq.decoder.airrohr import AirrohrDecoder
from kotori.daq.decoder.json import CompactTimestampedJsonDecoder
from kotori.daq.decoder.tasmota import TasmotaSensorDecoder, TasmotaStateDecoder
from kotori.daq.decoder.schema import MessageType

Expand All @@ -23,6 +24,12 @@ def probe(self):
if 'slot' not in self.topology:
return False

# Compact JSON format, with timestamps as keys
if self.topology.slot.endswith('tc.json'):
self.info.message_type = MessageType.DATA_CONTAINER
self.info.decoder = CompactTimestampedJsonDecoder
return True

# Airrohr
if self.topology.slot.endswith('airrohr.json'):
self.info.message_type = MessageType.DATA_CONTAINER
Expand Down
44 changes: 44 additions & 0 deletions kotori/daq/decoder/json.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# -*- coding: utf-8 -*-
# (c) 2021 Andreas Motl <andreas@getkotori.org>
import json


class CompactTimestampedJsonDecoder:
"""
Decode JSON payloads in compact format, with timestamps as keys.

Documentation
=============
- https://getkotori.org/docs/handbook/decoders/json.html (not yet)
- https://github.com/daq-tools/kotori/issues/39

Example
=======
::

{
"1611082554": {
"temperature": 21.42,
"humidity": 41.55
},
"1611082568": {
"temperature": 42.84,
"humidity": 83.1
}
}

"""

@staticmethod
def decode(payload):

# Decode from JSON.
message = json.loads(payload)

# Create list of data dictionaries.
data = []
for timestamp, item in message.items():
item["time"] = timestamp
data.append(item)

return data
1 change: 1 addition & 0 deletions test/settings/mqttkit.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class TestSettings:
mqtt_topic3_json = 'mqttkit-1/itest3/foo/bar/data.json'
mqtt_topic_event = 'mqttkit-1/itest/foo/bar/event.json'
mqtt_topic_homie = 'mqttkit-1/itest/foo/bar/data/__json__'
mqtt_topic_json_compact = 'mqttkit-1/itest/foo/bar/tc.json'
mqtt_topic_json_legacy = 'mqttkit-1/itest/foo/bar/message-json'

# HTTP channel settings.
Expand Down
34 changes: 34 additions & 0 deletions test/test_daq_mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,40 @@ def test_mqtt_to_influxdb_json_bulk(machinery, create_influxdb, reset_influxdb):
assert record == {u'temperature': 42.84, u'humidity': 83.1}


@pytest_twisted.inlineCallbacks
@pytest.mark.mqtt
def test_mqtt_to_influxdb_json_compact_bulk(machinery, create_influxdb, reset_influxdb):
"""
Publish multiple readings in compact JSON format to MQTT broker
and proof they are stored in the InfluxDB database.

https://github.com/daq-tools/kotori/issues/39
"""

# Submit multiple measurements, with timestamp.
data = {
"1611082554": {
"temperature": 21.42,
"humidity": 41.55,
},
"1611082568": {
"temperature": 42.84,
"humidity": 83.1,
},
}
yield threads.deferToThread(mqtt_json_sensor, settings.mqtt_topic_json_compact, data)

# Wait for some time to process the message.
yield sleep(PROCESS_DELAY_MQTT)

# Proof that data arrived in InfluxDB.
record = influx_sensors.get_record(index=0)
assert record == {u'time': '2021-01-19T18:55:54Z', u'temperature': 21.42, u'humidity': 41.55}

record = influx_sensors.get_record(index=1)
assert record == {u'time': '2021-01-19T18:56:08Z', u'temperature': 42.84, u'humidity': 83.1}


@pytest_twisted.inlineCallbacks
@pytest.mark.mqtt
@pytest.mark.legacy
Expand Down