Skip to content

Commit

Permalink
Merge branch 'master' into multi-reigon-test
Browse files Browse the repository at this point in the history
  • Loading branch information
ekexium authored Apr 21, 2021
2 parents 0157f29 + 20318ab commit ec97ea2
Show file tree
Hide file tree
Showing 22 changed files with 394 additions and 27 deletions.
8 changes: 4 additions & 4 deletions mock-tikv/src/pd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,11 +336,11 @@ impl Pd for MockPd {
todo!()
}

fn get_dc_locations(
fn get_dc_location_info(
&mut self,
_: ::grpcio::RpcContext<'_>,
_: GetDcLocationsRequest,
_: ::grpcio::UnarySink<GetDcLocationsResponse>,
_: grpcio::RpcContext,
_: GetDcLocationInfoRequest,
_: grpcio::UnarySink<GetDcLocationInfoResponse>,
) {
todo!()
}
Expand Down
33 changes: 30 additions & 3 deletions mock-tikv/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use derive_new::new;
use futures::{FutureExt, TryFutureExt};
use grpcio::{Environment, Server, ServerBuilder};
use std::sync::Arc;
use tikv_client_proto::{kvrpcpb::*, tikvpb::*};
use tikv_client_proto::{coprocessor_v2::*, kvrpcpb::*, tikvpb::*};

pub const MOCK_TIKV_PORT: u16 = 50019;

Expand Down Expand Up @@ -514,11 +514,38 @@ impl Tikv for MockTikv {
todo!()
}

fn raw_get_key_ttl(
&mut self,
_: grpcio::RpcContext,
_: RawGetKeyTtlRequest,
_: grpcio::UnarySink<RawGetKeyTtlResponse>,
) {
todo!()
}

fn raw_compare_and_swap(
&mut self,
_: grpcio::RpcContext,
_: RawCasRequest,
_: grpcio::UnarySink<RawCasResponse>,
) {
todo!()
}

fn coprocessor_v2(
&mut self,
_: grpcio::RpcContext,
_: tikv_client_proto::coprocessor_v2::RawCoprocessorRequest,
_: grpcio::UnarySink<tikv_client_proto::coprocessor_v2::RawCoprocessorResponse>,
_: RawCoprocessorRequest,
_: grpcio::UnarySink<RawCoprocessorResponse>,
) {
todo!()
}

fn get_store_safe_ts(
&mut self,
_: grpcio::RpcContext,
_: StoreSafeTsRequest,
_: grpcio::UnarySink<StoreSafeTsResponse>,
) {
todo!()
}
Expand Down
78 changes: 74 additions & 4 deletions src/raw/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ const MAX_RAW_KV_SCAN_LIMIT: u32 = 10240;
pub struct Client {
rpc: Arc<PdRpcClient>,
cf: Option<ColumnFamily>,
/// Whether to use the [`atomic mode`](Client::with_atomic_for_cas).
atomic: bool,
}

impl Client {
Expand Down Expand Up @@ -63,7 +65,11 @@ impl Client {
) -> Result<Client> {
let pd_endpoints: Vec<String> = pd_endpoints.into_iter().map(Into::into).collect();
let rpc = Arc::new(PdRpcClient::connect(&pd_endpoints, &config, false).await?);
Ok(Client { rpc, cf: None })
Ok(Client {
rpc,
cf: None,
atomic: false,
})
}

/// Set the column family of requests.
Expand All @@ -89,6 +95,22 @@ impl Client {
Client {
rpc: self.rpc.clone(),
cf: Some(cf),
atomic: self.atomic,
}
}

/// Set to use the atomic mode.
///
/// The only reason of using atomic mode is the
/// [`compare_and_swap`](Client::compare_and_swap) operation. To guarantee
/// the atomicity of CAS, write operations like [`put`](Client::put) or
/// [`delete`](Client::delete) in atomic mode are more expensive. Some
/// operations are not supported in the mode.
pub fn with_atomic_for_cas(&self) -> Client {
Client {
rpc: self.rpc.clone(),
cf: self.cf.clone(),
atomic: true,
}
}

Expand Down Expand Up @@ -171,7 +193,7 @@ impl Client {
/// # });
/// ```
pub async fn put(&self, key: impl Into<Key>, value: impl Into<Value>) -> Result<()> {
let request = new_raw_put_request(key.into(), value.into(), self.cf.clone());
let request = new_raw_put_request(key.into(), value.into(), self.cf.clone(), self.atomic);
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.single_region()
.await?
Expand Down Expand Up @@ -203,7 +225,11 @@ impl Client {
&self,
pairs: impl IntoIterator<Item = impl Into<KvPair>>,
) -> Result<()> {
let request = new_raw_batch_put_request(pairs.into_iter().map(Into::into), self.cf.clone());
let request = new_raw_batch_put_request(
pairs.into_iter().map(Into::into),
self.cf.clone(),
self.atomic,
);
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.multi_region()
.retry_region(DEFAULT_REGION_BACKOFF)
Expand Down Expand Up @@ -231,7 +257,7 @@ impl Client {
/// # });
/// ```
pub async fn delete(&self, key: impl Into<Key>) -> Result<()> {
let request = new_raw_delete_request(key.into(), self.cf.clone());
let request = new_raw_delete_request(key.into(), self.cf.clone(), self.atomic);
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.single_region()
.await?
Expand Down Expand Up @@ -260,6 +286,7 @@ impl Client {
/// # });
/// ```
pub async fn batch_delete(&self, keys: impl IntoIterator<Item = impl Into<Key>>) -> Result<()> {
self.assert_non_atomic()?;
let request =
new_raw_batch_delete_request(keys.into_iter().map(Into::into), self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
Expand Down Expand Up @@ -287,6 +314,7 @@ impl Client {
/// # });
/// ```
pub async fn delete_range(&self, range: impl Into<BoundRange>) -> Result<()> {
self.assert_non_atomic()?;
let request = new_raw_delete_range_request(range.into(), self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.multi_region()
Expand Down Expand Up @@ -415,6 +443,40 @@ impl Client {
.collect())
}

/// Create a new *atomic* 'compare and set' request.
///
/// Once resolved this request will result in an atomic `compare and set'
/// operation for the given key.
///
/// If the value retrived is equal to `current_value`, `new_value` is
/// written.
///
/// # Return Value
///
/// A tuple is returned if successful: the previous value and whether the
/// value is swapped
pub async fn compare_and_swap(
&self,
key: impl Into<Key>,
previous_value: impl Into<Option<Value>>,
new_value: impl Into<Value>,
) -> Result<(Option<Value>, bool)> {
self.assert_atomic()?;
let req = new_cas_request(
key.into(),
new_value.into(),
previous_value.into(),
self.cf.clone(),
);
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), req)
.single_region()
.await?
.retry_region(DEFAULT_REGION_BACKOFF)
.post_process_default()
.plan();
plan.execute().await
}

async fn scan_inner(
&self,
range: impl Into<BoundRange>,
Expand Down Expand Up @@ -467,4 +529,12 @@ impl Client {
.plan();
plan.execute().await
}

fn assert_non_atomic(&self) -> Result<()> {
(!self.atomic).then(|| ()).ok_or(Error::UnsupportedMode)
}

fn assert_atomic(&self) -> Result<()> {
self.atomic.then(|| ()).ok_or(Error::UnsupportedMode)
}
}
23 changes: 19 additions & 4 deletions src/raw/lowering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,25 @@ pub fn new_raw_put_request(
key: Key,
value: Value,
cf: Option<ColumnFamily>,
atomic: bool,
) -> kvrpcpb::RawPutRequest {
requests::new_raw_put_request(key.into(), value, cf)
requests::new_raw_put_request(key.into(), value, cf, atomic)
}

pub fn new_raw_batch_put_request(
pairs: impl Iterator<Item = KvPair>,
cf: Option<ColumnFamily>,
atomic: bool,
) -> kvrpcpb::RawBatchPutRequest {
requests::new_raw_batch_put_request(pairs.map(Into::into).collect(), cf)
requests::new_raw_batch_put_request(pairs.map(Into::into).collect(), cf, atomic)
}

pub fn new_raw_delete_request(key: Key, cf: Option<ColumnFamily>) -> kvrpcpb::RawDeleteRequest {
requests::new_raw_delete_request(key.into(), cf)
pub fn new_raw_delete_request(
key: Key,
cf: Option<ColumnFamily>,
atomic: bool,
) -> kvrpcpb::RawDeleteRequest {
requests::new_raw_delete_request(key.into(), cf, atomic)
}

pub fn new_raw_batch_delete_request(
Expand Down Expand Up @@ -76,3 +82,12 @@ pub fn new_raw_batch_scan_request(
) -> kvrpcpb::RawBatchScanRequest {
requests::new_raw_batch_scan_request(ranges.map(Into::into).collect(), each_limit, key_only, cf)
}

pub fn new_cas_request(
key: Key,
value: Value,
previous_value: Option<Value>,
cf: Option<ColumnFamily>,
) -> kvrpcpb::RawCasRequest {
requests::new_cas_request(key.into(), value, previous_value, cf)
}
53 changes: 52 additions & 1 deletion src/raw/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,13 @@ pub fn new_raw_put_request(
key: Vec<u8>,
value: Vec<u8>,
cf: Option<ColumnFamily>,
atomic: bool,
) -> kvrpcpb::RawPutRequest {
let mut req = kvrpcpb::RawPutRequest::default();
req.set_key(key);
req.set_value(value);
req.maybe_set_cf(cf);
req.set_for_cas(atomic);

req
}
Expand All @@ -98,10 +100,12 @@ impl SingleKey for kvrpcpb::RawPutRequest {
pub fn new_raw_batch_put_request(
pairs: Vec<kvrpcpb::KvPair>,
cf: Option<ColumnFamily>,
atomic: bool,
) -> kvrpcpb::RawBatchPutRequest {
let mut req = kvrpcpb::RawBatchPutRequest::default();
req.set_pairs(pairs);
req.maybe_set_cf(cf);
req.set_for_cas(atomic);

req
}
Expand Down Expand Up @@ -132,10 +136,15 @@ impl Shardable for kvrpcpb::RawBatchPutRequest {
}
}

pub fn new_raw_delete_request(key: Vec<u8>, cf: Option<ColumnFamily>) -> kvrpcpb::RawDeleteRequest {
pub fn new_raw_delete_request(
key: Vec<u8>,
cf: Option<ColumnFamily>,
atomic: bool,
) -> kvrpcpb::RawDeleteRequest {
let mut req = kvrpcpb::RawDeleteRequest::default();
req.set_key(key);
req.maybe_set_cf(cf);
req.set_for_cas(atomic);

req
}
Expand Down Expand Up @@ -267,6 +276,46 @@ impl Merge<kvrpcpb::RawBatchScanResponse> for Collect {
}
}

pub fn new_cas_request(
key: Vec<u8>,
value: Vec<u8>,
previous_value: Option<Vec<u8>>,
cf: Option<ColumnFamily>,
) -> kvrpcpb::RawCasRequest {
let mut req = kvrpcpb::RawCasRequest::default();
req.set_key(key);
req.set_value(value);
match previous_value {
Some(v) => req.set_previous_value(v),
None => req.set_previous_not_exist(true),
}
req.maybe_set_cf(cf);
req
}

impl KvRequest for kvrpcpb::RawCasRequest {
type Response = kvrpcpb::RawCasResponse;
}

impl SingleKey for kvrpcpb::RawCasRequest {
fn key(&self) -> &Vec<u8> {
&self.key
}
}

impl Process<kvrpcpb::RawCasResponse> for DefaultProcessor {
type Out = (Option<Value>, bool); // (previous_value, swapped)

fn process(&self, input: Result<kvrpcpb::RawCasResponse>) -> Result<Self::Out> {
let input = input?;
if input.previous_not_exist {
Ok((None, input.succeed))
} else {
Ok((Some(input.previous_value), input.succeed))
}
}
}

macro_rules! impl_raw_rpc_request {
($name: ident) => {
impl RawRpcRequest for kvrpcpb::$name {
Expand All @@ -286,6 +335,7 @@ impl_raw_rpc_request!(RawBatchDeleteRequest);
impl_raw_rpc_request!(RawScanRequest);
impl_raw_rpc_request!(RawBatchScanRequest);
impl_raw_rpc_request!(RawDeleteRangeRequest);
impl_raw_rpc_request!(RawCasRequest);

impl HasLocks for kvrpcpb::RawGetResponse {}
impl HasLocks for kvrpcpb::RawBatchGetResponse {}
Expand All @@ -296,6 +346,7 @@ impl HasLocks for kvrpcpb::RawBatchDeleteResponse {}
impl HasLocks for kvrpcpb::RawScanResponse {}
impl HasLocks for kvrpcpb::RawBatchScanResponse {}
impl HasLocks for kvrpcpb::RawDeleteRangeResponse {}
impl HasLocks for kvrpcpb::RawCasResponse {}

#[cfg(test)]
mod test {
Expand Down
7 changes: 5 additions & 2 deletions src/request/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,11 @@ impl<P: Plan + HasKeys, PdC: PdClient> HasKeys for ResolveLock<P, PdC> {
}
}

/// When executed, the plan extracts errors from its inner plan, and
/// returns an `Err` wrapping the error.
/// When executed, the plan extracts errors from its inner plan, and returns an
/// `Err` wrapping the error.
///
/// We usually need to apply this plan if (and only if) the output of the inner
/// plan is of a response type.
///
/// The errors come from two places: `Err` from inner plans, and `Ok(response)`
/// where `response` contains unresolved errors (`error` and `region_error`).
Expand Down
2 changes: 2 additions & 0 deletions src/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ impl TimestampExt for Timestamp {
Self {
physical: version >> PHYSICAL_SHIFT_BITS,
logical: version & LOGICAL_MASK,
// Now we only support global transactions
suffix_bits: 0,
}
}

Expand Down
Loading

0 comments on commit ec97ea2

Please sign in to comment.