Pub/Sub (MQTT) controller gateway

Allows to communicate with Pub/Sub (MQTT, PSRT) equipment and 3rd party platforms.

Blocks

Block kinds

The controller supports 3 kinds of blocks:

  • input are used to set EVA ICS item states and execute local actions with pub/sub messages.

  • output are used to export EVA ICS item states and static payloads.

  • action_map are used to map EVA ICS actions. When an action is executed, the mapped payload is sent to a pub/sub server topic.

Parsing/creating payloads

Payloads are parsed/created with JSONPath lightweight implementation:

  • $. block root (the default path value, can be omitted)

  • $.[0] first element if the root block is an array

  • $.name - block field “name”

  • $.name.value - block field “name”, subfield “value”

  • $.name[0] - the first element of block field “name” if the field is an array

Array ranges are not supported. If a payload or its part is created as an array and no elements are set before the value index, their values are set to null.

Payloads can be packed/unpacked as JSON (json), MessagePack (msgpack) or as-is for strings/numbers (no, the default one).

See all the available parameters in the deployment template comments.

Input

For example, the remote sends data packed with JSON to a topic “lab/env” in the following format:

{
   "device": {
       "deviceStatus": "OK",
       "data": {
           "temp": {
               "deviceKind": "temperature sensor",
               "deviceValue": 25
           },
           "hum": {
               "deviceKind": "humidity sensor",
               "deviceValue": 50
           }
       }
   }
}

where deviceStatus can be “OK” or “ERROR”. Let us map the payload to a local sensor:env/temp and sensor:env/hum where deviceStatus goes to the item status and deviceValue goes to the item value.

input:
  - topic: lab/env
    packer: json
    map:
      - path: $.device.deviceStatus
        value_map:
         "OK": 1
         "ERROR": -1
        oid: sensor:env/temp
        process: status
      - path: $.device.data.temp.deviceValue
        oid: sensor:env/temp
        #process: value # the default, can be omitted
      - path: $.device.deviceStatus
        value_map:
         "OK": 1
         "ERROR": -1
        oid: sensor:env/hum
        process: status
      - path: $.device.data.hum.deviceValue
        oid: sensor:env/hum

Output

Output payloads are sent in two cases:

  • if there is at least one item OID in the payload, it is sent as soon as the item state has been changed, unless ignore_events is set to true

  • if there is an interval property set, the payload is sent with the specified interval (seconds)

Let us repeat the presented task but opposite, sending the payload when a local state is changed and additionally every 5 seconds:

output:
  - topic: lab/env
    interval: 5
    packer: json
    map:
      # as the equipment is a single physical sensor, the status register
      # can be got from any item mapped
      - path: $.device.deviceStatus
        value_map:
         "1": "OK"
         "-1": "ERROR"
        oid: sensor:env/temp
        prop: status
      - path: $.device.data.temp.deviceValue
        oid: sensor:env/temp
        #prop: value # the default, can be omitted
      - path: $.device.data.temp.deviceKind
        payload: "temperature sensor"
      - path: $.device.data.hum.deviceValue
        oid: sensor:env/hum
      - path: $.device.data.hum.deviceKind
        payload: "humidity sensor"

Note

Some platforms/devices require pub/sub announcements (e.g. equipment info) without an actual state. For such cases, regular output blocks, which contain no oid but payload only data can be used.

Action maps

Action maps are equal to output blocks with the following exception: if neither oid nor prop is specified in a mapping block, the action value is inserted.

Extra topics

Extra topics can be processed with lmacro. See configuration options for more details.

The target lmacro gets the following keyword arguments:

  • pubsub_topic message topic

  • pubsub_payload message payload

Note

The process lmacro always gets message paylaods as-is. JSON and other serialized data SHOULD be deserialized manually.

Extra outgoing payloads can be sent using EAPI bus call pubsub.publish to the service instance.

Setup

Use the template EVA_DIR/share/svc-tpl/svc-tpl-controller-pubsub.yml:

# EVA ICS pub/sub controller
command: svc/eva-controller-pubsub
workers: 1
bus:
  path: var/bus.ipc
config:
  # ttl cache for input events (sec, skip non-modified)
  input_cache_sec: 3600
  pubsub:
    # mqtt or psrt
    proto: mqtt
    # path to CA certificate file. Enables SSL if set
    ca_certs: null
    # single or multiple hosts
    host:
      - 127.0.0.1:1883
    # if more than a single host is specified, shuffle the list before connecting
    cluster_hosts_randomize: false
    # user name / password auth
    username: null
    password: null
    ping_interval: 10
    # pub/sub queue size
    queue_size: 1024
    # pub/sub QoS (not required for PSRT)
    qos: 1
  # parsing/formatting values is performed with a lightweight JsonPath syntax:
  # $.some.value - value is in a structure "some", field "value"
  # $.some[1].value[2] - work with array of structures
  # $.[1] - top-level array of values
  # $. - payload top-level (the path can be omitted)
  #
  # inputs are used to get item states from pub/sub
  input:
    # a pub/sub topic (or a topic mask) to watch
    - topic: lab/equipment/sensor1
      # no packer specified - use the topic value as-is (numbers are parsed)
      map:
        - path: $. # top-level (can be omitted)
          oid: sensor:tests/s1 # item OID
          # process as the item value
          # variants are: status (set item status), value (default, can be
          # omitted), action (execute a local action)
          process: value
          # value mapping. map "OFF" to 0, "ON" to 1
          #value_map:
            #"OFF": 0
            #"ON": 1
          #transform:
            #- func: multiply # multiply the value by N
              #params: [ 1000 ]
            #- func: divide # divide the value by N
              #params: [ 1000 ]
            #- func: round # round the value to N digits after comma
              #params: [ 2 ]
            #- func: calc_speed # use the value as calc-speed gauge (with N seconds delta)
              #params: [ 1 ]
            #- func: invert # invert the value between 0/1
            # #params: []
    # process a complex payload
    - topic: lab/equipment/some-light/STATE
      # unpack from JSON. valid values are: no (default), json, msgpack
      packer: json
      map:
        - path: $.Vcc
          oid: sensor:light/vcc
          # value_map and transform can be used
        - path: $.Wifi.RSSI
          oid: sensor:light/wifi/rssi
    # action topic, allows to run EVA ICS actions from pub/sub
    - topic: lab/equipment/door1
      map:
        - oid: unit:tests/door
          process: action
          # path, value_map and transform can be used in action topics as well
  # outputs are used to submit data into pub/sub
  # if no interval is specified and ignore_events is not set to false
  # (default), events are sent in real-time
  output:
    - topic: out/sensor1
      #qos: 1 # override the default QoS
      packer: json
      map:
        - path: $.deviceStatus
          oid: sensor:t1/temp
          # EVA ICS item property to set
          #
          # valid properties are:
          #
          # status - item status
          # value - item value
          # time - item state set time (float)
          # time_sec - item state set time in seconds (integer)
          # time_millis - set time in milliseconds
          # time_micros - set time in microseconds
          # time_nanos - set time in nanoseconds
          prop: status
          # value_map and transform can be used
          value_map:
            "1": Good
            "-1": Error
        - path: $.deviceState
          oid: sensor:t1/temp
          # value_map and transform can be used
          transform:
            - func: multiply
              params: [ 100 ]
        - path: $.time
          oid: sensor:t1/temp
          prop: time
        - path: $.source
          # fill the "source" field of the output structure to a static
          # value. The payload can be a string or a complex one. If the
          # string contains variables, they are automatically replaced with
          # their values. Valid variables are:
          #
          # ${system_name} - EVA ICS node name
          payload: "${system_name}"
        - path: $.version
          # fill the "version" field with a string with no variables
          payload: "1.0"
    - topic: out/sensor2
      # send the output every 5 seconds and ignore real-time events
      interval: 5
      ignore_events: true
      packer: json
      map:
        - path: $.deviceStatus
          oid: sensor:t1/hum
          prop: status
          value_map:
            "1": Good
            "-1": Error
        - path: $.deviceState
          oid: sensor:t1/hum
        - path: $.time
          oid: sensor:t1/temp
          prop: time
        - path: $.source
          payload: "${system_name}"
  # action maps are used to execute EVA ICS unit actions by sending pub/sub
  # payloads
  action_map:
    unit:tests/u1:
      # the mapping format is similar to outputs with the following exception:
      # if neither oid nor payload is specified, the value is taken from the
      # action payload
      #
      # e.g. the example below sends "OFF" when action requests to set the item
      # value to 0 and "ON" for 1
      topic: out/control/x
      #qos: 1
      map:
        - path: $.
          # value_map and transform can be used
          value_map:
            "0": "OFF"
            "1": "ON"
          # as well as outputs, action payloads can be structures with
          # additional data (states of other items, static strings etc.)
          #
  # extra topics to be manually processed with lmacro
  extra:
    topics:
      - some/topic/#
      - some/other/topic
    # the target lmacro gets the following kwargs:
    # pubsub_topic - message topic
    # pubsub_payload - message payload (as-is, serialized data e.g. JSON must
    # be deserialized manually)
    process: lmacro:process
user: nobody
react_to_fail: true

Create the service using eva-shell:

eva svc create eva.controller.pubsub1 /opt/eva4/share/svc-tpl/svc-tpl-controller-pubsub.yml

or using the bus CLI client:

cd /opt/eva4
cat DEPLOY.yml | ./bin/yml2mp | \
    ./sbin/bus ./var/bus.ipc rpc call eva.core svc.deploy -

(see eva.core::svc.deploy for more info)

EAPI methods

See EAPI commons for the common information about the bus, types, errors and RPC calls.

pubsub.publish

Description

Publish data to a pub/sub server topic

Parameters

required

Returns

nothing

Parameters

Name

Type

Description

Required

topic

String

Server topic

yes

payload

Any

Data payload

yes

qos

i32

Operation QoS

no

packer

String

Data packer: no, json, msgpack (default: no, send data as-is)

no