Skip to content

Commit

Permalink
introduce new response format (BloopAI#420)
Browse files Browse the repository at this point in the history
Co-authored-by: calyptobai <[email protected]>
  • Loading branch information
oppiliappan and calyptobai committed May 19, 2023
1 parent ab5800c commit 21740ab
Show file tree
Hide file tree
Showing 4 changed files with 514 additions and 77 deletions.
157 changes: 94 additions & 63 deletions server/bleep/src/webserver/answer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ use crate::{
mod llm_gateway;
mod partial_parse;
mod prompts;
mod response;

use response::{ResponseState, SearchResult, SearchStep, Update};

#[derive(Default)]
pub struct RouteState {
Expand Down Expand Up @@ -82,6 +85,7 @@ pub(super) async fn handle(
let q = params.q;
let stream = async_stream::try_stream! {
let mut action_stream = Action::Query(q).into()?;
let mut response = ResponseState::new(&conversation_id, &conversation);

loop {
// The main loop. Here, we create two streams that operate simultaneously; the update
Expand All @@ -91,6 +95,7 @@ pub(super) async fn handle(

use futures::future::FutureExt;


let (update_tx, update_rx) = tokio::sync::mpsc::channel(10);

let left_stream = tokio_stream::wrappers::ReceiverStream::new(update_rx)
Expand All @@ -104,7 +109,10 @@ pub(super) async fn handle(
let mut next = None;
for await item in stream::select(left_stream, right_stream) {
match item {
Either::Left(upd) => yield upd,
Either::Left(upd) => {
response.apply_update(upd);
yield response.clone()
},
Either::Right(n) => next = n?,
}
}
Expand All @@ -115,6 +123,7 @@ pub(super) async fn handle(
}
}

// TODO: add `conclusion` of last assistant response to history here
// Storing the conversation here allows us to make subsequent requests.
state.conversations
.entry_async(conversation_id)
Expand All @@ -132,13 +141,13 @@ pub(super) async fn handle(
}

#[derive(Hash, PartialEq, Eq)]
struct ConversationId {
pub(super) struct ConversationId {
thread_id: String,
user_id: String,
}

#[derive(Clone, Debug)]
struct Conversation {
pub(super) struct Conversation {
history: Vec<llm_gateway::api::Message>,
path_aliases: Vec<String>,
repo_ref: RepoRef,
Expand Down Expand Up @@ -175,7 +184,7 @@ impl Conversation {
action_stream: ActionStream,
update: Sender<Update>,
) -> Result<Option<ActionStream>> {
let (action, raw_response) = action_stream.load(&update).await?;
let (action, raw_response) = action_stream.load().await?;

if !matches!(action, Action::Query(..)) {
self.history
Expand All @@ -184,23 +193,39 @@ impl Conversation {
}

let question = match action {
Action::Query(s) => parser::parse_nl(&s)?
.as_semantic()
.context("got a 'Grep' query")?
.target
.as_ref()
.context("query was empty")?
.as_plain()
.context("user query was not plain text")?
.clone()
.into_owned(),
Action::Query(s) => {
update
.send(Update::Step(SearchStep::Query(s.clone())))
.await?;

parser::parse_nl(&s)?
.as_semantic()
.context("got a 'Grep' query")?
.target
.as_ref()
.context("query was empty")?
.as_plain()
.context("user query was not plain text")?
.clone()
.into_owned()
}

Action::Prompt(_) => {
update
.send(Update::Step(SearchStep::Prompt("awaiting prompt".into())))
.await?;

return Ok(None);
}

Action::Answer(rephrased_question) => {
self.answer(ctx, update, &rephrased_question).await?;
self.answer(
ctx,
update,
&rephrased_question,
self.path_aliases.as_slice(),
)
.await?;
let r: Result<ActionStream> = Action::Prompt(prompts::CONTINUE.to_owned()).into();
return Ok(Some(r?));
}
Expand All @@ -215,7 +240,6 @@ impl Conversation {
.fuzzy_path_match(&self.repo_ref, &search, /* limit */ 50)
.await
.map(|c| c.relative_path)
.map(|p| format!("{}, {p}", self.path_alias(&p)))
.collect::<Vec<_>>();

// If there are no lexical results, perform a semantic search.
Expand All @@ -226,7 +250,7 @@ impl Conversation {
..Default::default()
};

let mut semantic_paths: Vec<String> = ctx
let semantic_paths = ctx
.app
.semantic
.as_ref()
Expand All @@ -240,20 +264,28 @@ impl Conversation {
.map(|(k, v)| (k, super::semantic::kind_to_value(v.kind)))
.collect::<HashMap<_, _>>()
})
.map(|chunk| {
let relative_path = chunk["relative_path"].as_str().unwrap();
format!("{}, {relative_path}", self.path_alias(relative_path))
})
.map(|chunk| chunk["relative_path"].as_str().unwrap().to_owned())
.collect::<HashSet<_>>()
.into_iter()
.collect();

paths.append(&mut semantic_paths);
paths = semantic_paths;
}

for u in paths
.iter()
.map(|p| Update::Step(SearchStep::Path(p.clone())))
{
update.send(u).await?;
}

Some("§alias, path".to_owned())
.into_iter()
.chain(paths)
.chain(
paths
.iter()
.map(|p| format!("{}, {p}", self.path_alias(&p))),
)
.collect::<Vec<_>>()
.join("\n")
}
Expand All @@ -270,6 +302,10 @@ impl Conversation {
FileRef::Path(p) => p,
};

update
.send(Update::Step(SearchStep::File(path.clone())))
.await?;

ctx.app
.indexes
.file
Expand All @@ -281,6 +317,10 @@ impl Conversation {
Action::Code(query) => {
// Semantic search.

update
.send(Update::Step(SearchStep::Code(query.clone())))
.await?;

let nl_query = SemanticQuery {
target: Some(parser::Literal::Plain(Cow::Owned(query))),
..Default::default()
Expand Down Expand Up @@ -316,7 +356,7 @@ impl Conversation {
}

Action::Check(question, path_aliases) => {
self.check(ctx, question, path_aliases).await?
self.check(ctx, update, question, path_aliases).await?
}
};

Expand All @@ -336,6 +376,7 @@ impl Conversation {
async fn check(
&mut self,
ctx: &AppContext,
update: Sender<Update>,
question: String,
path_aliases: Vec<usize>,
) -> Result<String> {
Expand All @@ -345,6 +386,13 @@ impl Conversation {
.collect::<Result<Vec<_>, _>>()
.map_err(|i| anyhow!("invalid path alias {i}"))?;

for u in paths
.iter()
.map(|&p| Update::Step(SearchStep::Check(p.clone())))
{
update.send(u).await?;
}

let question = &question;
let ctx = &ctx.clone().model("gpt-3.5-turbo");
let repo_ref = &self.repo_ref;
Expand Down Expand Up @@ -415,7 +463,13 @@ impl Conversation {
Ok(serde_json::to_string(&out)?)
}

async fn answer(&self, ctx: &AppContext, update: Sender<Update>, question: &str) -> Result<()> {
async fn answer(
&self,
ctx: &AppContext,
update: Sender<Update>,
question: &str,
path_aliases: &[String],
) -> Result<()> {
let messages = self
.history
.iter()
Expand All @@ -433,10 +487,19 @@ impl Conversation {
while let Some(token) = stream.next().await {
buffer += &token?;
let (s, _) = partial_parse::rectify_json(&buffer);
update
.send(Update::Answer(s.into_owned()))
.await
.or(Err(anyhow!("failed to send answer update")))?;

// this /should/ be infallible if rectify_json works
let json_array: Vec<Vec<serde_json::Value>> =
serde_json::from_str(&s).expect("failed to rectify_json");

let search_results = json_array
.iter()
.map(Vec::as_slice)
.filter_map(SearchResult::from_json_array)
.map(|s| s.substitute_path_alias(path_aliases))
.collect::<Vec<_>>();

update.send(Update::Result(search_results)).await?;
}

Ok(())
Expand Down Expand Up @@ -466,19 +529,6 @@ enum FileRef {
}

impl Action {
/// Map this action to a summary update.
fn update(&self) -> Update {
match self {
Self::Answer(..) => Update::Answering,
Self::Prompt(s) => Update::Prompt(s.clone()),
Self::Query(..) => Update::ProcessingQuery,
Self::Code(..) => Update::SearchingFiles,
Self::Path(..) => Update::SearchingFiles,
Self::File(..) => Update::LoadingFiles,
Self::Check(..) => Update::Answering,
}
}

/// Deserialize this action from the GPT-tagged enum variant format.
///
/// We convert:
Expand Down Expand Up @@ -508,7 +558,7 @@ impl Action {
let action = action.as_str().context("model action was not a string")?;

let value = if array.len() < 2 {
array.pop().unwrap_or(serde_json::Value::Null)
array.pop().unwrap_or_default()
} else {
array.into()
};
Expand Down Expand Up @@ -547,16 +597,6 @@ impl Action {
}
}

#[derive(serde::Serialize)]
enum Update {
Prompt(String),
Answer(String),
ProcessingQuery,
SearchingFiles,
Answering,
LoadingFiles,
}

/// An action that may not have finished loading yet.
struct ActionStream {
tokens: String,
Expand All @@ -565,14 +605,10 @@ struct ActionStream {

impl ActionStream {
/// Load this action, consuming the stream if required.
async fn load(mut self, update: &Sender<Update>) -> Result<(Action, String)> {
async fn load(mut self) -> Result<(Action, String)> {
let mut stream = match self.action {
Either::Left(stream) => stream,
Either::Right(action) => {
update
.send(action.update())
.await
.or(Err(anyhow!("failed to send update")))?;
return Ok((action, self.tokens));
}
};
Expand All @@ -582,11 +618,6 @@ impl ActionStream {
}

let action = Action::deserialize_gpt(&self.tokens)?;
update
.send(action.update())
.await
.or(Err(anyhow!("failed to send update")))?;

Ok((action, self.tokens))
}
}
Expand Down
16 changes: 15 additions & 1 deletion server/bleep/src/webserver/answer/partial_parse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub fn rectify_json(input: &str) -> (Cow<str>, &str) {
'"' => rectify_str(input),
'[' => rectify_array(input),
'{' => rectify_object(input),
d if input.trim().starts_with(|c: char| c.is_ascii_digit()) => rectify_number(input),
_d if input.trim().starts_with(|c: char| c.is_ascii_digit()) => rectify_number(input),
c => panic!("malformed JSON value: `{c}`"),
}
}
Expand Down Expand Up @@ -202,6 +202,12 @@ fn rectify_object(input: &str) -> (Cow<str>, &str) {
}
_ => panic!("malformed JSON object"),
}
rest = consume_whitespace(rest);

if rest.is_empty() {
buf += "}";
break;
}
}

(buf.into(), rest)
Expand Down Expand Up @@ -348,5 +354,13 @@ mod tests {
let (value, rest) = rectify_json("{\"oldFileName\": \"config.rs\",\n\"new\"");
assert_eq!(value, "{\"oldFileName\":\"config.rs\",\"new\":null}");
assert_eq!(rest, "");

let (value, rest) = rectify_json("[{\"oldFileName\": \"config.rs\",\n\"ne");
assert_eq!(value, "[{\"oldFileName\":\"config.rs\",\"ne\":null}]");
assert_eq!(rest, "");

let (value, rest) = rectify_json("[{\"oldFileName\": \"config.rs\",\n\"new\":null,");
assert_eq!(value, "[{\"oldFileName\":\"config.rs\",\"new\":null}]");
assert_eq!(rest, "");
}
}
Loading

0 comments on commit 21740ab

Please sign in to comment.