Skip to content

Commit

Permalink
fix(webserver): improve error handling and report stdout for doc inde…
Browse files Browse the repository at this point in the history
…xing pipeline (TabbyML#2462)

* fix(webserver): improve error handling for doc indexing pipeline

* Properly return stdout/errors

* [autofix.ci] apply automated fixes

* Update frontend text

update

update

update

* update description

* update

* update

* fix compiling

* fix

---------

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: Meng Zhang <[email protected]>
  • Loading branch information
3 people authored Jun 21, 2024
1 parent 4855d23 commit 8b56e32
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 40 deletions.
19 changes: 10 additions & 9 deletions crates/tabby-scheduler/src/crawl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use url::Url;

use self::types::{CrawledDocument, KatanaRequestResponse};

async fn crawl_url(start_url: &str) -> impl Stream<Item = KatanaRequestResponse> {
async fn crawl_url(start_url: &str) -> anyhow::Result<impl Stream<Item = KatanaRequestResponse>> {
let mut child = tokio::process::Command::new("katana")
.arg("-u")
.arg(start_url)
Expand All @@ -22,8 +22,7 @@ async fn crawl_url(start_url: &str) -> impl Stream<Item = KatanaRequestResponse>
.arg("9999")
.stdout(Stdio::piped())
.stderr(Stdio::inherit())
.spawn()
.expect("Failed to start katana, please check whether the binary is in your $PATH");
.spawn()?;

let stdout = child.stdout.take().expect("Failed to acquire stdout");
let mut stdout = tokio::io::BufReader::new(stdout).lines();
Expand All @@ -36,7 +35,7 @@ async fn crawl_url(start_url: &str) -> impl Stream<Item = KatanaRequestResponse>
}
});

stream! {
Ok(stream! {
while let Ok(Some(line)) = stdout.next_line().await {
let data = match serde_json::from_str::<KatanaRequestResponse>(&line) {
Ok(data) => data,
Expand Down Expand Up @@ -69,7 +68,7 @@ async fn crawl_url(start_url: &str) -> impl Stream<Item = KatanaRequestResponse>

yield data;
}
}
})
}

fn to_document(data: KatanaRequestResponse) -> Option<CrawledDocument> {
Expand Down Expand Up @@ -105,10 +104,12 @@ fn to_document(data: KatanaRequestResponse) -> Option<CrawledDocument> {
))
}

pub async fn crawl_pipeline(start_url: &str) -> impl Stream<Item = CrawledDocument> {
crawl_url(start_url)
.await
.filter_map(move |data| async move { to_document(data) })
pub async fn crawl_pipeline(
start_url: &str,
) -> anyhow::Result<impl Stream<Item = CrawledDocument>> {
Ok(crawl_url(start_url)
.await?
.filter_map(move |data| async move { to_document(data) }))
}

#[cfg(test)]
Expand Down
48 changes: 27 additions & 21 deletions crates/tabby-scheduler/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@ mod code;
pub mod crawl;
mod indexer;

use async_stream::stream;
pub use code::CodeIndexer;
use crawl::crawl_pipeline;
use doc::create_web_index;
pub use doc::{DocIndexer, WebDocument};
use futures::StreamExt;
use futures::{Future, StreamExt};
use indexer::{IndexAttributeBuilder, Indexer};
use tabby_inference::Embedding;

Expand Down Expand Up @@ -75,30 +74,37 @@ async fn scheduler_pipeline(config: &tabby_common::config::Config) {
code.garbage_collection(repositories);
}

pub async fn crawl_index_docs(urls: &[String], embedding: Arc<dyn Embedding>) {
pub async fn crawl_index_docs<F>(
urls: &[String],
embedding: Arc<dyn Embedding>,
on_process_url: impl Fn(String) -> F,
) -> anyhow::Result<()>
where
F: Future<Output = ()>,
{
for url in urls {
debug!("Starting doc index pipeline for {url}");
let embedding = embedding.clone();
stream! {
let mut num_docs = 0;
let doc_index = create_web_index(embedding.clone());
for await doc in crawl_pipeline(url).await {
let source_doc = SourceDocument {
id: doc.url.clone(),
title: doc.metadata.title.unwrap_or_default(),
link: doc.url,
body: doc.markdown,
};

num_docs += 1;
doc_index.add(source_doc).await;
}
info!("Crawled {} documents from '{}'", num_docs, url);
doc_index.commit();
let mut num_docs = 0;
let doc_index = create_web_index(embedding.clone());

let mut pipeline = Box::pin(crawl_pipeline(url).await?);
while let Some(doc) = pipeline.next().await {
on_process_url(doc.url.clone()).await;
let source_doc = SourceDocument {
id: doc.url.clone(),
title: doc.metadata.title.unwrap_or_default(),
link: doc.url,
body: doc.markdown,
};

num_docs += 1;
doc_index.add(source_doc).await;
}
.collect::<Vec<_>>()
.await;
info!("Crawled {} documents from '{}'", num_docs, url);
doc_index.commit();
}
Ok(())
}

mod tantivy_utils {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,25 @@ export default function WebProviderLayout({
return (
<>
<SubHeader
// todo: add external link
// FIXME(wsxiaoys): add external link
// externalLink="https://tabby.tabbyml.com/blog/2023/10/16/repository-context-for-code-completion"
>
Connect to a web URL, utilizing this as context to enhance the
performance of large language models.
<p>
Crawl documents from following URLs and use their content to enhance
the Answer Engine. Recrawling will occur only if manually initiated.
</p>
<p>
Underlying,{' '}
<a
className="underline"
target="_blank"
href="https://github.com/projectdiscovery/katana"
>
Katana
</a>{' '}
is used as a crawler (running as a subprocess) and thus needs to be
installed in the $PATH.
</p>
</SubHeader>
{children}
</>
Expand Down
11 changes: 7 additions & 4 deletions ee/tabby-webserver/src/service/background_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub async fn start(
git_repository_service: Arc<dyn GitRepositoryService>,
third_party_repository_service: Arc<dyn ThirdPartyRepositoryService>,
integration_service: Arc<dyn IntegrationService>,
web_crawler_service: Arc<dyn WebCrawlerService>,
_web_crawler_service: Arc<dyn WebCrawlerService>,
embedding: Arc<dyn Embedding>,
sender: tokio::sync::mpsc::UnboundedSender<BackgroundJobEvent>,
mut receiver: tokio::sync::mpsc::UnboundedReceiver<BackgroundJobEvent>,
Expand Down Expand Up @@ -95,9 +95,12 @@ pub async fn start(
let mut job_logger = JobLogger::new(WebCrawlerJob::NAME, db.clone()).await;
let job = WebCrawlerJob::new(url);

// FIXME(boxbeam): handles job error.
job.run(embedding.clone()).await;
job_logger.complete(0).await;
if let Err(err) = job.run(job_logger.clone(), embedding.clone()).await {
cprintln!(job_logger, "Web indexing process failed: do you have Katana installed? (https://github.com/projectdiscovery/katana) \n{err:?}");
job_logger.complete(-1).await;
} else {
job_logger.complete(0).await;
}
}
}
},
Expand Down
18 changes: 15 additions & 3 deletions ee/tabby-webserver/src/service/background_job/web_crawler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ use std::sync::Arc;

use tabby_inference::Embedding;

use super::helper::Job;
use super::{
cprintln,
helper::{Job, JobLogger},
};

pub struct WebCrawlerJob {
url: String,
Expand All @@ -17,7 +20,16 @@ impl WebCrawlerJob {
Self { url }
}

pub async fn run(self, embedding: Arc<dyn Embedding>) {
tabby_scheduler::crawl_index_docs(&[self.url], embedding).await;
pub async fn run(
self,
job_logger: JobLogger,
embedding: Arc<dyn Embedding>,
) -> tabby_schema::Result<()> {
tabby_scheduler::crawl_index_docs(&[self.url], embedding, move |url| {
let job_logger = job_logger.clone();
async move { cprintln!(job_logger, "Fetching {url}") }
})
.await?;
Ok(())
}
}

0 comments on commit 8b56e32

Please sign in to comment.