Skip to content

Commit

Permalink
Factory improvements (#237)
Browse files Browse the repository at this point in the history
1. Draining of requests
2. Lifecycle hooks
3. Worker startup custom arguments which can be thread through the factory's worker builder
4. Discarding reasoning
5. Automatic adjustment of the worker pool size and discard limits as well as support
for front and rear loadshedding.
6. Support for trait-based insertion of routing and queueing protocols, allowing downstream
developers to define their own specific log per use-case.
7. Movement of options from the factory's definition to the factory's arguments which reduces
the need for cloning and can simply move ownership of the properties defined there.
8. Sync-safe structures aren't cloned as boxes, rather referencing the same `Arc` instances

this additionally bumps the release version to 0.10.0 at the same time so we don't accidently
publish on the 0.9 release version.
  • Loading branch information
slawlor authored May 17, 2024
1 parent 7a7871b commit a9ff369
Show file tree
Hide file tree
Showing 29 changed files with 4,110 additions and 1,365 deletions.
4 changes: 2 additions & 2 deletions ractor/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ractor"
version = "0.9.8"
version = "0.10.0"
authors = ["Sean Lawlor", "Evan Au", "Dillon George"]
description = "A actor framework for Rust"
documentation = "https://docs.rs/ractor"
Expand All @@ -27,7 +27,7 @@ default = ["tokio_runtime", "async-trait"]
dashmap = "5"
futures = "0.3"
once_cell = "1"
rand = "0.8"
strum = { version = "0.26", features = ["derive"] }

## Configurable dependencies
# Tracing feature requires --cfg=tokio_unstable
Expand Down
2 changes: 2 additions & 0 deletions ractor/examples/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
//! cargo run --example counter
//! ```
#![allow(clippy::incompatible_msrv)]

extern crate ractor;

use ractor::{call_t, Actor, ActorProcessingErr, ActorRef, RpcReplyPort};
Expand Down
2 changes: 2 additions & 0 deletions ractor/examples/monte_carlo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
//! cargo run --example monte_carlo
//! ```
#![allow(clippy::incompatible_msrv)]

use std::collections::HashMap;

use ractor::{cast, Actor, ActorId, ActorProcessingErr, ActorRef};
Expand Down
2 changes: 2 additions & 0 deletions ractor/examples/output_port.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
//! cargo run --example output_port
//! ```
#![allow(clippy::incompatible_msrv)]

extern crate ractor;

use std::sync::Arc;
Expand Down
2 changes: 2 additions & 0 deletions ractor/examples/philosophers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
//! cargo run --example philosophers
//! ```
#![allow(clippy::incompatible_msrv)]

use std::collections::{HashMap, VecDeque};

use ractor::{cast, Actor, ActorId, ActorName, ActorProcessingErr, ActorRef, RpcReplyPort};
Expand Down
2 changes: 2 additions & 0 deletions ractor/examples/ping_pong.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
//! cargo run --example ping_pong
//! ```
#![allow(clippy::incompatible_msrv)]

extern crate ractor;

use ractor::{cast, Actor, ActorProcessingErr, ActorRef};
Expand Down
2 changes: 2 additions & 0 deletions ractor/examples/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
//! cargo run --example supervisor
//! ```
#![allow(clippy::incompatible_msrv)]

use ractor::{Actor, ActorProcessingErr, ActorRef, RpcReplyPort, SupervisionEvent};

use tokio::time::Duration;
Expand Down
15 changes: 3 additions & 12 deletions ractor/src/actor/actor_ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,16 @@ use super::ActorCell;
///
/// An [ActorRef] is the primary means of communication typically used
/// when interfacing with [super::Actor]s
///
/// The [ActorRef] is SPECIFICALLY marked [Sync], regardless of the message type
/// because all usages of the message type are to send an owned instance of a message
/// and in no case is that message instance shared across threads. This is guaranteed
/// by the underlying Tokio channel usages. Without this manual marking of [Sync] on
/// [ActorRef], we would need to constrain the message type [Message] to be [Sync] which
/// is overly restrictive.
pub struct ActorRef<TMessage> {
pub(crate) inner: ActorCell,
_tactor: PhantomData<TMessage>,
_tactor: PhantomData<fn() -> TMessage>,
}

unsafe impl<T> Sync for ActorRef<T> {}

impl<TMessage> Clone for ActorRef<TMessage> {
fn clone(&self) -> Self {
ActorRef {
inner: self.inner.clone(),
_tactor: PhantomData::<TMessage>,
_tactor: PhantomData,
}
}
}
Expand All @@ -53,7 +44,7 @@ impl<TMessage> From<ActorCell> for ActorRef<TMessage> {
fn from(value: ActorCell) -> Self {
Self {
inner: value,
_tactor: PhantomData::<TMessage>,
_tactor: PhantomData,
}
}
}
Expand Down
151 changes: 151 additions & 0 deletions ractor/src/factory/discard.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
// Copyright (c) Sean Lawlor
//
// This source code is licensed under both the MIT license found in the
// LICENSE-MIT file in the root directory of this source tree.

//! Discard handler managing when jobs are discarded
use super::Job;
use super::JobKey;
use crate::Message;

/// The discard mode of a factory
#[derive(Eq, PartialEq, Clone, Copy)]
pub enum DiscardMode {
/// Discard oldest incoming jobs under backpressure
Oldest,
/// Discard newest incoming jobs under backpressure
Newest,
}

/// A worker's copy of the discard settings.
///
/// Originally we passed a cloned box of the dynamic calculator to the workers,
/// but what that would do is have every worker re-compute the discard limit
/// which if you have many thousands of workers is kind of useless.
///
/// Instead now we have the workers treat the limit as static, but have the
/// factory compute the limit on it's interval and propagate the limit to the
/// workers. The workers "think" it's static, but the factory handles the dynamics.
/// This way the factory can keep the [DynamicDiscardHandler] as a single, uncloned
/// instance. It also moves NUM_WORKER calculations to 1.
pub(crate) enum WorkerDiscardSettings {
None,
Static { limit: usize, mode: DiscardMode },
}

impl WorkerDiscardSettings {
pub(crate) fn update_worker_limit(&mut self, new_limit: usize) {
if let Self::Static { limit, .. } = self {
*limit = new_limit;
}
}

pub(crate) fn get_limit_and_mode(&self) -> Option<(usize, DiscardMode)> {
match self {
Self::None => None,
Self::Static { limit, mode, .. } => Some((*limit, *mode)),
}
}
}
/// If a factory supports job discarding (loadshedding) it can have a few configurations
/// which are defined in this enum. There is
///
/// 1. No discarding
/// 2. A static queueing limit discarding, with a specific discarding mode
/// 3. A dynamic queueing limit for discards, with a specified discarding mode and init discard limit.
pub enum DiscardSettings {
/// Don't discard jobs
None,
/// A static, immutable limit
Static {
/// The current limit. If 0, means jobs will never queue and immediately be discarded
/// once all workers are busy
limit: usize,
/// Define the factory messaging discard mode denoting if the oldest or newest messages
/// should be discarded in back-pressure scenarios
///
/// Default is [DiscardMode::Oldest], meaning discard jobs at the head of the queue
mode: DiscardMode,
},
/// Dynamic discarding is where the discard limit can change over time, controlled
/// by the `updater` which is an implementation of the [DynamicDiscardController]
Dynamic {
/// The current limit. If 0, means jobs will never queue and immediately be discarded
/// once all workers are busy
limit: usize,
/// Define the factory messaging discard mode denoting if the oldest or newest messages
/// should be discarded in back-pressure scenarios
///
/// Default is [DiscardMode::Oldest], meaning discard jobs at the head of the queue
mode: DiscardMode,
/// The [DynamicDiscardController] implementation, which computes new limits dynamically
/// based on whatever metrics it wants
updater: Box<dyn DynamicDiscardController>,
},
}

impl DiscardSettings {
pub(crate) fn get_worker_settings(&self) -> WorkerDiscardSettings {
match &self {
Self::None => WorkerDiscardSettings::None,
Self::Static { limit, mode } => WorkerDiscardSettings::Static {
limit: *limit,
mode: *mode,
},
Self::Dynamic { limit, mode, .. } => WorkerDiscardSettings::Static {
limit: *limit,
mode: *mode,
},
}
}

/// Retrieve the discarding limit and [DiscardMode], if configured
pub fn get_limit_and_mode(&self) -> Option<(usize, DiscardMode)> {
match self {
Self::None => None,
Self::Static { limit, mode, .. } => Some((*limit, *mode)),
Self::Dynamic { limit, mode, .. } => Some((*limit, *mode)),
}
}
}

/// Controls the dynamic concurrency level by receiving periodic snapshots of job statistics
/// and emitting a new concurrency limit
#[cfg_attr(feature = "async-trait", crate::async_trait)]
pub trait DynamicDiscardController: Send + Sync + 'static {
/// Compute the new threshold for discarding
///
/// If you want to utilize metrics exposed in [crate::modular_factory::stats] you can gather them
/// by utilizing `stats_facebook::service_data::get_service_data_singleton` to retrieve a
/// accessor to `ServiceData` which you can then resolve stats by name (either timeseries or
/// counters)
///
/// The namespace of stats collected on the base controller factory are
/// `base_controller.factory.{FACTORY_NAME}.{STAT}`
///
/// If no factory name is set, then "all" will be inserted
async fn compute(&mut self, current_threshold: usize) -> usize;
}

/// Reason for discarding a job
pub enum DiscardReason {
/// The job TTLd
TtlExpired,
/// The job was rejected or dropped due to loadshedding
Loadshed,
/// The job was dropped due to factory shutting down
Shutdown,
}

/// Trait defining the discard handler for a factory.
pub trait DiscardHandler<TKey, TMsg>: Send + Sync + 'static
where
TKey: JobKey,
TMsg: Message,
{
/// Called on a job prior to being dropped from the factory.
///
/// Useful scenarios are (a) collecting metrics, (b) logging, (c) tearing
/// down resources in the job, etc.
fn discard(&self, reason: DiscardReason, job: &mut Job<TKey, TMsg>);
}
Loading

0 comments on commit a9ff369

Please sign in to comment.