Broker

The broker is the central busrt instance, which routes messages between applications. Being the broker is the only way to exchange messages between local threads, so usually the broker is embedded into the central heaviest application, while all plug-ins, services and additional components talk with the broker using inter-process communications (UNIX sockets or TCP).

busrt broker is currently implemented in Rust only.

Broker API

When rpc feature is enabled, the following default RPC methods are available at .broker after broker.init_default_core_rpc method is called:

  • test() - broker test (ok: true)

  • info() - broker info (author and version)

  • stats() - broker statistics

  • client.list() - list all connected clients

  • benchmark.test(payload) - test method, returns the payload as-is

The payload exchange format (call params / replies) is MessagePack.

Stand-alone broker server

To build a stand-alone broker server, use the command:

cargo build --features server,rpc

The rpc feature is optional.

Embedded broker

Note

If compiling for “musl” target, it is strongly recommended to replace the default MUSL allocator with 3rd party, e.g. with jemallocator to keep the broker fast.

Example of a broker with inter-thread communications and external clients:

// Demo of inter-thread communication (with no RPC layer) with a UNIX socket for external clients
use busrt::broker::{Broker, ServerConfig};
use busrt::client::AsyncClient;
use busrt::QoS;
use std::time::Duration;
use tokio::time::sleep;

const SLEEP_STEP: Duration = Duration::from_secs(1);

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // create a new broker instance
    let mut broker = Broker::new();
    // init the default broker RPC API, optional
    broker.init_default_core_rpc().await?;
    // spawn unix server for external clients
    broker
        .spawn_unix_server("/tmp/busrt.sock", ServerConfig::default())
        .await?;
    // worker 1 will send to worker2 direct "hello" message
    let mut client1 = broker.register_client("worker.1").await?;
    // worker 2 will listen to incoming frames only
    let mut client2 = broker.register_client("worker.2").await?;
    // worker 3 will send broadcasts to all workers, an external client with a name "worker.N" can
    // connect the broker via unix socket and receive them as well or send a message to "worker.2"
    // to print it
    let mut client3 = broker.register_client("worker.3").await?;
    let rx = client2.take_event_channel().unwrap();
    tokio::spawn(async move {
        loop {
            client1
                .send("worker.2", "hello".as_bytes().into(), QoS::No)
                .await
                .unwrap();
            sleep(SLEEP_STEP).await;
        }
    });
    tokio::spawn(async move {
        loop {
            client3
                .send_broadcast(
                    "worker.*",
                    "this is a broadcast message".as_bytes().into(),
                    QoS::No,
                )
                .await
                .unwrap();
            sleep(SLEEP_STEP).await;
        }
    });
    while let Ok(frame) = rx.recv().await {
        println!(
            "{}: {}",
            frame.sender(),
            std::str::from_utf8(frame.payload()).unwrap_or("something unreadable")
        );
    }
    Ok(())
}

Security model

An optional simple security model can be implemented in an embedded broker. Note that any security model may slowdown communications, so it is usually a good idea to move semi-trusted clients to a dedicated server socket, as a broker can have multiple ones.

When a model is applied, only clients with the names listed in AAA map can connect. The clients can be restricted to:

  • connect only from specified hosts/networks

  • exchange p2p messages with a specified list of peers only

  • denied to publish

  • denied to subscribe

  • denied to broadcast

Important things to know:

  • busrt::broker::AaaMap is a mutex-protected HashMap, which can be modified on-the-flow

  • when a client is connected, its AAA settings are CLONED and not affected with any modifications, so it is usually a good idea to call Broker::force_disconnect method when AAA settings are altered or removed

Example:

// Demo of a broker with AAA
//
// The broker listens on 0.0.0.0:7777
//
// Accepted client names: test (from localhost only), test2 (from any)
//
// test is allowed to do anything
//
// test2 is allowed to send direct messages to "test" only and publish to subtopics of "news"
//
// The broker force-disconnects the client named "test2" every 5 seconds
use busrt::broker::{AaaMap, Broker, ClientAaa, ServerConfig};
use ipnetwork::IpNetwork;
use std::time::Duration;
use tokio::time::sleep;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // create a new broker instance
    let mut broker = Broker::new();
    broker.init_default_core_rpc().await?;
    // create AAA map
    let aaa_map = AaaMap::default();
    {
        let mut map = aaa_map.lock();
        map.insert(
            "test".to_owned(),
            ClientAaa::new().hosts_allow(vec![IpNetwork::V4("127.0.0.0/8".parse()?)]),
        );
        map.insert(
            "test2".to_owned(),
            ClientAaa::new()
                .allow_publish_to(&["news/#"])
                .deny_subscribe()
                .deny_broadcast()
                .allow_p2p_to(&["test"]),
        );
    }
    // put AAA map to the server config
    let config = ServerConfig::new().aaa_map(aaa_map);
    // spawn tcp server for external clients
    broker.spawn_tcp_server("0.0.0.0:7777", config).await?;
    // the map can be modified later at any time, however access controls are cached for clients
    // which are already connected
    loop {
        sleep(Duration::from_secs(5)).await;
        println!("forcing test2 disconnect");
        if let Err(e) = broker.force_disconnect("test2") {
            eprintln!("{}", e);
        }
    }
}