Skip to content

Commit

Permalink
watchman: rust: more plumbing to support subscriptions
Browse files Browse the repository at this point in the history
Summary:
split up the ClientTask into a task that simply reads PDUs in a loop
(the ReaderTask) and then sends them to the other half of the ClientTask using
the same queue that is used by the consumer of `Client` to schedule a
request.

This effectively serializes the requests and responses in a single loop
which makes it easier to add in support for correctly handling unilateral
subscription response in a later diff.

Reviewed By: dtolnay

Differential Revision: D18598224

fbshipit-source-id: 00fd7746b9b612ee9119bff204dc06c50c49bb98
  • Loading branch information
wez authored and facebook-github-bot committed Nov 23, 2019
1 parent 7503ae9 commit 8f03f19
Showing 1 changed file with 124 additions and 25 deletions.
149 changes: 124 additions & 25 deletions rust/watchman_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub mod fields;
mod named_pipe;
pub mod pdu;
use serde_bser::de::{Bunser, PduInfo, SliceRead};
use std::collections::VecDeque;
use std::path::{Path, PathBuf};
use thiserror::Error;
use tokio::net::process::Command;
Expand Down Expand Up @@ -206,14 +207,25 @@ impl Connector {

let (request_tx, request_rx) = tokio::sync::mpsc::channel(128);

let mut task = ClientTask {
let mut reader_task = ReaderTask {
reader,
request_tx: request_tx.clone(),
};
tokio::spawn(async move {
if let Err(err) = reader_task.run().await {
eprintln!("watchman reader task failed: {}", err);
}
});

let mut task = ClientTask {
writer,
request_rx,
request_queue: VecDeque::new(),
waiting_response: false,
};
tokio::spawn(async move {
if let Err(()) = task.run().await {
eprintln!("watchman request task failed");
if let Err(err) = task.run().await {
eprintln!("watchman client task failed: {}", err);
}
});

Expand Down Expand Up @@ -300,31 +312,32 @@ impl SendRequest {
}
}

enum TaskItem {
QueueRequest(SendRequest),
ProcessReceivedPdu(Vec<u8>),
}

/// A live connection to a watchman server.
/// Use [Connector](struct.Connector.html) to establish a connection.
pub struct Client {
request_tx: tokio::sync::mpsc::Sender<SendRequest>,
request_tx: tokio::sync::mpsc::Sender<TaskItem>,
}

struct ClientTask {
writer: tokio_io::split::WriteHalf<Box<dyn ReadWriteStream>>,
/// The reader task lives to read a PDU and send it to the ClientTask
struct ReaderTask {
reader: tokio_io::split::ReadHalf<Box<dyn ReadWriteStream>>,
request_rx: tokio::sync::mpsc::Receiver<SendRequest>,
request_tx: tokio::sync::mpsc::Sender<TaskItem>,
}

impl ClientTask {
async fn run(&mut self) -> Result<(), ()> {
impl ReaderTask {
async fn run(&mut self) -> Result<(), Error> {
loop {
let request = match self.request_rx.next().await {
Some(request) => request,
None => break,
};
if let Err(err) = self.process_request(request).await {
eprintln!("Error in watchman request task: {}", err);
break;
}
let pdu = self.read_pdu_vec().await?;
self.request_tx
.send(TaskItem::ProcessReceivedPdu(pdu))
.await
.map_err(Error::generic)?;
}
Ok(())
}

/// Sniffs out the BSER PDU header to determine the length of data that
Expand Down Expand Up @@ -371,14 +384,100 @@ impl ClientTask {

Ok(buf)
}
}

/// The client task coordinates sending requests with processing
/// unilateral results
struct ClientTask {
writer: tokio_io::split::WriteHalf<Box<dyn ReadWriteStream>>,
request_rx: tokio::sync::mpsc::Receiver<TaskItem>,
request_queue: VecDeque<SendRequest>,
waiting_response: bool,
}

impl Drop for ClientTask {
fn drop(&mut self) {
self.fail_all(&Error::generic("the client task terminated"));
}
}

impl ClientTask {
async fn run(&mut self) -> Result<(), Error> {
// process things, and if we encounter an error, ensure that
// we fail all outstanding requests
match self.run_loop().await {
Err(err) => {
self.fail_all(&err);
Err(err)
}
ok => ok,
}
}

async fn run_loop(&mut self) -> Result<(), Error> {
loop {
match self.request_rx.next().await {
Some(TaskItem::QueueRequest(request)) => self.queue_request(request).await?,
Some(TaskItem::ProcessReceivedPdu(pdu)) => self.process_pdu(pdu).await?,
None => break,
};
}
Ok(())
}

/// Generate an error for each queued request.
/// This is called in situations where the state of the connection
/// to the serve is non-recoverable.
fn fail_all(&mut self, err: &Error) {
while let Some(request) = self.request_queue.pop_front() {
request.respond(Err(err.to_string())).ok();
}
}

async fn process_request(&mut self, request: SendRequest) -> Result<(), Error> {
if let Err(err) = self.writer.write_all(&request.buf).await {
request.respond(Err(err.to_string()))?;
return Ok(());
/// If we're not waiting for the response to a request,
/// then send the next one!
async fn send_next_request(&mut self) -> Result<(), Error> {
if !self.waiting_response && !self.request_queue.is_empty() {
match self
.writer
.write_all(&self.request_queue.front().expect("not empty").buf)
.await
{
Err(err) => {
// A failed write breaks our world; we don't want to
// try to continue
return Err(err.into());
}
Ok(_) => self.waiting_response = true,
}
}
Ok(())
}

request.respond(self.read_pdu_vec().await.map_err(|e| e.to_string()))?;
/// Queue up a new request from the client code, and then
/// check to see if we can send a queued request to the server.
async fn queue_request(&mut self, request: SendRequest) -> Result<(), Error> {
self.request_queue.push_back(request);
self.send_next_request().await?;
Ok(())
}

/// Dispatch a PDU that we just read to the appropriate client code.
async fn process_pdu(&mut self, pdu: Vec<u8>) -> Result<(), Error> {
if self.waiting_response {
let request = self
.request_queue
.pop_front()
.expect("waiting_response is only true when request_queue is not empty");
self.waiting_response = false;

request.respond(Ok(pdu))?;
} else {
// This should never happen as we're not doing any subscription stuff
return Err(Error::generic("received a unilateral PDU from the server"));
}

self.send_next_request().await?;
Ok(())
}
}
Expand Down Expand Up @@ -424,10 +523,10 @@ impl Client {
// Step 2: ask the client task to send it for us
let (tx, rx) = tokio::sync::oneshot::channel();
self.request_tx
.send(SendRequest {
.send(TaskItem::QueueRequest(SendRequest {
buf: request_data,
tx,
})
}))
.await
.map_err(Error::generic)?;

Expand Down

0 comments on commit 8f03f19

Please sign in to comment.