Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Pub/Sub

speare includes a built-in publish/subscribe system shared across the entire Node tree. It lets actors communicate through named topics without needing direct Handle references. Publishers and subscribers are fully decoupled – a publisher does not know who is listening, and a subscriber does not know who is publishing.

Subscribing to a Topic

Call ctx.subscribe::<T>(topic) inside init or handle to receive messages published to that topic. The type parameter T is the message type for the topic. Published messages are cloned, converted via From<T> into the actor’s Msg type, and delivered to its mailbox.

use speare::*;
use derive_more::From;

#[derive(Clone)]
struct OrderPlaced {
    id: u32,
}

struct OrderSubscriber;

#[derive(From)]
enum OrderSubMsg {
    Order(OrderPlaced),
}

impl Actor for OrderSubscriber {
    type Props = ();
    type Msg = OrderSubMsg;
    type Err = ();

    async fn init(ctx: &mut Ctx<Self>) -> Result<Self, Self::Err> {
        ctx.subscribe::<OrderPlaced>("orders").unwrap();
        Ok(OrderSubscriber)
    }

    async fn handle(&mut self, msg: Self::Msg, _ctx: &mut Ctx<Self>) -> Result<(), Self::Err> {
        match msg {
            OrderSubMsg::Order(o) => println!("Received order {}", o.id),
        }

        Ok(())
    }
}

The From derive is required – it is how speare converts the topic’s message type into the actor’s Msg enum. This is the same From trick described in the Communication chapter.

Publishing to a Topic

Call ctx.publish(topic, msg) to send a message to all subscribers of a topic. The message is cloned for each subscriber.

ctx.publish("orders", OrderPlaced { id: 42 }).unwrap();

Publishing to a topic with no subscribers is a no-op and returns Ok(()).

You can also publish from a Node directly:

node.publish("orders", OrderPlaced { id: 42 }).unwrap();

Type Safety

Each topic is locked to a single message type. The first subscribe or publish call on a topic sets its type. Any subsequent call with a different type returns Err(PubSubError::TypeMismatch { .. }).

// First subscribe locks "orders" to OrderPlaced
ctx.subscribe::<OrderPlaced>("orders").unwrap();

// This fails -- "orders" is already locked to OrderPlaced, not String
let result = ctx.subscribe::<String>("orders");
assert!(matches!(result, Err(PubSubError::TypeMismatch { .. })));

The same applies to publish:

// Topic "orders" is locked to OrderPlaced by a subscriber
let result = ctx.publish("orders", "not an OrderPlaced".to_string());
assert!(matches!(result, Err(PubSubError::TypeMismatch { .. })));

Multiple Topics

An actor can subscribe to multiple topics. Each subscription needs a corresponding variant in the actor’s Msg enum with a From implementation.

#[derive(Clone)]
struct PaymentProcessed {
    amount: u32,
}

#[derive(From)]
enum MultiMsg {
    Order(OrderPlaced),
    Payment(PaymentProcessed),
}

impl Actor for MultiSubscriber {
    type Props = ();
    type Msg = MultiMsg;
    type Err = ();

    async fn init(ctx: &mut Ctx<Self>) -> Result<Self, Self::Err> {
        ctx.subscribe::<OrderPlaced>("orders").unwrap();
        ctx.subscribe::<PaymentProcessed>("payments").unwrap();
        Ok(MultiSubscriber)
    }

    async fn handle(&mut self, msg: Self::Msg, _ctx: &mut Ctx<Self>) -> Result<(), Self::Err> {
        match msg {
            MultiMsg::Order(o) => println!("Order {}", o.id),
            MultiMsg::Payment(p) => println!("Payment ${}", p.amount),
        }

        Ok(())
    }
}

Dynamic Subscription

Subscribing is not limited to init. You can subscribe from handle as well, letting an actor join a topic in response to a message:

async fn handle(&mut self, msg: Self::Msg, ctx: &mut Ctx<Self>) -> Result<(), Self::Err> {
    match msg {
        Msg::StartListening => {
            ctx.subscribe::<OrderPlaced>("orders").unwrap();
        }

        Msg::Order(o) => {
            println!("Got order {}", o.id);
        }
    }

    Ok(())
}

Automatic Cleanup

Subscriptions are automatically cleaned up when an actor stops – whether by error, parent shutdown, or an explicit handle.stop() call. Once stopped, a subscriber will no longer receive messages from any topic it was subscribed to.

Subscriptions are also cleaned up before a restart. When an actor with Supervision::Restart fails and restarts, its old subscriptions are removed before init runs again. This means init can re-subscribe without causing duplicate deliveries.

Error Handling

All pub/sub operations return Result<(), PubSubError>. The enum has two variants:

pub enum PubSubError {
    /// Topic exists with a different message type.
    TypeMismatch { topic: String },
    /// Internal lock poisoned (rare -- indicates a panic elsewhere).
    PoisonErr,
}

PubSubError implements Display and Error, so it works with ? and standard error handling.

In practice, TypeMismatch is the variant you will encounter. PoisonErr only occurs if another thread panicked while holding the pub/sub lock, which should not happen under normal operation.