Skip to content

Commit

Permalink
Add env variable to set actor's heartbeat. Distinguish heartbeat and …
Browse files Browse the repository at this point in the history
…other interval usages. (quickwit-oss#3434)
  • Loading branch information
fmassot authored May 31, 2023
1 parent 0ff620d commit 2d340c9
Show file tree
Hide file tree
Showing 20 changed files with 154 additions and 92 deletions.
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions quickwit/quickwit-actors/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ anyhow = { workspace = true }
async-trait = { workspace = true }
flume = { workspace = true }
futures = { workspace = true }
once_cell = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
Expand Down
46 changes: 36 additions & 10 deletions quickwit/quickwit-actors/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
//! - detect when some task is stuck and does not progress anymore
use std::fmt;
use std::num::NonZeroU64;

use once_cell::sync::Lazy;
use quickwit_proto::{ServiceError, ServiceErrorCode};
use tokio::time::Duration;
mod actor;
Expand Down Expand Up @@ -59,6 +61,8 @@ pub use observation::{Observation, ObservationType};
use quickwit_common::KillSwitch;
pub use spawn_builder::SpawnContext;
use thiserror::Error;
use tracing::info;
use tracing::log::warn;
pub use universe::Universe;

pub use self::actor_context::ActorContext;
Expand All @@ -73,16 +77,38 @@ pub use self::supervisor::{Supervisor, SupervisorState};
/// If an actor does not advertise a progress within an interval of duration `HEARTBEAT`,
/// its supervisor will consider it as blocked and will proceed to kill it, as well
/// as all of the actors all the actors that share the killswitch.
pub const HEARTBEAT: Duration = if cfg!(any(test, feature = "testsuite")) {
// Right now some unit test end when we detect that a
// pipeline has terminated, which can require waiting
// for a heartbeat.
//
// We use a shorter heartbeat to reduce the time running unit tests.
Duration::from_millis(500)
} else {
Duration::from_secs(3)
};
pub static HEARTBEAT: Lazy<Duration> = Lazy::new(heartbeat_from_env_or_default);

/// Returns the actor's heartbeat duration:
/// - Derived from `QW_ACTOR_HEARTBEAT_SECS` if set and valid.
/// - Defaults to 30 seconds or 500ms for tests.
fn heartbeat_from_env_or_default() -> Duration {
if cfg!(any(test, feature = "testsuite")) {
// Right now some unit test end when we detect that a
// pipeline has terminated, which can require waiting
// for a heartbeat.
//
// We use a shorter heartbeat to reduce the time running unit tests.
return Duration::from_millis(500);
}
if let Ok(actor_hearbeat_secs_str) = std::env::var("QW_ACTOR_HEARTBEAT_SECS") {
if let Ok(actor_hearbeat_secs) = actor_hearbeat_secs_str.parse::<NonZeroU64>() {
info!("Set the actor heartbeat to {actor_hearbeat_secs} seconds.");
return Duration::from_secs(actor_hearbeat_secs.get());
} else {
warn!(
"Failed to parse `QW_ACTOR_HEARTBEAT_SECS={actor_hearbeat_secs_str}` in seconds > \
0, using default heartbeat (30 seconds)."
);
};
} else {
warn!(
"Failed to parse `QW_ACTOR_HEARTBEAT_SECS` in a valid unicode string, using default \
heartbeat (30 seconds)."
);
}
Duration::from_secs(30)
}

/// Time we accept to wait for a new observation.
///
Expand Down
6 changes: 4 additions & 2 deletions quickwit/quickwit-actors/src/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ impl<A: Actor> Actor for Supervisor<A> {
}

async fn initialize(&mut self, ctx: &ActorContext<Self>) -> Result<(), ActorExitStatus> {
ctx.schedule_self_msg(crate::HEARTBEAT, SuperviseLoop).await;
ctx.schedule_self_msg(*crate::HEARTBEAT, SuperviseLoop)
.await;
Ok(())
}

Expand Down Expand Up @@ -179,7 +180,8 @@ impl<A: Actor> Handler<SuperviseLoop> for Supervisor<A> {
ctx: &ActorContext<Self>,
) -> Result<Self::Reply, ActorExitStatus> {
self.supervise(ctx).await?;
ctx.schedule_self_msg(crate::HEARTBEAT, SuperviseLoop).await;
ctx.schedule_self_msg(*crate::HEARTBEAT, SuperviseLoop)
.await;
Ok(())
}
}
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-actors/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::HashMap;
use std::ops::Mul;
use std::time::Duration;

use async_trait::async_trait;
Expand Down Expand Up @@ -281,7 +282,7 @@ async fn test_timeouting_actor() {
ObservationType::Timeout
);
assert_eq!(buggy_handle.harvest_health(), Health::Healthy);
universe.sleep(crate::HEARTBEAT * 2).await;
universe.sleep(crate::HEARTBEAT.mul(2)).await;
assert_eq!(buggy_handle.harvest_health(), Health::FailureOrUnhealthy);
buggy_handle.kill().await;
}
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-control-plane/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ proptest = { workspace = true }
rand = { workspace = true }

quickwit-actors = { workspace = true, features = ["testsuite"] }
quickwit-cluster = { workspace = true, features = ["testsuite"] }
quickwit-common = { workspace = true, features = ["testsuite"] }
quickwit-config = { workspace = true, features = ["testsuite"] }
quickwit-metastore = { workspace = true, features = ["testsuite"] }
Expand Down
54 changes: 34 additions & 20 deletions quickwit/quickwit-control-plane/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use std::time::{Duration, Instant};
use anyhow::Context;
use async_trait::async_trait;
use itertools::Itertools;
use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, HEARTBEAT};
use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler};
use quickwit_cluster::{Cluster, ClusterMember};
use quickwit_config::service::QuickwitService;
use quickwit_config::SourceConfig;
Expand All @@ -42,10 +42,21 @@ use crate::indexing_plan::{
};
use crate::{NotifyIndexChangeRequest, NotifyIndexChangeResponse};

/// Interval between two controls (or checks) of the desired plan VS running plan.
const CONTROL_PLAN_LOOP_INTERVAL: Duration = if cfg!(any(test, feature = "testsuite")) {
Duration::from_millis(500)
} else {
Duration::from_secs(3)
};

/// Interval between two scheduling of indexing plans. No need to be faster than the
/// control plan loop.
// Note: it's currently not possible to define a const duration with
// `CONTROL_PLAN_LOOP_INTERVAL * number`.
const REFRESH_PLAN_LOOP_INTERVAL: Duration = if cfg!(any(test, feature = "testsuite")) {
Duration::from_secs(3)
} else {
Duration::from_secs(60 * 5)
Duration::from_secs(60)
};

const MIN_DURATION_BETWEEN_SCHEDULING: Duration = if cfg!(any(test, feature = "testsuite")) {
Expand Down Expand Up @@ -87,15 +98,15 @@ pub struct IndexingSchedulerState {
/// TODO(fmassot): to avoid a scheduling on each [`RefreshPlanLoop`], we can store in the
/// scheduler state a metastore version number that will be compared to the number stored in the
/// metastore itself.
/// - [`ControlPlanLoop`]: this event is scheduled every [`HEARTBEAT`] and control if the `desired
/// plan`, that is the last applied [`PhysicalIndexingPlan`] by the scheduler, and the `running
/// plan`, that is the indexing tasks running on all indexers and retrieved from the chitchat
/// state, are the same:
/// - [`ControlPlanLoop`]: this event is scheduled every [`CONTROL_PLAN_LOOP_INTERVAL`] and control
/// if the `desired plan`, that is the last applied [`PhysicalIndexingPlan`] by the scheduler, and
/// the `running plan`, that is the indexing tasks running on all indexers and retrieved from the
/// chitchat state, are the same:
/// - if node IDs are different, the scheduler will trigger a scheduling.
/// - if indexing tasks are different, the scheduler will apply again the last applied plan.
///
/// Finally, in order to give the time for each indexer to run their indexing tasks, the control
/// phase will wait at least [`MIN_DURATION_BETWEEN_SCHEDULING`] before comparing the desired
/// plase will wait at least [`MIN_DURATION_BETWEEN_SCHEDULING`] before comparing the desired
/// plan with the running plan.
pub struct IndexingScheduler {
cluster: Cluster,
Expand Down Expand Up @@ -132,7 +143,8 @@ impl Actor for IndexingScheduler {

async fn initialize(&mut self, ctx: &ActorContext<Self>) -> Result<(), ActorExitStatus> {
self.handle(RefreshPlanLoop, ctx).await?;
ctx.schedule_self_msg(HEARTBEAT, ControlPlanLoop).await;
ctx.schedule_self_msg(CONTROL_PLAN_LOOP_INTERVAL, ControlPlanLoop)
.await;
Ok(())
}
}
Expand Down Expand Up @@ -327,7 +339,8 @@ impl Handler<ControlPlanLoop> for IndexingScheduler {
if let Err(error) = self.control_running_plan().await {
error!("Error when controlling the running plan: `{}`.", error);
}
ctx.schedule_self_msg(HEARTBEAT, ControlPlanLoop).await;
ctx.schedule_self_msg(CONTROL_PLAN_LOOP_INTERVAL, ControlPlanLoop)
.await;
Ok(())
}
}
Expand Down Expand Up @@ -520,7 +533,7 @@ mod tests {
use std::time::Duration;

use chitchat::transport::ChannelTransport;
use quickwit_actors::{ActorHandle, Inbox, Universe, HEARTBEAT};
use quickwit_actors::{ActorHandle, Inbox, Universe};
use quickwit_cluster::{create_cluster_for_test, grpc_addr_from_listen_addr_for_test, Cluster};
use quickwit_common::test_utils::wait_until_predicate;
use quickwit_config::service::QuickwitService;
Expand All @@ -532,7 +545,7 @@ mod tests {
use quickwit_proto::indexing_api::{ApplyIndexingPlanRequest, IndexingTask};
use serde_json::json;

use super::IndexingScheduler;
use super::{IndexingScheduler, CONTROL_PLAN_LOOP_INTERVAL};
use crate::scheduler::{
get_indexing_plans_diff, MIN_DURATION_BETWEEN_SCHEDULING, REFRESH_PLAN_LOOP_INTERVAL,
};
Expand Down Expand Up @@ -625,10 +638,10 @@ mod tests {
assert!(scheduler_state.last_applied_physical_plan.is_some());
assert_eq!(indexing_service_inbox_messages.len(), 1);

// After a HEARTBEAT, the control loop will check if the desired plan is running on the
// indexer. As chitchat state of the indexer is not updated (we did not
// instantiate a indexing service for that), the control loop will apply again the
// same plan.
// After a CONTROL_PLAN_LOOP_INTERVAL, the control loop will check if the desired plan is
// running on the indexer. As chitchat state of the indexer is not updated (we did
// not instantiate a indexing service for that), the control loop will apply again
// the same plan.
// Check first the plan is not updated before `MIN_DURATION_BETWEEN_SCHEDULING`.
tokio::time::sleep(MIN_DURATION_BETWEEN_SCHEDULING.mul_f32(0.5)).await;
let scheduler_state = scheduler_handler.process_pending_and_observe().await;
Expand Down Expand Up @@ -689,7 +702,7 @@ mod tests {
let indexing_service_inbox = indexing_service_inboxes[0].clone();

// No indexer.
universe.sleep(HEARTBEAT).await;
universe.sleep(CONTROL_PLAN_LOOP_INTERVAL).await;
let scheduler_state = scheduler_handler.process_pending_and_observe().await;
let indexing_service_inbox_messages =
indexing_service_inbox.drain_for_test_typed::<ApplyIndexingPlanRequest>();
Expand Down Expand Up @@ -777,7 +790,7 @@ mod tests {
scheduler_state.num_schedule_indexing_plan == 1
}
},
HEARTBEAT * 4,
CONTROL_PLAN_LOOP_INTERVAL * 4,
Duration::from_millis(100),
)
.await
Expand All @@ -799,8 +812,9 @@ mod tests {
.await
.unwrap();

// Wait 2 heartbeats again and check the scheduler will not apply the plan several times.
universe.sleep(HEARTBEAT * 2).await;
// Wait 2 CONTROL_PLAN_LOOP_INTERVAL again and check the scheduler will not apply the plan
// several times.
universe.sleep(CONTROL_PLAN_LOOP_INTERVAL * 2).await;
let scheduler_state = scheduler_handler_arc.process_pending_and_observe().await;
assert_eq!(scheduler_state.num_schedule_indexing_plan, 1);

Expand Down Expand Up @@ -833,7 +847,7 @@ mod tests {
scheduler_state.num_schedule_indexing_plan == 2
}
},
HEARTBEAT * 10,
CONTROL_PLAN_LOOP_INTERVAL * 10,
Duration::from_millis(100),
)
.await
Expand Down
6 changes: 3 additions & 3 deletions quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,15 +471,15 @@ impl Handler<Supervise> for IndexingPipeline {
Health::Healthy => {}
Health::FailureOrUnhealthy => {
self.terminate().await;
ctx.schedule_self_msg(quickwit_actors::HEARTBEAT, Spawn { retry_count: 0 })
ctx.schedule_self_msg(*quickwit_actors::HEARTBEAT, Spawn { retry_count: 0 })
.await;
}
Health::Success => {
return Err(ActorExitStatus::Success);
}
}
}
ctx.schedule_self_msg(quickwit_actors::HEARTBEAT, Supervise)
ctx.schedule_self_msg(*quickwit_actors::HEARTBEAT, Supervise)
.await;
Ok(())
}
Expand Down Expand Up @@ -823,7 +823,7 @@ mod tests {
let indexer = universe.get::<Indexer>().into_iter().next().unwrap();
let _ = indexer.ask(Command::Quit).await;
for _ in 0..10 {
universe.sleep(quickwit_actors::HEARTBEAT).await;
universe.sleep(*quickwit_actors::HEARTBEAT).await;
// Check indexing pipeline has restarted.
let obs = indexing_pipeline_handler
.process_pending_and_observe()
Expand Down
10 changes: 5 additions & 5 deletions quickwit/quickwit-indexing/src/actors/indexing_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -710,7 +710,7 @@ impl Handler<SuperviseLoop> for IndexingService {
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
self.handle_supervise().await?;
ctx.schedule_self_msg(quickwit_actors::HEARTBEAT, SuperviseLoop)
ctx.schedule_self_msg(*quickwit_actors::HEARTBEAT, SuperviseLoop)
.await;
Ok(())
}
Expand Down Expand Up @@ -1267,12 +1267,12 @@ mod tests {
pipeline.quit().await;

// Let the service cleanup the merge pipelines.
universe.sleep(HEARTBEAT).await;
universe.sleep(*HEARTBEAT).await;

let observation = indexing_server_handle.process_pending_and_observe().await;
assert_eq!(observation.num_running_pipelines, 0);
assert_eq!(observation.num_running_merge_pipelines, 0);
universe.sleep(HEARTBEAT).await;
universe.sleep(*HEARTBEAT).await;
// Check that the merge pipeline is also shut down as they are no more indexing pipeilne on
// the index.
assert!(universe.get_one::<MergePipeline>().is_none());
Expand All @@ -1290,7 +1290,7 @@ mod tests {
_: FreezePipeline,
_ctx: &ActorContext<Self>,
) -> Result<Self::Reply, ActorExitStatus> {
tokio::time::sleep(HEARTBEAT * 5).await;
tokio::time::sleep(*HEARTBEAT * 5).await;
Ok(())
}
}
Expand Down Expand Up @@ -1368,7 +1368,7 @@ mod tests {
.send_message(FreezePipeline)
.await
.unwrap();
universe.sleep(HEARTBEAT * 5).await;
universe.sleep(*HEARTBEAT * 5).await;
// Check that indexing and merge pipelines are still running.
let observation = indexing_service_handle.observe().await;
assert_eq!(observation.num_running_pipelines, 1);
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-indexing/src/actors/merge_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,15 +376,15 @@ impl Handler<Supervise> for MergePipeline {
Health::Healthy => {}
Health::FailureOrUnhealthy => {
self.terminate().await;
ctx.schedule_self_msg(quickwit_actors::HEARTBEAT, Spawn { retry_count: 0 })
ctx.schedule_self_msg(*quickwit_actors::HEARTBEAT, Spawn { retry_count: 0 })
.await;
}
Health::Success => {
return Err(ActorExitStatus::Success);
}
}
}
ctx.schedule_self_msg(quickwit_actors::HEARTBEAT, Supervise)
ctx.schedule_self_msg(*quickwit_actors::HEARTBEAT, Supervise)
.await;
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/actors/merge_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ impl Handler<RefreshMetric> for MergePlanner {
self.pipeline_id.source_id.as_str(),
])
.set(self.ongoing_merge_operations_inventory.list().len() as i64);
ctx.schedule_self_msg(quickwit_actors::HEARTBEAT, RefreshMetric)
ctx.schedule_self_msg(*quickwit_actors::HEARTBEAT, RefreshMetric)
.await;
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/source/kafka_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ impl Source for KafkaSource {
) -> Result<Duration, ActorExitStatus> {
let now = Instant::now();
let mut batch = BatchBuilder::default();
let deadline = time::sleep(quickwit_actors::HEARTBEAT / 2);
let deadline = time::sleep(*quickwit_actors::HEARTBEAT / 2);
tokio::pin!(deadline);

loop {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ impl Source for KinesisSource {
let mut docs = Vec::new();
let mut checkpoint_delta = SourceCheckpointDelta::default();

let deadline = time::sleep(quickwit_actors::HEARTBEAT / 2);
let deadline = time::sleep(*quickwit_actors::HEARTBEAT / 2);
tokio::pin!(deadline);

loop {
Expand Down
Loading

0 comments on commit 2d340c9

Please sign in to comment.