Broker
The broker is the central busrt instance, which routes messages between applications. Being the broker is the only way to exchange messages between local threads, so usually the broker is embedded into the central heaviest application, while all plug-ins, services and additional components talk with the broker using inter-process communications (UNIX sockets or TCP).
busrt broker is currently implemented in Rust only.
Broker API
When rpc feature is enabled, the following default RPC methods are available at .broker after broker.init_default_core_rpc method is called:
test() - broker test (ok: true)
info() - broker info (author and version)
stats() - broker statistics
client.list() - list all connected clients
benchmark.test(payload) - test method, returns the payload as-is
The payload exchange format (call params / replies) is MessagePack.
Stand-alone broker server
To build a stand-alone broker server, use the command:
cargo build --features server,rpc
The rpc feature is optional.
Embedded broker
Note
If compiling for “musl” target, it is strongly recommended to replace the default MUSL allocator with 3rd party, e.g. with jemallocator to keep the broker fast.
Example of a broker with inter-thread communications and external clients:
// Demo of inter-thread communication (with no RPC layer) with a UNIX socket for external clients
use busrt::broker::{Broker, ServerConfig};
use busrt::client::AsyncClient;
use busrt::QoS;
use std::time::Duration;
use tokio::time::sleep;
const SLEEP_STEP: Duration = Duration::from_secs(1);
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// create a new broker instance
let mut broker = Broker::new();
// init the default broker RPC API, optional
broker.init_default_core_rpc().await?;
// spawn unix server for external clients
broker
.spawn_unix_server("/tmp/busrt.sock", ServerConfig::default())
.await?;
// worker 1 will send to worker2 direct "hello" message
let mut client1 = broker.register_client("worker.1").await?;
// worker 2 will listen to incoming frames only
let mut client2 = broker.register_client("worker.2").await?;
// worker 3 will send broadcasts to all workers, an external client with a name "worker.N" can
// connect the broker via unix socket and receive them as well or send a message to "worker.2"
// to print it
let mut client3 = broker.register_client("worker.3").await?;
let rx = client2.take_event_channel().unwrap();
tokio::spawn(async move {
loop {
client1
.send("worker.2", "hello".as_bytes().into(), QoS::No)
.await
.unwrap();
sleep(SLEEP_STEP).await;
}
});
tokio::spawn(async move {
loop {
client3
.send_broadcast(
"worker.*",
"this is a broadcast message".as_bytes().into(),
QoS::No,
)
.await
.unwrap();
sleep(SLEEP_STEP).await;
}
});
while let Ok(frame) = rx.recv().await {
println!(
"{}: {}",
frame.sender(),
std::str::from_utf8(frame.payload()).unwrap_or("something unreadable")
);
}
Ok(())
}
Security model
An optional simple security model can be implemented in an embedded broker. Note that any security model may slowdown communications, so it is usually a good idea to move semi-trusted clients to a dedicated server socket, as a broker can have multiple ones.
When a model is applied, only clients with the names listed in AAA map can connect. The clients can be restricted to:
connect only from specified hosts/networks
exchange p2p messages with a specified list of peers only
denied to publish
denied to subscribe
denied to broadcast
Important things to know:
busrt::broker::AaaMap is a mutex-protected HashMap, which can be modified on-the-flow
when a client is connected, its AAA settings are CLONED and not affected with any modifications, so it is usually a good idea to call Broker::force_disconnect method when AAA settings are altered or removed
Example:
// Demo of a broker with AAA
//
// The broker listens on 0.0.0.0:7777
//
// Accepted client names: test (from localhost only), test2 (from any)
//
// test is allowed to do anything
//
// test2 is allowed to send direct messages to "test" only and publish to subtopics of "news"
//
// The broker force-disconnects the client named "test2" every 5 seconds
use busrt::broker::{AaaMap, Broker, ClientAaa, ServerConfig};
use ipnetwork::IpNetwork;
use std::time::Duration;
use tokio::time::sleep;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// create a new broker instance
let mut broker = Broker::new();
broker.init_default_core_rpc().await?;
// create AAA map
let aaa_map = AaaMap::default();
{
let mut map = aaa_map.lock();
map.insert(
"test".to_owned(),
ClientAaa::new().hosts_allow(vec![IpNetwork::V4("127.0.0.0/8".parse()?)]),
);
map.insert(
"test2".to_owned(),
ClientAaa::new()
.allow_publish_to(&["news/#"])
.deny_subscribe()
.deny_broadcast()
.allow_p2p_to(&["test"]),
);
}
// put AAA map to the server config
let config = ServerConfig::new().aaa_map(aaa_map);
// spawn tcp server for external clients
broker.spawn_tcp_server("0.0.0.0:7777", config).await?;
// the map can be modified later at any time, however access controls are cached for clients
// which are already connected
loop {
sleep(Duration::from_secs(5)).await;
println!("forcing test2 disconnect");
if let Err(e) = broker.force_disconnect("test2") {
eprintln!("{}", e);
}
}
}