Client usage
Fibril has Rust, TypeScript, and Python clients for the same core broker surface:
- connect with optional auth
- publish with or without confirmation
- delayed publish
- manual acknowledgements
- auto-ack convenience
- message helpers for msgpack, JSON, raw bytes, and text payloads
The examples below are source-tree examples for the current pre-alpha API. They are useful for building against this repository, but not yet a stable package contract.
Parity note: all three clients cover the branch’s partition topology,
owner-redirect, partition-key routing, exclusive consumer-group surface, and
message TTL. The Rust client is the reference implementation. The TypeScript and
Python clients mirror its behavior. The Python client also ships a synchronous
facade (fibril.blocking.BlockingClient) over the same async core.
Requirements
Section titled “Requirements”- Rust: the workspace toolchain (current stable Rust).
- TypeScript: Node.js 18 or newer, ESM.
- Python: Python 3.11 or newer. The only runtime dependency is the PyPI
msgpackpackage, installed automatically with the client.
For reconnect behavior and current limits, see reconnects.
Connect
Section titled “Connect”use fibril_client::ClientOptions;
let client = ClientOptions::new() .auth("fibril", "fibril") .connect("127.0.0.1:9876") .await?;import { ClientOptions } from "@fibril/client";
const client = await new ClientOptions() .withAuth("fibril", "fibril") .connect("127.0.0.1:9876");from fibril import ClientOptions
client = await ClientOptions().with_auth("fibril", "fibril").connect("127.0.0.1:9876")from fibril import ClientOptionsfrom fibril.blocking import BlockingClient
client = BlockingClient.connect( "127.0.0.1:9876", ClientOptions().with_auth("fibril", "fibril"))By default, the clients make one automatic reconnect attempt before a new
operation if the previous engine is already closed. Disable that with
disable_auto_reconnect() in Rust and Python, or disableAutoReconnect() in
TypeScript. After a successful resume, the clients send their known subscription
metadata to the broker and read the reconciliation result. Subscriptions that the
broker confirms with keep continue on the existing stream. This does not replay
in-flight publish operations. See reconnects
for the current limits.
The default reconciliation policy is conservative. Rust can opt into restoring
missing server-side subscriptions with
reconnect_reconcile_policy(ReconcilePolicy::RestoreClientSubscriptions).
TypeScript can do the same with
withReconnectReconcilePolicy("restore_client_subscriptions"), and Python with
ClientOptions(reconnect_reconcile_policy="restore_client_subscriptions").
Reconnect Policy
Section titled “Reconnect Policy”The conservative default is the safest operational behavior. It keeps matching subscriptions, closes client streams the broker cannot prove are still valid, and drops server-side subscriptions the client no longer reports.
Use restore mode only when you want the client to ask the broker to recreate client-owned subscriptions that are missing after a successful resume. Restored subscriptions may receive a new server subscription id, which the clients remap internally.
use fibril_client::{ClientOptions, ReconcilePolicy};
let client = ClientOptions::new() .reconnect_reconcile_policy(ReconcilePolicy::RestoreClientSubscriptions) .connect("127.0.0.1:9876") .await?;import { ClientOptions, type ReconcilePolicy } from "@fibril/client"
const policy: ReconcilePolicy = "restore_client_subscriptions"const client = await new ClientOptions() .withReconnectReconcilePolicy(policy) .connect("127.0.0.1:9876")from fibril import ClientOptions
client = await ClientOptions( reconnect_reconcile_policy="restore_client_subscriptions").connect("127.0.0.1:9876")from fibril import ClientOptionsfrom fibril.blocking import BlockingClient
client = BlockingClient.connect( "127.0.0.1:9876", ClientOptions(reconnect_reconcile_policy="restore_client_subscriptions"),)Publish
Section titled “Publish”Plain publish uses the common unconfirmed path. Confirmed publish waits for the broker to acknowledge the stored message and returns the topic offset. For pipelined confirmed publishing, send with a confirmation handle and await those handles later.
let publisher = client.publisher("email.send")?;
publisher .publish("hello") .await?;
let offset = publisher .publish_confirmed("needs an offset") .await?;
let confirmation = publisher .publish_with_confirmation("pipeline me") .await?;let pipelined_offset = confirmation.confirmed().await?;const publisher = client.publisher("email.send");
await publisher.publish("hello");
const offset = await publisher.publishConfirmed("needs an offset");
const confirmation = await publisher.publishWithConfirmation("pipeline me");const pipelinedOffset = await confirmation.confirmed();publisher = client.publisher("email.send")
await publisher.publish("hello")
offset = await publisher.publish_confirmed("needs an offset")
confirmation = await publisher.publish_with_confirmation("pipeline me")pipelined_offset = await confirmation.confirmed()publisher = client.publisher("email.send")
publisher.publish("hello")
offset = publisher.publish_confirmed("needs an offset")
confirmation = publisher.publish_with_confirmation("pipeline me")pipelined_offset = confirmation.confirmed()Delayed Publish
Section titled “Delayed Publish”Delayed publish uses a distinct protocol frame and stores a not_before Unix-millisecond deadline. It does not add delay bytes to the common publish frame.
use std::time::Duration;
publisher .publish_delayed("send later", Duration::from_secs(30)) .await?;
let offset = publisher .publish_delayed_confirmed("send later and return offset", 30u64) .await?;Rust numeric delay helpers are seconds. std::time::Duration is also accepted.
await publisher.publishDelayed("send later", 30_000);
const offset = await publisher.publishDelayedConfirmed( "send later and return offset", new Date(Date.now() + 30_000),);TypeScript numeric delays are milliseconds. Passing a Date uses that absolute deadline.
await publisher.publish_delayed("send later", 30)
offset = await publisher.publish_delayed_confirmed( "send later and return offset", 30)publisher.publish_delayed("send later", 30)
offset = publisher.publish_delayed_confirmed("send later and return offset", 30)Python delays are seconds (a float or a datetime.timedelta).
Delayed Retry
Section titled “Delayed Retry”Manual-ack consumers can requeue a message after a delay instead of making it ready immediately.
use std::time::Duration;
let msg = sub.recv().await.expect("message");msg.retry_after(Duration::from_secs(30)).await?;Rust numeric delay helpers are seconds, matching delayed publish.
const msg = await sub.recv();await msg.retryAfter(30_000);TypeScript numeric delays are milliseconds. Passing a Date uses that absolute retry deadline.
msg = await sub.recv()await msg.retry_after(30)msg = sub.recv()msg.retry_after(30)Python delays are seconds (a float or a datetime.timedelta).
Message Payloads
Section titled “Message Payloads”Plain values are encoded as msgpack by default. Use explicit message helpers when you want JSON, raw bytes, text, or custom headers.
use fibril_client::NewMessage;
publisher .publish(NewMessage::json(&serde_json::json!({ "id": 42, "kind": "welcome", }))?) .await?;
publisher .publish( NewMessage::content("plain text") .header("x-trace-id", "abc123"), ) .await?;
publisher .publish(NewMessage::raw(vec![1, 2, 3])) .await?;import { NewMessage } from "@fibril/client";
await publisher.publish( NewMessage.json({ id: 42, kind: "welcome", }),);
await publisher.publish( NewMessage.content("plain text") .header("x-trace-id", "abc123"),);
await publisher.publish(NewMessage.raw(new Uint8Array([1, 2, 3])));from fibril import NewMessage
await publisher.publish(NewMessage.json({"id": 42, "kind": "welcome"}))
await publisher.publish( NewMessage.content("plain text").header("x-trace-id", "abc123"))
await publisher.publish(NewMessage.raw(bytes([1, 2, 3])))from fibril import NewMessage
publisher.publish(NewMessage.json({"id": 42, "kind": "welcome"}))
publisher.publish( NewMessage.content("plain text").header("x-trace-id", "abc123"))
publisher.publish(NewMessage.raw(bytes([1, 2, 3])))Fibril reserves fibril.* and stroma.* headers for system metadata. User code should not set headers with those prefixes. See metadata policy for the development note.
Manual Acknowledgements
Section titled “Manual Acknowledgements”Manual acknowledgement is the primary processing model. Messages are leased while inflight. Consumers settle each message explicitly.
let mut sub = client .subscribe("email.send")? .group("workers")? .prefetch(32) .sub() .await?;
while let Some(msg) = sub.recv().await { let body = msg.content()?;
match send_email(body).await { Ok(()) => { msg.complete().await?; } Err(_) => { msg.retry().await?; } }}const sub = await client .subscribe("email.send") .group("workers") .prefetch(32) .sub();
for await (const msg of sub) { const body = msg.content();
try { await sendEmail(body); await msg.complete(); } catch { await msg.retry(); }}sub = await client.subscribe("email.send").group("workers").prefetch(32).sub()
async for msg in sub: body = msg.text()
try: await send_email(body) await msg.complete() except Exception: await msg.retry()sub = client.subscribe("email.send").group("workers").prefetch(32).sub()
for msg in sub: body = msg.text()
try: send_email(body) msg.complete() except Exception: msg.retry()retry() requeues immediately. Use retry_after(..) in Rust and Python or retryAfter(..) in TypeScript for delayed retry.
Dead Letter Workflow
Section titled “Dead Letter Workflow”Dead-letter routing has two parts:
- Operators configure the global DLQ target through the admin UI/API.
- Applications may declare per-queue retry and DLQ policy through the client or
fibrilctl.
For example, first configure a global DLQ target:
PUT /admin/api/global-dlq{ "expected_version": 0, "target": { "tp": "_dlq.email", "group": null }}Then configure a source queue to use that global target after retries are exhausted:
use fibril_client::QueueConfig;
client .declare_queue( QueueConfig::new("email.send")? .group("workers")? .use_global_dead_letter_queue() .max_retries(3), ) .await?;import { QueueConfig } from "@fibril/client";
await client.declareQueue( new QueueConfig("email.send") .group("workers") .useGlobalDeadLetterQueue() .maxRetries(3),);from fibril import QueueConfig
await client.declare_queue( QueueConfig("email.send") .group("workers") .use_global_dead_letter_queue() .max_retries(3))from fibril import QueueConfig
client.declare_queue( QueueConfig("email.send") .group("workers") .use_global_dead_letter_queue() .max_retries(3))Application clients keep using normal publish/consume code:
let publisher = client.publisher_grouped("email.send", "workers")?;publisher.publish_confirmed(NewMessage::json(&job)?).await?;
let mut sub = client .subscribe("email.send")? .group("workers")? .prefetch(32) .sub() .await?;
while let Some(msg) = sub.recv().await { if process(msg.deserialize()?).await.is_ok() { msg.complete().await?; } else { msg.retry().await?; }}const publisher = client.publisherGrouped("email.send", "workers");await publisher.publishConfirmed(NewMessage.json(job));
const sub = await client .subscribe("email.send") .group("workers") .prefetch(32) .sub();
for await (const msg of sub) { if (await process(msg.deserialize())) { await msg.complete(); } else { await msg.retry(); }}publisher = client.publisher_grouped("email.send", "workers")await publisher.publish_confirmed(NewMessage.json(job))
sub = await client.subscribe("email.send").group("workers").prefetch(32).sub()
async for msg in sub: if await process(msg.deserialize()): await msg.complete() else: await msg.retry()publisher = client.publisher_grouped("email.send", "workers")publisher.publish_confirmed(NewMessage.json(job))
sub = client.subscribe("email.send").group("workers").prefetch(32).sub()
for msg in sub: if process(msg.deserialize()): msg.complete() else: msg.retry()Once the configured retry limit is exhausted, the broker routes the failed message to _dlq.email. Subscribe to that queue like any other queue when you want inspection or replay tooling.
Decode Messages
Section titled “Decode Messages”deserialize() chooses a decoder from content-type. Missing content type defaults to msgpack.
#[derive(serde::Deserialize)]struct Job { id: u64,}
let job: Job = msg.deserialize()?;let raw: &[u8] = msg.raw();let text: &str = msg.content()?;type Job = { id: number;};
const job = msg.deserialize<Job>();const raw = msg.raw();const text = msg.content();job = msg.deserialize()raw = msg.raw()text = msg.text()Auto Ack
Section titled “Auto Ack”Auto-ack subscriptions are a convenience mode that settles each delivery server-side as the broker sends it, so the consumer just reads messages. Use manual ack when processing correctness matters.
let mut sub = client .subscribe("metrics")? .prefetch(128) .sub_auto_ack() .await?;
while let Some(msg) = sub.recv().await { observe(msg.deserialize::<Metric>()?);}const sub = await client .subscribe("metrics") .prefetch(128) .subAutoAck();
for await (const msg of sub) { observe(msg.deserialize<Metric>());}sub = await client.subscribe("metrics").prefetch(128).sub_auto_ack()
async for msg in sub: observe(msg.deserialize())sub = client.subscribe("metrics").prefetch(128).sub_auto_ack()
for msg in sub: observe(msg.deserialize())Plexus streams (fan-out)
Section titled “Plexus streams (fan-out)”A Plexus stream delivers every record to
every consumer. Declare it with declare_plexus, publish with the ordinary
publisher (the broker routes by channel kind), and consume with stream(...).
Name the subscription to get a durable cursor that resumes after a restart and
advances on ack. A stream subscription reads all partitions and fans them in.
use fibril_client::StreamConfig;
client .declare_plexus(StreamConfig::new("events")?.partitions(4).retain_records(1_000_000)) .await?;
// Publish uses the normal publisher.client.publisher("events")?.publish("hello").await?;
let mut sub = client .stream("events")? .durable("analytics") .filter("region", "eu-*") .sub() .await?;
while let Some(msg) = sub.recv().await { handle(msg.content()?); msg.complete().await?; // advances the durable cursor}import { StreamConfig } from "@fibril/client";
await client.declarePlexus( new StreamConfig("events").partitions(4).retainRecords(1_000_000),);
await client.publisher("events").publish("hello");
const sub = await client .stream("events") .durable("analytics") .filter("region", "eu-*") .sub();
for await (const msg of sub) { handle(msg.content()); await msg.complete(); // advances the durable cursor}from fibril import StreamConfig
await client.declare_plexus(StreamConfig("events").partitions(4).retain_records(1_000_000))
await client.publisher("events").publish("hello")
sub = await client.stream("events").durable("analytics").filter("region", "eu-*").sub()
async for msg in sub: handle(msg.text()) await msg.complete() # advances the durable cursorfrom fibril import StreamConfig
client.declare_plexus(StreamConfig("events").partitions(4).retain_records(1_000_000))
client.publisher("events").publish("hello")
sub = client.stream("events").durable("analytics").filter("region", "eu-*").sub()
for msg in sub: handle(msg.text()) msg.complete() # advances the durable cursorFor a live tail without a durable cursor, drop .durable(...) and pick a start
position: from_latest() / fromLatest() (the default), from_earliest(),
from_offset(..), from_last(..), or from_time(..). Use sub_auto_ack() /
subAutoAck() when you do not need explicit settlement.
Pattern subscribe and discovery
Section titled “Pattern subscribe and discovery”Discovery is an opt-in surface: call routing() to get a routing view of the
same connection, then subscribe_pattern(glob) to fan in across every work queue
whose topic matches, or subscribe_stream_pattern(glob) for Plexus streams. The
subscription keeps attaching channels that start matching later, so queues or
streams declared after you subscribe are picked up without a reconnect. Each
delivery is paired with the channel it came from. The glob is the same
*-wildcard grammar as the header filter (no regex), and "*" matches everything.
Manual (sub / .sub()) and auto-ack (sub_auto_ack / .subAutoAck())
variants both exist, and the routing view still publishes and subscribes
normally, so it composes with reliable publishing.
let mut sub = client .routing() .subscribe_pattern("events.*") .sub() .await?;
while let Some((source, msg)) = sub.recv().await { handle(&source.topic, msg.content()?); msg.complete().await?;}const sub = await client.routing().subscribePattern("events.*").sub();
for await (const { source, message } of sub) { handle(source.topic, message.content()); await message.complete();}sub = await client.routing().subscribe_pattern("events.*").sub()
async for item in sub: handle(item.source.topic, item.message.text()) await item.message.complete()sub = client.routing().subscribe_pattern("events.*").sub()
for item in sub: handle(item.source.topic, item.message.text()) item.message.complete()Shutdown
Section titled “Shutdown”client.shutdown().await;await client.shutdown();Blocking Python client
Section titled “Blocking Python client”For code not built on asyncio, the Python package ships a synchronous facade. It runs the async client on a background event-loop thread and bridges each call, so it is the same client behind one source of truth rather than a second implementation. Subscriptions become ordinary iterators. The Async/Blocking toggle on the examples above shows the synchronous form of each call, and the full shape is:
from fibril.blocking import BlockingClient
with BlockingClient.connect("127.0.0.1:9876") as client: client.publisher("email.send").publish_confirmed({"body": "hello"})
sub = client.subscribe("email.send").group("workers").sub() for msg in sub: process(msg.deserialize()) msg.complete() sub.close()