Skip to content

Client usage

Fibril has Rust, TypeScript, and Python clients for the same core broker surface:

  • connect with optional auth
  • publish with or without confirmation
  • delayed publish
  • manual acknowledgements
  • auto-ack convenience
  • message helpers for msgpack, JSON, raw bytes, and text payloads

The examples below are source-tree examples for the current pre-alpha API. They are useful for building against this repository, but not yet a stable package contract.

Parity note: all three clients cover the branch’s partition topology, owner-redirect, partition-key routing, exclusive consumer-group surface, and message TTL. The Rust client is the reference implementation. The TypeScript and Python clients mirror its behavior. The Python client also ships a synchronous facade (fibril.blocking.BlockingClient) over the same async core.

  • Rust: the workspace toolchain (current stable Rust).
  • TypeScript: Node.js 18 or newer, ESM.
  • Python: Python 3.11 or newer. The only runtime dependency is the PyPI msgpack package, installed automatically with the client.

For reconnect behavior and current limits, see reconnects.

use fibril_client::ClientOptions;
let client = ClientOptions::new()
.auth("fibril", "fibril")
.connect("127.0.0.1:9876")
.await?;

By default, the clients make one automatic reconnect attempt before a new operation if the previous engine is already closed. Disable that with disable_auto_reconnect() in Rust and Python, or disableAutoReconnect() in TypeScript. After a successful resume, the clients send their known subscription metadata to the broker and read the reconciliation result. Subscriptions that the broker confirms with keep continue on the existing stream. This does not replay in-flight publish operations. See reconnects for the current limits.

The default reconciliation policy is conservative. Rust can opt into restoring missing server-side subscriptions with reconnect_reconcile_policy(ReconcilePolicy::RestoreClientSubscriptions). TypeScript can do the same with withReconnectReconcilePolicy("restore_client_subscriptions"), and Python with ClientOptions(reconnect_reconcile_policy="restore_client_subscriptions").

The conservative default is the safest operational behavior. It keeps matching subscriptions, closes client streams the broker cannot prove are still valid, and drops server-side subscriptions the client no longer reports.

Use restore mode only when you want the client to ask the broker to recreate client-owned subscriptions that are missing after a successful resume. Restored subscriptions may receive a new server subscription id, which the clients remap internally.

use fibril_client::{ClientOptions, ReconcilePolicy};
let client = ClientOptions::new()
.reconnect_reconcile_policy(ReconcilePolicy::RestoreClientSubscriptions)
.connect("127.0.0.1:9876")
.await?;

Plain publish uses the common unconfirmed path. Confirmed publish waits for the broker to acknowledge the stored message and returns the topic offset. For pipelined confirmed publishing, send with a confirmation handle and await those handles later.

let publisher = client.publisher("email.send")?;
publisher
.publish("hello")
.await?;
let offset = publisher
.publish_confirmed("needs an offset")
.await?;
let confirmation = publisher
.publish_with_confirmation("pipeline me")
.await?;
let pipelined_offset = confirmation.confirmed().await?;

Delayed publish uses a distinct protocol frame and stores a not_before Unix-millisecond deadline. It does not add delay bytes to the common publish frame.

use std::time::Duration;
publisher
.publish_delayed("send later", Duration::from_secs(30))
.await?;
let offset = publisher
.publish_delayed_confirmed("send later and return offset", 30u64)
.await?;

Rust numeric delay helpers are seconds. std::time::Duration is also accepted.

Manual-ack consumers can requeue a message after a delay instead of making it ready immediately.

use std::time::Duration;
let msg = sub.recv().await.expect("message");
msg.retry_after(Duration::from_secs(30)).await?;

Rust numeric delay helpers are seconds, matching delayed publish.

Plain values are encoded as msgpack by default. Use explicit message helpers when you want JSON, raw bytes, text, or custom headers.

use fibril_client::NewMessage;
publisher
.publish(NewMessage::json(&serde_json::json!({
"id": 42,
"kind": "welcome",
}))?)
.await?;
publisher
.publish(
NewMessage::content("plain text")
.header("x-trace-id", "abc123"),
)
.await?;
publisher
.publish(NewMessage::raw(vec![1, 2, 3]))
.await?;

Fibril reserves fibril.* and stroma.* headers for system metadata. User code should not set headers with those prefixes. See metadata policy for the development note.

Manual acknowledgement is the primary processing model. Messages are leased while inflight. Consumers settle each message explicitly.

let mut sub = client
.subscribe("email.send")?
.group("workers")?
.prefetch(32)
.sub()
.await?;
while let Some(msg) = sub.recv().await {
let body = msg.content()?;
match send_email(body).await {
Ok(()) => {
msg.complete().await?;
}
Err(_) => {
msg.retry().await?;
}
}
}

retry() requeues immediately. Use retry_after(..) in Rust and Python or retryAfter(..) in TypeScript for delayed retry.

Dead-letter routing has two parts:

  • Operators configure the global DLQ target through the admin UI/API.
  • Applications may declare per-queue retry and DLQ policy through the client or fibrilctl.

For example, first configure a global DLQ target:

PUT /admin/api/global-dlq
{
"expected_version": 0,
"target": {
"tp": "_dlq.email",
"group": null
}
}

Then configure a source queue to use that global target after retries are exhausted:

use fibril_client::QueueConfig;
client
.declare_queue(
QueueConfig::new("email.send")?
.group("workers")?
.use_global_dead_letter_queue()
.max_retries(3),
)
.await?;

Application clients keep using normal publish/consume code:

let publisher = client.publisher_grouped("email.send", "workers")?;
publisher.publish_confirmed(NewMessage::json(&job)?).await?;
let mut sub = client
.subscribe("email.send")?
.group("workers")?
.prefetch(32)
.sub()
.await?;
while let Some(msg) = sub.recv().await {
if process(msg.deserialize()?).await.is_ok() {
msg.complete().await?;
} else {
msg.retry().await?;
}
}

Once the configured retry limit is exhausted, the broker routes the failed message to _dlq.email. Subscribe to that queue like any other queue when you want inspection or replay tooling.

deserialize() chooses a decoder from content-type. Missing content type defaults to msgpack.

#[derive(serde::Deserialize)]
struct Job {
id: u64,
}
let job: Job = msg.deserialize()?;
let raw: &[u8] = msg.raw();
let text: &str = msg.content()?;

Auto-ack subscriptions are a convenience mode that settles each delivery server-side as the broker sends it, so the consumer just reads messages. Use manual ack when processing correctness matters.

let mut sub = client
.subscribe("metrics")?
.prefetch(128)
.sub_auto_ack()
.await?;
while let Some(msg) = sub.recv().await {
observe(msg.deserialize::<Metric>()?);
}

A Plexus stream delivers every record to every consumer. Declare it with declare_plexus, publish with the ordinary publisher (the broker routes by channel kind), and consume with stream(...). Name the subscription to get a durable cursor that resumes after a restart and advances on ack. A stream subscription reads all partitions and fans them in.

use fibril_client::StreamConfig;
client
.declare_plexus(StreamConfig::new("events")?.partitions(4).retain_records(1_000_000))
.await?;
// Publish uses the normal publisher.
client.publisher("events")?.publish("hello").await?;
let mut sub = client
.stream("events")?
.durable("analytics")
.filter("region", "eu-*")
.sub()
.await?;
while let Some(msg) = sub.recv().await {
handle(msg.content()?);
msg.complete().await?; // advances the durable cursor
}

For a live tail without a durable cursor, drop .durable(...) and pick a start position: from_latest() / fromLatest() (the default), from_earliest(), from_offset(..), from_last(..), or from_time(..). Use sub_auto_ack() / subAutoAck() when you do not need explicit settlement.

Discovery is an opt-in surface: call routing() to get a routing view of the same connection, then subscribe_pattern(glob) to fan in across every work queue whose topic matches, or subscribe_stream_pattern(glob) for Plexus streams. The subscription keeps attaching channels that start matching later, so queues or streams declared after you subscribe are picked up without a reconnect. Each delivery is paired with the channel it came from. The glob is the same *-wildcard grammar as the header filter (no regex), and "*" matches everything. Manual (sub / .sub()) and auto-ack (sub_auto_ack / .subAutoAck()) variants both exist, and the routing view still publishes and subscribes normally, so it composes with reliable publishing.

let mut sub = client
.routing()
.subscribe_pattern("events.*")
.sub()
.await?;
while let Some((source, msg)) = sub.recv().await {
handle(&source.topic, msg.content()?);
msg.complete().await?;
}
client.shutdown().await;

For code not built on asyncio, the Python package ships a synchronous facade. It runs the async client on a background event-loop thread and bridges each call, so it is the same client behind one source of truth rather than a second implementation. Subscriptions become ordinary iterators. The Async/Blocking toggle on the examples above shows the synchronous form of each call, and the full shape is:

from fibril.blocking import BlockingClient
with BlockingClient.connect("127.0.0.1:9876") as client:
client.publisher("email.send").publish_confirmed({"body": "hello"})
sub = client.subscribe("email.send").group("workers").sub()
for msg in sub:
process(msg.deserialize())
msg.complete()
sub.close()