A simple service in Rust

As EVA ICS itself is written in Rust, the primary and recommended SDK is the Rust one.

The task

Let us create a simple monitoring service, which monitors temperature sensors and sends email notifications when temperature is above a threshold.

Creating a project

Create a new Rust project:

cargo new svc-example-temp

The service requires two EVA ICS crates:

  • eva-sdk - provides the primary service SDK methods and structures

  • eva-common - the common crate, used both in EVA ICS and 3rd party services

and few additional:

[package]
name = "eva-svc-example-temp"
version = "0.0.1"
edition = "2021"
publish = false

[dependencies]
eva-common = { version = "0.3", features = ["events", "common-payloads", "payload", "acl"] }
eva-sdk = { version = "0.3" }
tokio = { version = "1.36.0", features = ["full"] }
async-trait = { version = "0.1.77" }
serde = { version = "1.0.196", features = ["derive", "rc"] }
log = "0.4.20"
jemallocator = { version = "0.5.4" }
once_cell = "1.19.0"

[features]
std-alloc = []

Open src/main.rs and let us continue.

Preparing the system

eva item create sensor:sdktest/temp1
eva item create sensor:sdktest/temp2

In this example, the sensors are not mapped to real equipment, but their state values can be changed with eva-shell manually, as the following:

eva item set sensor:sdktest/temp1 1 -v20

Service code

Here is the service code, guided with comments:

// the structure contains a set of EVA ICS OIDs, which are going to be monitored
use eva_common::acl::OIDMaskList;
// a log helper, which allows to quickly log std::result::Result error messages
use eva_common::err_logger;
// only local bus states are going to be monitored (states, replicated from other EVA ICS nodes are
// not processed)
use eva_common::events::LOCAL_STATE_TOPIC;
// EVA ICS bus uses MessagePack as the primary serialization format, the following methods are
// aliases for rmp_serde ones
use eva_common::payload::{pack, unpack};
// import common EVA ICS structures and methods, such as the standard Error, result (EResult), OID,
// Value (represents any valid MessagePack value) etc.
use eva_common::prelude::*;
// import common SDK methods, required for services
use eva_sdk::prelude::*;
// import basic item state structure
use eva_sdk::types::State;
// it is recommended to keep EVA ICS services simple and small and make majority of shared
// variables as static ones, so OnceCell (and/or lazy_static) is here
use once_cell::sync::Lazy;
// serde, used to process bus structures
use serde::{Deserialize, Serialize};
// and a few others
use std::collections::HashMap;
use std::sync::atomic;
use std::sync::Mutex;

// this macro alters Result with additional methods:
// log_ef() - log error and forget the result
// log_efd() - log error as debug and forget the result
// log_err() - log error and keep the result
// log_ed() - log error as debug and keep the result
//
// Why isn't a trait imported? The reason is simple: the macro defines the trait inside THE CURRENT
// module, which allows to log messages on behalf of it.
err_logger!();

// Typical service constants, used in bus EAPI etc.
const AUTHOR: &str = "Bohemia Automation";
const VERSION: &str = env!("CARGO_PKG_VERSION");
const DESCRIPTION: &str = "Temperature monitor";

// Let us define a hysteresis: a sensor is considered as back-to-normal, if the temperature drops
// below threshold-hysteresis. Values between are not processed.
const HYSTERESIS: f64 = 2.0;

// Let us calculate how many email notifications are sent
//
// The value can be obtained later using the console command:
//
// eva svc call eva.svc.alarm_temp get_counter
static COUNTER: atomic::AtomicUsize = atomic::AtomicUsize::new(0);

// It is recommended to use Jemalloc, which is found to provide the best functionality in
// industrial automation tasks. But tasks may vary, so let us keep a way to switch back to the
// standard one.
#[cfg(not(feature = "std-alloc"))]
#[global_allocator]
static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;

// The notification map, keeps info which sensors were already processed
static NOTIFIED: Lazy<Mutex<HashMap<OID, bool>>> = Lazy::new(<_>::default);

// BUS/RT RPC handlers
struct Handlers {
    // EVA ICS service info structure
    info: ServiceInfo,
    // Let us put some variables inside the handlers object directly:
    // e-mail recipient
    rcpt: String,
    // EVA ICS mailer service id
    mailer_svc: String,
    // The temperature threshold
    threshold: f64,
}

// Let us describe BUS/RT RPC handlers methods
#[async_trait::async_trait]
impl RpcHandlers for Handlers {
    // Handle RPC call
    async fn handle_call(&self, event: RpcEvent) -> RpcResult {
        // The following macro automatically returns Error::not_ready if the service is not fully
        // initialized yet
        svc_rpc_need_ready!();
        let method = event.parse_method()?;
        match method {
            // There is only one RPC method - "get_counter", which returns the number of email
            // messages sent
            "get_counter" => {
                #[derive(Serialize)]
                struct Reply {
                    count: usize,
                }
                Ok(Some(pack(&Reply {
                    count: COUNTER.load(atomic::Ordering::SeqCst),
                })?))
            }
            // This SDK function automatically provides the following standard EVA ICS service RPC
            // methods: "test", "info" and "stop", which must be included in all service
            // implementations
            _ => svc_handle_default_rpc(method, &self.info),
        }
    }
    // handle BUS/RT frames
    async fn handle_frame(&self, frame: Frame) {
        // The following macro automatically makes function return if the service is not fully
        // initialized yet
        svc_need_ready!();
        // Only frames with topic (bus publications) are processed
        //
        // When the service instance is started, it is not required to load telemetry from EVA ICS
        // core, as the core automatically sends bus notifications at startup to subscribers, after
        // all services are started
        if let Some(topic) = frame.topic() {
            // Only local state topics are processed
            if let Some(item_topic) = topic.strip_prefix(LOCAL_STATE_TOPIC) {
                self.process_state(item_topic, frame.payload())
                    .await
                    .log_ef();
            }
        }
    }
}

impl Handlers {
    // This function processes states of subscribed sensors
    async fn process_state(&self, item_topic: &str, payload: &[u8]) -> EResult<()> {
        // Letter payload, required for the mailer service "send" RPC method
        #[derive(Serialize)]
        struct Letter<'a> {
            rcp: &'a str,
            subject: String,
            text: String,
        }
        // Parse sensor OID from the topic
        let oid = OID::from_path(item_topic)?;
        // Parse sensor state
        let state: State = unpack(payload)?;
        let mut letter_c = None;
        // The next lines represent common Rust code, so no comments are provided
        if let Some(value) = state.value {
            if let Ok(temperature) = f64::try_from(value) {
                let mut notified = NOTIFIED.lock().unwrap();
                let was_sensor_notified = notified.get(&oid).copied().unwrap_or_default();
                if temperature > self.threshold && !was_sensor_notified {
                    let text = format!("{} temperature is {}", oid, temperature);
                    warn!("{}", text);
                    letter_c = Some(Letter {
                        rcp: &self.rcpt,
                        subject: format!("{} is hot", oid),
                        text,
                    });
                    notified.insert(oid, true);
                } else if temperature < self.threshold - HYSTERESIS && was_sensor_notified {
                    let text = format!("{} temperature is {}", oid, temperature);
                    info!("{}", text);
                    letter_c = Some(Letter {
                        rcp: &self.rcpt,
                        subject: format!("{} is back to normal", oid),
                        text,
                    });
                    notified.insert(oid, false);
                }
            }
        };
        if let Some(letter) = letter_c {
            COUNTER.fetch_add(1, atomic::Ordering::SeqCst);
            eapi_bus::call(&self.mailer_svc, "send", pack(&letter)?.into()).await?;
        }
        Ok(())
    }
}

// The service configuration
#[derive(Deserialize)]
#[serde(deny_unknown_fields)]
struct Config {
    #[serde(default)]
    // Array (set) of sensor OID masks
    sensors: OIDMaskList,
    // E-mail recipient
    rcpt: String,
    // Mailer service ID
    mailer_svc: String,
    // The temperature threshold
    threshold: f64,
}

#[svc_main]
// The proc attribute renames the function to eva_service_main and generates the following main
// instead:
//
// fn main() -> eva_common::EResult<()> { svc_launch(eva_service_main) }
//
// The structure Initial contains the initial payload, which is generated by EVA ICS service
// launcher and pushed to STDIN of the service process. This is also automatically handled by the
// proc attribute.
async fn main(mut initial: Initial) -> EResult<()> {
    // Get config from initial and deserialize it
    let config: Config = Config::deserialize(
        initial
            .take_config()
            .ok_or_else(|| Error::invalid_data("config not specified"))?,
    )?;
    // init RPC
    let mut info = ServiceInfo::new(AUTHOR, VERSION, DESCRIPTION);
    // It is not obliged to include all available service RPC methods into ServiceInfo, however
    // including methods and their parameters provide additional interface (the data is obtained by
    // calling "info" RPC method), e.g. auto-completion for eva-shell
    info.add_method(ServiceMethod::new("get_counter"));
    // The initial payload sturucture provides several methods to init the bus RPC, the primary
    // ones are: "init_rpc" and "init_rpc_blocking".
    //
    // The difference is that "init_rpc_blocking" initializes the bus RPC in blocking mode, meaning
    // that "handle_frame" calls are always processed one-by-one to keep strict event ordering.
    //
    // As in the current example it is highly unlikely that temperatures may change rapidly fast,
    // as well as fieldbus controllers usually do not pull such data more often than e.g.
    // 100-200ms, it is fine to go with "init_rpc", which is more easy to handle.
    //
    // Blocking mode requires additional code complication, such as having a secondary instance for
    // RPC to perform bus calls inside the frame handler and be sure the handler works fast enough
    // - as soon as the service bus queue is full, it is automatically disconnected by BUS/RT
    // broker.
    //
    // Starting from EVA ICS SDK 0.3.33 it is recommended to use high-level EAPI eva_sdk::eapi_bus
    // interface instead of BUS/RT directly
    let handlers = Handlers {
        info,
        rcpt: config.rcpt,
        mailer_svc: config.mailer_svc,
        threshold: config.threshold,
    };
    eapi_bus::init(&initial, handlers).await?;
    // Services are usually started under root and should drop privileges after the bus socket is
    // connected
    initial.drop_privileges()?;
    // A helper function, which subscribes BUS/RT client to all state events for OIDs maching
    // the given OIDMaskList
    eapi_bus::subscribe_oids(&config.sensors, eva_sdk::service::EventKind::Local).await?;
    // Init service logs (bus logger)
    eapi_bus::init_logs(&initial)?;
    // Start sigterm handler
    svc_start_signal_handlers();
    // Mark the instance ready and active
    eapi_bus::mark_ready().await?;
    info!("{} started ({})", DESCRIPTION, initial.id());
    // The service is blocked until one of the following:
    // * RPC client is disconnected from the bus
    // * The service gets a termination signal
    eapi_bus::block().await;
    // Tell the core and other services that the service is terminating
    eapi_bus::mark_terminating().await?;
    Ok(())
}

Service template

The following template can be used to quickly create a service instance with eva-shell:

eva svc create my.svc.alarm_temp svc-tpl.yml
command: path/to/eva-svc-example-temp
bus:
  path: var/bus.ipc
config:
  sensors:
    - sensor:sdktest/#
  rcpt: me@domain
  mailer_svc: eva.svc.mailer
  threshold: 25
user: nobody
workers: 1