diff --git a/components/raftstore/src/store/config.rs b/components/raftstore/src/store/config.rs index 2427c438bf8..8022b67ff0c 100644 --- a/components/raftstore/src/store/config.rs +++ b/components/raftstore/src/store/config.rs @@ -414,6 +414,11 @@ pub struct Config { #[doc(hidden)] #[online_config(hidden)] pub min_pending_apply_region_count: u64, + + /// Whether to skip manual compaction in the clean up worker for `write` and + /// `default` column family + #[doc(hidden)] + pub skip_manual_compaction_in_clean_up_worker: bool, } impl Default for Config { @@ -552,6 +557,7 @@ impl Default for Config { enable_v2_compatible_learner: false, unsafe_disable_check_quorum: false, min_pending_apply_region_count: 10, + skip_manual_compaction_in_clean_up_worker: false, } } } diff --git a/components/raftstore/src/store/fsm/store.rs b/components/raftstore/src/store/fsm/store.rs index 92618efbd47..56d0362ebf8 100644 --- a/components/raftstore/src/store/fsm/store.rs +++ b/components/raftstore/src/store/fsm/store.rs @@ -1704,7 +1704,12 @@ impl RaftBatchSystem { ReadRunner::new(self.router.clone(), engines.raft.clone()), ); - let compact_runner = CompactRunner::new(engines.kv.clone(), bgworker_remote); + let compact_runner = CompactRunner::new( + engines.kv.clone(), + bgworker_remote, + cfg.clone().tracker(String::from("compact-runner")), + cfg.value().skip_manual_compaction_in_clean_up_worker, + ); let cleanup_sst_runner = CleanupSstRunner::new(Arc::clone(&importer)); let gc_snapshot_runner = GcSnapshotRunner::new( meta.get_id(), diff --git a/components/raftstore/src/store/worker/compact.rs b/components/raftstore/src/store/worker/compact.rs index 06cc8dd5828..069e1f7ec2b 100644 --- a/components/raftstore/src/store/worker/compact.rs +++ b/components/raftstore/src/store/worker/compact.rs @@ -8,18 +8,20 @@ use std::{ time::Duration, }; -use engine_traits::{KvEngine, ManualCompactionOptions, RangeStats, CF_WRITE}; +use engine_traits::{KvEngine, ManualCompactionOptions, RangeStats, CF_LOCK, CF_WRITE}; use fail::fail_point; use futures_util::compat::Future01CompatExt; use thiserror::Error; use tikv_util::{ - box_try, debug, error, info, time::Instant, timer::GLOBAL_TIMER_HANDLE, warn, worker::Runnable, + box_try, config::Tracker, debug, error, info, time::Instant, timer::GLOBAL_TIMER_HANDLE, warn, + worker::Runnable, }; use yatp::Remote; use super::metrics::{ COMPACT_RANGE_CF, FULL_COMPACT, FULL_COMPACT_INCREMENTAL, FULL_COMPACT_PAUSE, }; +use crate::store::Config; type Key = Vec; @@ -214,14 +216,27 @@ pub enum Error { pub struct Runner { engine: E, remote: Remote, + cfg_tracker: Tracker, + // Whether to skip the manual compaction of write and default comlumn family. + skip_compact: bool, } impl Runner where E: KvEngine, { - pub fn new(engine: E, remote: Remote) -> Runner { - Runner { engine, remote } + pub fn new( + engine: E, + remote: Remote, + cfg_tracker: Tracker, + skip_compact: bool, + ) -> Runner { + Runner { + engine, + remote, + cfg_tracker, + skip_compact, + } } /// Periodic full compaction. @@ -369,6 +384,21 @@ where bottommost_level_force, } => { let cf = &cf_name; + if cf != CF_LOCK { + // check whether the config changed for ignoring manual compaction + if let Some(incoming) = self.cfg_tracker.any_new() { + self.skip_compact = incoming.skip_manual_compaction_in_clean_up_worker; + } + if self.skip_compact { + info!( + "skip compact range"; + "range_start" => start_key.as_ref().map(|k| log_wrappers::Value::key(k)), + "range_end" => end_key.as_ref().map(|k|log_wrappers::Value::key(k)), + "cf" => cf_name, + ); + return; + } + } if let Err(e) = self.compact_range_cf( cf, start_key.as_deref(), @@ -498,7 +528,10 @@ mod tests { E: KvEngine, { let pool = YatpPoolBuilder::new(DefaultTicker::default()).build_future_pool(); - (pool.clone(), Runner::new(engine, pool.remote().clone())) + ( + pool.clone(), + Runner::new(engine, pool.remote().clone(), Tracker::default(), false), + ) } #[test] diff --git a/components/test_raftstore-v2/src/node.rs b/components/test_raftstore-v2/src/node.rs index 70b6ccb1407..9e2d30aba02 100644 --- a/components/test_raftstore-v2/src/node.rs +++ b/components/test_raftstore-v2/src/node.rs @@ -311,6 +311,7 @@ impl Simulator for NodeCluster { let (sender, _) = mpsc::unbounded(); let bg_worker = WorkerBuilder::new("background").thread_count(2).create(); let state: Arc> = Arc::default(); + let store_config = Arc::new(VersionTrack::new(raft_store)); node.start( raft_engine.clone(), tablet_registry, @@ -324,7 +325,7 @@ impl Simulator for NodeCluster { CollectorRegHandle::new_for_test(), bg_worker, pd_worker, - Arc::new(VersionTrack::new(raft_store)), + store_config.clone(), &state, importer, key_manager, @@ -338,27 +339,11 @@ impl Simulator for NodeCluster { ); assert!(node_id == 0 || node_id == node.id()); let node_id = node.id(); - - let region_split_size = cfg.coprocessor.region_split_size(); - let enable_region_bucket = cfg.coprocessor.enable_region_bucket(); - let region_bucket_size = cfg.coprocessor.region_bucket_size; - let mut raftstore_cfg = cfg.tikv.raft_store; - raftstore_cfg.optimize_for(true); - raftstore_cfg - .validate( - region_split_size, - enable_region_bucket, - region_bucket_size, - true, - ) - .unwrap(); - - let raft_store = Arc::new(VersionTrack::new(raftstore_cfg)); cfg_controller.register( Module::Raftstore, Box::new(RaftstoreConfigManager::new( node.refresh_config_scheduler(), - raft_store, + store_config, )), ); diff --git a/components/test_raftstore/src/common-test.toml b/components/test_raftstore/src/common-test.toml index 8e4bed8b62b..3eba26403ad 100644 --- a/components/test_raftstore/src/common-test.toml +++ b/components/test_raftstore/src/common-test.toml @@ -73,6 +73,7 @@ store-io-pool-size = 0 apply-pool-size = 1 store-pool-size = 1 snap-generator-pool-size = 2 +skip-manual-compaction-in-clean_up-worker = false [coprocessor] [rocksdb] diff --git a/components/test_raftstore/src/node.rs b/components/test_raftstore/src/node.rs index 5a5b86150c2..98c2af5632c 100644 --- a/components/test_raftstore/src/node.rs +++ b/components/test_raftstore/src/node.rs @@ -246,10 +246,11 @@ impl Simulator for NodeCluster { ) .unwrap(); let bg_worker = WorkerBuilder::new("background").thread_count(2).create(); + let store_config = Arc::new(VersionTrack::new(raft_store)); let mut node = Node::new( system, &cfg.server, - Arc::new(VersionTrack::new(raft_store)), + store_config.clone(), cfg.storage.api_version(), Arc::clone(&self.pd_client), Arc::default(), @@ -353,25 +354,11 @@ impl Simulator for NodeCluster { .map(|p| p.path().to_str().unwrap().to_owned()) ); - let region_split_size = cfg.coprocessor.region_split_size(); - let enable_region_bucket = cfg.coprocessor.enable_region_bucket(); - let region_bucket_size = cfg.coprocessor.region_bucket_size; - let mut raftstore_cfg = cfg.tikv.raft_store; - raftstore_cfg.optimize_for(false); - raftstore_cfg - .validate( - region_split_size, - enable_region_bucket, - region_bucket_size, - false, - ) - .unwrap(); - let raft_store = Arc::new(VersionTrack::new(raftstore_cfg)); cfg_controller.register( Module::Raftstore, Box::new(RaftstoreConfigManager::new( node.refresh_config_scheduler(), - raft_store, + store_config, )), ); diff --git a/tests/failpoints/cases/test_split_region.rs b/tests/failpoints/cases/test_split_region.rs index 28ceba892d0..6a1b135ecb6 100644 --- a/tests/failpoints/cases/test_split_region.rs +++ b/tests/failpoints/cases/test_split_region.rs @@ -17,7 +17,7 @@ use kvproto::{ Mutation, Op, PessimisticLockRequest, PrewriteRequest, PrewriteRequestPessimisticAction::*, }, metapb::Region, - pdpb::CheckPolicy, + pdpb::{self, CheckPolicy}, raft_serverpb::{PeerState, RaftMessage}, tikvpb::TikvClient, }; @@ -1610,3 +1610,65 @@ fn test_split_by_split_check_on_keys() { // waiting the split, cluster.wait_region_split(®ion); } + +fn change(name: &str, value: &str) -> std::collections::HashMap { + let mut m = std::collections::HashMap::new(); + m.insert(name.to_owned(), value.to_owned()); + m +} + +#[test] +fn test_turn_off_manual_compaction_caused_by_no_valid_split_key() { + let mut cluster = new_node_cluster(0, 1); + cluster.run(); + let r = cluster.get_region(b""); + cluster.must_split(&r, b"k1"); + let r = cluster.get_region(b"k1"); + cluster.must_split(&r, b"k2"); + cluster.must_put(b"k1", b"val"); + + let (tx, rx) = sync_channel(5); + fail::cfg_callback("on_compact_range_cf", move || { + tx.send(true).unwrap(); + }) + .unwrap(); + + let safe_point_inject = "safe_point_inject"; + fail::cfg(safe_point_inject, "return(100)").unwrap(); + + { + let sim = cluster.sim.rl(); + let cfg_controller = sim.get_cfg_controller(1).unwrap(); + cfg_controller + .update(change( + "raftstore.skip-manual-compaction-in-clean_up-worker", + "true", + )) + .unwrap(); + } + + let r = cluster.get_region(b"k1"); + cluster + .pd_client + .split_region(r.clone(), pdpb::CheckPolicy::Usekey, vec![b"k1".to_vec()]); + rx.recv_timeout(Duration::from_secs(1)).unwrap_err(); + + { + let sim = cluster.sim.rl(); + let cfg_controller = sim.get_cfg_controller(1).unwrap(); + cfg_controller + .update(change( + "raftstore.skip-manual-compaction-in-clean_up-worker", + "false", + )) + .unwrap(); + } + + cluster + .pd_client + .split_region(r, pdpb::CheckPolicy::Usekey, vec![b"k1".to_vec()]); + fail::cfg(safe_point_inject, "return(200)").unwrap(); + rx.recv_timeout(Duration::from_secs(1)).unwrap(); + rx.recv_timeout(Duration::from_secs(1)).unwrap(); + rx.try_recv().unwrap_err(); +}