A simple service in Python

SDK Installation

If EVA ICS v4 venv is already configured (manually or the node is installed with “-a” installer option), Python SDK is already available on the host.

Otherwise Python virtual environment can be installed with the following command:

/opt/eva4/sbin/venvmgr build

or, alternatively, SDK can be installed as system-wide:

pip3 install evaics

The task

Let us create a simple monitoring service, which monitors temperature sensors and sends email notifications when temperature is above a threshold.

Preparing the system

eva item create sensor:sdktest/temp1
eva item create sensor:sdktest/temp2

In this example, the sensors are not mapped to real equipment, but their state values can be changed with eva-shell manually, as the following:

eva item set sensor:sdktest/temp1 1 -v20

Service code

Here is the service code, guided with comments. The following example is created as a single script. To create a dedicated Python module, refer to Python documentation for more info.

Make sure the file shebang points to the correct Python executable path.

#!/opt/eva4/venv/bin/python

__version__ = '0.0.1'

import evaics.sdk as sdk
import busrt

from types import SimpleNamespace
from evaics.sdk import pack, unpack, OID, LOCAL_STATE_TOPIC

import threading

HYSTERESIS = 2.0

# define a global namespace
_d = SimpleNamespace(service=None,
                     counter=0,
                     threshold=0,
                     rcpt=None,
                     mailer_svc=None)

op_lock = threading.RLock()

# the notification map, keeps info which sensors were already processed
notified = {}


# RPC calls handler
def handle_rpc(event):
    _d.service.need_ready()
    # There is only one RPC method - "get_counter", which returns the number of
    # email messages sent
    if event.method == b'get_counter':
        try:
            return pack(dict(count=_d.counter))
        except Exception as e:
            raise busrt.rpc.RpcException(str(e), sdk.ERR_CODE_FUNC_FAILED)
    else:
        sdk.no_rpc_method()


# handle BUS/RT frames
def on_frame(frame):
    if _d.service.is_active():
        if frame.topic and frame.topic.startswith(LOCAL_STATE_TOPIC):
            # Parse sensor OID from the topic
            oid = OID(frame.topic[len(LOCAL_STATE_TOPIC):], from_path=True)
            # Parse sensor state
            state = unpack(frame.payload)
            # The next lines represent common Python code, so no comments are
            # provided
            temperature = float(state['value'])
            letter = None
            with op_lock:
                was_notified = notified.get(oid)
                if temperature > _d.threshold and not was_notified:
                    # notify high
                    text = f'{oid} temperature is {temperature}'
                    _d.service.logger.warning(text)
                    letter = {
                        'rcp': _d.rcpt,
                        'subject': f'{oid} is hot',
                        'text': text
                    }
                    notified[oid] = True
                elif temperature < _d.threshold - HYSTERESIS and was_notified:
                    # notify back to normal
                    text = f'{oid} temperature is {temperature}'
                    _d.service.logger.info(text)
                    letter = {
                        'rcp': _d.rcpt,
                        'subject': f'{oid} is back to normal',
                        'text': text
                    }
                    notified[oid] = False
            if letter and _d.rcpt and _d.mailer_svc:
                _d.counter += 1
                # Call the mailer service
                _d.service.rpc.call(_d.mailer_svc,
                                    busrt.rpc.Request(
                                        'send', pack(letter))).wait_completed()


def run():
    # define ServiceInfo object
    info = sdk.ServiceInfo(author='Bohemia Automation',
                           description='Temperature monitor',
                           version=__version__)
    # it is not obliged to include all available service RPC methods into
    # ServiceInfo, however including methods and their parameters provide
    # additional interface (the data is obtained by calling "info" RPC method),
    # e.g. auto-completion for eva-shell
    info.add_method('get_counter')
    # create a service object
    service = sdk.Service()
    _d.service = service
    # get the service config
    config = service.get_config()
    _d.threshold = config.get('threshold')
    _d.mailer_svc = config.get('mailer_svc')
    _d.rcpt = config.get('rcpt')
    # init the service
    service.init(info, on_frame=on_frame, on_rpc_call=handle_rpc)
    # subscribe sensor OIDs via the helper method
    service.subscribe_oids(config.get('sensors'), event_kind='local')
    # the service is blocked until one of the following:
    # * RPC client is disconnected from the bus
    # * the service gets a termination signal
    service.block()


run()

Service template

The following template can be used to quickly create a service instance with eva-shell:

eva svc create my.svc.alarm_temp svc-tpl.yml
command: path/to/eva-svc-example-temp
bus:
  path: var/bus.ipc
config:
  sensors:
    - sensor:sdktest/#
  rcpt: me@domain
  mailer_svc: eva.svc.mailer
  threshold: 25
user: nobody
workers: 1