Skip to content

Commit

Permalink
The minimal region cache (tikv#291)
Browse files Browse the repository at this point in the history
  • Loading branch information
ekexium authored Jul 21, 2021
1 parent 4404c7e commit c14f23a
Show file tree
Hide file tree
Showing 29 changed files with 1,328 additions and 478 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ jobs:
- uses: actions-rs/cargo@v1
with:
command: check
args: --all-targets --all-features

fmt:
name: rustfmt
Expand Down Expand Up @@ -49,7 +50,7 @@ jobs:
- uses: actions-rs/clippy-check@v1
with:
token: ${{ secrets.GITHUB_TOKEN }}
args: --all-features
args: --all-targets --all-features -- -D clippy::all
name: Clippy Output
unit-test:
name: unit test
Expand Down Expand Up @@ -98,7 +99,6 @@ jobs:
path: |
~/.cargo/.crates.toml
~/.cargo/.crates2.json
~/.cargo/bin
~/.cargo/registry/index
~/.cargo/registry/cache
key: ${{ runner.os }}-build-${{ env.cache-name }}-${{ hashFiles('Cargo.lock') }}
Expand Down
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ serde_derive = "1.0"
slog = { version = "2.3", features = ["max_level_trace", "release_max_level_debug"] }
slog-term = { version = "2.4" }
thiserror = "1"
tokio = { version = "1.0", features = [ "sync", "time" ] }
tokio = { version = "1", features = [ "sync", "rt-multi-thread", "macros" ] }
async-recursion = "0.3"

tikv-client-common = { version = "0.1.0", path = "tikv-client-common" }
tikv-client-pd = { version = "0.1.0", path = "tikv-client-pd" }
Expand All @@ -50,7 +51,7 @@ proptest = "1"
proptest-derive = "0.3"
serial_test = "0.5.0"
simple_logger = "1"
tokio = { version = "1.0", features = [ "sync", "rt-multi-thread", "macros" ] }
tokio = { version = "1", features = [ "sync", "rt-multi-thread", "macros" ] }
reqwest = {version = "0.11", default-features = false, features = ["native-tls-vendored"]}
serde_json = "1"

Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
default: check

check:
cargo check --all
cargo check --all --all-targets --all-features
cargo fmt -- --check
cargo clippy -- -D clippy::all
cargo clippy --all-targets --all-features -- -D clippy::all

unit-test:
cargo test --all
Expand Down
5 changes: 5 additions & 0 deletions src/backoff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ impl Backoff {
self.kind == BackoffKind::None
}

/// Returns the number of attempts
pub fn current_attempts(&self) -> u32 {
self.current_attempts
}

/// Don't wait. Usually indicates that we should not retry a request.
pub const fn no_backoff() -> Backoff {
Backoff {
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ mod pd;
#[doc(hidden)]
pub mod raw;
mod region;
mod region_cache;
mod stats;
mod store;
mod timestamp;
Expand Down
63 changes: 44 additions & 19 deletions src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@

use crate::{
pd::{PdClient, PdRpcClient, RetryClient},
region::{Region, RegionId},
store::Store,
region::{RegionId, RegionWithLeader},
store::RegionStore,
Config, Error, Key, Result, Timestamp,
};
use async_trait::async_trait;
use derive_new::new;
use slog::{Drain, Logger};
use std::{any::Any, sync::Arc};
use tikv_client_proto::metapb;
use tikv_client_store::{KvClient, KvConnect, Request};
Expand All @@ -21,8 +22,16 @@ use tikv_client_store::{KvClient, KvConnect, Request};
/// client can be tested without doing any RPC calls.
pub async fn pd_rpc_client() -> PdRpcClient<MockKvConnect, MockCluster> {
let config = Config::default();
let plain = slog_term::PlainSyncDecorator::new(std::io::stdout());
let logger = Logger::root(
slog_term::FullFormat::new(plain)
.build()
.filter_level(slog::Level::Info)
.fuse(),
o!(),
);
PdRpcClient::new(
&config,
config.clone(),
|_, _| MockKvConnect,
|e, sm| {
futures::future::ok(RetryClient::new_with_cluster(
Expand All @@ -33,11 +42,13 @@ pub async fn pd_rpc_client() -> PdRpcClient<MockKvConnect, MockCluster> {
))
},
false,
logger,
)
.await
.unwrap()
}

#[allow(clippy::type_complexity)]
#[derive(new, Default, Clone)]
pub struct MockKvClient {
pub addr: String,
Expand Down Expand Up @@ -93,27 +104,31 @@ impl MockPdClient {
}
}

pub fn region1() -> Region {
let mut region = Region::default();
pub fn region1() -> RegionWithLeader {
let mut region = RegionWithLeader::default();
region.region.id = 1;
region.region.set_start_key(vec![0]);
region.region.set_end_key(vec![10]);

let mut leader = metapb::Peer::default();
leader.store_id = 41;
let leader = metapb::Peer {
store_id: 41,
..Default::default()
};
region.leader = Some(leader);

region
}

pub fn region2() -> Region {
let mut region = Region::default();
pub fn region2() -> RegionWithLeader {
let mut region = RegionWithLeader::default();
region.region.id = 2;
region.region.set_start_key(vec![10]);
region.region.set_end_key(vec![250, 250]);

let mut leader = metapb::Peer::default();
leader.store_id = 42;
let leader = metapb::Peer {
store_id: 42,
..Default::default()
};
region.leader = Some(leader);

region
Expand All @@ -124,11 +139,11 @@ impl MockPdClient {
impl PdClient for MockPdClient {
type KvClient = MockKvClient;

async fn map_region_to_store(self: Arc<Self>, region: Region) -> Result<Store> {
Ok(Store::new(region, Arc::new(self.client.clone())))
async fn map_region_to_store(self: Arc<Self>, region: RegionWithLeader) -> Result<RegionStore> {
Ok(RegionStore::new(region, Arc::new(self.client.clone())))
}

async fn region_for_key(&self, key: &Key) -> Result<Region> {
async fn region_for_key(&self, key: &Key) -> Result<RegionWithLeader> {
let bytes: &[_] = key.into();
let region = if bytes.is_empty() || bytes[0] < 10 {
Self::region1()
Expand All @@ -139,11 +154,11 @@ impl PdClient for MockPdClient {
Ok(region)
}

async fn region_for_id(&self, id: RegionId) -> Result<Region> {
async fn region_for_id(&self, id: RegionId) -> Result<RegionWithLeader> {
match id {
1 => Ok(Self::region1()),
2 => Ok(Self::region2()),
_ => Err(Error::RegionNotFound { region_id: id }),
_ => Err(Error::RegionNotFoundInResponse { region_id: id }),
}
}

Expand All @@ -154,11 +169,21 @@ impl PdClient for MockPdClient {
async fn update_safepoint(self: Arc<Self>, _safepoint: u64) -> Result<bool> {
unimplemented!()
}

async fn update_leader(
&self,
_ver_id: crate::region::RegionVerId,
_leader: metapb::Peer,
) -> Result<()> {
todo!()
}

async fn invalidate_region_cache(&self, _ver_id: crate::region::RegionVerId) {}
}

pub fn mock_store() -> Store {
Store {
region: Region::default(),
pub fn mock_store() -> RegionStore {
RegionStore {
region_with_leader: RegionWithLeader::default(),
client: Arc::new(MockKvClient::new("foo".to_owned(), None)),
}
}
Loading

0 comments on commit c14f23a

Please sign in to comment.