Skip to content

Commit

Permalink
basic error handling with anyhow; log some metrics on what was ingest…
Browse files Browse the repository at this point in the history
…ed; env_logger
  • Loading branch information
mooreniemi committed Jun 15, 2021
1 parent 298dd60 commit dedbf57
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 50 deletions.
39 changes: 39 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ j4rs = "0.13"
tantivy = "0.15"
clap = "3.0.0-beta.2"
serde_json = "1.0"
anyhow = "1"
log = "0.4"
env_logger = "0.8"


[dev-dependencies]
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,5 @@ rsync -r /var/lib/elasticsearch/nodes/0/indices/TvG2djXSQgqg4PWZSrv2wQ/0/index/

- `HierarchicalFacet` and `DateTime` support in the schema mapping.
- Remapping field names on export.
- Switch Clap to StructOpt probably.
- java_wrapper should probably be made into a git submodule. Right now I `rsync` from another repo.
183 changes: 134 additions & 49 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,28 @@
use log::{debug, error};
use log::{info, warn};
use std::convert::TryFrom;
use std::env;
use std::error::Error;
use std::fs;
use std::path::Path;
use std::process;
use std::time::Instant;

use clap::{AppSettings, Clap};
use j4rs::{ClasspathEntry, InvocationArg, Jvm, JvmBuilder};
use serde_json::Value;
use tantivy::collector::TopDocs;
use tantivy::directory::MmapDirectory;
use tantivy::query::QueryParser;
use tantivy::schema::Schema;
use tantivy::schema::{Field, FieldEntry, Schema};
use tantivy::DocAddress;
use tantivy::Document;
use tantivy::Index;
use tantivy::Score;

/// This is a tool for dumping Elasticsearch Lucene shards into Tantivy indices.
/// Elasticsearch stores fields in a particular way which is why it's not "just" Lucene.
/// If you use a schema mapping with _es_source as a listed field, a copy of the entire ES JSON will be placed there
#[derive(Clap)]
#[clap(version = "0.1.2", author = "Alex MN. <[email protected]>")]
#[clap(setting = AppSettings::ColoredHelp)]
Expand All @@ -45,7 +49,11 @@ fn run(
) -> Result<(), Box<Error>> {
// destination index details
let schema: Schema = serde_json::from_str(&fs::read_to_string(schema_path)?)?;
dbg!(&schema);
info!("Found Schema: {:?}", schema);
schema.get_field("_es_source").and_then(|f| {
info!("Found {:?} in Schema so will retain ES source document", f);
Some(f)
});

// safe to rm -rf /tmp/suntan/tantivy-idx
fs::create_dir_all(&output)?;
Expand All @@ -61,7 +69,7 @@ fn run(
jassets_path
.as_path()
.to_str()
.expect("valid jassets classpath"),
.expect("Valid jassets classpath setup with build.rs"),
);
let jvm: Jvm = JvmBuilder::new()
.classpath_entry(entry)
Expand All @@ -72,65 +80,68 @@ fn run(
let instantiation_args = vec![InvocationArg::try_from(input)?];
let instance = jvm.create_instance("org.suntan.ShardReader", instantiation_args.as_ref())?;
let chain = jvm.chain(&instance)?;

let doc_count: u64 = chain.clone_instance()?.invoke("docCount", &[])?.to_rust()?;

let iterator = chain.invoke("batches", &[])?;

// # Indexing documents

// FIXME: should probably be configurable
// Tantivy buffer of 100MB that will be split between indexing threads.
let mut index_writer = index.writer(100_000_000)?;

// FIXME: tantivy DocId is incremental so this creates duplication on each run
// see also https://docs.rs/tantivy/0.15.0/tantivy/type.DocId.html
let now = Instant::now();
let mut batches: u64 = 0;
let mut docs: u64 = 0;
while iterator.invoke("hasNext", &[])?.to_rust()? {
batches += 1;
debug!("Found another batch ({})", batches);
let batch: Vec<String> = iterator.invoke("next", &[])?.to_rust()?;
batch.iter().for_each(|doc_source| {
// there is also a parse_document method we could use specific to tantivy
// but it errors on any keys not in the schema so the below is more flexible right now
// let doc: Document = schema.parse_document(&doc_source)?;
let v: Value = serde_json::from_str(&doc_source).expect("must be valid doc");
// dbg!(v);

let mut doc = Document::new();

// FIXME: only handling exact field to field export for text
schema.fields().for_each(|(field, field_entry)| {
if field_entry.name().eq("source") {
doc.add_text(field, doc_source);
} else {
match field_entry.field_type() {
tantivy::schema::FieldType::Str(_) => {
doc.add_text(field, v[field_entry.name()].as_str().unwrap_or(""));
}
tantivy::schema::FieldType::U64(_) => {
doc.add_text(field, v[field_entry.name()].as_u64().unwrap_or(0));
}
tantivy::schema::FieldType::I64(_) => {
doc.add_text(field, v[field_entry.name()].as_i64().unwrap_or(0));
}
tantivy::schema::FieldType::F64(_) => {
doc.add_text(field, v[field_entry.name()].as_i64().unwrap_or(0));
}
tantivy::schema::FieldType::Date(_) => {
// TODO: need to bring in chrono etc
// doc.add_date(content, v["last_updated"].as_str().unwrap_or(""));
todo!()
}
tantivy::schema::FieldType::HierarchicalFacet(_) => {
todo!()
}
tantivy::schema::FieldType::Bytes(_) => {
doc.add_bytes(
field,
v[field_entry.name()].as_str().unwrap_or("").as_bytes(),
);
if let Ok(v) = serde_json::from_str(&doc_source) {
let v: Value = v;
// dbg!(v);
let mut doc = Document::new();

// FIXME: only handling exact field to field export for text
for (field, field_entry) in schema.fields() {
if field_entry.name().eq("_es_source") {
doc.add_text(field, doc_source);
} else {
match add_to_doc(field, field_entry, &v, &mut doc) {
Ok(_) => {}
Err(_) => {
error!("Failed to add {:?} to {:?} on {:?}", v, field_entry, doc)
}
}
}
}
});

index_writer.add_document(doc);
index_writer.add_document(doc);
docs += 1;
} else {
error!("Failed to parse doc_source into Value: {:?}", doc_source);
}
});
}
info!(
"Finished {} batches, {} docs, in {} seconds.",
batches,
docs,
now.elapsed().as_secs()
);

if docs < doc_count {
warn!(
"# docs ingested on this run {} < # docs in source index {}; likely some docs errored.",
docs, doc_count
);
}

// Like Lucene, Tantivy has to close writers before opening readers
index_writer.commit()?;
Expand All @@ -141,7 +152,18 @@ fn run(
let reader = index.reader()?;

let searcher = reader.searcher();
dbg!(searcher.num_docs());
info!(
"Found {} docs in exported Tantivy index.",
searcher.num_docs()
);

if searcher.num_docs() != docs {
warn!(
"Tantivy index # docs {} didn't match # docs {} ingested on this run; were you ingesting onto an existing index?",
searcher.num_docs(),
docs
);
}

// search all fields except unindexed (eg. source) for the test query
let all_fields = schema
Expand All @@ -151,22 +173,85 @@ fn run(
let query_parser = QueryParser::for_index(&index, all_fields);

let query = query_parser.parse_query(&test_query)?;
dbg!(&query);
info!("Testing query {:?} against exported Tantivy index.", query);

let top_docs: Vec<(Score, DocAddress)> = searcher.search(&query, &TopDocs::with_limit(10))?;

println!("results");
for (_score, doc_address) in top_docs {
let retrieved_doc = searcher.doc(doc_address)?;
dbg!(retrieved_doc);
// println!("{}", schema.to_json(&retrieved_doc));
if top_docs.len() > 0 {
info!(
"Test query {:?} found {} results.",
test_query,
top_docs.len()
);
for (_score, doc_address) in top_docs {
let retrieved_doc = searcher.doc(doc_address)?;
info!("Found in Tantivy index: {:?}", retrieved_doc);
// println!("{}", schema.to_json(&retrieved_doc));
}
} else {
warn!("Test query {:?} failed to find results.", test_query);
}

Ok(())
}

fn add_to_doc(
field: Field,
field_entry: &FieldEntry,
v: &Value,
doc: &mut Document,
) -> anyhow::Result<()> {
match field_entry.field_type() {
tantivy::schema::FieldType::Str(_) => {
let text = v[field_entry.name()]
.as_str()
.ok_or(anyhow::anyhow!("bad text"))?;
doc.add_text(field, text);
Ok(())
}
tantivy::schema::FieldType::U64(_) => {
let num = v[field_entry.name()]
.as_u64()
.ok_or(anyhow::anyhow!("bad u64"))?;
doc.add_u64(field, num);
Ok(())
}
tantivy::schema::FieldType::I64(_) => {
let num = v[field_entry.name()]
.as_i64()
.ok_or(anyhow::anyhow!("bad i64"))?;
doc.add_i64(field, num);
Ok(())
}
tantivy::schema::FieldType::F64(_) => {
let num = v[field_entry.name()]
.as_f64()
.ok_or(anyhow::anyhow!("bad f64"))?;
doc.add_f64(field, num);
Ok(())
}
tantivy::schema::FieldType::Date(_) => {
// TODO: need to bring in chrono etc
// doc.add_date(content, v["last_updated"].as_str().unwrap_or(""));
todo!()
}
tantivy::schema::FieldType::HierarchicalFacet(_) => {
todo!()
}
tantivy::schema::FieldType::Bytes(_) => {
let b = v[field_entry.name()]
.as_str()
.ok_or(anyhow::anyhow!("bad str"))?
.as_bytes();
doc.add_bytes(field, b);
Ok(())
}
}
}

fn main() {
let opts: Opts = Opts::parse();
env_logger::init();

// FIXME: this is old Rust 2015 format...
if let Err(e) = run(opts.input, opts.output, opts.schema_path, opts.test_query) {
Expand Down
2 changes: 1 addition & 1 deletion tests/resources/tantivy-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
}
},
{
"name": "source",
"name": "_es_source",
"type": "text",
"options": {
"stored": true
Expand Down

0 comments on commit dedbf57

Please sign in to comment.