Skip to content

Commit

Permalink
raftstore: make manual compaction in cleanup worker be able to be ign…
Browse files Browse the repository at this point in the history
…ored dynamically (tikv#16547)

close tikv#15282

make manual compaction in cleanup worker be able to be ignored dynamically

Signed-off-by: SpadeA-Tang <[email protected]>
  • Loading branch information
SpadeA-Tang authored Feb 21, 2024
1 parent ec64762 commit 8cdf87b
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 41 deletions.
6 changes: 6 additions & 0 deletions components/raftstore/src/store/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,11 @@ pub struct Config {
#[doc(hidden)]
#[online_config(hidden)]
pub min_pending_apply_region_count: u64,

/// Whether to skip manual compaction in the clean up worker for `write` and
/// `default` column family
#[doc(hidden)]
pub skip_manual_compaction_in_clean_up_worker: bool,
}

impl Default for Config {
Expand Down Expand Up @@ -552,6 +557,7 @@ impl Default for Config {
enable_v2_compatible_learner: false,
unsafe_disable_check_quorum: false,
min_pending_apply_region_count: 10,
skip_manual_compaction_in_clean_up_worker: false,
}
}
}
Expand Down
7 changes: 6 additions & 1 deletion components/raftstore/src/store/fsm/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1704,7 +1704,12 @@ impl<EK: KvEngine, ER: RaftEngine> RaftBatchSystem<EK, ER> {
ReadRunner::new(self.router.clone(), engines.raft.clone()),
);

let compact_runner = CompactRunner::new(engines.kv.clone(), bgworker_remote);
let compact_runner = CompactRunner::new(
engines.kv.clone(),
bgworker_remote,
cfg.clone().tracker(String::from("compact-runner")),
cfg.value().skip_manual_compaction_in_clean_up_worker,
);
let cleanup_sst_runner = CleanupSstRunner::new(Arc::clone(&importer));
let gc_snapshot_runner = GcSnapshotRunner::new(
meta.get_id(),
Expand Down
43 changes: 38 additions & 5 deletions components/raftstore/src/store/worker/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,20 @@ use std::{
time::Duration,
};

use engine_traits::{KvEngine, ManualCompactionOptions, RangeStats, CF_WRITE};
use engine_traits::{KvEngine, ManualCompactionOptions, RangeStats, CF_LOCK, CF_WRITE};
use fail::fail_point;
use futures_util::compat::Future01CompatExt;
use thiserror::Error;
use tikv_util::{
box_try, debug, error, info, time::Instant, timer::GLOBAL_TIMER_HANDLE, warn, worker::Runnable,
box_try, config::Tracker, debug, error, info, time::Instant, timer::GLOBAL_TIMER_HANDLE, warn,
worker::Runnable,
};
use yatp::Remote;

use super::metrics::{
COMPACT_RANGE_CF, FULL_COMPACT, FULL_COMPACT_INCREMENTAL, FULL_COMPACT_PAUSE,
};
use crate::store::Config;

type Key = Vec<u8>;

Expand Down Expand Up @@ -214,14 +216,27 @@ pub enum Error {
pub struct Runner<E> {
engine: E,
remote: Remote<yatp::task::future::TaskCell>,
cfg_tracker: Tracker<Config>,
// Whether to skip the manual compaction of write and default comlumn family.
skip_compact: bool,
}

impl<E> Runner<E>
where
E: KvEngine,
{
pub fn new(engine: E, remote: Remote<yatp::task::future::TaskCell>) -> Runner<E> {
Runner { engine, remote }
pub fn new(
engine: E,
remote: Remote<yatp::task::future::TaskCell>,
cfg_tracker: Tracker<Config>,
skip_compact: bool,
) -> Runner<E> {
Runner {
engine,
remote,
cfg_tracker,
skip_compact,
}
}

/// Periodic full compaction.
Expand Down Expand Up @@ -369,6 +384,21 @@ where
bottommost_level_force,
} => {
let cf = &cf_name;
if cf != CF_LOCK {
// check whether the config changed for ignoring manual compaction
if let Some(incoming) = self.cfg_tracker.any_new() {
self.skip_compact = incoming.skip_manual_compaction_in_clean_up_worker;
}
if self.skip_compact {
info!(
"skip compact range";
"range_start" => start_key.as_ref().map(|k| log_wrappers::Value::key(k)),
"range_end" => end_key.as_ref().map(|k|log_wrappers::Value::key(k)),
"cf" => cf_name,
);
return;
}
}
if let Err(e) = self.compact_range_cf(
cf,
start_key.as_deref(),
Expand Down Expand Up @@ -498,7 +528,10 @@ mod tests {
E: KvEngine,
{
let pool = YatpPoolBuilder::new(DefaultTicker::default()).build_future_pool();
(pool.clone(), Runner::new(engine, pool.remote().clone()))
(
pool.clone(),
Runner::new(engine, pool.remote().clone(), Tracker::default(), false),
)
}

#[test]
Expand Down
21 changes: 3 additions & 18 deletions components/test_raftstore-v2/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ impl<EK: KvEngine> Simulator<EK> for NodeCluster<EK> {
let (sender, _) = mpsc::unbounded();
let bg_worker = WorkerBuilder::new("background").thread_count(2).create();
let state: Arc<Mutex<GlobalReplicationState>> = Arc::default();
let store_config = Arc::new(VersionTrack::new(raft_store));
node.start(
raft_engine.clone(),
tablet_registry,
Expand All @@ -324,7 +325,7 @@ impl<EK: KvEngine> Simulator<EK> for NodeCluster<EK> {
CollectorRegHandle::new_for_test(),
bg_worker,
pd_worker,
Arc::new(VersionTrack::new(raft_store)),
store_config.clone(),
&state,
importer,
key_manager,
Expand All @@ -338,27 +339,11 @@ impl<EK: KvEngine> Simulator<EK> for NodeCluster<EK> {
);
assert!(node_id == 0 || node_id == node.id());
let node_id = node.id();

let region_split_size = cfg.coprocessor.region_split_size();
let enable_region_bucket = cfg.coprocessor.enable_region_bucket();
let region_bucket_size = cfg.coprocessor.region_bucket_size;
let mut raftstore_cfg = cfg.tikv.raft_store;
raftstore_cfg.optimize_for(true);
raftstore_cfg
.validate(
region_split_size,
enable_region_bucket,
region_bucket_size,
true,
)
.unwrap();

let raft_store = Arc::new(VersionTrack::new(raftstore_cfg));
cfg_controller.register(
Module::Raftstore,
Box::new(RaftstoreConfigManager::new(
node.refresh_config_scheduler(),
raft_store,
store_config,
)),
);

Expand Down
1 change: 1 addition & 0 deletions components/test_raftstore/src/common-test.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ store-io-pool-size = 0
apply-pool-size = 1
store-pool-size = 1
snap-generator-pool-size = 2
skip-manual-compaction-in-clean_up-worker = false
[coprocessor]

[rocksdb]
Expand Down
19 changes: 3 additions & 16 deletions components/test_raftstore/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,10 +246,11 @@ impl<EK: KvEngine> Simulator<EK> for NodeCluster<EK> {
)
.unwrap();
let bg_worker = WorkerBuilder::new("background").thread_count(2).create();
let store_config = Arc::new(VersionTrack::new(raft_store));
let mut node = Node::new(
system,
&cfg.server,
Arc::new(VersionTrack::new(raft_store)),
store_config.clone(),
cfg.storage.api_version(),
Arc::clone(&self.pd_client),
Arc::default(),
Expand Down Expand Up @@ -353,25 +354,11 @@ impl<EK: KvEngine> Simulator<EK> for NodeCluster<EK> {
.map(|p| p.path().to_str().unwrap().to_owned())
);

let region_split_size = cfg.coprocessor.region_split_size();
let enable_region_bucket = cfg.coprocessor.enable_region_bucket();
let region_bucket_size = cfg.coprocessor.region_bucket_size;
let mut raftstore_cfg = cfg.tikv.raft_store;
raftstore_cfg.optimize_for(false);
raftstore_cfg
.validate(
region_split_size,
enable_region_bucket,
region_bucket_size,
false,
)
.unwrap();
let raft_store = Arc::new(VersionTrack::new(raftstore_cfg));
cfg_controller.register(
Module::Raftstore,
Box::new(RaftstoreConfigManager::new(
node.refresh_config_scheduler(),
raft_store,
store_config,
)),
);

Expand Down
64 changes: 63 additions & 1 deletion tests/failpoints/cases/test_split_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use kvproto::{
Mutation, Op, PessimisticLockRequest, PrewriteRequest, PrewriteRequestPessimisticAction::*,
},
metapb::Region,
pdpb::CheckPolicy,
pdpb::{self, CheckPolicy},
raft_serverpb::{PeerState, RaftMessage},
tikvpb::TikvClient,
};
Expand Down Expand Up @@ -1610,3 +1610,65 @@ fn test_split_by_split_check_on_keys() {
// waiting the split,
cluster.wait_region_split(&region);
}

fn change(name: &str, value: &str) -> std::collections::HashMap<String, String> {
let mut m = std::collections::HashMap::new();
m.insert(name.to_owned(), value.to_owned());
m
}

#[test]
fn test_turn_off_manual_compaction_caused_by_no_valid_split_key() {
let mut cluster = new_node_cluster(0, 1);
cluster.run();
let r = cluster.get_region(b"");
cluster.must_split(&r, b"k1");
let r = cluster.get_region(b"k1");
cluster.must_split(&r, b"k2");
cluster.must_put(b"k1", b"val");

let (tx, rx) = sync_channel(5);
fail::cfg_callback("on_compact_range_cf", move || {
tx.send(true).unwrap();
})
.unwrap();

let safe_point_inject = "safe_point_inject";
fail::cfg(safe_point_inject, "return(100)").unwrap();

{
let sim = cluster.sim.rl();
let cfg_controller = sim.get_cfg_controller(1).unwrap();
cfg_controller
.update(change(
"raftstore.skip-manual-compaction-in-clean_up-worker",
"true",
))
.unwrap();
}

let r = cluster.get_region(b"k1");
cluster
.pd_client
.split_region(r.clone(), pdpb::CheckPolicy::Usekey, vec![b"k1".to_vec()]);
rx.recv_timeout(Duration::from_secs(1)).unwrap_err();

{
let sim = cluster.sim.rl();
let cfg_controller = sim.get_cfg_controller(1).unwrap();
cfg_controller
.update(change(
"raftstore.skip-manual-compaction-in-clean_up-worker",
"false",
))
.unwrap();
}

cluster
.pd_client
.split_region(r, pdpb::CheckPolicy::Usekey, vec![b"k1".to_vec()]);
fail::cfg(safe_point_inject, "return(200)").unwrap();
rx.recv_timeout(Duration::from_secs(1)).unwrap();
rx.recv_timeout(Duration::from_secs(1)).unwrap();
rx.try_recv().unwrap_err();
}

0 comments on commit 8cdf87b

Please sign in to comment.