RPC Cursors

Consider there is data in an external database or files which is processed by a process and sent to client processes.

If there are lots of records (like millions) and the data source allows to read them as a stream (e.g. a SQL database), it is useful to let clients read these records either one-by-one or in bulk blocks to avoid unnecessary huge memory allocation on both the clients and the server.

To solve the problem, BUS/RT provides a helper module “cursors” starting from the version 0.4. The module must be enabled with “cursors” crate feature.

With cursors processes can exchange data of nearly unlimited size.

When to use cursors

It is recommended to use cursors when there is expected to deal with 10-100k data records or more. It is recommended to use bulk-only requests unless working with huge data rows.

If less amount of records is typically expected, cursors may provide additional overhead and there is usually no significant RAM advantages in typical tasks.

Besides RAM advantages, cursors may provide slightly better overall performance if the optimal-sized data blocks are processed (up to 10% less CPU load).

Technical background

  • BUS/RT cursors are very similar to database cursors.

  • When a client process calls a RPC method to get data, a server process defines a cursor object, which contains a database, HTTP, a file stream etc. The cursor unique ID (a counter, UUID etc.) is returned instead of data. There is no mandatory ID type and serialization format.

  • The client can use the cursor (UUID) to get data records from the stream either one-by-one or in bulks. There is no mandatory naming for the RPC methods as well.

  • The cursor should be automatically dropped if there are no more data records in the source or if the client is unable to process all records during the specified amount of time (cursor time-to-live).

Server example

// Server-side cursor example
//
// Cursors are used to transfer data from streams either one-by-one or in bulk blocks
//
// The source can be a database, a HTTP data stream etc.
//
// consider there is a local PostgreSQL database "tests" with a table "customers" (id bigserial,
// name varchar). The access credentials are tests/xxx
use busrt::broker::{Broker, ServerConfig};
use busrt::rpc::{RpcClient, RpcError, RpcEvent, RpcHandlers, RpcResult};
use busrt::{async_trait, cursors};
use futures::{Stream, TryStreamExt};
use serde::Serialize;
use sqlx::{
    postgres::{PgPoolOptions, PgRow},
    Row,
};
use std::pin::Pin;
use std::str::FromStr;
use std::time::Duration;
use tokio::sync::Mutex;
use tokio::time::sleep;

// max cursor time-to-live before forcibly dropped
const CURSOR_TTL: Duration = Duration::from_secs(30);

// a database stream type alias
type DbStream = Pin<Box<dyn Stream<Item = Result<PgRow, sqlx::Error>> + Send>>;

// a structure for data rows
#[derive(Serialize)]
struct Customer {
    id: i64,
    name: String,
}

// define a cursor for the database stream
struct CustomerCursor {
    // futures::Stream object must be under a mutex to implement Sync which is required for the RPC
    // server
    stream: Mutex<DbStream>,
    // a special cursor metadata object, must exist in all cursor structures if busrt::cursors::Map
    // helper object is used
    meta: cursors::Meta,
}

// the busrt::cursors::Cursor trait requires the following methods to be implemented
//
// some implementations may omit either "next" or "next_bulk" if not required (e.g. with
// unimplemented!() inside the function), in this case the methods must not be mapped to RPC
#[async_trait]
impl cursors::Cursor for CustomerCursor {
    // the method returns either a serialized data (bytes) or None
    // as BUS/RT has no requirements for the data serialization format, it can be any, recognized
    // by both server and client
    async fn next(&self) -> Result<Option<Vec<u8>>, RpcError> {
        if let Some(row) = self
            .stream
            .lock()
            .await
            .try_next()
            .await
            .map_err(|_| RpcError::internal(None))?
        {
            let id: i64 = row.try_get(0).map_err(|_| RpcError::internal(None))?;
            let name: String = row.try_get(1).map_err(|_| RpcError::internal(None))?;
            Ok(Some(rmp_serde::to_vec_named(&Customer { id, name })?))
        } else {
            // mark the cursor finished if there are no more records
            self.meta().mark_finished();
            Ok(None)
        }
    }
    // the method always returns a serialized data array (bytes)
    // if there are no more records, an empty array should be returned
    async fn next_bulk(&self, count: usize) -> Result<Vec<u8>, RpcError> {
        let mut result: Vec<Customer> = Vec::with_capacity(count);
        if count > 0 {
            let mut stream = self.stream.lock().await;
            while let Some(row) = stream
                .try_next()
                .await
                .map_err(|_| RpcError::internal(None))?
            {
                let id: i64 = row.try_get(0).map_err(|_| RpcError::internal(None))?;
                let name: String = row.try_get(1).map_err(|_| RpcError::internal(None))?;
                result.push(Customer { id, name });
                if result.len() == count {
                    break;
                }
            }
        }
        if result.len() < count {
            // mark the cursor finished if there are no more records
            self.meta.mark_finished();
        }
        Ok(rmp_serde::to_vec_named(&result)?)
    }
    // the method must return the pointer to the cursor meta object
    //
    // can be omitted with e.g. unimplemented!() if no busrt::cursors::Map helper objects are used
    fn meta(&self) -> &cursors::Meta {
        &self.meta
    }
}

impl CustomerCursor {
    fn new(stream: DbStream) -> Self {
        Self {
            stream: Mutex::new(stream),
            meta: cursors::Meta::new(CURSOR_TTL),
        }
    }
}

struct MyHandlers {
    pool: sqlx::PgPool,
    // a helper object to handle multiple cursors
    cursors: cursors::Map,
}

#[async_trait]
impl RpcHandlers for MyHandlers {
    async fn handle_call(&self, event: RpcEvent) -> RpcResult {
        let payload = event.payload();
        match event.parse_method()? {
            // the method "CCustomers" returns a cursor uuid only
            "Ccustomers" => {
                let stream = sqlx::query("select id, name from customers").fetch(&self.pool);
                let cursor = CustomerCursor::new(stream);
                let u = self.cursors.add(cursor).await;
                Ok(Some(rmp_serde::to_vec_named(&cursors::Payload::from(u))?))
            }
            "N" => {
                // handle cursor-next calls. if all cursors properly implement
                // busrt::cursors::Cursor trait, it is possible to have a sigle "next" method for
                // all cursor types.
                let p: cursors::Payload = rmp_serde::from_slice(payload)?;
                self.cursors.next(p.uuid()).await
            }
            "NB" => {
                // handle cursor-next-bulk calls. if all cursors properly implement
                // busrt::cursors::Cursor trait, it is possible to have a sigle "next-bulk" method
                // for all cursor types.
                let p: cursors::Payload = rmp_serde::from_slice(payload)?;
                self.cursors.next_bulk(p.uuid(), p.bulk_number()).await
            }
            _ => Err(RpcError::method(None)),
        }
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut broker = Broker::new();
    broker
        .spawn_unix_server("/tmp/busrt.sock", ServerConfig::default())
        .await?;
    let client = broker.register_client("db").await?;
    let opts = sqlx::postgres::PgConnectOptions::from_str("postgres://tests:xxx@localhost/tests")?;
    let pool = PgPoolOptions::new().connect_with(opts).await?;
    let handlers = MyHandlers {
        pool,
        cursors: cursors::Map::new(Duration::from_secs(30)),
    };
    let _rpc = RpcClient::new(client, handlers);
    loop {
        sleep(Duration::from_secs(1)).await;
    }
}

Client example

// Demo of client cursor RPC
//
// use server_cursor example to test client/server
use busrt::ipc::{Client, Config};
use busrt::rpc::{Rpc, RpcClient};
use busrt::{cursors, empty_payload, QoS};
use serde::Deserialize;

#[derive(Deserialize)]
struct Customer {
    id: i64,
    name: String,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let name = "test.client.123";
    let target = "db";
    // create a new client instance
    let config = Config::new("/tmp/busrt.sock", name);
    let client = Client::connect(&config).await?;
    // create RPC with no handlers
    let rpc = RpcClient::new0(client);
    // get a cursor
    let cursor: cursors::Payload = rmp_serde::from_slice(
        rpc.call(target, "Ccustomers", empty_payload!(), QoS::Processed)
            .await?
            .payload(),
    )?;
    // let us use a cow to avoid unnecessary data serialization every time when the method is
    // called
    let packed_cursor = rmp_serde::to_vec_named(&cursor)?;
    let b_cursor = busrt::borrow::Cow::Borrowed(&packed_cursor);
    loop {
        // get customers one-by-one
        let result = rpc
            .call(target, "N", b_cursor.clone(), QoS::Processed)
            .await?;
        let data = result.payload();
        // the payload is empty when there are no more records left
        if data.is_empty() {
            break;
        }
        let customer: Customer = rmp_serde::from_slice(data)?;
        println!("{}: {}", customer.id, customer.name);
    }
    // do the same in bulk
    let bulk_size = 100;
    // get a cursor
    let mut cursor: cursors::Payload = rmp_serde::from_slice(
        rpc.call(target, "Ccustomers", empty_payload!(), QoS::Processed)
            .await?
            .payload(),
    )?;
    cursor.set_bulk_number(bulk_size);
    let packed_cursor = rmp_serde::to_vec_named(&cursor)?;
    let b_cursor = busrt::borrow::Cow::Borrowed(&packed_cursor);
    loop {
        // get customers in bulk
        let result = rpc
            .call(target, "NB", b_cursor.clone(), QoS::Processed)
            .await?;
        let customers: Vec<Customer> = rmp_serde::from_slice(result.payload())?;
        for customer in &customers {
            println!("{}: {}", customer.id, customer.name);
        }
        // stop if the block contains less records than the bulk size - that means it is the last
        // block
        if customers.len() < bulk_size {
            break;
        }
    }
    Ok(())
}