Skip to content

Commit

Permalink
raftstore: fix store_heartbeat incorrectly reporting store.is_busy ==…
Browse files Browse the repository at this point in the history
… true. (tikv#16494)

close tikv#16491

In this pr, `completed_apply_peers_count` is replaced with `Option<u64>` from `u64`.

When initializing, `completed_apply_peers_count` will be set with `Some(0)`. If the check finds
that the store already finishes its `busy_on_apply` check  with `false` or starting from empty regions, this value
will be reset with `None` to mark the check is finished and no need to keep the check anymore next time.

Signed-off-by: lucasliang <[email protected]>
  • Loading branch information
LykxSassinator authored Feb 5, 2024
1 parent 04370e9 commit 4a14cec
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 19 deletions.
10 changes: 9 additions & 1 deletion components/raftstore/src/store/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6579,6 +6579,12 @@ where
// If the peer is newly added or created, no need to check the apply status.
if last_idx <= RAFT_INIT_LOG_INDEX {
self.fsm.peer.busy_on_apply = None;
// And it should be recorded in the `completed_apply_peers_count`.
let mut meta = self.ctx.store_meta.lock().unwrap();
meta.busy_apply_peers.remove(&peer_id);
if let Some(count) = meta.completed_apply_peers_count.as_mut() {
*count += 1;
}
return;
}
assert!(self.fsm.peer.busy_on_apply.is_some());
Expand All @@ -6602,7 +6608,9 @@ where
{
let mut meta = self.ctx.store_meta.lock().unwrap();
meta.busy_apply_peers.remove(&peer_id);
meta.completed_apply_peers_count += 1;
if let Some(count) = meta.completed_apply_peers_count.as_mut() {
*count += 1;
}
}
debug!(
"peer completes applying logs";
Expand Down
38 changes: 26 additions & 12 deletions components/raftstore/src/store/fsm/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,9 @@ pub struct StoreMeta {
/// Record the number of peers done for applying logs.
/// Without `completed_apply_peers_count`, it's hard to know whether all
/// peers are ready for applying logs.
pub completed_apply_peers_count: u64,
/// If None, it means the store is start from empty, no need to check and
/// update it anymore.
pub completed_apply_peers_count: Option<u64>,
}

impl StoreRegionMeta for StoreMeta {
Expand Down Expand Up @@ -249,7 +251,7 @@ impl StoreMeta {
region_read_progress: RegionReadProgressRegistry::new(),
damaged_ranges: HashMap::default(),
busy_apply_peers: HashSet::default(),
completed_apply_peers_count: 0,
completed_apply_peers_count: Some(0),
}
}

Expand Down Expand Up @@ -2725,8 +2727,14 @@ impl<'a, EK: KvEngine, ER: RaftEngine, T: Transport> StoreFsmDelegate<'a, EK, ER
start_ts_sec: u32,
region_count: u64,
busy_apply_peers_count: u64,
completed_apply_peers_count: u64,
completed_apply_peers_count: Option<u64>,
) -> bool {
// No need to check busy status if there are no regions.
if completed_apply_peers_count.is_none() || region_count == 0 {
return false;
}

let completed_apply_peers_count = completed_apply_peers_count.unwrap();
let during_starting_stage = {
(time::get_time().sec as u32).saturating_sub(start_ts_sec)
<= STORE_CHECK_PENDING_APPLY_DURATION.as_secs() as u32
Expand All @@ -2737,7 +2745,7 @@ impl<'a, EK: KvEngine, ER: RaftEngine, T: Transport> StoreFsmDelegate<'a, EK, ER
// regarded as the candidate for balancing leaders.
if during_starting_stage {
let completed_target_count = (|| {
fail_point!("on_mock_store_completed_target_count", |_| 0);
fail_point!("on_mock_store_completed_target_count", |_| 100);
std::cmp::max(
1,
STORE_CHECK_COMPLETE_APPLY_REGIONS_PERCENT * region_count / 100,
Expand All @@ -2752,7 +2760,7 @@ impl<'a, EK: KvEngine, ER: RaftEngine, T: Transport> StoreFsmDelegate<'a, EK, ER
self.ctx.cfg.min_pending_apply_region_count,
region_count.saturating_sub(completed_target_count),
);
busy_apply_peers_count >= pending_target_count
pending_target_count > 0 && busy_apply_peers_count >= pending_target_count
}
} else {
// Already started for a fairy long time.
Expand All @@ -2765,7 +2773,7 @@ impl<'a, EK: KvEngine, ER: RaftEngine, T: Transport> StoreFsmDelegate<'a, EK, ER

stats.set_store_id(self.ctx.store_id());

let completed_apply_peers_count: u64;
let completed_apply_peers_count: Option<u64>;
let busy_apply_peers_count: u64;
{
let meta = self.ctx.store_meta.lock().unwrap();
Expand Down Expand Up @@ -2810,18 +2818,24 @@ impl<'a, EK: KvEngine, ER: RaftEngine, T: Transport> StoreFsmDelegate<'a, EK, ER
.swap(0, Ordering::Relaxed),
);

let store_is_busy = self
.ctx
.global_stat
.stat
.is_busy
.swap(false, Ordering::Relaxed);
let busy_on_apply = self.check_store_is_busy_on_apply(
start_time,
stats.get_region_count() as u64,
busy_apply_peers_count,
completed_apply_peers_count,
);
// If the store already pass the check, it should clear the
// `completed_apply_peers_count` to skip the check next time.
if !busy_on_apply {
let mut meta = self.ctx.store_meta.lock().unwrap();
meta.completed_apply_peers_count = None;
}
let store_is_busy = self
.ctx
.global_stat
.stat
.is_busy
.swap(false, Ordering::Relaxed);
stats.set_is_busy(store_is_busy || busy_on_apply);

let mut query_stats = QueryStats::default();
Expand Down
22 changes: 16 additions & 6 deletions tests/failpoints/cases/test_pending_peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,12 @@ fn test_on_check_busy_on_apply_peers() {
must_get_equal(&cluster.get_engine(2), b"k1", b"v1");
must_get_equal(&cluster.get_engine(3), b"k1", b"v1");

// Check the start status for peer 1003.
cluster.must_send_store_heartbeat(3);
sleep_ms(100);
let stats = cluster.pd_client.get_store_stats(3).unwrap();
assert!(!stats.is_busy);

// Pause peer 1003 on applying logs to make it pending.
let before_apply_stat = cluster.apply_state(r1, 3);
cluster.stop_node(3);
Expand All @@ -149,23 +155,27 @@ fn test_on_check_busy_on_apply_peers() {
cluster.run_node(3).unwrap();
let after_apply_stat = cluster.apply_state(r1, 3);
assert!(after_apply_stat.applied_index == before_apply_stat.applied_index);
// Case 1: no completed regions.

// Case 1: completed regions < target count.
fail::cfg("on_mock_store_completed_target_count", "return").unwrap();
sleep_ms(100);
cluster.must_send_store_heartbeat(3);
sleep_ms(100);
let stats = cluster.pd_client.get_store_stats(3).unwrap();
assert!(stats.is_busy);
// Case 2: completed_apply_peers_count > completed_target_count but
// there exists busy peers.
fail::cfg("on_mock_store_completed_target_count", "return").unwrap();
fail::remove("on_mock_store_completed_target_count");
sleep_ms(100);

// Case 2: completed_apply_peers_count > completed_target_count but
// there exists no busy peers.
cluster.must_send_store_heartbeat(3);
sleep_ms(100);
let stats = cluster.pd_client.get_store_stats(3).unwrap();
assert!(!stats.is_busy);
fail::remove("on_mock_store_completed_target_count");

// After peer 1003 is recovered, store also should not be marked with busy.
fail::remove("on_handle_apply_1003");
sleep_ms(100);
// After peer 1003 is recovered, store should not be marked with busy.
let stats = cluster.pd_client.get_store_stats(3).unwrap();
assert!(!stats.is_busy);
}

0 comments on commit 4a14cec

Please sign in to comment.