Skip to content

Commit

Permalink
fix: cargo.toml
Browse files Browse the repository at this point in the history
Signed-off-by: Hergy Fongue <[email protected]>
  • Loading branch information
rjtch committed Mar 10, 2024
1 parent 001d7ad commit 448fdc5
Show file tree
Hide file tree
Showing 7 changed files with 12 additions and 113 deletions.
3 changes: 1 addition & 2 deletions cdviz-collector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ axum-tracing-opentelemetry = { version = "0.18", optional = true }
cdevents-sdk = { git = "https://github.com/cdevents/sdk-rust" }
clap = { workspace = true }
clap-verbosity-flag = "2.2.0"
# cloudevents-sdk = { version = "0.7", features = ["axum"] } // not compatible with axum 0.7
cloudevents-sdk = "0.7.0"
chrono = "0.4"
enum_dispatch = "0.3"
figment = { version = "0.10", features = ["toml", "env"] }
Expand Down Expand Up @@ -50,7 +50,6 @@ thiserror = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
tracing-opentelemetry-instrumentation-sdk = { workspace = true }
cloudevents-sdk = "0.7.0"

[dev-dependencies]
axum-test = "14"
Expand Down
25 changes: 0 additions & 25 deletions cdviz-collector/src/middleware/event_filter.rs

This file was deleted.

1 change: 0 additions & 1 deletion cdviz-collector/src/middleware/mod.rs

This file was deleted.

37 changes: 9 additions & 28 deletions cdviz-collector/src/sinks/http.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
use cloudevents::{EventBuilder};
use reqwest::Url;
use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
use serde::{Deserialize, Serialize};
//use reqwest_retry::{RetryTransientMiddleware, policies::ExponentialBackoff};
use crate::errors::Result;
use crate::Message;
use reqwest_tracing::TracingMiddleware;

use super::Sink;
use cloudevents::event::{EventBuilderV10, EventBuilder};

#[derive(Debug, Deserialize, Serialize)]
pub(crate) struct Config {
Expand Down Expand Up @@ -45,20 +44,19 @@ impl HttpSink {
impl Sink for HttpSink {
//TODO use cloudevents
async fn send(&self, msg: &Message) -> Result<()> {
if msg.cd_event.is_some() {
// convert CdEvent to cloudevents
let value = cloudevents::EventBuilderV10::new()
.id(msg.cd_event.clone().unwrap().id())
.ty(msg.cd_event.clone().unwrap().ty())
.source(msg.cd_event.clone().unwrap().source().as_str())
.subject(msg.cd_event.clone().unwrap().subject().id())
// .time(msg.CdEvent.clone().unwrap().timestamp())
.data("application/json", serde_json::to_value(msg.cd_event.clone().unwrap())?)
.build();
.id(msg.cd_event.id())
.ty(msg.cd_event.ty())
.source(msg.cd_event.source().as_str())
.subject(msg.cd_event.subject().id())
.data("application/json", serde_json::to_value(msg.cd_event.clone())?)
.build();

match value {
Ok(value) => {
let json = serde_json::to_value(&value)?;
dbg!("transformed cloudevent: {:?}", json.clone());
let resp = self
.client
.post(self.dest.clone())
Expand All @@ -75,24 +73,7 @@ impl Sink for HttpSink {
Err(err) => {
tracing::warn!(error = ?err, "Failed to convert to cloudevents");
}
};

} else {
let json = serde_json::to_value(&msg.cloud_event.clone().unwrap())?;
println!("json value 2: {:?}", json);
let resp = self
.client
.post(self.dest.clone())
.json(&json)
.send()
.await?;
if !resp.status().is_success() {
tracing::warn!(
cloud_event = ?serde_json::to_value(&msg.cloud_event)?,
http_status = ?resp.status(),
"failed to send event")
}
}
};
Ok(())
}
}
17 changes: 2 additions & 15 deletions cdviz-collector/src/sources/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use serde::{Deserialize, Serialize};
use serde_json::json;
use std::net::{IpAddr, SocketAddr};
use cdevents_sdk::CDEvent;
use crate::middleware::event_filter::{event_filter, EventType};
use crate::sources::Source;

/// The http server config
Expand Down Expand Up @@ -37,14 +36,13 @@ pub(crate) struct HttpSource {

#[derive(Clone)]
struct AppState {
tx: Sender<Message>,
event_type: Extension<EventType>,
tx: Sender<Message>
}

impl Source for HttpSource {
async fn run(&self, tx: Sender<Message>) -> Result<()> {
// set default types
let app_state = AppState { tx, event_type: Extension(EventType::CdEvent)};
let app_state = AppState { tx };

let app = app(app_state);
// run it
Expand All @@ -66,9 +64,6 @@ fn app(app_state: AppState) -> Router {
.route("/cdevents", post(events_collector))
// include trace context as header into the response
.layer(OtelInResponseLayer)
.route_layer(middleware::from_fn(
event_filter,
))
//start OpenTelemetry trace on incoming request
.layer(OtelAxumLayer::default())
.route("/healthz", get(health)) // request processed without span / trace
Expand All @@ -86,20 +81,12 @@ async fn health() -> impl IntoResponse {
//TODO use cloudevents
#[tracing::instrument(skip(app_state, payload))]
async fn events_collector(
Extension(is_event): Extension<EventType>,
State(app_state): State<AppState>,
Json(payload): Json<CDEvent>,
) -> Result<http::StatusCode> {
if is_event.eq(&EventType::CloudEvent) {
// TODO use cloud event
println!("received CloudEvent {:?}", &payload);
Ok(http::StatusCode::CREATED)
} else {
println!("received CdEvent {:?}", &payload);
let message = Message::from(payload);
app_state.tx.send(message)?;
Ok(http::StatusCode::CREATED)
}
}

impl IntoResponse for Error {
Expand Down
22 changes: 0 additions & 22 deletions demos/sample_cloud_event.json

This file was deleted.

20 changes: 0 additions & 20 deletions demos/service_deployed.json

This file was deleted.

0 comments on commit 448fdc5

Please sign in to comment.