-
I have been looking and learning a bit. My question is how do I use process groups? Using a slightly modified ping pong example from the readme use ractor::{cast, Actor, ActorProcessingErr, ActorRef};
/// [PingPong] is a basic actor that will print
/// ping..pong.. repeatedly until some exit
/// condition is met (a counter hits 10). Then
/// it will exit
pub struct PingPong;
/// This is the types of message [PingPong] supports
#[derive(Debug, Clone)]
pub enum Message {
Ping,
Pong,
}
impl Message {
// retrieve the next message in the sequence
fn next(&self) -> Self {
match self {
Self::Ping => Self::Pong,
Self::Pong => Self::Ping,
}
}
// print out this message
fn print(&self) {
match self {
Self::Ping => print!("ping.."),
Self::Pong => print!("pong.."),
}
}
}
// the implementation of our actor's "logic"
#[async_trait::async_trait]
impl Actor for PingPong {
// An actor has a message type
type Msg = Message;
// and (optionally) internal state
type State = u8;
// Startup initialization args
type Arguments = ();
// Initially we need to create our state, and potentially
// start some internal processing (by posting a message for
// example)
async fn pre_start(
&self,
myself: ActorRef<Self::Msg>,
_: (),
) -> Result<Self::State, ActorProcessingErr> {
// create the initial state
Ok(0u8)
}
// This is our main message handler
async fn handle(
&self,
myself: ActorRef<Self::Msg>,
message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
if *state < 10u8 {
message.print();
*state += 1;
} else {
println!();
myself.stop(None);
// don't send another message, rather stop the agent after 10 iterations
}
Ok(())
}
}
#[tokio::main]
async fn main() {
let const number_of_actors = 5;
let actors = vec![]
let handles = vec![]
for i in 0..number_of_actors {
let (actor, handle) = Actor::spawn(None, PingPong, ())
.await
.expect("Failed to start ping-pong actor");
actors.push(actor.get_cell());
handles.push(handle);
}
let actors_pg = ractor::pg::join(String::from("PING_PONG_ACTORS"), actors);
for i in 0..10*number_of_actors {
// How do I send ping pongs to this group of actors, do I use
// pg::get_members to get a vec of them and then pick one?
// The docs say
// `Common operations are to (a) upcast the group members to a strong-type’d actor then dispatch a message with crate::call or crate::cast.`
// How do I do this?
}
} Question in the comments about ☝🏻 |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 4 replies
-
So a process group is effectively a group of actors which all do the same function. If you want to make sure you hit the same actor, then process groups aren't probably the right method here. Think more "there's 100 web servers, I don't care which one serves the request, I just need one". The normal flow would be to get the members of the group and select one at random. There might be other constraints (i.e. local only vs remote actors) or if you wanted to extend further with like properties, you could add stuff like regional knowledge / etc. But the general idea is to get the list of processes, select 1, then up-cast it and send a message. Something like let actor_cell = pg::get_members("my_group").first(); // or a random sampling
if let Some(cell) = actor_cell {
let actor_ref: ActorRef<SomeKnownMessageTypeForThisProcessGroup> = cell.into();
actor_ref.cast(AMessage)?;
} |
Beta Was this translation helpful? Give feedback.
You'd need to build that functionality. For example having an actor that to "send a message" is actually an RPC saying if the message was accepted or rejected and it should go to the next actor. However that's a significant amount of overhead. If it's a local group of actors (i.e. not in a cluster) then I'd consider a factory over something else. That will handle even job distribution for you.