What is a Process?
A Process is really just a loop that lives in a tokio::task
together with some state, yielding to the tokio runtime while it waits for a message received via a flume channel which it then uses to modify its state. All speare
does is provide mechanisms to manage lifecycle, communication and supervision of processes.
A Process
is comprised of a few parts:
Trait Implementor
- State: the main struct which implements the
Process
trait. The state is mutable and can be modified every time a messaged is received by theProcess
.
Associated types
- Props: dependencies needed by the struct that implements
Process
for instantiation, configuration or anything in between. - Msg: data received and processed by the
Process
. - Err: the error that can be returned by the process when it fails.
Trait Functions
-
async fn init:
(ctx: &mut Ctx<Self>) -> Result<Self, Self::Err>
Required. Used by
speare
to create new instances of yourProcess
whenever it is spawned or restarted. -
async fn exit:
(&mut self, reason: ExitReason<Self>, ctx: &mut Ctx<Self>)
Optional. Called every time your
Process
is stopped, be it manually through itsHandle<_>
or by the parent through its supervision strategy. -
handle:
(&mut self, msg: Self::Msg, ctx: &mut Ctx<Self>) -> Result<(), Self::Err>
Optional. Called when the
Process
receives a message through its channel. Messages are always processed sequentially. -
supervision:
(props: &Self::Props) -> Supervision
Optional. Called before your
Process
is spawned. Used to customize the supervision strategy for its children. If not implemented, the default supervision strategy will be used: one-for-one infinite restarts without backoff.
Example
Below is an example of a process making use of all its associated types and trait functions.
use speare::*;
use async_trait::async_trait;
struct Counter {
count: u32,
}
struct CounterProps {
initial_count: u32,
max_count: u32,
min_count: u32,
}
enum CounterMsg {
Add(u32),
Subtract(u32),
}
#[derive(Debug)]
enum CounterErr {
MaxCountExceeded,
MinCountExceeded,
}
#[async_trait]
impl Process for Counter {
type Props = CounterProps;
type Msg = CounterMsg;
type Err = CounterErr;
async fn init(ctx: &mut Ctx<Self>) -> Result<Self, Self::Err> {
println!("Counter starting up!");
Ok(Counter {
count: ctx.props().initial_count,
})
}
async fn exit(&mut self, reason: ExitReason<Self>, ctx: &mut Ctx<Self>) {
println!("Counter exiting due to: {:?}", reason);
}
async fn handle(&mut self, msg: Self::Msg, ctx: &mut Ctx<Self>) -> Result<(), Self::Err> {
match msg {
CounterMsg::Add(n) => {
self.count += n;
if self.count > ctx.props().max_count {
return Err(CounterErr::MaxCountExceeded);
}
}
CounterMsg::Subtract(n) => {
self.count -= n;
if self.count < ctx.props().max_count {
return Err(CounterErr::MinCountExceeded);
}
}
}
Ok(())
}
fn supervision(props: &Self::Props) -> Supervision {
Supervision::one_for_one().directive(Directive::Restart)
}
}
#[tokio::main]
async fn main() {
let mut node = Node::default();
let counter = node.spawn::<Counter>(CounterProps {
initial_count: 5,
max_count: 20,
min_count: 5
});
counter.send(CounterMsg::Add(5));
counter.send(CounterMsg::Subtract(2));
}