Pub/Sub (MQTT) controller gateway
Contents
Allows to communicate with Pub/Sub (MQTT, PSRT) equipment and 3rd party platforms.
Blocks
Block kinds
The controller supports 3 kinds of blocks:
input are used to set EVA ICS item states and execute local actions with pub/sub messages.
output are used to export EVA ICS item states and static payloads.
action_map are used to map EVA ICS actions. When an action is executed, the mapped payload is sent to a pub/sub server topic.
Parsing/creating payloads
Payloads are parsed/created with JSONPath lightweight implementation:
$. block root (the default path value, can be omitted)
$.[0] first element if the root block is an array
$.name - block field “name”
$.name.value - block field “name”, subfield “value”
$.name[0] - the first element of block field “name” if the field is an array
Array ranges are not supported. If a payload or its part is created as an array and no elements are set before the value index, their values are set to null.
Payloads can be packed/unpacked as JSON (json), MessagePack (msgpack) or as-is for strings/numbers (no, the default one).
See all the available parameters in the deployment template comments.
Input
For example, the remote sends data packed with JSON to a topic “lab/env” in the following format:
{
"device": {
"deviceStatus": "OK",
"data": {
"temp": {
"deviceKind": "temperature sensor",
"deviceValue": 25
},
"hum": {
"deviceKind": "humidity sensor",
"deviceValue": 50
}
}
}
}
where deviceStatus can be “OK” or “ERROR”. Let us map the payload to a local sensor:env/temp and sensor:env/hum where deviceStatus goes to the item status and deviceValue goes to the item value.
input:
- topic: lab/env
packer: json
map:
- path: $.device.deviceStatus
value_map:
"OK": 1
"ERROR": -1
oid: sensor:env/temp
process: status
- path: $.device.data.temp.deviceValue
oid: sensor:env/temp
#process: value # the default, can be omitted
- path: $.device.deviceStatus
value_map:
"OK": 1
"ERROR": -1
oid: sensor:env/hum
process: status
- path: $.device.data.hum.deviceValue
oid: sensor:env/hum
Output
Output payloads are sent in two cases:
if there is at least one item OID in the payload, it is sent as soon as the item state has been changed, unless ignore_events is set to true
if there is an interval property set, the payload is sent with the specified interval (seconds)
Let us repeat the presented task but opposite, sending the payload when a local state is changed and additionally every 5 seconds:
output:
- topic: lab/env
interval: 5
packer: json
map:
# as the equipment is a single physical sensor, the status register
# can be got from any item mapped
- path: $.device.deviceStatus
value_map:
"1": "OK"
"-1": "ERROR"
oid: sensor:env/temp
prop: status
- path: $.device.data.temp.deviceValue
oid: sensor:env/temp
#prop: value # the default, can be omitted
- path: $.device.data.temp.deviceKind
payload: "temperature sensor"
- path: $.device.data.hum.deviceValue
oid: sensor:env/hum
- path: $.device.data.hum.deviceKind
payload: "humidity sensor"
Note
Some platforms/devices require pub/sub announcements (e.g. equipment info) without an actual state. For such cases, regular output blocks, which contain no oid but payload only data can be used.
Action maps
Action maps are equal to output blocks with the following exception: if neither oid nor prop is specified in a mapping block, the action value is inserted.
Extra topics
Extra topics can be processed with lmacro. See configuration options for more details.
The target lmacro gets the following keyword arguments:
pubsub_topic message topic
pubsub_payload message payload
Note
The process lmacro always gets message paylaods as-is. JSON and other serialized data SHOULD be deserialized manually.
Extra outgoing payloads can be sent using EAPI bus call pubsub.publish to the service instance.
Setup
Use the template EVA_DIR/share/svc-tpl/svc-tpl-controller-pubsub.yml:
# EVA ICS pub/sub controller
command: svc/eva-controller-pubsub
workers: 1
bus:
path: var/bus.ipc
config:
# ttl cache for input events (sec, skip non-modified)
input_cache_sec: 3600
pubsub:
# mqtt or psrt
proto: mqtt
# path to CA certificate file. Enables SSL if set
ca_certs: null
# single or multiple hosts
host:
- 127.0.0.1:1883
# if more than a single host is specified, shuffle the list before connecting
cluster_hosts_randomize: false
# user name / password auth
username: null
password: null
ping_interval: 10
# pub/sub queue size
queue_size: 1024
# pub/sub QoS (not required for PSRT)
qos: 1
# parsing/formatting values is performed with a lightweight JsonPath syntax:
# $.some.value - value is in a structure "some", field "value"
# $.some[1].value[2] - work with array of structures
# $.[1] - top-level array of values
# $. - payload top-level (the path can be omitted)
#
# inputs are used to get item states from pub/sub
input:
# a pub/sub topic (or a topic mask) to watch
- topic: lab/equipment/sensor1
# no packer specified - use the topic value as-is (numbers are parsed)
map:
- path: $. # top-level (can be omitted)
oid: sensor:tests/s1 # item OID
# process as the item value
# variants are: status (set item status), value (default, can be
# omitted), action (execute a local action)
process: value
# value mapping. map "OFF" to 0, "ON" to 1
#value_map:
#"OFF": 0
#"ON": 1
#transform:
#- func: multiply # multiply the value by N
#params: [ 1000 ]
#- func: divide # divide the value by N
#params: [ 1000 ]
#- func: round # round the value to N digits after comma
#params: [ 2 ]
#- func: calc_speed # use the value as calc-speed gauge (with N seconds delta)
#params: [ 1 ]
#- func: invert # invert the value between 0/1
# #params: []
# process a complex payload
- topic: lab/equipment/some-light/STATE
# unpack from JSON. valid values are: no (default), json, msgpack
packer: json
map:
- path: $.Vcc
oid: sensor:light/vcc
# value_map and transform can be used
- path: $.Wifi.RSSI
oid: sensor:light/wifi/rssi
# action topic, allows to run EVA ICS actions from pub/sub
- topic: lab/equipment/door1
map:
- oid: unit:tests/door
process: action
# path, value_map and transform can be used in action topics as well
# outputs are used to submit data into pub/sub
# if no interval is specified and ignore_events is not set to false
# (default), events are sent in real-time
output:
- topic: out/sensor1
#qos: 1 # override the default QoS
packer: json
map:
- path: $.deviceStatus
oid: sensor:t1/temp
# EVA ICS item property to set
#
# valid properties are:
#
# status - item status
# value - item value
# time - item state set time (float)
# time_sec - item state set time in seconds (integer)
# time_millis - set time in milliseconds
# time_micros - set time in microseconds
# time_nanos - set time in nanoseconds
prop: status
# value_map and transform can be used
value_map:
"1": Good
"-1": Error
- path: $.deviceState
oid: sensor:t1/temp
# value_map and transform can be used
transform:
- func: multiply
params: [ 100 ]
- path: $.time
oid: sensor:t1/temp
prop: time
- path: $.source
# fill the "source" field of the output structure to a static
# value. The payload can be a string or a complex one. If the
# string contains variables, they are automatically replaced with
# their values. Valid variables are:
#
# ${system_name} - EVA ICS node name
payload: "${system_name}"
- path: $.version
# fill the "version" field with a string with no variables
payload: "1.0"
- topic: out/sensor2
# send the output every 5 seconds and ignore real-time events
interval: 5
ignore_events: true
packer: json
map:
- path: $.deviceStatus
oid: sensor:t1/hum
prop: status
value_map:
"1": Good
"-1": Error
- path: $.deviceState
oid: sensor:t1/hum
- path: $.time
oid: sensor:t1/temp
prop: time
- path: $.source
payload: "${system_name}"
# action maps are used to execute EVA ICS unit actions by sending pub/sub
# payloads
action_map:
unit:tests/u1:
# the mapping format is similar to outputs with the following exception:
# if neither oid nor payload is specified, the value is taken from the
# action payload
#
# e.g. the example below sends "OFF" when action requests to set the item
# value to 0 and "ON" for 1
topic: out/control/x
#qos: 1
map:
- path: $.
# value_map and transform can be used
value_map:
"0": "OFF"
"1": "ON"
# as well as outputs, action payloads can be structures with
# additional data (states of other items, static strings etc.)
#
# extra topics to be manually processed with lmacro
extra:
topics:
- some/topic/#
- some/other/topic
# the target lmacro gets the following kwargs:
# pubsub_topic - message topic
# pubsub_payload - message payload (as-is, serialized data e.g. JSON must
# be deserialized manually)
process: lmacro:process
user: nobody
react_to_fail: true
Create the service using eva-shell:
eva svc create eva.controller.pubsub1 /opt/eva4/share/svc-tpl/svc-tpl-controller-pubsub.yml
or using the bus CLI client:
cd /opt/eva4
cat DEPLOY.yml | ./bin/yml2mp | \
./sbin/bus ./var/bus.ipc rpc call eva.core svc.deploy -
(see eva.core::svc.deploy for more info)
EAPI methods
See EAPI commons for the common information about the bus, types, errors and RPC calls.
pubsub.publish
Description |
Publish data to a pub/sub server topic |
Parameters |
required |
Returns |
nothing |
Name |
Type |
Description |
Required |
topic |
String |
Server topic |
yes |
payload |
Any |
Data payload |
yes |
qos |
i32 |
Operation QoS |
no |
packer |
String |
Data packer: no, json, msgpack (default: no, send data as-is) |
no |