Introduction
speare is a thin abstraction over tokio tasks and flume channels for actor-based concurrency in Rust. Each actor lives on its own tokio task, owns its data, and communicates exclusively through messages.
Why Actors?
If you have written async Rust before, you have likely run into Arc<Mutex<T>> for sharing state across tasks. Actors offer an alternative:
- No shared mutable state. Each actor owns its data. No
Arc<Mutex<T>>, no lock contention, no deadlocks from forgetting lock ordering. - Natural concurrency boundaries. One actor = one task = one mailbox. Concurrency is modeled as independent units exchanging messages rather than threads contending over locks.
- Fault isolation. When an actor fails, its parent decides what happens: stop, restart, or ignore the error. A bug in one part of the system does not have to bring everything down.
- Modular design. Actors enforce separation of concerns. Each actor has a single responsibility, a defined message protocol, and a clear lifecycle.
A Minimal Example
Below is a complete Counter actor. It accepts three kinds of messages: add, subtract, and print.
use speare::*;
use std::time::Duration;
use tokio::time;
struct Counter {
count: u32,
}
enum CounterMsg {
Add(u32),
Subtract(u32),
Print,
}
impl Actor for Counter {
type Props = ();
type Msg = CounterMsg;
type Err = ();
async fn init(_ctx: &mut Ctx<Self>) -> Result<Self, Self::Err> {
Ok(Counter { count: 0 })
}
async fn handle(&mut self, msg: Self::Msg, _ctx: &mut Ctx<Self>) -> Result<(), Self::Err> {
match msg {
CounterMsg::Add(n) => self.count += n,
CounterMsg::Subtract(n) => self.count -= n,
CounterMsg::Print => println!("Count is {}", self.count),
}
Ok(())
}
}
#[tokio::main]
async fn main() {
let mut node = Node::default();
let counter = node.actor::<Counter>(()).spawn();
counter.send(CounterMsg::Add(5));
counter.send(CounterMsg::Subtract(2));
counter.send(CounterMsg::Print); // will print 3
// Give the actor time to process messages before the program exits.
time::sleep(Duration::from_millis(10)).await;
}
What Just Happened?
Two types drive the example above: Node and Handle.
Node is the root of an actor hierarchy, created by calling Node::default(). You spawn actors from it with node.actor::<MyActor>(props).spawn(), where props is whatever data the actor needs at initialization time (here just ()). The Node owns all top-level actors. When dropped, it sends stop signals to its children, but the cleanup runs in a background tokio task that may not complete if the runtime is also shutting down. For guaranteed cleanup, call node.shutdown().await before the Node is dropped.
.spawn() returns a Handle<Msg>. It is a lightweight, cloneable reference to a running actor. You can use it to send messages (handle.send(msg)), stop the actor (handle.stop()), or check if it is still alive (handle.is_alive()). Handles can be passed freely between actors and tasks.
The Actor trait itself requires two things: an init function that constructs the actor, and a handle function that processes each incoming message. Everything else – lifecycle hooks, supervision, streams – is optional.
What This Book Covers
- The Actor – the
Actortrait,Props,Msg,Err, andCtxin detail. - Communication – fire-and-forget messages, request-response, and the
From-based send ergonomics. - Lifecycle & Supervision – init, exit, restart strategies, backoff, and the
watchcallback. - Concurrency Patterns – background tasks, streams, intervals, and
SourceSet. - Service Discovery – the actor registry,
spawn_registered,spawn_named, and cross-actor lookup.
The Actor
An actor is a struct that lives on its own tokio task, owns its data, and processes messages one at a time. There is no shared mutable state between actors – they communicate exclusively through message passing. speare manages the lifecycle, communication, and supervision of your actors so you can focus on business logic rather than concurrency plumbing.
A Minimal Actor
Here is a complete Greeter actor that prints a hello message for each name it receives:
use speare::*;
struct Greeter;
enum GreeterMsg {
Greet(String),
}
impl Actor for Greeter {
type Props = ();
type Msg = GreeterMsg;
type Err = ();
async fn init(_ctx: &mut Ctx<Self>) -> Result<Self, Self::Err> {
Ok(Greeter)
}
async fn handle(&mut self, msg: Self::Msg, _ctx: &mut Ctx<Self>) -> Result<(), Self::Err> {
match msg {
GreeterMsg::Greet(name) => println!("Hello, {name}!"),
}
Ok(())
}
}
The Actor trait is the core abstraction in speare. You implement it on a struct, define a few associated types, and provide at minimum an init function. That is enough to have a working actor.
init constructs the actor’s state. It runs when the actor is first spawned and again on every restart. handle is called once for each incoming message, and messages are always processed sequentially – there is no concurrent access to &mut self.
Associated Types
The Actor trait requires three associated types.
Props: Send + 'static
Props is immutable configuration or dependencies your actor needs. It is passed in at spawn time and remains available across restarts via ctx.props(). When your actor does not need configuration, use ().
Msg: Send + 'static
Msg is the message type your actor processes. Typically this is an enum with one variant per command the actor understands.
Err: Send + Sync + 'static
Err is the error type. Returning Err from init or handle triggers the supervision strategy configured by the parent.
Trait Functions
init (required)
async fn init(ctx: &mut Ctx<Self>) -> Result<Self, Self::Err>;
The only required function. Called when the actor is first spawned and again on every restart. Its job is to construct and return the actor’s initial state. You have access to the full Ctx here, so you can read props, clear stale messages from a previous life, or even spawn child actors.
If init returns Err, the actor never enters the message loop. The supervision strategy is consulted and, if applicable, init will be called again on restart.
handle (optional)
async fn handle(&mut self, msg: Self::Msg, ctx: &mut Ctx<Self>) -> Result<(), Self::Err>;
Called once for every message the actor receives. Messages are processed sequentially – the next message is not dequeued until the current handle call completes. This means you never need a lock on the actor’s state.
Returning Err triggers the supervision strategy. With Supervision::Stop, the actor shuts down. With Supervision::Restart, it re-runs init. With Supervision::Resume, it simply continues to the next message.
exit (optional)
async fn exit(this: Option<Self>, reason: ExitReason<Self>, ctx: &mut Ctx<Self>);
A cleanup hook called when the actor stops, restarts, or fails to initialize. The this parameter is Some if the actor was successfully constructed and None if init itself failed.
ExitReason tells you why the actor is exiting:
ExitReason::Handle– the actor was stopped manually viahandle.stop().ExitReason::Parent– the parent actor stopped or restarted this actor.ExitReason::Err(e)– the actor returned an error frominitorhandle.
sources (optional)
async fn sources(&self, ctx: &Ctx<Self>) -> Result<impl Sources<Self>, Self::Err>;
Sets up additional message sources such as streams and intervals. This is covered in Concurrency Patterns.
The Context (Ctx)
Every trait function receives a reference to Ctx<Self>, which provides access to the actor’s environment. Some commonly used methods are:
ctx.props()
Returns &Props. This is the immutable configuration passed at spawn time. It persists across restarts, so you can always rely on it to rebuild your actor’s state inside init.
async fn init(ctx: &mut Ctx<Self>) -> Result<Self, Self::Err> {
Ok(Counter {
count: ctx.props().initial_count,
})
}
ctx.this()
Returns &Handle<Self::Msg>, a handle to the current actor. This is useful for sending messages to yourself (for example, scheduling a delayed self-message) or for passing your handle to a child actor so it can communicate back.
async fn init(ctx: &mut Ctx<Self>) -> Result<Self, Self::Err> {
let my_handle = ctx.this().clone();
// pass my_handle to a child, or send yourself a message:
my_handle.send(MyMsg::Bootstrap);
Ok(MyActor)
}
ctx.actor()
Spawns a child actor supervised by the current actor. The child type is passed as a generic parameter and its props as the argument. Returns a SpawnBuilder that lets you configure a supervision strategy before calling .spawn(), which returns a Handle<Child::Msg> you can use to send messages to the child.
Children are automatically stopped when the parent stops.
async fn init(ctx: &mut Ctx<Self>) -> Result<Self, Self::Err> {
let worker_handle = ctx.actor::<Worker>(WorkerProps { id: 1 })
.supervision(Supervision::Restart {
max: Limit::Amount(3),
backoff: Backoff::None,
})
.spawn();
Ok(MyActor { worker_handle })
}
ctx.clear_mailbox()
Drains all pending messages from the actor’s mailbox. This is most useful inside init during a restart – if the actor crashed because of a bad message, you may want to throw away whatever was queued up and start fresh.
async fn init(ctx: &mut Ctx<Self>) -> Result<Self, Self::Err> {
ctx.clear_mailbox();
Ok(MyActor::default())
}
Full Example
The following example ties everything together. A Counter actor uses custom CounterProps for configuration, a CounterErr for domain errors, and implements init, handle, and exit.
use speare::*;
struct Counter {
count: u32,
}
struct CounterProps {
initial_count: u32,
max_count: u32,
}
enum CounterMsg {
Add(u32),
Subtract(u32),
}
#[derive(Debug)]
enum CounterErr {
MaxCountExceeded,
}
impl Actor for Counter {
type Props = CounterProps;
type Msg = CounterMsg;
type Err = CounterErr;
async fn init(ctx: &mut Ctx<Self>) -> Result<Self, Self::Err> {
println!("Counter starting with {}", ctx.props().initial_count);
Ok(Counter {
count: ctx.props().initial_count,
})
}
async fn exit(this: Option<Self>, reason: ExitReason<Self>, _ctx: &mut Ctx<Self>) {
match &reason {
ExitReason::Err(e) => println!("Counter failed: {e:?}"),
ExitReason::Handle => println!("Counter stopped manually"),
ExitReason::Parent => println!("Counter stopped by parent"),
}
if let Some(counter) = this {
println!("Final count was: {}", counter.count);
}
}
async fn handle(&mut self, msg: Self::Msg, ctx: &mut Ctx<Self>) -> Result<(), Self::Err> {
match msg {
CounterMsg::Add(n) => {
self.count += n;
if self.count > ctx.props().max_count {
return Err(CounterErr::MaxCountExceeded);
}
}
CounterMsg::Subtract(n) => {
self.count = self.count.saturating_sub(n);
}
}
Ok(())
}
}
init reads initial_count from props to set the starting state. handle performs arithmetic and returns CounterErr::MaxCountExceeded if the count goes over the configured max_count – this will trigger whatever supervision strategy the parent has set. exit logs why the actor stopped and, if the actor was successfully created, prints the final count.
Props survive restarts. If this actor is supervised with Supervision::Restart, init will be called again with the same CounterProps, producing a fresh Counter with the original initial_count. The broken state is gone, but the configuration is preserved.
Communication
This chapter covers how to create an actor system, spawn actors, send messages, and get replies.
Node
Every actor system starts with a Node. It is the root supervisor – all actors you spawn from it become its children.
let mut node = Node::default(); // or Node::new()
Spawn an actor by calling node.actor::<MyActor>(props).spawn(), which returns a Handle<Msg>:
let counter = node.actor::<Counter>(()).spawn();
When a Node is dropped, it sends a stop signal to all its children in a fire-and-forget fashion – it does not wait for them to finish. If you need to wait for actors to finish processing their remaining messages, call shutdown():
node.shutdown().await;
Handle
Handle<Msg> is what you get back from spawning an actor. It is your only way to interact with that actor from the outside. Handle is Clone, so you can share it freely.
| Method | Description |
|---|---|
handle.send(msg) | Fire-and-forget. Silently fails if the actor is dead. |
handle.send_in(msg, duration) | Sends msg after the given Duration. |
handle.stop() | Requests the actor to stop (non-blocking). |
handle.restart() | Requests the actor to restart (non-blocking). |
handle.is_alive() | Returns true if the actor is still running. |
Here is a complete example spawning a Counter and using its handle:
use speare::*;
use std::time::Duration;
use tokio::time;
// (assume Counter actor defined as in previous chapter)
#[tokio::main]
async fn main() {
let mut node = Node::default();
let counter = node.actor::<Counter>(()).spawn();
counter.send(CounterMsg::Add(10));
counter.send(CounterMsg::Print);
assert!(counter.is_alive());
counter.stop();
task::yield_now().await;
assert!(!counter.is_alive());
node.shutdown().await;
}
send is non-blocking. Messages are queued and processed one at a time by the actor. If the actor has already stopped, send does nothing – it will not panic or return an error.
The From Trick
Handle::send accepts any type M where Msg: From<M>. If you derive From on your message enum (via derive_more), you can send inner types directly without wrapping them:
use derive_more::From;
#[derive(From)]
enum Msg {
Increment(u32),
SetName(String),
}
// Both work:
handle.send(Msg::Increment(1));
handle.send(1u32); // auto-converted via From<u32>
handle.send("Alice".to_string()); // auto-converted via From<String>
This keeps call sites clean, especially when variants are simple newtypes. It also plays a central role in request-response, as shown next.
Request-Response
Sometimes fire-and-forget is not enough – you need an answer back. speare provides Request<Req, Res> for this.
The pattern has two sides:
Sender side – use handle.req(payload).await, which returns Result<Res, ReqErr>:
let count: u32 = handle.req(()).await?;
Receiver side – match on the Request variant inside handle() and call req.reply(value):
KvMsg::Get(req) => {
let value = self.data.get(req.data()).cloned();
req.reply(value);
}
req.data() gives you a reference to the request payload. req.reply(value) sends the response back to the caller.
Defining request variants
Add Request<Req, Res> as a variant in your message enum:
use speare::*;
use derive_more::From;
#[derive(From)]
enum Msg {
GetCount(Request<(), u32>),
}
Because Msg derives From, the req method on Handle can automatically wrap a Request<(), u32> into Msg::GetCount. If you cannot derive From for a particular variant, use reqw instead:
let count: u32 = handle.reqw(Msg::GetCount, ()).await?;
reqw takes a wrapper function (here the enum variant constructor Msg::GetCount) and the request payload as separate arguments.
Timeouts
By default, req waits indefinitely. Use req_timeout to set a deadline:
let count: u32 = handle.req_timeout((), Duration::from_secs(1)).await?;
A reqw_timeout variant is also available:
let count: u32 = handle.reqw_timeout(Msg::GetCount, (), Duration::from_secs(1)).await?;
Error handling
req and its variants return Result<Res, ReqErr>. There are two failure cases:
ReqErr::Dropped– the actor died (or theRequestwas dropped) before callingreply.ReqErr::Timeout– the deadline passed before a response arrived.
Full example: a key-value store
Putting it all together with a KV store actor that supports both fire-and-forget writes and request-response reads:
use speare::*;
use std::collections::HashMap;
use derive_more::From;
struct KvStore {
data: HashMap<String, String>,
}
#[derive(From)]
enum KvMsg {
Set(SetCmd),
Get(Request<String, Option<String>>),
}
struct SetCmd {
key: String,
value: String,
}
impl Actor for KvStore {
type Props = ();
type Msg = KvMsg;
type Err = ();
async fn init(_ctx: &mut Ctx<Self>) -> Result<Self, Self::Err> {
Ok(KvStore { data: HashMap::new() })
}
async fn handle(&mut self, msg: Self::Msg, _ctx: &mut Ctx<Self>) -> Result<(), Self::Err> {
match msg {
KvMsg::Set(cmd) => {
self.data.insert(cmd.key, cmd.value);
}
KvMsg::Get(req) => {
let value = self.data.get(req.data()).cloned();
req.reply(value);
}
}
Ok(())
}
}
#[tokio::main]
async fn main() {
let mut node = Node::default();
let kv = node.actor::<KvStore>(()).spawn();
kv.send(SetCmd { key: "name".into(), value: "Alice".into() });
let name = kv.req("name".to_string()).await.unwrap();
println!("{name:?}"); // Some("Alice")
node.shutdown().await;
}
Notice how kv.send(SetCmd { ... }) works without wrapping in KvMsg::Set – the From derive handles the conversion. Likewise, kv.req("name".to_string()) automatically wraps the Request<String, Option<String>> into KvMsg::Get.
Lifecycle & Supervision
Every actor in speare follows a predictable lifecycle, and when things go wrong, its parent decides what happens next. This chapter covers both: how actors live and die, how to build parent-child hierarchies, and how to configure supervision strategies for automatic recovery.
Actor Lifecycle
An actor goes through the following stages:
spawn → init() → [process messages via handle()] → exit()
init()constructs the actor. This is where you set up initial state, spawn children, or open connections.handle()is called once per incoming message, for as long as the actor is alive.exit()runs when the actor stops for any reason – error, manual stop, or parent shutdown.
On restart, exit() is called first, then init() runs again. The Handle stays the same across restarts, so any code holding a reference to it does not need to update.
When does an actor stop?
An actor lives until one of these happens:
- Manual stop – someone calls
handle.stop(). - Parent stops – when a parent actor stops, all of its children are stopped first.
- Unrecoverable error – the actor returns an
Errand its supervision strategy isSupervision::Stop(or it has exhausted its restart limit).
ExitReason
The exit() callback receives an ExitReason so you can react accordingly:
pub enum ExitReason<P: Actor> {
Handle, // stopped manually via handle.stop()
Parent, // parent actor stopped this child
Err(P::Err), // actor's code returned an error
}
async fn exit(this: Option<Self>, reason: ExitReason<Self>, _ctx: &mut Ctx<Self>) {
match reason {
ExitReason::Handle => println!("stopped by handle"),
ExitReason::Parent => println!("parent shut us down"),
ExitReason::Err(e) => println!("failed with error: {e:?}"),
}
}
Note that this is Option<Self> – it will be None if init() itself failed.
Parent-Child Actors
Inside an actor’s init or handle, you can spawn child actors via ctx.actor::<Child>(props).spawn(). The child is supervised by the actor that spawned it.
use speare::*;
struct Worker;
enum WorkerMsg { Process(String) }
impl Actor for Worker {
type Props = ();
type Msg = WorkerMsg;
type Err = ();
async fn init(_ctx: &mut Ctx<Self>) -> Result<Self, Self::Err> {
println!("Worker started");
Ok(Worker)
}
async fn handle(&mut self, msg: Self::Msg, _ctx: &mut Ctx<Self>) -> Result<(), Self::Err> {
match msg {
WorkerMsg::Process(job) => println!("Processing: {job}"),
}
Ok(())
}
async fn exit(_this: Option<Self>, _reason: ExitReason<Self>, _ctx: &mut Ctx<Self>) {
println!("Worker stopped");
}
}
struct Manager {
worker: Handle<WorkerMsg>,
}
enum ManagerMsg { Dispatch(String) }
impl Actor for Manager {
type Props = ();
type Msg = ManagerMsg;
type Err = ();
async fn init(ctx: &mut Ctx<Self>) -> Result<Self, Self::Err> {
let worker = ctx.actor::<Worker>(()).spawn();
Ok(Manager { worker })
}
async fn handle(&mut self, msg: Self::Msg, _ctx: &mut Ctx<Self>) -> Result<(), Self::Err> {
match msg {
ManagerMsg::Dispatch(job) => self.worker.send(WorkerMsg::Process(job)),
}
Ok(())
}
}
// When the Manager is stopped, the Worker is automatically stopped too.
When a parent stops, all of its children are stopped first. This cascades down the entire tree – if the Manager has workers, and those workers have their own children, everything shuts down in order from the leaves up.
You can also stop all children manually without stopping the parent:
ctx.stop_children().await;
This stops every child actor the current actor has spawned and waits for each to fully terminate before returning.
Similarly, you can restart all children at once:
ctx.restart_children();
This sends a restart signal to every child. Each child will re-run exit() then init(), resetting its state while keeping its Handle valid. Unlike stop_children, this is fire-and-forget – it does not wait for the children to finish restarting. The restart bypasses each child’s supervision strategy (it always restarts, regardless of Supervision::Stop or restart limits).
Supervision Strategies
When you spawn a child actor, you can configure what should happen if it returns an error. Set the strategy with .supervision() on the spawn builder:
let handle = ctx.actor::<Child>(props)
.supervision(strategy)
.spawn();
There are three strategies:
Supervision::Stop
The actor terminates on error. exit() is called, and the actor is done.
ctx.actor::<Worker>(()).supervision(Supervision::Stop).spawn();
Supervision::Resume
The actor ignores the error and continues processing the next message. The actor is not restarted – exit() and init() are not called. The actor keeps its current state and moves on.
ctx.actor::<Worker>(()).supervision(Supervision::Resume).spawn();
Supervision::Restart (default)
The actor is restarted: exit() is called, then init() runs again. The Handle stays the same, so senders do not need to update their references.
This is the default – if you call .spawn() without .supervision(), you get Restart with unlimited restarts and no backoff.
ctx.actor::<Worker>(())
.supervision(Supervision::Restart {
max: Limit::Amount(3),
backoff: Backoff::None,
})
.spawn();
Restart Limits
The max field on Supervision::Restart controls how many times the actor can be restarted before giving up:
Limit::None– unlimited restarts. The actor will be restarted every time it errors, forever. Use with caution.Limit::Amount(n)– restart at mostntimes. Once the limit is reached, the actor terminates for real, just as if the strategy wereSupervision::Stop.
Note:
LimitimplementsFrom<u64>, but0maps toLimit::None(unlimited), not zero restarts. If you want zero restarts (i.e., never restart), useSupervision::Stopinstead.
// Restart up to 5 times, then give up
Supervision::Restart {
max: Limit::Amount(5),
backoff: Backoff::None,
}
// Restart forever
Supervision::Restart {
max: Limit::None,
backoff: Backoff::None,
}
Backoff Strategies
The backoff field on Supervision::Restart controls how long to wait between restart attempts:
Backoff::None– restart immediately, no delay.Backoff::Static(Duration)– fixed delay between each restart.Backoff::Incremental { min, max, step }– delay increases linearly bystepper restart, clamped betweenminandmax.
Here is a complete example. First, define an actor that always fails:
use speare::*;
use std::time::Duration;
struct Flaky;
enum FlakyMsg { DoWork }
#[derive(Debug)]
struct FlakyErr;
impl Actor for Flaky {
type Props = ();
type Msg = FlakyMsg;
type Err = FlakyErr;
async fn init(_ctx: &mut Ctx<Self>) -> Result<Self, Self::Err> {
println!("Flaky actor (re)starting");
Ok(Flaky)
}
async fn handle(&mut self, _msg: Self::Msg, _ctx: &mut Ctx<Self>) -> Result<(), Self::Err> {
Err(FlakyErr) // always fails
}
}
Then spawn it with a restart strategy and incremental backoff:
// In a parent actor's init():
let flaky = ctx.actor::<Flaky>(())
.supervision(Supervision::Restart {
max: Limit::Amount(3),
backoff: Backoff::Incremental {
min: Duration::from_millis(100),
max: Duration::from_secs(5),
step: Duration::from_millis(500),
},
})
.spawn();
Each time Flaky errors, it will be restarted after a growing delay: 500ms, then 1000ms, then 1500ms (clamped between 100ms and 5s). After the third restart, it terminates permanently.
Watching Children
Sometimes a parent needs to know when a child has permanently failed. The .watch() method on the spawn builder lets you register a callback that fires when the child terminates due to an unrecoverable error.
Watch fires when:
- The strategy is
Supervision::Stopand the child errors. - The strategy is
Supervision::Restartand the child exhausts all its allowed restarts.
Watch does not fire when:
- The child is restarted successfully (it has retries remaining).
- The child is stopped manually via
handle.stop(). - The strategy is
Supervision::Resume.
The callback maps the child’s error into a message for the parent:
use speare::*;
struct Supervisor;
enum SupervisorMsg {
Start,
WorkerDied(String),
}
impl Actor for Supervisor {
type Props = ();
type Msg = SupervisorMsg;
type Err = ();
async fn init(ctx: &mut Ctx<Self>) -> Result<Self, Self::Err> {
// Spawn a worker with restart supervision and a watch callback
ctx.actor::<Flaky>(())
.supervision(Supervision::Restart {
max: Limit::Amount(3),
backoff: Backoff::None,
})
.watch(|err| SupervisorMsg::WorkerDied(format!("{err:?}")))
.spawn();
Ok(Supervisor)
}
async fn handle(&mut self, msg: Self::Msg, _ctx: &mut Ctx<Self>) -> Result<(), Self::Err> {
match msg {
SupervisorMsg::Start => { /* ... */ }
SupervisorMsg::WorkerDied(reason) => {
println!("Worker permanently failed: {reason}");
// Could spawn a replacement, alert, etc.
}
}
Ok(())
}
}
After Flaky fails 3 times and exhausts its restart limit, the watch callback fires and sends SupervisorMsg::WorkerDied to the Supervisor. The parent can then decide what to do – spawn a replacement, escalate, log the failure, or shut itself down.
Replicating BEAM Supervision Strategies
If you are coming from Elixir or Erlang, you may be familiar with three supervisor strategies:
- one_for_one – if a child fails, only that child is restarted.
- one_for_all – if any child fails, all children are stopped and restarted.
- rest_for_one – if a foundational child fails, all children that depend on it are restarted too.
In speare, supervision is configured per-child rather than per-supervisor. Supervision::Restart gives you one_for_one out of the box. The other two can be built by combining .watch() with stop_children() or restart_children().
one_for_one
This is speare’s default behavior. Each child gets its own Supervision::Restart:
ctx.actor::<WorkerA>(())
.supervision(Supervision::Restart {
max: Limit::Amount(3),
backoff: Backoff::None,
})
.spawn();
ctx.actor::<WorkerB>(())
.supervision(Supervision::Restart {
max: Limit::Amount(3),
backoff: Backoff::None,
})
.spawn();
If WorkerA fails, only WorkerA is restarted. WorkerB is unaffected.
one_for_all
Use .watch() to detect when any child permanently fails, then stop all remaining children and re-spawn the entire group:
struct Supervisor;
enum Msg {
ChildFailed,
}
impl Actor for Supervisor {
type Props = ();
type Msg = Msg;
type Err = ();
async fn init(ctx: &mut Ctx<Self>) -> Result<Self, Self::Err> {
spawn_all(ctx);
Ok(Supervisor)
}
async fn handle(&mut self, msg: Self::Msg, ctx: &mut Ctx<Self>) -> Result<(), Self::Err> {
match msg {
Msg::ChildFailed => {
ctx.stop_children().await;
spawn_all(ctx);
}
}
Ok(())
}
}
fn spawn_all(ctx: &mut Ctx<Supervisor>) {
ctx.actor::<WorkerA>(())
.watch(|_| Msg::ChildFailed)
.spawn();
ctx.actor::<WorkerB>(())
.watch(|_| Msg::ChildFailed)
.spawn();
ctx.actor::<WorkerC>(())
.watch(|_| Msg::ChildFailed)
.spawn();
}
When any worker terminates for good, the supervisor stops all remaining children and re-spawns the entire group. The stop_children call does not trigger .watch() on the surviving children – watch only fires on error termination – so there are no spurious ChildFailed messages from the teardown.
You can also give each child individual restart attempts before escalating to the group restart. Just add a supervision strategy alongside the watch:
ctx.actor::<WorkerA>(())
.supervision(Supervision::Restart {
max: Limit::Amount(3),
backoff: Backoff::None,
})
.watch(|_| Msg::ChildFailed)
.spawn();
Now WorkerA gets 3 restart attempts on its own. Only if it exhausts those does the watch fire and trigger the full group restart.
rest_for_one
Watch the foundational actor that others depend on. If it dies, the children that depend on it need to restart too. Suppose WorkerB and WorkerC both depend on WorkerA:
enum Msg {
WorkerAFailed,
}
async fn init(ctx: &mut Ctx<Self>) -> Result<Self, Self::Err> {
ctx.actor::<WorkerA>(())
.watch(|_| Msg::WorkerAFailed)
.spawn();
ctx.actor::<WorkerB>(()).spawn();
ctx.actor::<WorkerC>(()).spawn();
Ok(Supervisor)
}
async fn handle(&mut self, msg: Self::Msg, ctx: &mut Ctx<Self>) -> Result<(), Self::Err> {
match msg {
Msg::WorkerAFailed => {
// Option 1: tear down everything and start fresh
ctx.stop_children().await;
// re-spawn A, B, C...
// Option 2: keep B and C's mailbox
ctx.restart_children(); // restarts the surviving B and C
// re-spawn A (since it's already dead)
ctx.actor::<WorkerA>(())
.watch(|_| Msg::WorkerAFailed)
.spawn();
}
}
Ok(())
}
Option 1 is the clean slate – stop everything and re-spawn in order. Option 2 preserves the mailbox and Handle<_> of the surviving children by restarting them (re-running their init with the same props) while only re-spawning the dead actor.
Concurrency Patterns
This chapter covers two mechanisms for doing work alongside an actor’s main message loop: background tasks and message sources.
Background Tasks
ctx.task(async { ... }) spawns an async task that runs concurrently with the actor. The task returns Result<Msg, Err>. When it completes:
Ok(msg)– the message is delivered to the actor’shandle()method, just like any other message.Err(e)– the actor’s supervision strategy kicks in (restart, stop, or resume, depending on how the parent configured it).
Tasks are automatically aborted when the actor stops. However, tasks survive restarts – if an actor is restarted (via supervision or restart_children()), any in-flight tasks from the previous incarnation will continue running and their results will still be delivered to the restarted actor’s handle(). This is by design: it avoids losing work that was already in progress. You can spawn tasks from init or handle.
use speare::*;
struct Fetcher {
results: Vec<String>,
}
enum FetcherMsg {
Fetch(String),
Fetched(String),
}
impl Actor for Fetcher {
type Props = ();
type Msg = FetcherMsg;
type Err = ();
async fn init(_ctx: &mut Ctx<Self>) -> Result<Self, Self::Err> {
Ok(Fetcher { results: vec![] })
}
async fn handle(&mut self, msg: Self::Msg, ctx: &mut Ctx<Self>) -> Result<(), Self::Err> {
match msg {
FetcherMsg::Fetch(url) => {
ctx.task(async move {
// some async work (e.g., HTTP request)
let body = format!("Response from {url}");
Ok(FetcherMsg::Fetched(body))
});
}
FetcherMsg::Fetched(body) => {
println!("Got: {body}");
self.results.push(body);
}
}
Ok(())
}
}
The actor stays responsive to other messages while the task runs in the background. When the task completes, its Ok value is fed back into handle() as a regular message – the actor processes it in turn, just like a message sent via handle.send().
Message Sources
The sources trait function declares additional message sources that run alongside the actor’s main channel. It is called once, right after init succeeds. It returns a SourceSet – a composable collection of intervals and streams. All sources and the main message channel are polled concurrently via tokio::select!, so the actor can react to whichever fires first.
async fn sources(&self, ctx: &Ctx<Self>) -> Result<impl Sources<Self>, Self::Err> {
Ok(SourceSet::new()
.interval(time::interval(Duration::from_secs(1)), || Msg::Tick))
}
You need to import Sources from speare. It is included when you use use speare::*.
Intervals
SourceSet::new().interval(tokio_interval, || Msg) adds a recurring timer. Each time the interval fires, the closure produces a message that is delivered to handle().
use speare::*;
use std::time::Duration;
use tokio::time;
struct Heartbeat {
tick_count: u64,
}
enum HeartbeatMsg {
Tick,
}
impl Actor for Heartbeat {
type Props = ();
type Msg = HeartbeatMsg;
type Err = ();
async fn init(_ctx: &mut Ctx<Self>) -> Result<Self, Self::Err> {
Ok(Heartbeat { tick_count: 0 })
}
async fn sources(&self, _ctx: &Ctx<Self>) -> Result<impl Sources<Self>, Self::Err> {
Ok(SourceSet::new()
.interval(time::interval(Duration::from_secs(1)), || HeartbeatMsg::Tick))
}
async fn handle(&mut self, msg: Self::Msg, _ctx: &mut Ctx<Self>) -> Result<(), Self::Err> {
match msg {
HeartbeatMsg::Tick => {
self.tick_count += 1;
println!("Heartbeat #{}", self.tick_count);
}
}
Ok(())
}
}
The interval is created with tokio::time::interval, which means the first tick fires immediately. If you want an initial delay, use tokio::time::interval_at instead.
Streams
SourceSet::new().stream(my_stream) adds any type that implements Stream<Item = Msg> + Send + Unpin + 'static. This is the Sources trait from speare – any stream whose items are the actor’s message type works.
Any type implementing Stream<Item = Msg> + Send + Unpin + 'static works – a WebSocket connection, a file watcher, a Kafka consumer, or any async data source. The actor does not need to know or care where the messages come from; they all arrive through handle().
Combining Sources
You can chain multiple .interval() and .stream() calls to build a SourceSet with several sources at once:
async fn sources(&self, _ctx: &Ctx<Self>) -> Result<impl Sources<Self>, Self::Err> {
Ok(SourceSet::new()
.interval(time::interval(Duration::from_secs(1)), || Msg::Tick)
.stream(my_websocket_stream)
.interval(time::interval(Duration::from_secs(30)), || Msg::Heartbeat))
}
All sources are merged and polled together. Messages from intervals, streams, and regular handle.send() calls all arrive in handle() – the actor processes them one at a time in the order they become ready.
Polling priority: sources added earlier in the chain have higher priority. If an earlier source is consistently ready, later sources may be starved. In practice this rarely matters for intervals and moderate-throughput streams, but keep it in mind when combining a high-throughput stream with other sources – place it last in the chain.
Service Discovery
speare has a global actor registry shared across the entire Node tree. Actors can register themselves at spawn time and be looked up by other actors without needing direct Handle references. This enables loose coupling between actors that don’t know about each other at compile time.
Registering by Type
Use spawn_registered() to register an actor under its type name. This enforces a singleton pattern – only one actor of each type can be registered at a time.
ctx.actor::<Logger>(()).spawn_registered()?;
The return type is Result<Handle<Msg>, RegistryError>. If a Logger is already registered, this returns Err(RegistryError::NameTaken(...)).
Here is a full Logger actor that we will register as a singleton:
use speare::*;
struct Logger;
enum LogMsg {
Info(String),
Error(String),
}
impl Actor for Logger {
type Props = ();
type Msg = LogMsg;
type Err = ();
async fn init(_ctx: &mut Ctx<Self>) -> Result<Self, Self::Err> {
Ok(Logger)
}
async fn handle(&mut self, msg: Self::Msg, _ctx: &mut Ctx<Self>) -> Result<(), Self::Err> {
match msg {
LogMsg::Info(m) => println!("[INFO] {m}"),
LogMsg::Error(m) => eprintln!("[ERROR] {m}"),
}
Ok(())
}
}
A parent actor spawns and registers it during init:
// In some parent actor's init:
ctx.actor::<Logger>(()).spawn_registered()?;
Registering by Name
Use spawn_named() to register an actor under a custom string key. This allows multiple actors of the same type to coexist in the registry with different names.
ctx.actor::<Worker>(props1).spawn_named("worker-1")?;
ctx.actor::<Worker>(props2).spawn_named("worker-2")?;
Like spawn_registered(), this returns Result<Handle<Msg>, RegistryError> and fails with Err(RegistryError::NameTaken(...)) if the name is already taken.
Looking Up Actors
Once registered, any actor in the tree can look up a handle without having received one directly.
By type:
let logger: Handle<LogMsg> = ctx.get_handle_for::<Logger>()?;
get_handle_for::<A>() returns Result<Handle<A::Msg>, RegistryError>. It infers the message type from the actor type.
By name:
let w1: Handle<WorkerMsg> = ctx.get_handle::<WorkerMsg>("worker-1")?;
get_handle::<Msg>(name) returns Result<Handle<Msg>, RegistryError>. You must specify the message type explicitly, since the registry only knows the name string.
Both methods return Err(RegistryError::NotFound(...)) if no actor is registered under that type or name.
Sending to Registered Actors
For fire-and-forget messages, Ctx provides convenience methods that combine lookup and send in one call. No need to store the Handle.
By type:
ctx.send::<Logger>(LogMsg::Info("System started".into()))?;
By name:
ctx.send_to::<WorkerMsg>("worker-1", WorkerMsg::Process(data))?;
Both return Result<(), RegistryError>, failing with NotFound if the actor is not in the registry.
If you need to send multiple messages, grab the handle once and reuse it:
let logger = ctx.get_handle_for::<Logger>()?;
logger.send(LogMsg::Info("Starting up".into()));
logger.send(LogMsg::Error("Something went wrong".into()));
Putting It Together
Any actor anywhere in the tree can send to the registered Logger without ever receiving its handle directly:
// Any actor anywhere in the tree can send logs:
ctx.send::<Logger>(LogMsg::Info("System started".into()))?;
// Or get the handle for repeated use:
let logger = ctx.get_handle_for::<Logger>()?;
logger.send(LogMsg::Error("Something went wrong".into()));
Named workers follow the same pattern:
// Spawn workers with unique names:
ctx.actor::<Worker>(props1).spawn_named("worker-1")?;
ctx.actor::<Worker>(props2).spawn_named("worker-2")?;
// Send by name:
ctx.send_to::<WorkerMsg>("worker-1", WorkerMsg::Process(data))?;
// Or get the handle:
let w2 = ctx.get_handle::<WorkerMsg>("worker-2")?;
w2.send(WorkerMsg::Process(more_data));
Auto-Deregistration
When an actor stops – whether by error, parent shutdown, or an explicit handle.stop() call – it is automatically removed from the registry. Subsequent lookups for that type or name will return Err(RegistryError::NotFound(...)).
This means the registry always reflects which actors are actually alive. You do not need to manually clean up entries.
Error Handling
All registry operations return Result<_, RegistryError>. The enum has three variants:
pub enum RegistryError {
/// Tried to register a name that's already in use.
NameTaken(String),
/// No actor registered under that name or type.
NotFound(String),
/// Internal lock poisoned (rare -- indicates a panic elsewhere).
PoisonErr,
}
RegistryError implements Display and Error, so it works with ? and standard error handling.
In practice, NameTaken and NotFound are the variants you will encounter. PoisonErr only occurs if another thread panicked while holding the registry lock, which should not happen under normal operation.
Pub/Sub
speare includes a built-in publish/subscribe system shared across the entire Node tree. It lets actors communicate through named topics without needing direct Handle references. Publishers and subscribers are fully decoupled – a publisher does not know who is listening, and a subscriber does not know who is publishing.
Subscribing to a Topic
Call ctx.subscribe::<T>(topic) inside init or handle to receive messages published to that topic. The type parameter T is the message type for the topic. Published messages are cloned, converted via From<T> into the actor’s Msg type, and delivered to its mailbox.
use speare::*;
use derive_more::From;
#[derive(Clone)]
struct OrderPlaced {
id: u32,
}
struct OrderSubscriber;
#[derive(From)]
enum OrderSubMsg {
Order(OrderPlaced),
}
impl Actor for OrderSubscriber {
type Props = ();
type Msg = OrderSubMsg;
type Err = ();
async fn init(ctx: &mut Ctx<Self>) -> Result<Self, Self::Err> {
ctx.subscribe::<OrderPlaced>("orders").unwrap();
Ok(OrderSubscriber)
}
async fn handle(&mut self, msg: Self::Msg, _ctx: &mut Ctx<Self>) -> Result<(), Self::Err> {
match msg {
OrderSubMsg::Order(o) => println!("Received order {}", o.id),
}
Ok(())
}
}
The From derive is required – it is how speare converts the topic’s message type into the actor’s Msg enum. This is the same From trick described in the Communication chapter.
Publishing to a Topic
Call ctx.publish(topic, msg) to send a message to all subscribers of a topic. The message is cloned for each subscriber.
ctx.publish("orders", OrderPlaced { id: 42 }).unwrap();
Publishing to a topic with no subscribers is a no-op and returns Ok(()).
You can also publish from a Node directly:
node.publish("orders", OrderPlaced { id: 42 }).unwrap();
Type Safety
Each topic is locked to a single message type. The first subscribe or publish call on a topic sets its type. Any subsequent call with a different type returns Err(PubSubError::TypeMismatch { .. }).
// First subscribe locks "orders" to OrderPlaced
ctx.subscribe::<OrderPlaced>("orders").unwrap();
// This fails -- "orders" is already locked to OrderPlaced, not String
let result = ctx.subscribe::<String>("orders");
assert!(matches!(result, Err(PubSubError::TypeMismatch { .. })));
The same applies to publish:
// Topic "orders" is locked to OrderPlaced by a subscriber
let result = ctx.publish("orders", "not an OrderPlaced".to_string());
assert!(matches!(result, Err(PubSubError::TypeMismatch { .. })));
Multiple Topics
An actor can subscribe to multiple topics. Each subscription needs a corresponding variant in the actor’s Msg enum with a From implementation.
#[derive(Clone)]
struct PaymentProcessed {
amount: u32,
}
#[derive(From)]
enum MultiMsg {
Order(OrderPlaced),
Payment(PaymentProcessed),
}
impl Actor for MultiSubscriber {
type Props = ();
type Msg = MultiMsg;
type Err = ();
async fn init(ctx: &mut Ctx<Self>) -> Result<Self, Self::Err> {
ctx.subscribe::<OrderPlaced>("orders").unwrap();
ctx.subscribe::<PaymentProcessed>("payments").unwrap();
Ok(MultiSubscriber)
}
async fn handle(&mut self, msg: Self::Msg, _ctx: &mut Ctx<Self>) -> Result<(), Self::Err> {
match msg {
MultiMsg::Order(o) => println!("Order {}", o.id),
MultiMsg::Payment(p) => println!("Payment ${}", p.amount),
}
Ok(())
}
}
Dynamic Subscription
Subscribing is not limited to init. You can subscribe from handle as well, letting an actor join a topic in response to a message:
async fn handle(&mut self, msg: Self::Msg, ctx: &mut Ctx<Self>) -> Result<(), Self::Err> {
match msg {
Msg::StartListening => {
ctx.subscribe::<OrderPlaced>("orders").unwrap();
}
Msg::Order(o) => {
println!("Got order {}", o.id);
}
}
Ok(())
}
Automatic Cleanup
Subscriptions are automatically cleaned up when an actor stops – whether by error, parent shutdown, or an explicit handle.stop() call. Once stopped, a subscriber will no longer receive messages from any topic it was subscribed to.
Subscriptions are also cleaned up before a restart. When an actor with Supervision::Restart fails and restarts, its old subscriptions are removed before init runs again. This means init can re-subscribe without causing duplicate deliveries.
Error Handling
All pub/sub operations return Result<(), PubSubError>. The enum has two variants:
pub enum PubSubError {
/// Topic exists with a different message type.
TypeMismatch { topic: String },
/// Internal lock poisoned (rare -- indicates a panic elsewhere).
PoisonErr,
}
PubSubError implements Display and Error, so it works with ? and standard error handling.
In practice, TypeMismatch is the variant you will encounter. PoisonErr only occurs if another thread panicked while holding the pub/sub lock, which should not happen under normal operation.