Introduction

This guide introduces speare, covering Process creation, message handling, and essential library features. The objective is to provide clear, concise instructions for effectively utilizing speare in your Rust projects.

What is speare?

speare is a Rust library designed to simplify the process of actor-based concurrency. It provides an abstraction over tokio green threads and flume channels, allowing for easier management of tokio threads and efficient message passing between them. speare revolves around a main abstraction: Process -- which lives on its own green thread owning its data, reducing the risks of deadlocks and encouraging a modular design. No more Arc<Mutex<T>> everywhere :)

Quick Look

Below is an example of a very minimal Counter Process.

use speare::*;
use async_trait::async_trait;
use tokio::time;

struct Counter {
    count: u32,
}

enum CounterMsg {
    Add(u32),
    Subtract(u32),
    Print
}

#[async_trait]
impl Process for Counter {
    type Props = ();
    type Msg = CounterMsg;
    type Err = ();

    async fn init(ctx: &mut Ctx<Self>) -> Result<Self, Self::Err> {
        Ok(Counter { count: 0 })
    }

    async fn handle(&mut self, msg: Self::Msg, ctx: &mut Ctx<Self>) -> Result<(), Self::Err> {
        match msg {
            CounterMsg::Add(n) => self.count += n,
            CounterMsg::Subtract(n) => self.count -= n,
            CounterMsg::Print => println!("Count is {}", self.count)
        }

        Ok(())
    }
}

#[tokio::main]
async fn main() {
    let mut node = Node::default();
    let counter = node.spawn::<Counter>(());
    counter.send(CounterMsg::Add(5));
    counter.send(CounterMsg::Subtract(2));
    counter.send(CounterMsg::Print); // will print 3

    // We wait so the program doesn't end before we print.
    time::sleep(Duration::from_millis(1)).await;
}

What is a Process?

A Process is really just a loop that lives in a tokio::task together with some state, yielding to the tokio runtime while it waits for a message received via a flume channel which it then uses to modify its state. All speare does is provide mechanisms to manage lifecycle, communication and supervision of processes.

A Process is comprised of a few parts:

Trait Implementor

  • State: the main struct which implements the Process trait. The state is mutable and can be modified every time a messaged is received by the Process.

Associated types

  • Props: dependencies needed by the struct that implements Process for instantiation, configuration or anything in between.
  • Msg: data received and processed by the Process.
  • Err: the error that can be returned by the process when it fails.

Trait Functions

  • async fn init: (ctx: &mut Ctx<Self>) -> Result<Self, Self::Err>

    Required. Used by speare to create new instances of your Process whenever it is spawned or restarted.

  • async fn exit: (&mut self, reason: ExitReason<Self>, ctx: &mut Ctx<Self>)

    Optional. Called every time your Process is stopped, be it manually through its Handle<_> or by the parent through its supervision strategy.

  • handle: (&mut self, msg: Self::Msg, ctx: &mut Ctx<Self>) -> Result<(), Self::Err>

    Optional. Called when the Process receives a message through its channel. Messages are always processed sequentially.

  • supervision: (props: &Self::Props) -> Supervision

    Optional. Called before your Process is spawned. Used to customize the supervision strategy for its children. If not implemented, the default supervision strategy will be used: one-for-one infinite restarts without backoff.

Example

Below is an example of a process making use of all its associated types and trait functions.

use speare::*;
use async_trait::async_trait;

struct Counter {
    count: u32,
}

struct CounterProps {
    initial_count: u32,
    max_count: u32,
    min_count: u32,
}

enum CounterMsg {
    Add(u32),
    Subtract(u32),
}

#[derive(Debug)]
enum CounterErr {
    MaxCountExceeded,
    MinCountExceeded,
}

#[async_trait]
impl Process for Counter {
    type Props = CounterProps;
    type Msg = CounterMsg;
    type Err = CounterErr;

    async fn init(ctx: &mut Ctx<Self>) -> Result<Self, Self::Err> {
        println!("Counter starting up!");

        Ok(Counter {
            count: ctx.props().initial_count,
        })
    }

    async fn exit(&mut self, reason: ExitReason<Self>, ctx: &mut Ctx<Self>) {
        println!("Counter exiting due to: {:?}", reason);
    }

    async fn handle(&mut self, msg: Self::Msg, ctx: &mut Ctx<Self>) -> Result<(), Self::Err> {
        match msg {
            CounterMsg::Add(n) => {
                self.count += n;
                if self.count > ctx.props().max_count {
                    return Err(CounterErr::MaxCountExceeded);
                }
            }

            CounterMsg::Subtract(n) => {
                self.count -= n;
                if self.count < ctx.props().max_count {
                    return Err(CounterErr::MinCountExceeded);
                }
            }
        }

        Ok(())
    }

    fn supervision(props: &Self::Props) -> Supervision {
        Supervision::one_for_one().directive(Directive::Restart)
    }
}

#[tokio::main]
async fn main() {
    let mut node = Node::default();
    let counter = node.spawn::<Counter>(CounterProps {
        initial_count: 5,
        max_count: 20,
        min_count: 5
    });

    counter.send(CounterMsg::Add(5));
    counter.send(CounterMsg::Subtract(2));
}

TBD 🏗️

TBD 🏗️

TBD 🏗️