A simple service in Python

SDK Installation

If EVA ICS v4 venv is already configured (manually or the node is installed with “-a” installer option), Python SDK is already available on the host.

Otherwise Python virtual environment can be installed with the following command:

/opt/eva4/sbin/venvmgr build

or, alternatively, SDK can be installed as system-wide:

pip3 install evaics

The task

Let us create a simple monitoring service, which monitors temperature sensors and sends email notifications when temperature is above a threshold.

Preparing the system

eva item create sensor:sdktest/temp1
eva item create sensor:sdktest/temp2

In this example, the sensors are not mapped to real equipment, but their state values can be changed with eva-shell manually, as the following:

eva item set sensor:sdktest/temp1 1 -v20

Creating and debugging services with EVA ICS Python SDK

Starting from Python SDK 0.2.33, it is possible to create and debug services in more convenient way. The following steps are required to create and debug the service:

  • Deploy a service template on a EVA ICS node, mark the service disabled

  • If EVA ICS is running on a remote machine, ensure there is direct access to the IPC bus

Create a service file:

python -m evaics.sdk new myservice

The above command creates a file myservice.py in the current directory.

The service can be run locally with the following command:

python -m evaics.sdk run -b BUS_IP:PORT svc_id myservice.py

If EVA ICS is running on a local machine, the parameter -b is not required.

Service code

Here is the service code, guided with comments. The following example is created as a single script. To create a dedicated Python module, refer to Python documentation for more info.

Make sure the file shebang points to the correct Python executable path.

#!/opt/eva4/venv/bin/python

__version__ = '0.0.1'

import evaics.sdk as sdk
import busrt

from types import SimpleNamespace
from evaics.sdk import pack, unpack, OID, LOCAL_STATE_TOPIC

import threading

HYSTERESIS = 2.0

# define a global namespace
_d = SimpleNamespace(service=None,
                     counter=0,
                     threshold=0,
                     rcpt=None,
                     mailer_svc=None)

op_lock = threading.RLock()

# the notification map, keeps info which sensors were already processed
notified = {}


# RPC calls handler
def handle_rpc(event):
    _d.service.need_ready()
    # There is only one RPC method - "get_counter", which returns the number of
    # email messages sent
    if event.method == b'get_counter':
        try:
            return pack(dict(count=_d.counter))
        except Exception as e:
            raise busrt.rpc.RpcException(str(e), sdk.ERR_CODE_FUNC_FAILED)
    else:
        sdk.no_rpc_method()


# handle BUS/RT frames
def on_frame(frame):
    if _d.service.is_active():
        if frame.topic and frame.topic.startswith(LOCAL_STATE_TOPIC):
            # Parse sensor OID from the topic
            oid = OID(frame.topic[len(LOCAL_STATE_TOPIC):], from_path=True)
            # Parse sensor state
            state = unpack(frame.payload)
            # The next lines represent common Python code, so no comments are
            # provided
            temperature = float(state['value'])
            letter = None
            with op_lock:
                was_notified = notified.get(oid)
                if temperature > _d.threshold and not was_notified:
                    # notify high
                    text = f'{oid} temperature is {temperature}'
                    _d.service.logger.warning(text)
                    letter = {
                        'rcp': _d.rcpt,
                        'subject': f'{oid} is hot',
                        'text': text
                    }
                    notified[oid] = True
                elif temperature < _d.threshold - HYSTERESIS and was_notified:
                    # notify back to normal
                    text = f'{oid} temperature is {temperature}'
                    _d.service.logger.info(text)
                    letter = {
                        'rcp': _d.rcpt,
                        'subject': f'{oid} is back to normal',
                        'text': text
                    }
                    notified[oid] = False
            if letter and _d.rcpt and _d.mailer_svc:
                _d.counter += 1
                # Call the mailer service
                _d.service.rpc.call(_d.mailer_svc,
                                    busrt.rpc.Request(
                                        'send', pack(letter))).wait_completed()


def run():
    # define ServiceInfo object
    info = sdk.ServiceInfo(author='Bohemia Automation',
                           description='Temperature monitor',
                           version=__version__)
    # it is not obliged to include all available service RPC methods into
    # ServiceInfo, however including methods and their parameters provide
    # additional interface (the data is obtained by calling "info" RPC method),
    # e.g. auto-completion for eva-shell
    info.add_method('get_counter')
    # create a service object
    service = sdk.Service()
    _d.service = service
    # get the service config
    config = service.get_config()
    _d.threshold = config.get('threshold')
    _d.mailer_svc = config.get('mailer_svc')
    _d.rcpt = config.get('rcpt')
    # init the service
    service.init(info, on_frame=on_frame, on_rpc_call=handle_rpc)
    # subscribe sensor OIDs via the helper method
    service.subscribe_oids(config.get('sensors'), event_kind='local')
    # the service is blocked until one of the following:
    # * RPC client is disconnected from the bus
    # * the service gets a termination signal
    service.block()


run()

Service template

The following template can be used to quickly create a service instance with eva-shell:

eva svc create my.svc.alarm_temp svc-tpl.yml
command: path/to/eva-svc-example-temp
bus:
  path: var/bus.ipc
config:
  sensors:
    - sensor:sdktest/#
  rcpt: me@domain
  mailer_svc: eva.svc.mailer
  threshold: 25
user: nobody
workers: 1