Skip to content

Commit

Permalink
Add _msearch elasticsearch endpoint. (quickwit-oss#3330)
Browse files Browse the repository at this point in the history
* Add _msearch elasticsearch endpoint.

* Clean and add tests.

* Fix following review comments.
  • Loading branch information
fmassot authored May 18, 2023
1 parent 3a41263 commit 253a81d
Show file tree
Hide file tree
Showing 7 changed files with 489 additions and 6 deletions.
13 changes: 13 additions & 0 deletions quickwit/quickwit-serve/src/elastic_search_api/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use serde::de::DeserializeOwned;
use warp::reject::LengthRequired;
use warp::{Filter, Rejection};

use super::model::MultiSearchQueryParams;
use crate::elastic_search_api::model::{ElasticIngestOptions, SearchBody, SearchQueryParams};

const BODY_LENGTH_LIMIT: Byte = byte_unit::Byte::from_bytes(1_000_000);
Expand Down Expand Up @@ -130,3 +131,15 @@ pub(crate) fn elastic_index_bulk_filter(
serde_qs::Config::default(),
))
}

#[utoipa::path(post, tag = "Search", path = "/_msearch")]
pub(crate) fn elastic_multi_search_filter(
) -> impl Filter<Extract = (Bytes, MultiSearchQueryParams), Error = Rejection> + Clone {
warp::path!("_elastic" / "_msearch")
.and(warp::body::content_length_limit(
BODY_LENGTH_LIMIT.get_bytes(),
))
.and(warp::body::bytes())
.and(warp::post())
.and(serde_qs::warp::query(serde_qs::Config::default()))
}
225 changes: 223 additions & 2 deletions quickwit/quickwit-serve/src/elastic_search_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ use std::sync::Arc;
use bulk::{es_compat_bulk_handler, es_compat_index_bulk_handler};
use quickwit_ingest::IngestServiceClient;
use quickwit_search::SearchService;
use rest_handler::{es_compat_index_search_handler, es_compat_search_handler};
use rest_handler::{
es_compat_index_multi_search_handler, es_compat_index_search_handler, es_compat_search_handler,
};
use serde::{Deserialize, Serialize};
use warp::{Filter, Rejection};

Expand All @@ -40,7 +42,8 @@ pub fn elastic_api_handlers(
ingest_service: IngestServiceClient,
) -> impl Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone {
es_compat_search_handler(search_service.clone())
.or(es_compat_index_search_handler(search_service))
.or(es_compat_index_search_handler(search_service.clone()))
.or(es_compat_index_multi_search_handler(search_service))
.or(es_compat_bulk_handler(ingest_service.clone()))
.or(es_compat_index_bulk_handler(ingest_service))
// Register newly created handlers here.
Expand Down Expand Up @@ -74,3 +77,221 @@ impl From<i64> for TrackTotalHits {
TrackTotalHits::Count(i)
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use mockall::predicate;
use quickwit_ingest::{IngestApiService, IngestServiceClient};
use quickwit_search::MockSearchService;

use super::model::ElasticSearchError;
use crate::elastic_search_api::model::MultiSearchResponse;

fn ingest_service_client() -> IngestServiceClient {
let universe = quickwit_actors::Universe::new();
let (ingest_service_mailbox, _) = universe.create_test_mailbox::<IngestApiService>();
IngestServiceClient::from_mailbox(ingest_service_mailbox)
}

#[tokio::test]
async fn test_msearch_api_return_200_responses() {
let mut mock_search_service = MockSearchService::new();
mock_search_service
.expect_root_search()
.with(predicate::function(
|search_request: &quickwit_proto::SearchRequest| {
(search_request.index_id == "index-1"
&& search_request.start_offset == 5
&& search_request.max_hits == 20)
|| (search_request.index_id == "index-2"
&& search_request.start_offset == 0
&& search_request.max_hits == 10)
},
))
.returning(|_| Ok(Default::default()));
let es_search_api_handler =
super::elastic_api_handlers(Arc::new(mock_search_service), ingest_service_client());
let msearch_payload = r#"
{"index":"index-1"}
{"query":{"query_string":{"query":"test"}}, "from": 5, "size": 20}
{"index":"index-2"}
{"query":{"query_string":{"query":"test"}}}
"#;
let resp = warp::test::request()
.path("/_elastic/_msearch")
.method("POST")
.body(msearch_payload)
.reply(&es_search_api_handler)
.await;
assert_eq!(resp.status(), 200);
let string_body = String::from_utf8(resp.body().to_vec()).unwrap();
let es_msearch_response: MultiSearchResponse = serde_json::from_str(&string_body).unwrap();
assert_eq!(es_msearch_response.responses.len(), 2);
for response in es_msearch_response.responses {
assert_eq!(response.status, 200);
assert_eq!(response.error, None);
assert!(response.response.is_some())
}
}

#[tokio::test]
async fn test_msearch_api_return_one_500_and_one_200_responses() {
let mut mock_search_service = MockSearchService::new();
mock_search_service
.expect_root_search()
.returning(|search_request| {
if search_request.index_id == "index-1" {
Ok(Default::default())
} else {
Err(quickwit_search::SearchError::InternalError(
"something bad happened".to_string(),
))
}
});
let es_search_api_handler =
super::elastic_api_handlers(Arc::new(mock_search_service), ingest_service_client());
let msearch_payload = r#"
{"index":"index-1"}
{"query":{"query_string":{"query":"test"}}, "from": 5, "size": 10}
{"index":"index-2"}
{"query":{"query_string":{"query":"test"}}}
"#;
let resp = warp::test::request()
.path("/_elastic/_msearch")
.method("POST")
.body(msearch_payload)
.reply(&es_search_api_handler)
.await;
assert_eq!(resp.status(), 200);
let es_msearch_response: MultiSearchResponse = serde_json::from_slice(resp.body()).unwrap();
assert_eq!(es_msearch_response.responses.len(), 2);
assert_eq!(es_msearch_response.responses[0].status, 200);
assert!(es_msearch_response.responses[0].error.is_none());
assert_eq!(es_msearch_response.responses[1].status, 500);
assert!(es_msearch_response.responses[1].response.is_none());
let error_cause = es_msearch_response.responses[1].error.as_ref().unwrap();
assert_eq!(
error_cause.reason.as_ref().unwrap(),
"Internal error: `something bad happened`."
);
}

#[tokio::test]
async fn test_msearch_api_return_400_with_malformed_request_header() {
let mock_search_service = MockSearchService::new();
let es_search_api_handler =
super::elastic_api_handlers(Arc::new(mock_search_service), ingest_service_client());
let msearch_payload = r#"
{"index":"index-1"
{"query":{"query_string":{"query":"test"}}}
"#;
let resp = warp::test::request()
.path("/_elastic/_msearch")
.method("POST")
.body(msearch_payload)
.reply(&es_search_api_handler)
.await;
assert_eq!(resp.status(), 400);
let es_error: ElasticSearchError = serde_json::from_slice(resp.body()).unwrap();
assert!(es_error
.error
.reason
.unwrap()
.starts_with("Invalid argument: Failed to parse request header"));
}

#[tokio::test]
async fn test_msearch_api_return_400_with_malformed_request_body() {
let mock_search_service = MockSearchService::new();
let es_search_api_handler =
super::elastic_api_handlers(Arc::new(mock_search_service), ingest_service_client());
let msearch_payload = r#"
{"index":"index-1"}
{"query":{"query_string":{"bad":"test"}}}
"#;
let resp = warp::test::request()
.path("/_elastic/_msearch")
.method("POST")
.body(msearch_payload)
.reply(&es_search_api_handler)
.await;
assert_eq!(resp.status(), 400);
let es_error: ElasticSearchError = serde_json::from_slice(resp.body()).unwrap();
assert!(es_error
.error
.reason
.unwrap()
.starts_with("Invalid argument: Failed to parse request body"));
}

#[tokio::test]
async fn test_msearch_api_return_400_with_only_a_header_request() {
let mock_search_service = MockSearchService::new();
let es_search_api_handler =
super::elastic_api_handlers(Arc::new(mock_search_service), ingest_service_client());
let msearch_payload = r#"
{"index":"index-1"}
"#;
let resp = warp::test::request()
.path("/_elastic/_msearch")
.method("POST")
.body(msearch_payload)
.reply(&es_search_api_handler)
.await;
assert_eq!(resp.status(), 400);
let es_error: ElasticSearchError = serde_json::from_slice(resp.body()).unwrap();
assert!(es_error
.error
.reason
.unwrap()
.starts_with("Invalid argument: Expect request body after request header"));
}

#[tokio::test]
async fn test_msearch_api_return_400_with_no_index() {
let mock_search_service = MockSearchService::new();
let es_search_api_handler =
super::elastic_api_handlers(Arc::new(mock_search_service), ingest_service_client());
let msearch_payload = r#"
{}
{"query":{"query_string":{"bad":"test"}}}
"#;
let resp = warp::test::request()
.path("/_elastic/_msearch")
.method("POST")
.body(msearch_payload)
.reply(&es_search_api_handler)
.await;
assert_eq!(resp.status(), 400);
let es_error: ElasticSearchError = serde_json::from_slice(resp.body()).unwrap();
assert!(es_error.error.reason.unwrap().starts_with(
"Invalid argument: `_msearch` must define one `index` in the request header"
));
}

#[tokio::test]
async fn test_msearch_api_return_400_with_multiple_indexes() {
let mock_search_service = MockSearchService::new();
let es_search_api_handler =
super::elastic_api_handlers(Arc::new(mock_search_service), ingest_service_client());
let msearch_payload = r#"
{"index": ["index-1", "index-2"]}
{"query":{"query_string":{"bad":"test"}}}
"#;
let resp = warp::test::request()
.path("/_elastic/_msearch")
.method("POST")
.body(msearch_payload)
.reply(&es_search_api_handler)
.await;
assert_eq!(resp.status(), 400);
let es_error: ElasticSearchError = serde_json::from_slice(resp.body()).unwrap();
assert!(es_error
.error
.reason
.unwrap()
.starts_with("Invalid argument: Searching only one index is supported for now."));
}
}
4 changes: 2 additions & 2 deletions quickwit/quickwit-serve/src/elastic_search_api/model/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ use elasticsearch_dsl::search::ErrorCause;
use hyper::StatusCode;
use quickwit_proto::ServiceError;
use quickwit_search::SearchError;
use serde::Serialize;
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Serialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ElasticSearchError {
#[serde(with = "http_serde::status_code")]
pub status: StatusCode,
Expand Down
4 changes: 4 additions & 0 deletions quickwit/quickwit-serve/src/elastic_search_api/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,15 @@
mod bulk_body;
mod bulk_query_params;
mod error;
mod multi_search;
mod search_body;
mod search_query_params;

pub use bulk_body::{BulkAction, BulkActionMeta};
pub use bulk_query_params::{ElasticIngestOptions, ElasticRefresh};
pub use error::ElasticSearchError;
pub use multi_search::{
MultiSearchHeader, MultiSearchQueryParams, MultiSearchResponse, MultiSearchSingleResponse,
};
pub use search_body::SearchBody;
pub use search_query_params::SearchQueryParams;
Loading

0 comments on commit 253a81d

Please sign in to comment.