diff --git a/crates/cdk/src/nuts/nut17.rs b/crates/cdk/src/nuts/nut17.rs index d717dd92..5494d8e2 100644 --- a/crates/cdk/src/nuts/nut17.rs +++ b/crates/cdk/src/nuts/nut17.rs @@ -1,4 +1,5 @@ //! Specific Subscription for the cdk crate +use super::{MeltQuoteBolt11Response, MintQuoteBolt11Response}; use crate::{ nuts::ProofState, subscription::{self, Index, Indexable, SubId}, @@ -14,12 +15,53 @@ pub struct Params { id: SubId, } -impl Indexable for ProofState { +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(untagged)] +pub enum SubscriptionResponse { + ProofState(ProofState), + MeltQuoteBolt11Response(MeltQuoteBolt11Response), + MintQuoteBolt11Response(MintQuoteBolt11Response), +} + +impl From for SubscriptionResponse { + fn from(proof_state: ProofState) -> SubscriptionResponse { + SubscriptionResponse::ProofState(proof_state) + } +} + +impl From for SubscriptionResponse { + fn from(melt_quote: MeltQuoteBolt11Response) -> SubscriptionResponse { + SubscriptionResponse::MeltQuoteBolt11Response(melt_quote) + } +} + +impl From for SubscriptionResponse { + fn from(mint_quote: MintQuoteBolt11Response) -> SubscriptionResponse { + SubscriptionResponse::MintQuoteBolt11Response(mint_quote) + } +} + +impl Indexable for SubscriptionResponse { type Type = (String, Kind); fn to_indexes(&self) -> Vec> { - // convert the event to a list of indexes - todo!() + match self { + SubscriptionResponse::ProofState(proof_state) => { + vec![Index::from((proof_state.y.to_hex(), Kind::ProofState))] + } + SubscriptionResponse::MeltQuoteBolt11Response(melt_quote) => { + vec![Index::from(( + melt_quote.quote.clone(), + Kind::Bolt11MeltQuote, + ))] + } + SubscriptionResponse::MintQuoteBolt11Response(mint_quote) => { + vec![Index::from(( + mint_quote.quote.clone(), + Kind::Bolt11MintQuote, + ))] + } + } } } @@ -34,23 +76,23 @@ pub enum Kind { ProofState, } -impl Into for &Params { - fn into(self) -> SubId { - self.id.clone() +impl AsRef for Params { + fn as_ref(&self) -> &SubId { + &self.id } } -impl Into>> for &Params { +impl Into>> for Params { fn into(self) -> Vec> { self.filters .iter() - .map(|filter| Index::from((filter.clone(), self.kind))) + .map(|filter| Index::from(((filter.clone(), self.kind), self.id.clone()))) .collect() } } /// Manager -pub type Manager = subscription::Manager; +pub type Manager = subscription::Manager; #[cfg(test)] mod test { @@ -74,8 +116,8 @@ mod test { // responsability of the implementor to make sure that SubId are unique // either globally or per client let subscriptions = vec![ - manager.subscribe(¶ms, ¶ms).await, - manager.subscribe(¶ms, ¶ms).await, + manager.subscribe(params.clone()).await, + manager.subscribe(params).await, ]; assert_eq!(2, manager.active_subscriptions()); drop(subscriptions); @@ -88,19 +130,27 @@ mod test { #[tokio::test] async fn broadcast() { let manager = Manager::default(); - let params = Params { - kind: Kind::ProofState, - filters: vec!["x".to_string()], - id: "uno".into(), - }; - - // Although the same param is used, two subscriptions are created, that - // is because each index is unique, thanks to `Unique`, it is the - // responsability of the implementor to make sure that SubId are unique - // either globally or per client let mut subscriptions = vec![ - manager.subscribe(¶ms, ¶ms).await, - manager.subscribe(¶ms, ¶ms).await, + manager + .subscribe(Params { + kind: Kind::ProofState, + filters: vec![ + "02194603ffa36356f4a56b7df9371fc3192472351453ec7398b8da8117e7c3e104" + .to_string(), + ], + id: "uno".into(), + }) + .await, + manager + .subscribe(Params { + kind: Kind::ProofState, + filters: vec![ + "02194603ffa36356f4a56b7df9371fc3192472351453ec7398b8da8117e7c3e104" + .to_string(), + ], + id: "dos".into(), + }) + .await, ]; let event = ProofState { @@ -112,10 +162,17 @@ mod test { witness: None, }; - manager.broadcast(event); + manager.broadcast(event.into()); sleep(Duration::from_millis(10)).await; - let x = subscriptions[1].try_recv().expect("valid message"); + let (sub1, _) = subscriptions[0].try_recv().expect("valid message"); + assert_eq!("uno", *sub1); + + let (sub1, _) = subscriptions[1].try_recv().expect("valid message"); + assert_eq!("dos", *sub1); + + assert!(subscriptions[0].try_recv().is_err()); + assert!(subscriptions[1].try_recv().is_err()); } } diff --git a/crates/cdk/src/subscription/index.rs b/crates/cdk/src/subscription/index.rs index 20ba2e21..6a9729df 100644 --- a/crates/cdk/src/subscription/index.rs +++ b/crates/cdk/src/subscription/index.rs @@ -1,12 +1,13 @@ use super::SubId; use std::{ + fmt::Debug, ops::Deref, sync::atomic::{AtomicUsize, Ordering}, }; /// Indexable trait pub trait Indexable { - type Type: PartialOrd + Ord + Send + Sync; + type Type: PartialOrd + Ord + Send + Sync + Debug; /// To indexes fn to_indexes(&self) -> Vec>; @@ -15,19 +16,22 @@ pub trait Indexable { #[derive(Debug, Ord, PartialOrd, PartialEq, Eq, Clone)] /// Index /// -/// The Index is a sorted structure that +/// The Index is a sorted structure that is used to quickly find matches +/// +/// The counter is used to make sure each Index is unique, even if the prefix +/// are the same, and also to make sure that ealier indexes matches first pub struct Index where - T: PartialOrd + Ord + Send + Sync, + T: PartialOrd + Ord + Send + Sync + Debug, { prefix: T, + counter: Unique, id: super::SubId, - _unique: Unique, } impl Into for &Index where - T: PartialOrd + Ord + Send + Sync, + T: PartialOrd + Ord + Send + Sync + Debug, { fn into(self) -> super::SubId { self.id.clone() @@ -36,7 +40,7 @@ where impl Deref for Index where - T: PartialOrd + Ord + Send + Sync, + T: PartialOrd + Ord + Send + Sync + Debug, { type Target = T; @@ -47,7 +51,7 @@ where impl Index where - T: PartialOrd + Ord + Send + Sync, + T: PartialOrd + Ord + Send + Sync + Debug, { /// Compare the pub fn cmp_prefix(&self, other: &Index) -> std::cmp::Ordering { @@ -57,26 +61,26 @@ where impl From<(T, SubId)> for Index where - T: PartialOrd + Ord + Send + Sync, + T: PartialOrd + Ord + Send + Sync + Debug, { fn from((prefix, id): (T, SubId)) -> Self { Self { prefix, id, - _unique: Default::default(), + counter: Default::default(), } } } impl From for Index where - T: PartialOrd + Ord + Send + Sync, + T: PartialOrd + Ord + Send + Sync + Debug, { fn from(prefix: T) -> Self { Self { prefix, id: Default::default(), - _unique: Default::default(), + counter: Unique(0), } } } @@ -90,6 +94,8 @@ static COUNTER: AtomicUsize = AtomicUsize::new(0); /// The prefix is used to leverage the BTree to find things quickly, but each /// entry/key must be unique, so we use this dummy type to make sure each Index /// is unique. +/// +/// Unique is also used to make sure that the indexes are sorted by creation order #[derive(Debug, Ord, PartialOrd, PartialEq, Eq, Clone)] struct Unique(usize); diff --git a/crates/cdk/src/subscription/mod.rs b/crates/cdk/src/subscription/mod.rs index c3fdbd37..cb3785b9 100644 --- a/crates/cdk/src/subscription/mod.rs +++ b/crates/cdk/src/subscription/mod.rs @@ -5,6 +5,7 @@ use serde::{Deserialize, Serialize}; use std::{ cmp::Ordering, collections::BTreeMap, + fmt::Debug, ops::{Deref, DerefMut}, str::FromStr, sync::{ @@ -40,7 +41,7 @@ pub const DEFAULT_CHANNEL_SIZE: usize = 10; pub struct Manager where T: Indexable + Clone + Send + Sync + 'static, - I: PartialOrd + Clone + Ord + Send + Sync + 'static, + I: PartialOrd + Clone + Debug + Ord + Send + Sync + 'static, { indexes: IndexTree, unsubscription_sender: mpsc::Sender<(SubId, Vec>)>, @@ -51,7 +52,7 @@ where impl Default for Manager where T: Indexable + Clone + Send + Sync + 'static, - I: Clone + PartialOrd + Ord + Send + Sync + 'static, + I: PartialOrd + Clone + Debug + Ord + Send + Sync + 'static, { fn default() -> Self { let (sender, receiver) = mpsc::channel(DEFAULT_REMOVE_SIZE); @@ -74,7 +75,7 @@ where impl Manager where T: Indexable + Clone + Send + Sync + 'static, - I: Clone + PartialOrd + Ord + Send + Sync + 'static, + I: Clone + Debug + PartialOrd + Ord + Send + Sync + 'static, { #[inline] /// Broadcast an event to all listeners @@ -84,6 +85,8 @@ where async fn broadcast_impl(storage: &IndexTree, event: T) { let index_storage = storage.read().await; for index in event.to_indexes() { + println!("{:?}", index); + println!("{:?}", index_storage.keys().collect::>()); for (key, sender) in index_storage.range(index.clone()..) { if index.cmp_prefix(&key) != Ordering::Equal { break; @@ -112,13 +115,13 @@ where } /// Subscribe to a specific event - pub async fn subscribe, P: Into>>>( + pub async fn subscribe + Into>>>( &self, - sub_id: SI, params: P, ) -> ActiveSubscription { let (sender, receiver) = mpsc::channel(10); - let indexes = params.into(); + let sub_id: SubId = params.as_ref().clone(); + let indexes: Vec> = params.into(); let mut index_storage = self.indexes.write().await; for index in indexes.clone() { @@ -169,7 +172,7 @@ where impl Drop for Manager where T: Indexable + Clone + Send + Sync + 'static, - I: Clone + PartialOrd + Ord + Send + Sync + 'static, + I: Clone + Debug + PartialOrd + Ord + Send + Sync + 'static, { fn drop(&mut self) { for task in self.background_tasks.drain(..) { @@ -187,7 +190,7 @@ where pub struct ActiveSubscription where T: Send + Sync, - I: Clone + PartialOrd + Ord + Send + Sync + 'static, + I: Clone + Debug + PartialOrd + Ord + Send + Sync + 'static, { /// The subscription ID pub id: SubId, @@ -199,7 +202,7 @@ where impl Deref for ActiveSubscription where T: Send + Sync, - I: Clone + PartialOrd + Ord + Send + Sync + 'static, + I: Clone + Debug + PartialOrd + Ord + Send + Sync + 'static, { type Target = mpsc::Receiver<(SubId, T)>; @@ -211,7 +214,7 @@ where impl DerefMut for ActiveSubscription where T: Indexable + Clone + Send + Sync + 'static, - I: Clone + PartialOrd + Ord + Send + Sync + 'static, + I: Clone + Debug + PartialOrd + Ord + Send + Sync + 'static, { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.receiver @@ -226,7 +229,7 @@ where impl Drop for ActiveSubscription where T: Send + Sync, - I: Clone + PartialOrd + Ord + Send + Sync + 'static, + I: Clone + Debug + PartialOrd + Ord + Send + Sync + 'static, { fn drop(&mut self) { self.drop