Introduction
This guide introduces speare
, covering Process
creation, message handling, and essential library features. The objective is to provide clear, concise instructions for effectively utilizing speare
in your Rust projects.
What is speare
?
speare
is a Rust library designed to simplify the process of actor-based concurrency. It provides an abstraction over tokio green threads and flume channels, allowing for easier management of tokio threads and efficient message passing between them. speare
revolves around a main abstraction: Process
-- which lives on its own green thread owning its data, reducing the risks of deadlocks and encouraging a modular design. No more Arc<Mutex<T>>
everywhere :)
Quick Look
Below is an example of a very minimal Counter
Process
.
use speare::*; use async_trait::async_trait; use tokio::time; struct Counter { count: u32, } enum CounterMsg { Add(u32), Subtract(u32), Print } #[async_trait] impl Process for Counter { type Props = (); type Msg = CounterMsg; type Err = (); async fn init(ctx: &mut Ctx<Self>) -> Result<Self, Self::Err> { Ok(Counter { count: 0 }) } async fn handle(&mut self, msg: Self::Msg, ctx: &mut Ctx<Self>) -> Result<(), Self::Err> { match msg { CounterMsg::Add(n) => self.count += n, CounterMsg::Subtract(n) => self.count -= n, CounterMsg::Print => println!("Count is {}", self.count) } Ok(()) } } #[tokio::main] async fn main() { let mut node = Node::default(); let counter = node.spawn::<Counter>(()); counter.send(CounterMsg::Add(5)); counter.send(CounterMsg::Subtract(2)); counter.send(CounterMsg::Print); // will print 3 // We wait so the program doesn't end before we print. time::sleep(Duration::from_millis(1)).await; }
What is a Process?
A Process is really just a loop that lives in a tokio::task
together with some state, yielding to the tokio runtime while it waits for a message received via a flume channel which it then uses to modify its state. All speare
does is provide mechanisms to manage lifecycle, communication and supervision of processes.
A Process
is comprised of a few parts:
Trait Implementor
- State: the main struct which implements the
Process
trait. The state is mutable and can be modified every time a messaged is received by theProcess
.
Associated types
- Props: dependencies needed by the struct that implements
Process
for instantiation, configuration or anything in between. - Msg: data received and processed by the
Process
. - Err: the error that can be returned by the process when it fails.
Trait Functions
-
async fn init:
(ctx: &mut Ctx<Self>) -> Result<Self, Self::Err>
Required. Used by
speare
to create new instances of yourProcess
whenever it is spawned or restarted. -
async fn exit:
(&mut self, reason: ExitReason<Self>, ctx: &mut Ctx<Self>)
Optional. Called every time your
Process
is stopped, be it manually through itsHandle<_>
or by the parent through its supervision strategy. -
handle:
(&mut self, msg: Self::Msg, ctx: &mut Ctx<Self>) -> Result<(), Self::Err>
Optional. Called when the
Process
receives a message through its channel. Messages are always processed sequentially. -
supervision:
(props: &Self::Props) -> Supervision
Optional. Called before your
Process
is spawned. Used to customize the supervision strategy for its children. If not implemented, the default supervision strategy will be used: one-for-one infinite restarts without backoff.
Example
Below is an example of a process making use of all its associated types and trait functions.
use speare::*;
use async_trait::async_trait;
struct Counter {
count: u32,
}
struct CounterProps {
initial_count: u32,
max_count: u32,
min_count: u32,
}
enum CounterMsg {
Add(u32),
Subtract(u32),
}
#[derive(Debug)]
enum CounterErr {
MaxCountExceeded,
MinCountExceeded,
}
#[async_trait]
impl Process for Counter {
type Props = CounterProps;
type Msg = CounterMsg;
type Err = CounterErr;
async fn init(ctx: &mut Ctx<Self>) -> Result<Self, Self::Err> {
println!("Counter starting up!");
Ok(Counter {
count: ctx.props().initial_count,
})
}
async fn exit(&mut self, reason: ExitReason<Self>, ctx: &mut Ctx<Self>) {
println!("Counter exiting due to: {:?}", reason);
}
async fn handle(&mut self, msg: Self::Msg, ctx: &mut Ctx<Self>) -> Result<(), Self::Err> {
match msg {
CounterMsg::Add(n) => {
self.count += n;
if self.count > ctx.props().max_count {
return Err(CounterErr::MaxCountExceeded);
}
}
CounterMsg::Subtract(n) => {
self.count -= n;
if self.count < ctx.props().max_count {
return Err(CounterErr::MinCountExceeded);
}
}
}
Ok(())
}
fn supervision(props: &Self::Props) -> Supervision {
Supervision::one_for_one().directive(Directive::Restart)
}
}
#[tokio::main]
async fn main() {
let mut node = Node::default();
let counter = node.spawn::<Counter>(CounterProps {
initial_count: 5,
max_count: 20,
min_count: 5
});
counter.send(CounterMsg::Add(5));
counter.send(CounterMsg::Subtract(2));
}
TBD 🏗️
# Working with Processes - a process can be communicated with through its Handle, which is received once it is spawned - a process can be spawned from a node as an unsupervised top-level process (more on that later) or spawned from other processes which will be their parent and supervise them. - ctx what is it etc communication between processes - send and send_in to send messages - req and req_timeout to send requests
TBD 🏗️
process lifecycle - process spawns and lives in memory forever until its stopped - stopped by its parent being stopped, by being stopped manually or by its parent supervision strategy - when a process is spawned, the init method is called to create the process instance - when a process is stopped the exit method is called. - when a process is restarted, the exit method is first called, and then the init to create a new instance of the process. the handle remains the same though.
TBD 🏗️
supervisoin - top level processes created by a node are unsupervised - every process created by another process is superivsed by its parent - supervision means that the parent decide can decide what to do with its children when one of them errors. - two main strategies, one for one, one for all - can restart, stop, resume or escalate - a process will finish handling whatever message it is currently handling before stopping (or restarting) itself