Skip to content

Commit

Permalink
refactor: introduce an atomic mode for raw client
Browse files Browse the repository at this point in the history
Signed-off-by: ekexium <[email protected]>
  • Loading branch information
ekexium committed Apr 20, 2021
1 parent 6e02f61 commit 570b9f9
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 97 deletions.
163 changes: 72 additions & 91 deletions src/raw/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ const MAX_RAW_KV_SCAN_LIMIT: u32 = 10240;
pub struct Client {
rpc: Arc<PdRpcClient>,
cf: Option<ColumnFamily>,
/// Whether to use the [`atomic mode`](Client::with_atomic).
atomic: bool,
}

impl Client {
Expand Down Expand Up @@ -63,7 +65,11 @@ impl Client {
) -> Result<Client> {
let pd_endpoints: Vec<String> = pd_endpoints.into_iter().map(Into::into).collect();
let rpc = Arc::new(PdRpcClient::connect(&pd_endpoints, &config, false).await?);
Ok(Client { rpc, cf: None })
Ok(Client {
rpc,
cf: None,
atomic: false,
})
}

/// Set the column family of requests.
Expand All @@ -89,6 +95,22 @@ impl Client {
Client {
rpc: self.rpc.clone(),
cf: Some(cf),
atomic: self.atomic,
}
}

/// Set to use the atomic mode.
///
/// The only reason of using atomic mode is the
/// [`compare_and_swap`](Client::compare_and_swap) operation. To guarantee
/// the atomicity of CAS, write operations like [`put`](Client::put) or
/// [`delete`](Client::delete) in atomic mode are more expensive. Some
/// operations are not supported in the mode.
pub fn with_atomic(&self) -> Client {
Client {
rpc: self.rpc.clone(),
cf: self.cf.clone(),
atomic: true,
}
}

Expand Down Expand Up @@ -171,15 +193,15 @@ impl Client {
/// # });
/// ```
pub async fn put(&self, key: impl Into<Key>, value: impl Into<Value>) -> Result<()> {
self.put_inner(key, value, false).await
}

/// Create a new *atomic* 'put' request.
/// Atomic operations can block each other on the same key.
///
/// Once resolved this request will result in the setting of the value associated with the given key.
pub async fn atomic_put(&self, key: impl Into<Key>, value: impl Into<Value>) -> Result<()> {
self.put_inner(key, value, true).await
let request = new_raw_put_request(key.into(), value.into(), self.cf.clone(), self.atomic);
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.single_region()
.await?
.retry_region(DEFAULT_REGION_BACKOFF)
.extract_error()
.plan();
plan.execute().await?;
Ok(())
}

/// Create a new 'batch put' request.
Expand All @@ -203,19 +225,18 @@ impl Client {
&self,
pairs: impl IntoIterator<Item = impl Into<KvPair>>,
) -> Result<()> {
self.batch_put_inner(pairs, false).await
}

/// Create a new *atomic* 'batch put' request.
/// Atomic operations can block each other on the same key.
///
/// Once resolved this request will result in the setting of the values
/// associated with the given keys.
pub async fn atomic_batch_put(
&self,
pairs: impl IntoIterator<Item = impl Into<KvPair>>,
) -> Result<()> {
self.batch_put_inner(pairs, true).await
let request = new_raw_batch_put_request(
pairs.into_iter().map(Into::into),
self.cf.clone(),
self.atomic,
);
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.multi_region()
.retry_region(DEFAULT_REGION_BACKOFF)
.extract_error()
.plan();
plan.execute().await?;
Ok(())
}

/// Create a new 'delete' request.
Expand All @@ -236,29 +257,15 @@ impl Client {
/// # });
/// ```
pub async fn delete(&self, key: impl Into<Key>) -> Result<()> {
self.delete_inner(key, false).await
}

/// Create a new *atomic* 'delete' request.
/// Atomic operations can block each other on the same key.
///
/// Once resolved this request will result in the deletion of the given key.
///
/// It does not return an error if the key does not exist in TiKV.
///
/// # Examples
/// ```rust,no_run
/// # use tikv_client::{Key, Config, RawClient};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
/// let key = "TiKV".to_owned();
/// let req = client.delete(key);
/// let result: () = req.await.unwrap();
/// # });
/// ```
pub async fn atomic_delete(&self, key: impl Into<Key>) -> Result<()> {
self.delete_inner(key, true).await
let request = new_raw_delete_request(key.into(), self.cf.clone(), self.atomic);
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.single_region()
.await?
.retry_region(DEFAULT_REGION_BACKOFF)
.extract_error()
.plan();
plan.execute().await?;
Ok(())
}

/// Create a new 'batch delete' request.
Expand All @@ -279,6 +286,7 @@ impl Client {
/// # });
/// ```
pub async fn batch_delete(&self, keys: impl IntoIterator<Item = impl Into<Key>>) -> Result<()> {
self.assert_non_atomic()?;
let request =
new_raw_batch_delete_request(keys.into_iter().map(Into::into), self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
Expand Down Expand Up @@ -306,6 +314,7 @@ impl Client {
/// # });
/// ```
pub async fn delete_range(&self, range: impl Into<BoundRange>) -> Result<()> {
self.assert_non_atomic()?;
let request = new_raw_delete_range_request(range.into(), self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.multi_region()
Expand Down Expand Up @@ -436,18 +445,23 @@ impl Client {

/// Create a new *atomic* 'compare and set' request.
///
/// Once resolved this request will result in an atomic `compare and set' operation for the given key.
/// Once resolved this request will result in an atomic `compare and set'
/// operation for the given key.
///
/// If the value retrived is equal to `current_value`, `new_value` is written.
/// If the value retrived is equal to `current_value`, `new_value` is
/// written.
///
/// # Return Value
/// A tuple is returned if successful: the previous value and whether the value is swapped
pub async fn atomic_compare_and_swap(
///
/// A tuple is returned if successful: the previous value and whether the
/// value is swapped
pub async fn compare_and_swap(
&self,
key: impl Into<Key>,
previous_value: impl Into<Option<Value>>,
new_value: impl Into<Value>,
) -> Result<(Option<Value>, bool)> {
self.assert_atomic()?;
let req = new_cas_request(
key.into(),
new_value.into(),
Expand Down Expand Up @@ -516,48 +530,15 @@ impl Client {
plan.execute().await
}

async fn put_inner(
&self,
key: impl Into<Key>,
value: impl Into<Value>,
atomic: bool,
) -> Result<()> {
let request = new_raw_put_request(key.into(), value.into(), self.cf.clone(), atomic);
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.single_region()
.await?
.retry_region(DEFAULT_REGION_BACKOFF)
.extract_error()
.plan();
plan.execute().await?;
Ok(())
}

async fn batch_put_inner(
&self,
pairs: impl IntoIterator<Item = impl Into<KvPair>>,
atomic: bool,
) -> Result<()> {
let request =
new_raw_batch_put_request(pairs.into_iter().map(Into::into), self.cf.clone(), atomic);
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.multi_region()
.retry_region(DEFAULT_REGION_BACKOFF)
.extract_error()
.plan();
plan.execute().await?;
Ok(())
fn assert_non_atomic(&self) -> Result<()> {
(!self.atomic)
.then(|| ())
.ok_or(Error::UnsupportedInAtomicMode)
}

async fn delete_inner(&self, key: impl Into<Key>, atomic: bool) -> Result<()> {
let request = new_raw_delete_request(key.into(), self.cf.clone(), atomic);
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.single_region()
.await?
.retry_region(DEFAULT_REGION_BACKOFF)
.extract_error()
.plan();
plan.execute().await?;
Ok(())
fn assert_atomic(&self) -> Result<()> {
self.atomic
.then(|| ())
.ok_or(Error::UnsupportedInNonAtomicMode)
}
}
29 changes: 23 additions & 6 deletions tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ use std::{
iter,
};
use tikv_client::{
Key, KvPair, RawClient, Result, Transaction, TransactionClient, TransactionOptions, Value,
Error, Key, KvPair, RawClient, Result, Transaction, TransactionClient, TransactionOptions,
Value,
};

// Parameters used in test
Expand Down Expand Up @@ -759,19 +760,19 @@ async fn pessimistic_heartbeat() -> Result<()> {
#[serial]
async fn raw_cas() -> Result<()> {
clear_tikv().await;
let client = RawClient::new(pd_addrs()).await?;
let client = RawClient::new(pd_addrs()).await?.with_atomic();
let key = "key".to_owned();
let value = "value".to_owned();
let new_value = "new value".to_owned();

client.atomic_put(key.clone(), value.clone()).await?;
client.put(key.clone(), value.clone()).await?;
assert_eq!(
client.get(key.clone()).await?.unwrap(),
value.clone().as_bytes()
);

client
.atomic_compare_and_swap(
.compare_and_swap(
key.clone(),
Some("another_value".to_owned()).map(|v| v.into()),
new_value.clone(),
Expand All @@ -783,7 +784,7 @@ async fn raw_cas() -> Result<()> {
);

client
.atomic_compare_and_swap(
.compare_and_swap(
key.clone(),
Some(value.to_owned()).map(|v| v.into()),
new_value.clone(),
Expand All @@ -794,8 +795,24 @@ async fn raw_cas() -> Result<()> {
new_value.clone().as_bytes()
);

client.atomic_delete(key.clone()).await?;
client.delete(key.clone()).await?;
assert!(client.get(key.clone()).await?.is_none());

// check unsupported operations
assert!(matches!(
client.batch_delete(vec![key.clone()]).await.err().unwrap(),
Error::UnsupportedInAtomicMode
));
let client = RawClient::new(pd_addrs()).await?;
assert!(matches!(
client
.compare_and_swap(key.clone(), None, vec![])
.await
.err()
.unwrap(),
Error::UnsupportedInNonAtomicMode
));

Ok(())
}

Expand Down
6 changes: 6 additions & 0 deletions tikv-client-common/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ pub enum Error {
/// An operation requires a primary key, but the transaction was empty.
#[error("transaction has no primary key")]
NoPrimaryKey,
#[error(
"The operation does is not supported in raw-atomic mode, please consider using an atomic client"
)]
UnsupportedInAtomicMode,
#[error("The operation is only supported in raw-atomic mode, please consider using a non-atomic raw client")]
UnsupportedInNonAtomicMode,
/// Wraps a `std::io::Error`.
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
Expand Down

0 comments on commit 570b9f9

Please sign in to comment.