Skip to content

Commit

Permalink
Merge pull request databendlabs#10702 from zhang2014/refactor/rename_…
Browse files Browse the repository at this point in the history
…flight

refactor(cluster): rename new_flight to flight
  • Loading branch information
zhang2014 authored Mar 22, 2023
2 parents 9f2130f + 8091a4d commit 796ec31
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 65 deletions.
52 changes: 26 additions & 26 deletions src/query/service/src/api/rpc/exchange/exchange_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ use crate::api::rpc::exchange::exchange_sink::ExchangeSink;
use crate::api::rpc::exchange::exchange_transform::ExchangeTransform;
use crate::api::rpc::exchange::statistics_receiver::StatisticsReceiver;
use crate::api::rpc::exchange::statistics_sender::StatisticsSender;
use crate::api::rpc::flight_client::FlightExchange;
use crate::api::rpc::flight_client::FlightReceiver;
use crate::api::rpc::flight_client::FlightSender;
use crate::api::rpc::flight_client::NewFlightExchange;
use crate::api::rpc::Packet;
use crate::api::DataExchange;
use crate::api::DefaultExchangeInjector;
Expand Down Expand Up @@ -218,7 +218,7 @@ impl DataExchangeManager {
}
}

pub fn new_handle_exchange_fragment(
pub fn handle_exchange_fragment(
&self,
query: String,
target: String,
Expand All @@ -228,10 +228,10 @@ impl DataExchangeManager {
let queries_coordinator = unsafe { &mut *queries_coordinator_guard.deref().get() };

match queries_coordinator.entry(query) {
Entry::Occupied(mut v) => v.get_mut().new_add_fragment_exchange(target, fragment),
Entry::Occupied(mut v) => v.get_mut().add_fragment_exchange(target, fragment),
Entry::Vacant(v) => v
.insert(QueryCoordinator::create())
.new_add_fragment_exchange(target, fragment),
.add_fragment_exchange(target, fragment),
}
}

Expand Down Expand Up @@ -308,12 +308,12 @@ impl DataExchangeManager {
match queries_coordinator.get_mut(&query_id) {
None => Err(ErrorCode::Internal("Query not exists.")),
Some(query_coordinator) => {
query_coordinator.new_fragment_exchanges.clear();
query_coordinator.fragment_exchanges.clear();
let injector = DefaultExchangeInjector::create();
let mut build_res =
query_coordinator.subscribe_fragment(&ctx, fragment_id, injector)?;

let exchanges = std::mem::take(&mut query_coordinator.new_statistics_exchanges);
let exchanges = std::mem::take(&mut query_coordinator.statistics_exchanges);
let statistics_receiver = StatisticsReceiver::spawn_receiver(&ctx, exchanges)?;

let statistics_receiver: Mutex<StatisticsReceiver> =
Expand Down Expand Up @@ -396,17 +396,17 @@ struct QueryCoordinator {
info: Option<QueryInfo>,
fragments_coordinator: HashMap<usize, Box<FragmentCoordinator>>,

new_statistics_exchanges: HashMap<String, Vec<NewFlightExchange>>,
new_fragment_exchanges: HashMap<(String, usize, u8), NewFlightExchange>,
statistics_exchanges: HashMap<String, Vec<FlightExchange>>,
fragment_exchanges: HashMap<(String, usize, u8), FlightExchange>,
}

impl QueryCoordinator {
pub fn create() -> QueryCoordinator {
QueryCoordinator {
info: None,
fragments_coordinator: HashMap::new(),
new_fragment_exchanges: HashMap::new(),
new_statistics_exchanges: HashMap::new(),
fragment_exchanges: HashMap::new(),
statistics_exchanges: HashMap::new(),
}
}

Expand All @@ -415,12 +415,12 @@ impl QueryCoordinator {
target: String,
) -> Result<Receiver<Result<FlightData, Status>>> {
let (tx, rx) = async_channel::bounded(8);
match self.new_statistics_exchanges.entry(target) {
match self.statistics_exchanges.entry(target) {
Entry::Vacant(v) => {
v.insert(vec![NewFlightExchange::create_sender(tx)]);
v.insert(vec![FlightExchange::create_sender(tx)]);
}
Entry::Occupied(mut v) => {
v.get_mut().push(NewFlightExchange::create_sender(tx));
v.get_mut().push(FlightExchange::create_sender(tx));
}
};

Expand All @@ -429,10 +429,10 @@ impl QueryCoordinator {

pub fn add_statistics_exchanges(
&mut self,
exchanges: HashMap<String, NewFlightExchange>,
exchanges: HashMap<String, FlightExchange>,
) -> Result<()> {
for (source, exchange) in exchanges.into_iter() {
match self.new_statistics_exchanges.entry(source) {
match self.statistics_exchanges.entry(source) {
Entry::Vacant(v) => {
v.insert(vec![exchange]);
}
Expand All @@ -445,25 +445,25 @@ impl QueryCoordinator {
Ok(())
}

pub fn new_add_fragment_exchange(
pub fn add_fragment_exchange(
&mut self,
target: String,
fragment: usize,
) -> Result<Receiver<Result<FlightData, Status>>> {
let (tx, rx) = async_channel::bounded(8);
self.new_fragment_exchanges.insert(
self.fragment_exchanges.insert(
(target, fragment, FLIGHT_SENDER),
NewFlightExchange::create_sender(tx),
FlightExchange::create_sender(tx),
);
Ok(rx)
}

pub fn add_fragment_exchanges(
&mut self,
exchanges: HashMap<(String, usize), NewFlightExchange>,
exchanges: HashMap<(String, usize), FlightExchange>,
) -> Result<()> {
for ((source, fragment), exchange) in exchanges.into_iter() {
self.new_fragment_exchanges
self.fragment_exchanges
.insert((source, fragment, FLIGHT_RECEIVER), exchange);
}

Expand All @@ -474,7 +474,7 @@ impl QueryCoordinator {
match params {
ExchangeParams::MergeExchange(params) => {
let mut exchanges = vec![];
for ((_target, fragment, role), exchange) in &self.new_fragment_exchanges {
for ((_target, fragment, role), exchange) in &self.fragment_exchanges {
if *fragment == params.fragment_id && *role == FLIGHT_SENDER {
exchanges.push(exchange.as_sender());
}
Expand All @@ -488,7 +488,7 @@ impl QueryCoordinator {
for destination in &params.destination_ids {
exchanges.push(match destination == &params.executor_id {
true => Ok(FlightSender::create(async_channel::bounded(1).0)),
false => match self.new_fragment_exchanges.get(&(
false => match self.fragment_exchanges.get(&(
destination.clone(),
params.fragment_id,
FLIGHT_SENDER,
Expand All @@ -511,7 +511,7 @@ impl QueryCoordinator {
match params {
ExchangeParams::MergeExchange(params) => {
let mut exchanges = vec![];
for ((_target, fragment, role), exchange) in &self.new_fragment_exchanges {
for ((_target, fragment, role), exchange) in &self.fragment_exchanges {
if *fragment == params.fragment_id && *role == FLIGHT_RECEIVER {
exchanges.push(exchange.as_receiver());
}
Expand All @@ -525,7 +525,7 @@ impl QueryCoordinator {
for destination in &params.destination_ids {
exchanges.push(match destination == &params.executor_id {
true => Ok(FlightReceiver::create(async_channel::bounded(1).1)),
false => match self.new_fragment_exchanges.get(&(
false => match self.fragment_exchanges.get(&(
destination.clone(),
params.fragment_id,
FLIGHT_RECEIVER,
Expand Down Expand Up @@ -692,13 +692,13 @@ impl QueryCoordinator {

let executor = PipelineCompleteExecutor::from_pipelines(pipelines, executor_settings)?;

self.new_fragment_exchanges.clear();
self.fragment_exchanges.clear();
let info_mut = self.info.as_mut().expect("Query info is None");
info_mut.query_executor = Some(executor.clone());

let query_id = info_mut.query_id.clone();
let query_ctx = info_mut.query_ctx.clone();
let request_server_exchanges = std::mem::take(&mut self.new_statistics_exchanges);
let request_server_exchanges = std::mem::take(&mut self.statistics_exchanges);

if request_server_exchanges.len() != 1 {
return Err(ErrorCode::Internal(
Expand Down
18 changes: 8 additions & 10 deletions src/query/service/src/api/rpc/exchange/statistics_receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ use common_base::runtime::TrySpawn;
use common_exception::ErrorCode;
use common_exception::Result;

use crate::api::rpc::flight_client::FlightExchange;
use crate::api::rpc::flight_client::FlightSender;
use crate::api::rpc::flight_client::NewFlightExchange;
use crate::api::DataPacket;
use crate::sessions::QueryContext;

Expand All @@ -44,7 +44,7 @@ pub struct StatisticsReceiver {
impl StatisticsReceiver {
pub fn spawn_receiver(
ctx: &Arc<QueryContext>,
statistics_exchanges: HashMap<String, Vec<NewFlightExchange>>,
statistics_exchanges: HashMap<String, Vec<FlightExchange>>,
) -> Result<StatisticsReceiver> {
let shutdown_notify = Arc::new(Notify::new());
let shutdown_flag = Arc::new(AtomicBool::new(false));
Expand All @@ -55,14 +55,12 @@ impl StatisticsReceiver {
debug_assert_eq!(exchanges.len(), 2);

let (tx, rx) = match (exchanges.remove(0), exchanges.remove(0)) {
(
tx @ NewFlightExchange::Sender { .. },
rx @ NewFlightExchange::Receiver { .. },
) => (tx.as_sender(), rx.as_receiver()),
(
rx @ NewFlightExchange::Receiver { .. },
tx @ NewFlightExchange::Sender { .. },
) => (tx.as_sender(), rx.as_receiver()),
(tx @ FlightExchange::Sender { .. }, rx @ FlightExchange::Receiver { .. }) => {
(tx.as_sender(), rx.as_receiver())
}
(rx @ FlightExchange::Receiver { .. }, tx @ FlightExchange::Sender { .. }) => {
(tx.as_sender(), rx.as_receiver())
}
_ => unreachable!(),
};

Expand Down
8 changes: 4 additions & 4 deletions src/query/service/src/api/rpc/exchange/statistics_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use common_exception::ErrorCode;
use common_exception::Result;
use futures_util::future::Either;

use crate::api::rpc::flight_client::FlightExchange;
use crate::api::rpc::flight_client::FlightSender;
use crate::api::rpc::flight_client::NewFlightExchange;
use crate::api::rpc::packets::PrecommitBlock;
use crate::api::rpc::packets::ProgressInfo;
use crate::api::DataPacket;
Expand All @@ -42,15 +42,15 @@ impl StatisticsSender {
pub fn spawn_sender(
query_id: &str,
ctx: Arc<QueryContext>,
mut exchanges: Vec<NewFlightExchange>,
mut exchanges: Vec<FlightExchange>,
) -> StatisticsSender {
debug_assert_eq!(exchanges.len(), 2);

let (tx, rx) = match (exchanges.remove(0), exchanges.remove(0)) {
(tx @ NewFlightExchange::Sender { .. }, rx @ NewFlightExchange::Receiver { .. }) => {
(tx @ FlightExchange::Sender { .. }, rx @ FlightExchange::Receiver { .. }) => {
(tx.as_sender(), rx.as_receiver())
}
(rx @ NewFlightExchange::Receiver { .. }, tx @ NewFlightExchange::Sender { .. }) => {
(rx @ FlightExchange::Receiver { .. }, tx @ FlightExchange::Sender { .. }) => {
(tx.as_sender(), rx.as_receiver())
}
_ => unreachable!(),
Expand Down
48 changes: 24 additions & 24 deletions src/query/service/src/api/rpc/flight_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl FlightClient {
&mut self,
query_id: &str,
target: &str,
) -> Result<NewFlightExchange> {
) -> Result<FlightExchange> {
let mut streaming = self
.get_streaming(
RequestBuilder::create(Ticket::default())
Expand All @@ -84,15 +84,15 @@ impl FlightClient {
}
});

Ok(NewFlightExchange::create_receiver(rx))
Ok(FlightExchange::create_receiver(rx))
}

pub async fn do_get(
&mut self,
query_id: &str,
target: &str,
fragment: usize,
) -> Result<NewFlightExchange> {
) -> Result<FlightExchange> {
let mut streaming = self
.get_streaming(
RequestBuilder::create(Ticket::default())
Expand All @@ -117,7 +117,7 @@ impl FlightClient {
}
});

Ok(NewFlightExchange::create_receiver(rx))
Ok(FlightExchange::create_receiver(rx))
}

async fn get_streaming(&mut self, request: Request<Ticket>) -> Result<Streaming<FlightData>> {
Expand Down Expand Up @@ -149,7 +149,7 @@ impl FlightClient {
}

pub struct FlightReceiver {
state: Arc<NewState>,
state: Arc<State>,
dropped: AtomicBool,
rx: Receiver<Result<FlightData>>,
}
Expand All @@ -164,7 +164,7 @@ impl FlightReceiver {
pub fn create(rx: Receiver<Result<FlightData>>) -> FlightReceiver {
FlightReceiver {
rx,
state: NewState::create(),
state: State::create(),
dropped: AtomicBool::new(false),
}
}
Expand All @@ -188,7 +188,7 @@ impl FlightReceiver {
}

pub struct FlightSender {
state: Arc<NewState>,
state: Arc<State>,
dropped: AtomicBool,
tx: Sender<Result<FlightData, Status>>,
}
Expand All @@ -214,7 +214,7 @@ impl Drop for FlightSender {
impl FlightSender {
pub fn create(tx: Sender<Result<FlightData, Status>>) -> FlightSender {
FlightSender {
state: NewState::create(),
state: State::create(),
dropped: AtomicBool::new(false),
tx,
}
Expand All @@ -240,48 +240,48 @@ impl FlightSender {
}
}

pub struct NewState {
pub struct State {
strong_count: AtomicUsize,
}

impl NewState {
pub fn create() -> Arc<NewState> {
Arc::new(NewState {
impl State {
pub fn create() -> Arc<State> {
Arc::new(State {
strong_count: AtomicUsize::new(0),
})
}
}

pub enum NewFlightExchange {
pub enum FlightExchange {
Dummy,
Receiver {
state: Arc<NewState>,
state: Arc<State>,
receiver: Receiver<Result<FlightData>>,
},
Sender {
state: Arc<NewState>,
state: Arc<State>,
sender: Sender<Result<FlightData, Status>>,
},
}

impl NewFlightExchange {
pub fn create_sender(sender: Sender<Result<FlightData, Status>>) -> NewFlightExchange {
NewFlightExchange::Sender {
impl FlightExchange {
pub fn create_sender(sender: Sender<Result<FlightData, Status>>) -> FlightExchange {
FlightExchange::Sender {
sender,
state: NewState::create(),
state: State::create(),
}
}

pub fn create_receiver(receiver: Receiver<Result<FlightData>>) -> NewFlightExchange {
NewFlightExchange::Receiver {
pub fn create_receiver(receiver: Receiver<Result<FlightData>>) -> FlightExchange {
FlightExchange::Receiver {
receiver,
state: NewState::create(),
state: State::create(),
}
}

pub fn as_sender(&self) -> FlightSender {
match self {
NewFlightExchange::Sender { state, sender } => {
FlightExchange::Sender { state, sender } => {
state.strong_count.fetch_add(1, Ordering::SeqCst);

FlightSender {
Expand All @@ -296,7 +296,7 @@ impl NewFlightExchange {

pub fn as_receiver(&self) -> FlightReceiver {
match self {
NewFlightExchange::Receiver { state, receiver } => {
FlightExchange::Receiver { state, receiver } => {
state.strong_count.fetch_add(1, Ordering::SeqCst);

FlightReceiver {
Expand Down
Loading

0 comments on commit 796ec31

Please sign in to comment.