Skip to content

Commit

Permalink
Add target,file,line field to Log
Browse files Browse the repository at this point in the history
  • Loading branch information
Folyd committed Aug 12, 2024
1 parent 53fafef commit e7d8b44
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 5 deletions.
11 changes: 9 additions & 2 deletions duo-api/proto/log.proto
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,15 @@ message Log {
optional uint64 trace_id = 3;
// Describes the level of verbosity of a log.
common.Level level = 4;
// The part of the system that the span that this metadata describes
// occurred in.
string target = 5;
// The name of the source code file where the log occurred.
optional string file = 6;
// The line number in the source code file where the log occurred.
optional uint32 line = 7;
// Timestamp.
google.protobuf.Timestamp time = 5;
google.protobuf.Timestamp time = 8;
// Key-value fileds.
map<string, common.Value> fields = 6;
map<string, common.Value> fields = 9;
}
3 changes: 3 additions & 0 deletions duo-subscriber/src/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ where
process_id: String::new(),
span_id,
trace_id,
target: metadata.target().into(),
file: metadata.file().map(Into::into),
line: metadata.line(),
level: proto::Level::from(*metadata.level()) as i32,
time: Some(SystemTime::now().into()),
fields,
Expand Down
7 changes: 5 additions & 2 deletions duo/src/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,14 @@ pub fn convert_log_to_record_batch(logs: Vec<Log>) -> Result<RecordBatch> {
map.insert("span_id".into(), log.span_id.into());
map.insert("trace_id".into(), log.trace_id.into());
map.insert("level".into(), log.level.as_str().into());
map.insert("target".into(), log.target.into());
map.insert("file".into(), log.file.into());
map.insert("line".into(), log.line.into());
map.insert("time".into(), time.into());
map.insert("message".into(), log.message.into());
let mut field_map = Map::new();
for (key, value) in log.fields {
field_map.insert(key.clone(), value.clone());
field_map.insert(key, value);
}

if !field_map.is_empty() {
Expand All @@ -76,7 +79,7 @@ pub fn convert_log_to_record_batch(logs: Vec<Log>) -> Result<RecordBatch> {
data.push(JsonValue::Object(map));
}

let inferred_field_schema = infer_json_schema_from_iterator(fields.iter().map(Ok))?;
let inferred_field_schema = infer_json_schema_from_iterator(fields.iter().map(Ok)).unwrap();
let schema = Schema::try_merge(vec![
(*schema::get_log_schema()).clone(),
inferred_field_schema,
Expand Down
6 changes: 6 additions & 0 deletions duo/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ pub struct Log {
// TODO: change level to i32
#[serde(with = "deser::level")]
pub level: Level,
pub target: String,
pub file: Option<String>,
pub line: Option<u32>,
#[serde(with = "deser::miscrosecond")]
pub time: OffsetDateTime,
pub message: String,
Expand Down Expand Up @@ -163,6 +166,9 @@ impl From<proto::Log> for Log {
span_id: log.span_id,
trace_id: log.trace_id,
level,
target: log.target,
file: log.file,
line: log.line,
time: log
.time
.and_then(|timestamp| {
Expand Down
8 changes: 7 additions & 1 deletion duo/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ fn default_log_schema() -> Arc<Schema> {
Field::new("trace_id", DataType::UInt64, true),
Field::new("span_id", DataType::UInt64, true),
Field::new("level", DataType::Utf8, false),
Field::new("target", DataType::Utf8, true),
Field::new("file", DataType::Utf8, true),
Field::new("line", DataType::UInt32, true),
Field::new("message", DataType::Utf8, true),
]))
}
Expand All @@ -51,8 +54,11 @@ pub async fn load() -> Result<()> {
{
Ok(data) => {
let schema = serde_json::from_slice::<Schema>(&data.bytes().await?)?;
let latest_schema =
Schema::try_merge(vec![(*default_log_schema()).clone(), schema]).unwrap();
LOG_SCHEMA_DIRTY.store(true, Ordering::Relaxed);
LOG_SCHEMA
.set(RwLock::new(Arc::new(schema)))
.set(RwLock::new(Arc::new(latest_schema)))
.expect("LogSchema already initialized");
}
Err(_err) => {
Expand Down
2 changes: 2 additions & 0 deletions duo/src/web/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ pub(super) struct QueryParameters {
pub start: Option<OffsetDateTime>,
#[serde(default, deserialize_with = "deser::option_miscrosecond")]
pub end: Option<OffsetDateTime>,
keyword: Option<String>,
level: Option<String>,
}

#[tracing::instrument]
Expand Down
7 changes: 7 additions & 0 deletions duo/src/web/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,13 @@ impl<'a> Serialize for JaegerLog<'a> {
let mut fields = HashMap::new();
fields.insert("message".into(), self.0.message.clone().into());
fields.insert("level".into(), self.0.level.as_str().into());
fields.insert("target".into(), self.0.target.as_str().into());
if let Some(file) = &self.0.file {
fields.insert(
"file".into(),
format!("{}:{}", &file, self.0.line.unwrap_or_default()).into(),
);
}
fields.extend(self.0.fields.clone());
map.serialize_entry(
"fields",
Expand Down

0 comments on commit e7d8b44

Please sign in to comment.