Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into buffer-mutex
Browse files Browse the repository at this point in the history
Signed-off-by: ekexium <[email protected]>
  • Loading branch information
ekexium committed Apr 20, 2021
2 parents 131bbff + bdff7e3 commit 9ceac82
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 54 deletions.
11 changes: 1 addition & 10 deletions src/raw/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use tikv_client_common::Error;

use crate::{
backoff::{DEFAULT_REGION_BACKOFF, OPTIMISTIC_BACKOFF},
backoff::DEFAULT_REGION_BACKOFF,
config::Config,
pd::PdRpcClient,
raw::lowering::*,
Expand Down Expand Up @@ -115,7 +115,6 @@ impl Client {
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.single_region()
.await?
.resolve_lock(OPTIMISTIC_BACKOFF)
.retry_region(DEFAULT_REGION_BACKOFF)
.post_process_default()
.plan();
Expand Down Expand Up @@ -146,7 +145,6 @@ impl Client {
) -> Result<Vec<KvPair>> {
let request = new_raw_batch_get_request(keys.into_iter().map(Into::into), self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.resolve_lock(OPTIMISTIC_BACKOFF)
.multi_region()
.retry_region(DEFAULT_REGION_BACKOFF)
.merge(Collect)
Expand Down Expand Up @@ -177,7 +175,6 @@ impl Client {
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.single_region()
.await?
.resolve_lock(OPTIMISTIC_BACKOFF)
.retry_region(DEFAULT_REGION_BACKOFF)
.extract_error()
.plan();
Expand Down Expand Up @@ -208,7 +205,6 @@ impl Client {
) -> Result<()> {
let request = new_raw_batch_put_request(pairs.into_iter().map(Into::into), self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.resolve_lock(OPTIMISTIC_BACKOFF)
.multi_region()
.retry_region(DEFAULT_REGION_BACKOFF)
.extract_error()
Expand Down Expand Up @@ -239,7 +235,6 @@ impl Client {
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.single_region()
.await?
.resolve_lock(OPTIMISTIC_BACKOFF)
.retry_region(DEFAULT_REGION_BACKOFF)
.extract_error()
.plan();
Expand Down Expand Up @@ -268,7 +263,6 @@ impl Client {
let request =
new_raw_batch_delete_request(keys.into_iter().map(Into::into), self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.resolve_lock(OPTIMISTIC_BACKOFF)
.multi_region()
.retry_region(DEFAULT_REGION_BACKOFF)
.extract_error()
Expand All @@ -295,7 +289,6 @@ impl Client {
pub async fn delete_range(&self, range: impl Into<BoundRange>) -> Result<()> {
let request = new_raw_delete_range_request(range.into(), self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.resolve_lock(OPTIMISTIC_BACKOFF)
.multi_region()
.retry_region(DEFAULT_REGION_BACKOFF)
.extract_error()
Expand Down Expand Up @@ -437,7 +430,6 @@ impl Client {

let request = new_raw_scan_request(range.into(), limit, key_only, self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.resolve_lock(OPTIMISTIC_BACKOFF)
.multi_region()
.retry_region(DEFAULT_REGION_BACKOFF)
.merge(Collect)
Expand Down Expand Up @@ -469,7 +461,6 @@ impl Client {
self.cf.clone(),
);
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.resolve_lock(OPTIMISTIC_BACKOFF)
.multi_region()
.retry_region(DEFAULT_REGION_BACKOFF)
.merge(Collect)
Expand Down
15 changes: 8 additions & 7 deletions src/transaction/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@ struct InnerBuffer {
impl InnerBuffer {
fn insert(&mut self, key: impl Into<Key>, entry: BufferEntry) {
let key = key.into();
if !matches!(entry, BufferEntry::Cached(_)) {
if !matches!(entry, BufferEntry::Cached(_) | BufferEntry::CheckNotExist) {
self.primary_key.get_or_insert_with(|| key.clone());
}
self.entry_map.insert(key, entry);
}

pub fn get_primary_key_or(&mut self, key: &Key) -> &Key {
self.primary_key.get_or_insert(key.clone())
/// Set the primary key if it is not set
pub fn primary_key_or(&mut self, key: &Key) {
self.primary_key.get_or_insert(key.clone());
}
}

Expand All @@ -48,9 +49,9 @@ impl Buffer {
self.inner.primary_key.clone()
}

/// Get the primary key of the buffer, if not exists, use `key` as the primary key.
pub async fn get_primary_key_or(&mut self, key: &Key) -> Key {
self.inner.get_primary_key_or(key).clone()
/// Set the primary key if it is not set
pub async fn primary_key_or(&mut self, key: &Key) {
self.inner.primary_key_or(key);
}

/// Get a value from the buffer.
Expand Down Expand Up @@ -186,7 +187,7 @@ impl Buffer {

/// Lock the given key if necessary.
pub async fn lock(&mut self, key: Key) {
self.inner.primary_key.get_or_insert(key.clone());
self.inner.primary_key.get_or_insert_with(|| key.clone());
let value = self
.inner
.entry_map
Expand Down
79 changes: 48 additions & 31 deletions src/transaction/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,18 +116,30 @@ impl<PdC: PdClient> Transaction<PdC> {
}

/// Create a `get for udpate` request.
/// Once resolved this request will pessimistically lock and fetch the latest
/// value associated with the given key at **current timestamp**.
///
/// The "current timestamp" (also called `for_update_ts` of the request) is fetched immediately from PD.
/// The request reads and "locks" a key. It is similar to `SELECT ... FOR
/// UPDATE` in TiDB, and has different behavior in optimistic and
/// pessimistic transactions.
///
/// Note: The behavior of this command does not follow snapshot isolation. It is similar to `select for update` in TiDB,
/// which is similar to that in MySQL. It reads the latest value (using current timestamp),
/// and the value is not cached in the local buffer.
/// So normal `get`-like commands after `get_for_update` will not be influenced, they still read values at `start_ts`.
/// # Optimistic transaction
///
/// It reads at the "start timestamp" and caches the value, just like normal
/// get requests. The lock is written in prewrite and commit, so it cannot
/// prevent concurrent transactions from writing the same key, but can only
/// prevent itself from committing.
///
/// It can only be used in pessimistic mode.
/// # Pessimistic transaction
///
/// It reads at the "current timestamp" and thus does not cache the value.
/// So following read requests won't be affected by the `get_for_udpate`.
/// A lock will be acquired immediately with this request, which prevents
/// concurrent transactions from mutating the keys.
///
/// The "current timestamp" (also called `for_update_ts` of the request) is
/// fetched immediately from the timestamp oracle.
///
/// Note: The behavior of the request under pessimistic transaction does not
/// follow snapshot isolation.
///
/// # Examples
/// ```rust,no_run
Expand All @@ -146,7 +158,9 @@ impl<PdC: PdClient> Transaction<PdC> {
pub async fn get_for_update(&mut self, key: impl Into<Key>) -> Result<Option<Value>> {
self.check_allow_operation().await?;
if !self.is_pessimistic() {
Err(Error::InvalidTransactionType)
let key = key.into();
self.lock_keys(iter::once(key.clone())).await?;
self.get(key).await
} else {
let mut pairs = self.pessimistic_lock(iter::once(key.into()), true).await?;
debug_assert!(pairs.len() <= 1);
Expand Down Expand Up @@ -228,33 +242,25 @@ impl<PdC: PdClient> Transaction<PdC> {

/// Create a new 'batch get for update' request.
///
/// Once resolved this request will pessimistically lock the keys and
/// fetch the values associated with the given keys.
///
/// Note: The behavior of this command does not follow snapshot isolation. It is similar to `select for update` in TiDB,
/// which is similar to that in MySQL. It reads the latest value (using current timestamp),
/// and the value is not cached in the local buffer.
/// So normal `get`-like commands after `batch_get_for_update` will not be influenced, they still read values at `start_ts`.
///
/// Non-existent entries will not appear in the result. The order of the keys is not retained in the result.
/// Similar [`get_for_update`](Transaction::get_for_update), but it works
/// for a batch of keys.
///
/// It can only be used in pessimistic mode.
/// Non-existent entries will not appear in the result. The order of the
/// keys is not retained in the result.
///
/// # Examples
/// ```rust,no_run
/// # use tikv_client::{Key, Value, Config, TransactionClient};
/// # use tikv_client::{Key, Value, Config, TransactionClient, KvPair};
/// # use futures::prelude::*;
/// # use std::collections::HashMap;
/// # futures::executor::block_on(async {
/// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap();
/// let mut txn = client.begin_pessimistic().await.unwrap();
/// let keys = vec!["TiKV".to_owned(), "TiDB".to_owned()];
/// let result: HashMap<Key, Value> = txn
/// let result: Vec<KvPair> = txn
/// .batch_get_for_update(keys)
/// .await
/// .unwrap()
/// .map(|pair| (pair.0, pair.1))
/// .collect();
/// .unwrap();
/// // now "TiKV" and "TiDB" are both locked
/// // Finish the transaction...
/// txn.commit().await.unwrap();
Expand All @@ -263,13 +269,15 @@ impl<PdC: PdClient> Transaction<PdC> {
pub async fn batch_get_for_update(
&mut self,
keys: impl IntoIterator<Item = impl Into<Key>>,
) -> Result<impl Iterator<Item = KvPair>> {
) -> Result<Vec<KvPair>> {
self.check_allow_operation().await?;
let keys: Vec<Key> = keys.into_iter().map(|k| k.into()).collect();
if !self.is_pessimistic() {
return Err(Error::InvalidTransactionType);
self.lock_keys(keys.clone()).await?;
Ok(self.batch_get(keys).await?.collect())
} else {
self.pessimistic_lock(keys, true).await
}
let keys: Vec<Key> = keys.into_iter().map(|it| it.into()).collect();
Ok(self.pessimistic_lock(keys, true).await?.into_iter())
}

/// Create a new 'scan' request.
Expand Down Expand Up @@ -473,8 +481,8 @@ impl<PdC: PdClient> Transaction<PdC> {
}
}
TransactionKind::Pessimistic(_) => {
self.pessimistic_lock(keys.into_iter().map(|k| k.into()), false)
.await?;
let keys: Vec<Key> = keys.into_iter().map(|k| k.into()).collect();
self.pessimistic_lock(keys.into_iter(), false).await?;
}
}
Ok(())
Expand Down Expand Up @@ -649,7 +657,13 @@ impl<PdC: PdClient> Transaction<PdC> {
}

let first_key = keys[0].clone().key();
let primary_lock = self.buffer.get_primary_key_or(&first_key).await;
// we do not set the primary key here, because pessimistic lock request
// can fail, in which case the keys may not be part of the transaction.
let primary_lock = self
.buffer
.get_primary_key()
.await
.unwrap_or_else(|| first_key.clone());
let for_update_ts = self.rpc.clone().get_timestamp().await?;
self.options.push_for_update_ts(for_update_ts.clone());
let request = new_pessimistic_lock_request(
Expand All @@ -669,6 +683,9 @@ impl<PdC: PdClient> Transaction<PdC> {
.plan();
let pairs = plan.execute().await;

// primary key will be set here if needed
self.buffer.primary_key_or(&first_key).await;

self.start_auto_heartbeat().await;

for key in keys {
Expand Down
17 changes: 11 additions & 6 deletions tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,10 +305,7 @@ async fn txn_write_million() -> Result<()> {
let res = txn.batch_get(keys.clone()).await?.collect::<Vec<_>>();
assert_eq!(res.len(), keys.len());

let res = txn
.batch_get_for_update(keys.clone())
.await?
.collect::<Vec<_>>();
let res = txn.batch_get_for_update(keys.clone()).await?;
assert_eq!(res.len(), keys.len());

txn.commit().await?;
Expand Down Expand Up @@ -686,7 +683,8 @@ async fn get_for_update() -> Result<()> {

let mut t1 = client.begin_pessimistic().await?;
let mut t2 = client.begin_pessimistic().await?;

let mut t3 = client.begin_optimistic().await?;
let mut t4 = client.begin_optimistic().await?;
let mut t0 = client.begin_pessimistic().await?;
t0.put(key1.clone(), value1).await?;
t0.put(key2.clone(), value2).await?;
Expand All @@ -700,12 +698,19 @@ async fn get_for_update() -> Result<()> {
let res: HashMap<_, _> = t2
.batch_get_for_update(keys.clone())
.await?
.into_iter()
.map(From::from)
.collect();
t2.commit().await?;
assert!(res.get(&key1.into()).unwrap() == &value1);
assert!(res.get(&key1.clone().into()).unwrap() == &value1);
assert!(res.get(&key2.into()).unwrap() == &value2);

assert!(t3.get_for_update(key1).await?.is_none());
assert!(t3.commit().await.is_err());

assert!(t4.batch_get_for_update(keys).await?.len() == 0);
assert!(t4.commit().await.is_err());

Ok(())
}

Expand Down

0 comments on commit 9ceac82

Please sign in to comment.