Skip to content

Commit

Permalink
Removed extraction events (tensorlakeai#299)
Browse files Browse the repository at this point in the history
  • Loading branch information
diptanu authored Feb 4, 2024
1 parent 36aba4a commit 5dea7d0
Show file tree
Hide file tree
Showing 11 changed files with 294 additions and 283 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/indexify_internal_api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ crate-type = ["rlib"]

[dependencies]
anyhow = { workspace = true }
nanoid = { workspace = true }
indexify_proto = { workspace = true }
mime = { workspace = true }
serde = { workspace = true }
Expand Down
65 changes: 42 additions & 23 deletions crates/indexify_internal_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::{

use anyhow::Result;
use indexify_proto::indexify_coordinator;
use nanoid::nanoid;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, BytesOrString};
use smart_default::SmartDefault;
Expand Down Expand Up @@ -281,27 +282,6 @@ impl TryFrom<indexify_coordinator::Task> for Task {
})
}
}

#[derive(Serialize, Debug, Deserialize, Display, Clone, PartialEq)]
pub enum ExtractionEventPayload {
ExtractorBindingAdded {
repository: String,
binding: ExtractorBinding,
},
CreateContent {
content: ContentMetadata,
},
}

#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
pub struct ExtractionEvent {
pub id: String,
pub repository: String,
pub payload: ExtractionEventPayload,
pub created_at: u64,
pub processed_at: Option<u64>,
}

#[derive(Debug, Clone, Serialize, PartialEq, Eq, Deserialize)]
pub struct ExtractorBinding {
pub id: String,
Expand Down Expand Up @@ -378,8 +358,6 @@ impl From<ContentMetadata> for indexify_coordinator::ContentMetadata {
}
}

// FIXME - Make this visible to only tests
#[cfg(test)]
impl Default for ContentMetadata {
fn default() -> Self {
Self {
Expand Down Expand Up @@ -484,3 +462,44 @@ impl From<Repository> for indexify_coordinator::Repository {
}
}
}

#[derive(Clone, Serialize, Deserialize, Debug, Display)]
pub enum ChangeType {
NewContent,
NewBinding,
ExecutorAdded,
ExecutorRemoved,
}

#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct StateChange {
pub id: String,
pub object_id: String,
pub change_type: ChangeType,
pub created_at: u64,
pub processed_at: Option<u64>,
}

impl Default for StateChange {
fn default() -> Self {
Self {
id: "".to_string(),
object_id: "".to_string(),
change_type: ChangeType::NewContent,
created_at: 0,
processed_at: None,
}
}
}

impl StateChange {
pub fn new(object_id: String, change_type: ChangeType, created_at: u64) -> Self {
Self {
id: nanoid!(16),
object_id,
change_type,
created_at,
processed_at: None,
}
}
}
134 changes: 63 additions & 71 deletions src/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,12 @@ use std::{
use anyhow::{anyhow, Ok, Result};
use indexify_internal_api as internal_api;
use indexify_proto::indexify_coordinator;
use internal_api::StateChange;
use jsonschema::JSONSchema;
use tokio::sync::watch::Receiver;
use tracing::info;

use crate::{
coordinator_filters::*,
state::{store::StateChange, SharedState},
utils::timestamp_secs,
};
use crate::{coordinator_filters::*, state::SharedState};

pub struct Coordinator {
shared_state: SharedState,
Expand All @@ -28,41 +25,52 @@ impl Coordinator {

#[tracing::instrument(skip(self))]
pub async fn process_extraction_events(&self) -> Result<(), anyhow::Error> {
let events = self.shared_state.unprocessed_extraction_events().await?;
info!("processing {} extraction events", events.len());
for event in &events {
info!("processing extraction event: {}", event.id);
let mut tasks = Vec::new();
match event.payload.clone() {
internal_api::ExtractionEventPayload::ExtractorBindingAdded {
repository,
binding,
} => {
let state_changes = self.shared_state.unprocessed_state_change_events().await?;
info!("processing {} extraction events", state_changes.len());
let mut tasks = Vec::new();
for change in &state_changes {
info!(
"processing change event: {}, type: {}, id: {}",
change.id, change.change_type, change.object_id
);
match change.change_type {
internal_api::ChangeType::NewBinding => {
let content_list = self
.shared_state
.content_matching_binding(&repository, &binding)
.content_matching_binding(&change.object_id)
.await?;
let tasks_for_binding = self.create_task(&binding, content_list).await?;
let tasks_for_binding =
self.create_task(&change.object_id, content_list).await?;
tasks.extend(tasks_for_binding);
}
internal_api::ExtractionEventPayload::CreateContent { content } => {
internal_api::ChangeType::NewContent => {
let bindings = self
.shared_state
.filter_extractor_binding_for_content(&content)
.filter_extractor_binding_for_content(&change.object_id)
.await?;
let content = self
.shared_state
.get_conent_metadata(&change.object_id)
.await?;
for binding in bindings {
let task_for_binding =
self.create_task(&binding, vec![content.clone()]).await?;
self.create_task(&binding.id, vec![content.clone()]).await?;
tasks.extend(task_for_binding);
}
}
internal_api::ChangeType::ExecutorAdded => {
info!("executor {} added", change.object_id);
}
internal_api::ChangeType::ExecutorRemoved => {
info!("executor {} removed", change.object_id);
}
};
info!("created {} tasks", tasks.len());
self.shared_state.create_tasks(tasks).await?;
self.shared_state
.mark_extraction_event_processed(&event.id)
.await?;
}
self.shared_state.create_tasks(tasks).await?;
self.shared_state
.mark_change_events_as_processed(state_changes)
.await?;
Ok(())
}

Expand All @@ -89,9 +97,13 @@ impl Coordinator {

pub async fn create_task(
&self,
extractor_binding: &internal_api::ExtractorBinding,
extractor_binding_id: &str,
content_list: Vec<internal_api::ContentMetadata>,
) -> Result<Vec<internal_api::Task>> {
let extractor_binding = self
.shared_state
.get_extractor_binding(extractor_binding_id)
.await?;
let extractor = self
.shared_state
.extractor_with_name(&extractor_binding.extractor)
Expand Down Expand Up @@ -167,16 +179,10 @@ impl Coordinator {
task_id, executor_id, outcome
);
let mut task = self.shared_state.task_with_id(task_id).await?;
let (content_meta_list, extraction_events) =
content_request_to_content_metadata(content_list)?;
let content_meta_list = content_request_to_content_metadata(content_list)?;
task.outcome = outcome;
self.shared_state
.update_task(
task,
Some(executor_id.to_string()),
content_meta_list,
extraction_events,
)
.update_task(task, Some(executor_id.to_string()), content_meta_list)
.await?;
Ok(())
}
Expand Down Expand Up @@ -290,19 +296,7 @@ impl Coordinator {
errors.join(",")
));
}
let extraction_event = internal_api::ExtractionEvent {
id: nanoid::nanoid!(),
repository: binding.repository.clone(),
payload: internal_api::ExtractionEventPayload::ExtractorBindingAdded {
repository: binding.repository.clone(),
binding: binding.clone(),
},
created_at: timestamp_secs(),
processed_at: None,
};
self.shared_state
.create_binding(binding, extraction_event)
.await?;
self.shared_state.create_binding(binding).await?;
Ok(())
}

Expand All @@ -324,10 +318,9 @@ impl Coordinator {
&self,
content_list: Vec<indexify_coordinator::ContentMetadata>,
) -> Result<()> {
let (content_meta_list, extraction_events) =
content_request_to_content_metadata(content_list)?;
let content_meta_list = content_request_to_content_metadata(content_list)?;
self.shared_state
.create_content_batch(content_meta_list, extraction_events)
.create_content_batch(content_meta_list)
.await?;
Ok(())
}
Expand All @@ -339,31 +332,18 @@ impl Coordinator {

fn content_request_to_content_metadata(
content_list: Vec<indexify_coordinator::ContentMetadata>,
) -> Result<(
Vec<internal_api::ContentMetadata>,
Vec<internal_api::ExtractionEvent>,
)> {
) -> Result<Vec<internal_api::ContentMetadata>> {
let mut content_meta_list = Vec::new();
let mut extraction_events = Vec::new();
for content in content_list {
let repository = content.repository.clone();
let c: internal_api::ContentMetadata = content.try_into()?;
content_meta_list.push(c.clone());
let extraction_event = internal_api::ExtractionEvent {
id: nanoid::nanoid!(),
repository,
payload: internal_api::ExtractionEventPayload::CreateContent { content: c },
created_at: timestamp_secs(),
processed_at: None,
};
extraction_events.push(extraction_event);
}
Ok((content_meta_list, extraction_events))
Ok(content_meta_list)
}

#[cfg(test)]
mod tests {
use std::{collections::HashMap, sync::Arc, time::Duration};
use std::{collections::HashMap, fs, sync::Arc, time::Duration};

use indexify_internal_api as internal_api;
use indexify_proto::indexify_coordinator;
Expand All @@ -378,6 +358,7 @@ mod tests {
#[tracing_test::traced_test]
async fn test_create_extraction_events() -> Result<(), anyhow::Error> {
let config = Arc::new(ServerConfig::default());
fs::remove_dir_all(config.sled.clone().path.unwrap()).unwrap();
let shared_state = App::new(config).await.unwrap();
shared_state.initialize_raft().await.unwrap();
let coordinator = crate::coordinator::Coordinator::new(shared_state.clone());
Expand All @@ -402,13 +383,14 @@ mod tests {
}])
.await?;

let events = shared_state.unprocessed_extraction_events().await?;
let events = shared_state.unprocessed_state_change_events().await?;
println!("events: {:?}", events);
assert_eq!(events.len(), 1);

// Run scheduler without any bindings to make sure that the event is processed
// and we don't have any tasks
coordinator.process_and_distribute_work().await?;
let events = shared_state.unprocessed_extraction_events().await?;
let events = shared_state.unprocessed_state_change_events().await?;
assert_eq!(events.len(), 0);
let tasks = shared_state.unassigned_tasks().await?;
assert_eq!(tasks.len(), 0);
Expand Down Expand Up @@ -439,9 +421,15 @@ mod tests {
mock_extractor(),
)
.await?;
assert_eq!(1, shared_state.unprocessed_extraction_events().await?.len());
assert_eq!(
2,
shared_state.unprocessed_state_change_events().await?.len()
);
coordinator.process_and_distribute_work().await?;
assert_eq!(0, shared_state.unprocessed_extraction_events().await?.len());
assert_eq!(
0,
shared_state.unprocessed_state_change_events().await?.len()
);
assert_eq!(
1,
shared_state
Expand All @@ -466,7 +454,10 @@ mod tests {
}])
.await?;
coordinator.process_and_distribute_work().await?;
assert_eq!(0, shared_state.unprocessed_extraction_events().await?.len());
assert_eq!(
0,
shared_state.unprocessed_state_change_events().await?.len()
);
assert_eq!(
1,
shared_state
Expand Down Expand Up @@ -515,10 +506,11 @@ mod tests {
#[tokio::test]
#[tracing_test::traced_test]
async fn test_form_raft_cluster() -> Result<(), anyhow::Error> {
let server_configs = create_test_raft_configs(50)?;
let server_configs = create_test_raft_configs(10)?;

let mut apps = Vec::new();
for config in server_configs {
let _ = fs::remove_dir_all(config.sled.clone().path.unwrap());
let shared_state = App::new(config.clone()).await?;
apps.push(shared_state);
}
Expand Down
2 changes: 1 addition & 1 deletion src/coordinator_filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub fn list_content_filter<'a>(
}

#[cfg(test)]
mod test_list_content_filter {
mod test {
use std::collections::HashMap;

use super::*;
Expand Down
3 changes: 2 additions & 1 deletion src/coordinator_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use indexify_proto::indexify_coordinator::{
UpdateTaskRequest,
UpdateTaskResponse,
};
use internal_api::StateChange;
use itertools::Itertools;
use tokio::{
select,
Expand All @@ -61,7 +62,7 @@ use tracing::{error, info};
use crate::{
coordinator::Coordinator,
server_config::ServerConfig,
state::{self, store::StateChange},
state,
tonic_streamer::DropReceiver,
utils::timestamp_secs,
};
Expand Down
Loading

0 comments on commit 5dea7d0

Please sign in to comment.