Skip to content

Commit

Permalink
fix(metrics): collect metrics for custom services [fixes NET-438] (#1549
Browse files Browse the repository at this point in the history
)
  • Loading branch information
kmd-fl authored Apr 6, 2023
1 parent 5711171 commit ed1ce37
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 107 deletions.
88 changes: 1 addition & 87 deletions crates/particle-node-tests/tests/script_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,12 @@ extern crate fstrings;
use eyre::WrapErr;
use fstrings::f;
use maplit::hashmap;
use serde_json::json;
use serde_json::Value as JValue;
use serde_json::{json, Value};

use connected_client::ConnectedClient;
use created_swarm::make_swarms;
use humantime_serde::re::humantime::format_duration;
use now_millis::now;
use service_modules::load_module;
use std::time::Duration;
use test_utils::{create_service, CreatedService};

#[tokio::test]
Expand Down Expand Up @@ -219,89 +216,6 @@ async fn autoremove_singleshot() {
assert_eq!(list, vec![serde_json::Value::Array(vec![])]);
}

async fn get_list(client: &mut ConnectedClient) -> Vec<Value> {
let list_id = client.send_particle(
r#"
(seq
(call relay ("script" "list") [] list)
(call client ("op" "return") [list])
)
"#,
hashmap! {
"relay" => json ! (client.node.to_string()),
"client" => json ! (client.peer_id.to_string()),
},
);

client.wait_particle_args(list_id).await.unwrap()
}

#[tokio::test]
async fn autoremove_failed() {
let swarms = make_swarms(1).await;

let mut client = ConnectedClient::connect_to(swarms[0].multiaddr.clone())
.await
.wrap_err("connect client")
.unwrap();

let script = f!(r#"
INVALID SCRIPT
"#);

client.send_particle(
r#"
(call relay ("script" "add") [script])
"#,
hashmap! {
"relay" => json!(client.node.to_string()),
"client" => json!(client.peer_id.to_string()),
"script" => json!(script),
},
);

let timeout = Duration::from_secs(5);
let deadline = now() + timeout;

// wait for script to appear in the list
while now() < deadline {
let list = get_list(&mut client).await;

if list.is_empty() {
continue;
}

if let JValue::Array(arr) = &list[0] {
if arr.is_empty() {
continue;
}
let failures = arr[0].get("failures");
assert_eq!(failures, Some(&json!(0)));
break;
} else {
panic!("expected array");
}
}

if now() >= deadline {
panic!("timed out adding script after {}", format_duration(timeout));
}

// wait for script to disappear from the list
while now() < deadline {
let list = get_list(&mut client).await;
if list == vec![serde_json::Value::Array(vec![])] {
return;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}

panic!(
"failed script wasn't deleted after {}",
format_duration(timeout)
);
}

#[tokio::test]
async fn remove_script_unauth() {
let swarms = make_swarms(1).await;
Expand Down
2 changes: 1 addition & 1 deletion crates/peer-metrics/src/services_metrics/builtin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::services_metrics::message::ServiceCallStats;
type ServiceId = String;
type Name = String;

/// Store a part of series of numeric observations and some parameters that desribe the series.
/// Store a part of series of numeric observations and some parameters that describe the series.
/// The number of stored observations is now a constant MAX_METRICS_STORAGE_SIZE.
#[derive(Default, Debug, Clone, Serialize)]
pub struct NumericSeriesStat {
Expand Down
8 changes: 7 additions & 1 deletion crates/peer-metrics/src/services_metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,12 @@ impl ServicesMetrics {
});
}

pub fn observe_created_failed(&self) {
self.observe_external(|external| {
external.creation_failure_count.inc();
});
}

pub fn observe_removed(&self, removal_time: f64) {
self.observe_external(|external| {
external.observe_removed(removal_time);
Expand All @@ -167,7 +173,7 @@ impl ServicesMetrics {
});
}

pub fn observe_external<F>(&self, callback: F)
fn observe_external<F>(&self, callback: F)
where
F: FnOnce(&ServicesMetricsExternal),
{
Expand Down
15 changes: 10 additions & 5 deletions particle-builtins/src/builtins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,19 @@ where
}

pub async fn call(&self, args: Args, particle: ParticleParams) -> FunctionOutcome {
let start = Instant::now();
let mut start = Instant::now();
let result = self.builtins_call(args, particle).await;
let result = match result {
FunctionOutcome::NotDefined { args, params } => {
start = Instant::now();
self.custom_service_call(args, params).await
}
result => result,
};
let end = start.elapsed().as_secs();

match result {
FunctionOutcome::NotDefined { args, params } => self
.custom_service_call(args, params)
.await
.or_else(|args, params| self.call_service(args, params)),
FunctionOutcome::NotDefined { args, params } => self.call_service(args, params),
result => {
if let Some(metrics) = self.services.metrics.as_ref() {
metrics.observe_builtins(result.not_err(), end as f64);
Expand Down
24 changes: 11 additions & 13 deletions particle-services/src/app_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -483,17 +483,17 @@ impl ParticleAppServices {
})?;

let call_time_sec = call_time_start.elapsed().as_secs_f64();
let new_memory = service.module_memory_stats();
let new_memory_usage = ServicesMetricsBuiltin::get_used_memory(&new_memory);

let memory_delta_bytes = new_memory_usage - old_mem_usage;
let stats = ServiceCallStats::Success {
memory_delta_bytes: memory_delta_bytes as f64,
call_time_sec,
timestamp,
};

if let Some(metrics) = self.metrics.as_ref() {
let new_memory = service.module_memory_stats();
let new_memory_usage = ServicesMetricsBuiltin::get_used_memory(&new_memory);

let memory_delta_bytes = new_memory_usage - old_mem_usage;
let stats = ServiceCallStats::Success {
memory_delta_bytes: memory_delta_bytes as f64,
call_time_sec,
timestamp,
};

metrics.observe_service_state(
service_id,
function_name,
Expand Down Expand Up @@ -823,9 +823,7 @@ impl ParticleAppServices {
)
.inspect_err(|_| {
if let Some(metrics) = self.metrics.as_ref() {
metrics.observe_external(|external| {
external.creation_failure_count.inc();
})
metrics.observe_created_failed();
}
})?;
let stats = service.module_memory_stats();
Expand Down

0 comments on commit ed1ce37

Please sign in to comment.