Skip to content

Commit

Permalink
attach thread-id to response (BloopAI#461)
Browse files Browse the repository at this point in the history
  • Loading branch information
oppiliappan authored and calyptobai committed May 19, 2023
1 parent b9e4ab7 commit 6566478
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 21 deletions.
2 changes: 1 addition & 1 deletion server/bleep/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ tokenizers = "0.13.3"
tokio-stream = "0.1.12"
ort = { git = "https://github.com/bloopai/ort", branch = "merge-upstream" }
ndarray = "0.15"
uuid = { version = "1.3.2", features = ["v4", "fast-rng"] }
uuid = { version = "1.3.2", features = ["v4", "fast-rng", "serde"] }
jsonwebtoken = { version = "8.3.0", features = ["use_pem"] }

# telemetry
Expand Down
42 changes: 23 additions & 19 deletions server/bleep/src/webserver/answer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ pub struct Params {
pub q: String,
pub repo_ref: RepoRef,
#[serde(default = "default_thread_id")]
pub thread_id: String,
pub thread_id: uuid::Uuid,
}

fn default_thread_id() -> String {
uuid::Uuid::new_v4().to_string()
fn default_thread_id() -> uuid::Uuid {
uuid::Uuid::new_v4()
}

pub(super) async fn handle(
Expand All @@ -67,7 +67,7 @@ pub(super) async fn handle(

let mut conversation = Conversation::load(&conversation_id)
.await?
.unwrap_or_else(|| Conversation::new(params.repo_ref));
.unwrap_or_else(|| Conversation::new(params.repo_ref.clone()));

let ctx = AppContext::new(app)
.map_err(|e| super::Error::user(e).with_status(StatusCode::UNAUTHORIZED))?;
Expand Down Expand Up @@ -124,20 +124,24 @@ pub(super) async fn handle(
conversation.store(conversation_id).await?;
};

let stream = stream
.map(|upd: Result<Exchange>| {
sse::Event::default().json_data(upd.map_err(|e| e.to_string()))
})
.chain(futures::stream::once(async {
Ok(sse::Event::default().data("[DONE]"))
}));
let thread_stream = futures::stream::once(async move {
Ok(sse::Event::default().data(params.thread_id.to_string()))
});

let answer_stream = stream.map(|upd: Result<Exchange>| {
sse::Event::default().json_data(upd.map_err(|e| e.to_string()))
});

let done_stream = futures::stream::once(async { Ok(sse::Event::default().data("[DONE]")) });

let stream = thread_stream.chain(answer_stream).chain(done_stream);

Ok(Sse::new(stream))
}

#[derive(Hash, PartialEq, Eq, Clone)]
pub(super) struct ConversationId {
thread_id: String,
thread_id: uuid::Uuid,
user_id: String,
}

Expand Down Expand Up @@ -517,12 +521,12 @@ impl Conversation {
let mut transaction = db.begin().await?;

// Delete the old conversation for simplicity. This also deletes all its messages.
let id2 = id.clone();
let (user_id, thread_id) = (id.user_id.clone(), id.thread_id.to_string());
sqlx::query! {
"DELETE FROM conversations \
WHERE user_id = ? AND thread_id = ?",
id2.user_id,
id2.thread_id,
user_id,
thread_id,
}
.execute(&mut transaction)
.await?;
Expand All @@ -544,8 +548,8 @@ impl Conversation {
path_aliases, created_at\
) \
VALUES (?, ?, ?, ?, ?, ?, ?, strftime('%s', 'now'))",
id.user_id,
id.thread_id,
user_id,
thread_id,
repo_ref,
title,
exchanges,
Expand All @@ -560,10 +564,10 @@ impl Conversation {
Ok(())
}

async fn load(conversation_id: &ConversationId) -> Result<Option<Self>> {
async fn load(id: &ConversationId) -> Result<Option<Self>> {
let db = db::get().await?;

let ConversationId { thread_id, user_id } = conversation_id.clone();
let (user_id, thread_id) = (id.user_id.clone(), id.thread_id.to_string());

let row = sqlx::query! {
"SELECT repo_ref, llm_history, exchanges, path_aliases FROM conversations \
Expand Down
2 changes: 1 addition & 1 deletion server/bleep/src/webserver/answer/conversations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ pub(in crate::webserver) async fn delete(
}

pub(in crate::webserver) async fn thread(
Path(thread_id): Path<String>,
Path(thread_id): Path<uuid::Uuid>,
Extension(user): Extension<User>,
) -> webserver::Result<impl IntoResponse> {
let user_id = user.0.ok_or_else(|| Error::user("missing user ID"))?;
Expand Down

0 comments on commit 6566478

Please sign in to comment.