TimescaleDB databases state history
Contents
Allows to store item states history in Timescale/PostgreSQL database. Unlike SQL databases state history this service provides a dedicated connector, optimized for speed and database size.
Note that the service can store numeric item state values only.
The service provides unified database EAPI.
Note
Time convention used in EVA ICS time-frames to fill data:
S for seconds (e.g. 5S for 5 seconds)
T for minutes
H for hours
D for days
W for weeks
A to get automatic number of records (e.g. 5A for 5 exactly records)
Time convention used in EVA ICS to specify start/end of a timeframe:
If there are dedicated parameters for start/end, they are filled separately
If there is a single parameter only, it is filled either as START:END or as START only (END is automatically set to the current time)
where values must be:
UNIX timestamps only (if a parameter is strictly specified as a number)
Date in human-readable format (RFC3339 recommended)
Using the same notation as for filling. E.g. setting start=30T sets time-frame start to 30 minutes before now.
START value is always mandatory. If END value is required to be set to the current time, it can be omitted.
Timescale Installation
See https://docs.timescale.com/self-hosted/latest/install/installation-linux/
Initialization
The service creates a table for event data called state_history_events and automatically tries to convert it to hyper-table (see https://docs.timescale.com/api/latest/hypertable/create_hypertable/).
If the service is unable to init the hyper-table, a warning message is written in the node log. The problem may be e.g. the table already contains events data. In this case, connect to the database and perform initialization manually:
Warning
The operation blocks the events table and may consume a lot of time for large databases.
SELECT create_hypertable('state_history_events', 't', migrate_data => true);
ALTER TABLE state_history_events SET
(timescaledb.compress,
timescaledb.compress_orderby = 't DESC',
timescaledb.compress_segmentby = 'oid_id');
SELECT add_compression_policy('state_history_events', INTERVAL '1d');
To check that the events hyper-table has been initialized properly, use the following request:
SELECT * FROM timescaledb_information.hypertables WHERE
hypertable_name='state_history_events';
Modifying compression policy
The default compression policy is set for 1 day. To modify it, connect to the database and use the following commands (the example sets compression policy to 7 days):
SELECT remove_compression_policy('state_history_events');
SELECT add_compression_policy('state_history_events', INTERVAL '7d');
To check compression effectiveness, use the following command:
SELECT before_compression_total_bytes AS before,
after_compression_total_bytes AS after,
(before_compression_total_bytes::DOUBLE PRECISION
/after_compression_total_bytes)::DECIMAL(100,2) || 'x'
AS "compression rate"
FROM hypertable_compression_stats('state_history_events');
Data insertion policy
On conflicts, the service always replaces states for sames items and timestamps to the most recent ones.
Setup
Use the template EVA_DIR/share/svc-tpl/svc-tpl-db-timescale.yml:
# Event-to-TimescaleDB service
command: svc/eva-db-timescale
bus:
path: var/bus.ipc
config:
# database URI
db: postgres://USER:PASSWORD@HOST/DB
# event buffer time-to-live, seconds
buf_ttl_sec: null
# periodically submit all item states
interval: null
# do not submit remote disconnected items (useful for zfrepl or similar)
# ignore real-time events
skip_disconnected: false
ignore_events: false
# keep records (seconds)
keep: 604800
# automatically cleanup obsolete OIDs (can be slow)
#cleanup_oids: true
# event queue size
queue_size: 8192
# panic on critical errors in X seconds after happened (0 - panic immediately)
panic_in: 0
# database pool size
#pool_size: 2 (default = workers)
# item OIDs / OID masks
oids:
- "#"
oids_exclude: []
# enable EVA ICS PostgreSQL extension support (experimental)
#eva_pg: true
# by default null values (status) are written into the database
oids_exclude_null: []
user: nobody
Create the service using eva-shell:
eva svc create eva.db.timescale1 /opt/eva4/share/svc-tpl/svc-tpl-db-timescale.yml
or using the bus CLI client:
cd /opt/eva4
cat DEPLOY.yml | ./bin/yml2mp | \
./sbin/bus ./var/bus.ipc rpc call eva.core svc.deploy -
(see eva.core::svc.deploy for more info)
EAPI methods
See EAPI commons for the common information about the bus, types, errors and RPC calls.
state_announce
Description |
Replays state log rows as state bus publishes (same query as state_log). Note: kind chooses ST/LOC (loc) or ST/RAR (rar); default rar. |
Parameters |
required |
Returns |
nothing |
Name |
Type |
Description |
Required |
i |
String |
Item OID, supports ending masks (e.g. sensor:group/#) |
yes |
t_start |
f64 |
Beginning timestamp (default: last 24 hours) (alias: s) |
no |
t_end |
f64 |
Ending timestamp (default: now) (alias: e) |
no |
limit |
u32 |
Limit records to (alias: n) |
no |
xopts |
Map<String, String> |
Extra: offset=N for query offset, rp=TABLE for custom rp_TABLE (alias: o) |
no |
kind |
String |
loc | rar (default: rar): publish under local or remote-archive state topic prefix |
no |
publish_for |
String/Vec<String> |
Required; bus client id(s); empty list sends nothing (alias: for) |
yes |
state_history
Description |
Gets item state history |
Parameters |
required |
Returns |
State history payload |
Name |
Type |
Description |
Required |
i |
String |
Item OID |
yes |
t_start |
f64 |
Beginning timestamp (default: last 24 hours) |
no |
t_end |
f64 |
Ending timestamp (default: now) |
no |
fill |
String |
Fill (nS/T/H/D/W e.g. 10T for 10-minute) |
no |
precision |
u32 |
Round values to digits after commma |
no |
limit |
u32 |
Limit records to |
no |
prop |
String |
Property: status or value (default: both) |
no |
xopts |
Map<String, String> |
Extra: vfn=fn for value grouping: mean/sum (d: mean), fill_null=none|zero|nan|previous, rp=TABLE for custom rp_TABLE |
no |
compact |
bool |
Pack data in arrays according to type |
no |
Return payload example:
[
{
"status": 1,
"t": 1652059860.0424938,
"value": 15
},
{
"status": 1,
"t": 1652059865.045223,
"value": 15
},
{
"status": 1,
"t": 1652059870.0452943,
"value": 15
},
{
"status": 1,
"t": 1652059875.0443518,
"value": 15
}
]
state_history_combined
Description |
Gets item state history combined (value only) |
Parameters |
required |
Returns |
State history combined payload |
Name |
Type |
Description |
Required |
i |
String/Vec<String> |
Item OID/OIDs |
yes |
t_start |
f64 |
Beginning timestamp (default: last 24 hours) |
no |
t_end |
f64 |
Ending timestamp (default: now) |
no |
fill |
String |
Fill (nS/T/H/D/W e.g. 10T for 10-minute, requires ts_extension) |
yes |
precision |
u32 |
Round values to digits after commma |
no |
xopts |
Map<String, String> |
Extra: vfn=fn for value grouping: mean/sum (d: mean), fill_null=none|zero|nan|previous, rp=TABLE for custom rp_TABLE |
no |
Return payload example:
{
"data": {
"sensor:env/temp": [
20.0,
25.0,
22.0,
18.0,
],
"sensor:env/hum": [
40.0,
45.0,
35.2,
34.0,
]
},
"t": [
1745859600.0,
1745863200.0,
1745866800.0,
1745870400.0,
]
}
state_log
Description |
Gets item state log |
Parameters |
required |
Returns |
State log payload (includes OIDs, as other svcs may support get-by-mask) |
Name |
Type |
Description |
Required |
i |
String |
Item OID, supports ending masks (e.g. sensor:group/#) |
yes |
t_start |
f64 |
Beginning timestamp (default: last 24 hours) |
no |
t_end |
f64 |
Ending timestamp (default: now) |
no |
limit |
u32 |
Limit records to |
no |
xopts |
Map<String, String> |
Extra: offset=N for query offset, rp=TABLE for custom rp_TABLE |
no |
Return payload example:
[
{
"oid": "sensor:tests/temp",
"status": 1,
"t": 1652060175.0443184,
"value": 15
},
{
"oid": "sensor:tests/temp",
"status": 1,
"t": 1652060180.046056,
"value": 15
},
{
"oid": "sensor:tests/temp",
"status": 1,
"t": 1652060185.0454304,
"value": 15
}
]
state_push
Description |
push item states into db, (payload: single item state or list). skips existing states |
Parameters |
required |
Returns |
nothing |
Name |
Type |
Description |
Required |
oid |
String |
Item OID (alias: i) |
yes |
status |
i16 |
Item status (alias: s) |
yes |
value |
Any |
Item value, numeric only (alias: v) |
no |
t |
f64 |
Timestamp, seconds since epoch (required, no default; key must be ‘t’, not ‘set_time’) |
yes |
Return payload example:
{"oid": "sensor:tests/temp", "status": 1, "value": 15.0, "t": 1652060175.044}
Retention policies
In TimescaleDB, retention policies are implemented using materialized views. To let EVA ICS API method pass the view name, it must be called rp_<view_name> in the database (e.g. rp_foo for rp=foo API parameter). This restricts users from selecting data from other views and tables via API calls.
The materialized view must have the same columns as state_history_events table:
t timestamp without time zone
oid_od integer
status smallint
value double precision
In case if there is no plan to query via API status or value columns, the one which is not required can be omitted.
Example, let us create a retention policy to store hour-averages:
CREATE MATERIALIZED VIEW rp_all_hourly_avg
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 hour', t) AS t,
oid_id,
avg(value) AS value
FROM state_history_events AS she
JOIN state_history_oids AS sh ON she.oid_id = sh.id
GROUP BY 1,2;
In case if the view is required for certain items only, use WHERE clause to specify oids by joining state_history_oids table or directly specifying OID ids as recorded.
Then create a continuous aggregate policy to refresh the view every 5 minutes:
SELECT add_continuous_aggregate_policy('rp_all_hourly_avg',
start_offset => INTERVAL '1 day',
end_offset => INTERVAL '5 minutes',
schedule_interval => INTERVAL '5 minutes');
Note
Materialized views are not purged automatically by EVA ICS and may require additonal cleanup logic.