A simple service in Python
Contents
SDK Installation
If EVA ICS v4 venv is already configured (manually or the node is installed with “-a” installer option), Python SDK is already available on the host.
Otherwise Python virtual environment can be installed with the following command:
/opt/eva4/sbin/venvmgr build
or, alternatively, SDK can be installed as system-wide:
pip3 install evaics
The task
Let us create a simple monitoring service, which monitors temperature sensors and sends email notifications when temperature is above a threshold.
Preparing the system
Deploy an instance of Mailer service
Create a couple of sensors with eva-shell:
eva item create sensor:sdktest/temp1
eva item create sensor:sdktest/temp2
In this example, the sensors are not mapped to real equipment, but their state values can be changed with eva-shell manually, as the following:
eva item set sensor:sdktest/temp1 1 -v20
Creating and debugging services with EVA ICS Python SDK
Starting from Python SDK 0.2.33, it is possible to create and debug services in more convenient way. The following steps are required to create and debug the service:
Deploy a service template on a EVA ICS node, mark the service disabled
If EVA ICS is running on a remote machine, ensure there is direct access to the IPC bus
Create a service file:
python -m evaics.sdk new myservice
The above command creates a file myservice.py in the current directory.
The service can be run locally with the following command:
python -m evaics.sdk run -b BUS_IP:PORT svc_id myservice.py
If EVA ICS is running on a local machine, the parameter -b is not required.
Service code
Here is the service code, guided with comments. The following example is created as a single script. To create a dedicated Python module, refer to Python documentation for more info.
Make sure the file shebang points to the correct Python executable path.
#!/opt/eva4/venv/bin/python
__version__ = '0.0.1'
import evaics.sdk as sdk
import busrt
from types import SimpleNamespace
from evaics.sdk import pack, unpack, OID, LOCAL_STATE_TOPIC
import threading
HYSTERESIS = 2.0
# define a global namespace
_d = SimpleNamespace(service=None,
counter=0,
threshold=0,
rcpt=None,
mailer_svc=None)
op_lock = threading.RLock()
# the notification map, keeps info which sensors were already processed
notified = {}
# RPC calls handler
def handle_rpc(event):
_d.service.need_ready()
# There is only one RPC method - "get_counter", which returns the number of
# email messages sent
if event.method == b'get_counter':
try:
return pack(dict(count=_d.counter))
except Exception as e:
raise busrt.rpc.RpcException(str(e), sdk.ERR_CODE_FUNC_FAILED)
else:
sdk.no_rpc_method()
# handle BUS/RT frames
def on_frame(frame):
if _d.service.is_active():
if frame.topic and frame.topic.startswith(LOCAL_STATE_TOPIC):
# Parse sensor OID from the topic
oid = OID(frame.topic[len(LOCAL_STATE_TOPIC):], from_path=True)
# Parse sensor state
state = unpack(frame.payload)
# The next lines represent common Python code, so no comments are
# provided
temperature = float(state['value'])
letter = None
with op_lock:
was_notified = notified.get(oid)
if temperature > _d.threshold and not was_notified:
# notify high
text = f'{oid} temperature is {temperature}'
_d.service.logger.warning(text)
letter = {
'rcp': _d.rcpt,
'subject': f'{oid} is hot',
'text': text
}
notified[oid] = True
elif temperature < _d.threshold - HYSTERESIS and was_notified:
# notify back to normal
text = f'{oid} temperature is {temperature}'
_d.service.logger.info(text)
letter = {
'rcp': _d.rcpt,
'subject': f'{oid} is back to normal',
'text': text
}
notified[oid] = False
if letter and _d.rcpt and _d.mailer_svc:
_d.counter += 1
# Call the mailer service
_d.service.rpc.call(_d.mailer_svc,
busrt.rpc.Request(
'send', pack(letter))).wait_completed()
def run():
# define ServiceInfo object
info = sdk.ServiceInfo(author='Bohemia Automation',
description='Temperature monitor',
version=__version__)
# it is not obliged to include all available service RPC methods into
# ServiceInfo, however including methods and their parameters provide
# additional interface (the data is obtained by calling "info" RPC method),
# e.g. auto-completion for eva-shell
info.add_method('get_counter')
# create a service object
service = sdk.Service()
_d.service = service
# get the service config
config = service.get_config()
_d.threshold = config.get('threshold')
_d.mailer_svc = config.get('mailer_svc')
_d.rcpt = config.get('rcpt')
# init the service
service.init(info, on_frame=on_frame, on_rpc_call=handle_rpc)
# subscribe sensor OIDs via the helper method
service.subscribe_oids(config.get('sensors'), event_kind='local')
# the service is blocked until one of the following:
# * RPC client is disconnected from the bus
# * the service gets a termination signal
service.block()
run()
Service template
The following template can be used to quickly create a service instance with eva-shell:
eva svc create my.svc.alarm_temp svc-tpl.yml
command: path/to/eva-svc-example-temp
bus:
path: var/bus.ipc
config:
sensors:
- sensor:sdktest/#
rcpt: me@domain
mailer_svc: eva.svc.mailer
threshold: 25
user: nobody
workers: 1