TimescaleDB databases state history

Allows to store item states history in Timescale/PostgreSQL database. Unlike SQL databases state history this service provides a dedicated connector, optimized for speed and database size.

Note that the service can store numeric item state values only.

The service provides unified database EAPI.

Note

Time convention used in EVA ICS time-frames to fill data:

  • S for seconds (e.g. 5S for 5 seconds)

  • T for minutes

  • H for hours

  • D for days

  • W for weeks

  • A to get automatic number of records (e.g. 5A for 5 exactly records)

Time convention used in EVA ICS to specify start/end of a timeframe:

  • If there are dedicated parameters for start/end, they are filled separately

  • If there is a single parameter only, it is filled either as START:END or as START only (END is automatically set to the current time)

where values must be:

  • UNIX timestamps only (if a parameter is strictly specified as a number)

  • Date in human-readable format (RFC3339 recommended)

  • Using the same notation as for filling. E.g. setting start=30T sets time-frame start to 30 minutes before now.

START value is always mandatory. If END value is required to be set to the current time, it can be omitted.

Timescale Installation

See https://docs.timescale.com/self-hosted/latest/install/installation-linux/

Initialization

The service creates a table for event data called state_history_events and automatically tries to convert it to hyper-table (see https://docs.timescale.com/api/latest/hypertable/create_hypertable/).

If the service is unable to init the hyper-table, a warning message is written in the node log. The problem may be e.g. the table already contains events data. In this case, connect to the database and perform initialization manually:

Warning

The operation blocks the events table and may consume a lot of time for large databases.

SELECT create_hypertable('state_history_events', 't', migrate_data => true);
ALTER TABLE state_history_events SET
    (timescaledb.compress,
    timescaledb.compress_orderby = 't DESC',
    timescaledb.compress_segmentby = 'oid_id');
SELECT add_compression_policy('state_history_events', INTERVAL '1d');

To check that the events hyper-table has been initialized properly, use the following request:

SELECT * FROM timescaledb_information.hypertables WHERE
    hypertable_name='state_history_events';

Modifying compression policy

The default compression policy is set for 1 day. To modify it, connect to the database and use the following commands (the example sets compression policy to 7 days):

SELECT remove_compression_policy('state_history_events');
SELECT add_compression_policy('state_history_events', INTERVAL '7d');

To check compression effectiveness, use the following command:

SELECT before_compression_total_bytes AS before,
     after_compression_total_bytes AS after,
     (before_compression_total_bytes::DOUBLE PRECISION
         /after_compression_total_bytes)::DECIMAL(100,2) || 'x'
     AS "compression rate"
     FROM hypertable_compression_stats('state_history_events');

Data insertion policy

On conflicts, the service always replaces states for sames items and timestamps to the most recent ones.

Setup

Use the template EVA_DIR/share/svc-tpl/svc-tpl-db-timescale.yml:

# Event-to-TimescaleDB service
command: svc/eva-db-timescale
bus:
  path: var/bus.ipc
config:
  # database URI
  db: postgres://USER:PASSWORD@HOST/DB
  # event buffer time-to-live, seconds
  buf_ttl_sec: null
  # periodically submit all item states
  interval: null
  # do not submit remote disconnected items (useful for zfrepl or similar)
  # ignore real-time events
  skip_disconnected: false
  ignore_events: false
  # keep records (seconds)
  keep: 604800
  # automatically cleanup obsolete OIDs (can be slow)
  #cleanup_oids: true
  # event queue size
  queue_size: 8192
  # panic on critical errors in X seconds after happened (0 - panic immediately)
  panic_in: 0
  # database pool size
  #pool_size: 2 (default = workers)
  # item OIDs / OID masks
  oids:
    - "#"
  oids_exclude: []
  # enable EVA ICS PostgreSQL extension support (experimental)
  #eva_pg: true
  # by default null values (status) are written into the database
  oids_exclude_null: []
user: nobody

Create the service using eva-shell:

eva svc create eva.db.timescale1 /opt/eva4/share/svc-tpl/svc-tpl-db-timescale.yml

or using the bus CLI client:

cd /opt/eva4
cat DEPLOY.yml | ./bin/yml2mp | \
    ./sbin/bus ./var/bus.ipc rpc call eva.core svc.deploy -

(see eva.core::svc.deploy for more info)

EAPI methods

See EAPI commons for the common information about the bus, types, errors and RPC calls.

state_announce

Description

Replays state log rows as state bus publishes (same query as state_log). Note: kind chooses ST/LOC (loc) or ST/RAR (rar); default rar.

Parameters

required

Returns

nothing

Parameters

Name

Type

Description

Required

i

String

Item OID, supports ending masks (e.g. sensor:group/#)

yes

t_start

f64

Beginning timestamp (default: last 24 hours) (alias: s)

no

t_end

f64

Ending timestamp (default: now) (alias: e)

no

limit

u32

Limit records to (alias: n)

no

xopts

Map<String, String>

Extra: offset=N for query offset, rp=TABLE for custom rp_TABLE (alias: o)

no

kind

String

loc | rar (default: rar): publish under local or remote-archive state topic prefix

no

publish_for

String/Vec<String>

Required; bus client id(s); empty list sends nothing (alias: for)

yes

state_history

Description

Gets item state history

Parameters

required

Returns

State history payload

Parameters

Name

Type

Description

Required

i

String

Item OID

yes

t_start

f64

Beginning timestamp (default: last 24 hours)

no

t_end

f64

Ending timestamp (default: now)

no

fill

String

Fill (nS/T/H/D/W e.g. 10T for 10-minute)

no

precision

u32

Round values to digits after commma

no

limit

u32

Limit records to

no

prop

String

Property: status or value (default: both)

no

xopts

Map<String, String>

Extra: vfn=fn for value grouping: mean/sum (d: mean), fill_null=none|zero|nan|previous, rp=TABLE for custom rp_TABLE

no

compact

bool

Pack data in arrays according to type

no

Return payload example:

[
    {
        "status": 1,
        "t": 1652059860.0424938,
        "value": 15
    },
    {
        "status": 1,
        "t": 1652059865.045223,
        "value": 15
    },
    {
        "status": 1,
        "t": 1652059870.0452943,
        "value": 15
    },
    {
        "status": 1,
        "t": 1652059875.0443518,
        "value": 15
    }
]

state_history_combined

Description

Gets item state history combined (value only)

Parameters

required

Returns

State history combined payload

Parameters

Name

Type

Description

Required

i

String/Vec<String>

Item OID/OIDs

yes

t_start

f64

Beginning timestamp (default: last 24 hours)

no

t_end

f64

Ending timestamp (default: now)

no

fill

String

Fill (nS/T/H/D/W e.g. 10T for 10-minute, requires ts_extension)

yes

precision

u32

Round values to digits after commma

no

xopts

Map<String, String>

Extra: vfn=fn for value grouping: mean/sum (d: mean), fill_null=none|zero|nan|previous, rp=TABLE for custom rp_TABLE

no

Return payload example:

{
    "data": {
        "sensor:env/temp": [
            20.0,
            25.0,
            22.0,
            18.0,
        ],
        "sensor:env/hum": [
            40.0,
            45.0,
            35.2,
            34.0,
        ]
    },
    "t": [
        1745859600.0,
        1745863200.0,
        1745866800.0,
        1745870400.0,
    ]
}

state_log

Description

Gets item state log

Parameters

required

Returns

State log payload (includes OIDs, as other svcs may support get-by-mask)

Parameters

Name

Type

Description

Required

i

String

Item OID, supports ending masks (e.g. sensor:group/#)

yes

t_start

f64

Beginning timestamp (default: last 24 hours)

no

t_end

f64

Ending timestamp (default: now)

no

limit

u32

Limit records to

no

xopts

Map<String, String>

Extra: offset=N for query offset, rp=TABLE for custom rp_TABLE

no

Return payload example:

[
    {
        "oid": "sensor:tests/temp",
        "status": 1,
        "t": 1652060175.0443184,
        "value": 15
    },
    {
        "oid": "sensor:tests/temp",
        "status": 1,
        "t": 1652060180.046056,
        "value": 15
    },
    {
        "oid": "sensor:tests/temp",
        "status": 1,
        "t": 1652060185.0454304,
        "value": 15
    }
]

state_push

Description

push item states into db, (payload: single item state or list). skips existing states

Parameters

required

Returns

nothing

Parameters

Name

Type

Description

Required

oid

String

Item OID (alias: i)

yes

status

i16

Item status (alias: s)

yes

value

Any

Item value, numeric only (alias: v)

no

t

f64

Timestamp, seconds since epoch (required, no default; key must be ‘t’, not ‘set_time’)

yes

Return payload example:

{"oid": "sensor:tests/temp", "status": 1, "value": 15.0, "t": 1652060175.044}

Retention policies

In TimescaleDB, retention policies are implemented using materialized views. To let EVA ICS API method pass the view name, it must be called rp_<view_name> in the database (e.g. rp_foo for rp=foo API parameter). This restricts users from selecting data from other views and tables via API calls.

The materialized view must have the same columns as state_history_events table:

  • t timestamp without time zone

  • oid_od integer

  • status smallint

  • value double precision

In case if there is no plan to query via API status or value columns, the one which is not required can be omitted.

Example, let us create a retention policy to store hour-averages:

CREATE MATERIALIZED VIEW rp_all_hourly_avg
WITH (timescaledb.continuous) AS
SELECT
    time_bucket('1 hour', t) AS t,
    oid_id,
    avg(value) AS value
FROM state_history_events AS she
JOIN state_history_oids AS sh ON she.oid_id = sh.id
GROUP BY 1,2;

In case if the view is required for certain items only, use WHERE clause to specify oids by joining state_history_oids table or directly specifying OID ids as recorded.

Then create a continuous aggregate policy to refresh the view every 5 minutes:

SELECT add_continuous_aggregate_policy('rp_all_hourly_avg',
    start_offset => INTERVAL '1 day',
    end_offset => INTERVAL '5 minutes',
    schedule_interval => INTERVAL '5 minutes');

Note

Materialized views are not purged automatically by EVA ICS and may require additonal cleanup logic.