Skip to content

Commit

Permalink
First functional prototype
Browse files Browse the repository at this point in the history
  • Loading branch information
crodas committed Oct 9, 2024
1 parent 50653fe commit 3cbc5af
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 47 deletions.
107 changes: 82 additions & 25 deletions crates/cdk/src/nuts/nut17.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//! Specific Subscription for the cdk crate
use super::{MeltQuoteBolt11Response, MintQuoteBolt11Response};
use crate::{
nuts::ProofState,
subscription::{self, Index, Indexable, SubId},
Expand All @@ -14,12 +15,53 @@ pub struct Params {
id: SubId,
}

impl Indexable for ProofState {
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(untagged)]
pub enum SubscriptionResponse {
ProofState(ProofState),
MeltQuoteBolt11Response(MeltQuoteBolt11Response),
MintQuoteBolt11Response(MintQuoteBolt11Response),
}

impl From<ProofState> for SubscriptionResponse {
fn from(proof_state: ProofState) -> SubscriptionResponse {
SubscriptionResponse::ProofState(proof_state)
}
}

impl From<MeltQuoteBolt11Response> for SubscriptionResponse {
fn from(melt_quote: MeltQuoteBolt11Response) -> SubscriptionResponse {
SubscriptionResponse::MeltQuoteBolt11Response(melt_quote)
}
}

impl From<MintQuoteBolt11Response> for SubscriptionResponse {
fn from(mint_quote: MintQuoteBolt11Response) -> SubscriptionResponse {
SubscriptionResponse::MintQuoteBolt11Response(mint_quote)
}
}

impl Indexable for SubscriptionResponse {
type Type = (String, Kind);

fn to_indexes(&self) -> Vec<Index<Self::Type>> {
// convert the event to a list of indexes
todo!()
match self {
SubscriptionResponse::ProofState(proof_state) => {
vec![Index::from((proof_state.y.to_hex(), Kind::ProofState))]
}
SubscriptionResponse::MeltQuoteBolt11Response(melt_quote) => {
vec![Index::from((
melt_quote.quote.clone(),
Kind::Bolt11MeltQuote,
))]
}
SubscriptionResponse::MintQuoteBolt11Response(mint_quote) => {
vec![Index::from((
mint_quote.quote.clone(),
Kind::Bolt11MintQuote,
))]
}
}
}
}

Expand All @@ -34,23 +76,23 @@ pub enum Kind {
ProofState,
}

impl Into<SubId> for &Params {
fn into(self) -> SubId {
self.id.clone()
impl AsRef<SubId> for Params {
fn as_ref(&self) -> &SubId {
&self.id
}
}

impl Into<Vec<Index<(String, Kind)>>> for &Params {
impl Into<Vec<Index<(String, Kind)>>> for Params {
fn into(self) -> Vec<Index<(String, Kind)>> {
self.filters
.iter()
.map(|filter| Index::from((filter.clone(), self.kind)))
.map(|filter| Index::from(((filter.clone(), self.kind), self.id.clone())))
.collect()
}
}

/// Manager
pub type Manager = subscription::Manager<ProofState, (String, Kind)>;
pub type Manager = subscription::Manager<SubscriptionResponse, (String, Kind)>;

#[cfg(test)]
mod test {
Expand All @@ -74,8 +116,8 @@ mod test {
// responsability of the implementor to make sure that SubId are unique
// either globally or per client
let subscriptions = vec![
manager.subscribe(&params, &params).await,
manager.subscribe(&params, &params).await,
manager.subscribe(params.clone()).await,
manager.subscribe(params).await,
];
assert_eq!(2, manager.active_subscriptions());
drop(subscriptions);
Expand All @@ -88,19 +130,27 @@ mod test {
#[tokio::test]
async fn broadcast() {
let manager = Manager::default();
let params = Params {
kind: Kind::ProofState,
filters: vec!["x".to_string()],
id: "uno".into(),
};

// Although the same param is used, two subscriptions are created, that
// is because each index is unique, thanks to `Unique`, it is the
// responsability of the implementor to make sure that SubId are unique
// either globally or per client
let mut subscriptions = vec![
manager.subscribe(&params, &params).await,
manager.subscribe(&params, &params).await,
manager
.subscribe(Params {
kind: Kind::ProofState,
filters: vec![
"02194603ffa36356f4a56b7df9371fc3192472351453ec7398b8da8117e7c3e104"
.to_string(),
],
id: "uno".into(),
})
.await,
manager
.subscribe(Params {
kind: Kind::ProofState,
filters: vec![
"02194603ffa36356f4a56b7df9371fc3192472351453ec7398b8da8117e7c3e104"
.to_string(),
],
id: "dos".into(),
})
.await,
];

let event = ProofState {
Expand All @@ -112,10 +162,17 @@ mod test {
witness: None,
};

manager.broadcast(event);
manager.broadcast(event.into());

sleep(Duration::from_millis(10)).await;

let x = subscriptions[1].try_recv().expect("valid message");
let (sub1, _) = subscriptions[0].try_recv().expect("valid message");
assert_eq!("uno", *sub1);

let (sub1, _) = subscriptions[1].try_recv().expect("valid message");
assert_eq!("dos", *sub1);

assert!(subscriptions[0].try_recv().is_err());
assert!(subscriptions[1].try_recv().is_err());
}
}
28 changes: 17 additions & 11 deletions crates/cdk/src/subscription/index.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use super::SubId;
use std::{
fmt::Debug,
ops::Deref,
sync::atomic::{AtomicUsize, Ordering},
};

/// Indexable trait
pub trait Indexable {
type Type: PartialOrd + Ord + Send + Sync;
type Type: PartialOrd + Ord + Send + Sync + Debug;

/// To indexes
fn to_indexes(&self) -> Vec<Index<Self::Type>>;
Expand All @@ -15,19 +16,22 @@ pub trait Indexable {
#[derive(Debug, Ord, PartialOrd, PartialEq, Eq, Clone)]
/// Index
///
/// The Index is a sorted structure that
/// The Index is a sorted structure that is used to quickly find matches
///
/// The counter is used to make sure each Index is unique, even if the prefix
/// are the same, and also to make sure that ealier indexes matches first
pub struct Index<T>
where
T: PartialOrd + Ord + Send + Sync,
T: PartialOrd + Ord + Send + Sync + Debug,
{
prefix: T,
counter: Unique,
id: super::SubId,
_unique: Unique,
}

impl<T> Into<super::SubId> for &Index<T>
where
T: PartialOrd + Ord + Send + Sync,
T: PartialOrd + Ord + Send + Sync + Debug,
{
fn into(self) -> super::SubId {
self.id.clone()
Expand All @@ -36,7 +40,7 @@ where

impl<T> Deref for Index<T>
where
T: PartialOrd + Ord + Send + Sync,
T: PartialOrd + Ord + Send + Sync + Debug,
{
type Target = T;

Expand All @@ -47,7 +51,7 @@ where

impl<T> Index<T>
where
T: PartialOrd + Ord + Send + Sync,
T: PartialOrd + Ord + Send + Sync + Debug,
{
/// Compare the
pub fn cmp_prefix(&self, other: &Index<T>) -> std::cmp::Ordering {
Expand All @@ -57,26 +61,26 @@ where

impl<T> From<(T, SubId)> for Index<T>
where
T: PartialOrd + Ord + Send + Sync,
T: PartialOrd + Ord + Send + Sync + Debug,
{
fn from((prefix, id): (T, SubId)) -> Self {
Self {
prefix,
id,
_unique: Default::default(),
counter: Default::default(),
}
}
}

impl<T> From<T> for Index<T>
where
T: PartialOrd + Ord + Send + Sync,
T: PartialOrd + Ord + Send + Sync + Debug,
{
fn from(prefix: T) -> Self {
Self {
prefix,
id: Default::default(),
_unique: Default::default(),
counter: Unique(0),
}
}
}
Expand All @@ -90,6 +94,8 @@ static COUNTER: AtomicUsize = AtomicUsize::new(0);
/// The prefix is used to leverage the BTree to find things quickly, but each
/// entry/key must be unique, so we use this dummy type to make sure each Index
/// is unique.
///
/// Unique is also used to make sure that the indexes are sorted by creation order
#[derive(Debug, Ord, PartialOrd, PartialEq, Eq, Clone)]
struct Unique(usize);

Expand Down
25 changes: 14 additions & 11 deletions crates/cdk/src/subscription/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use serde::{Deserialize, Serialize};
use std::{
cmp::Ordering,
collections::BTreeMap,
fmt::Debug,
ops::{Deref, DerefMut},
str::FromStr,
sync::{
Expand Down Expand Up @@ -40,7 +41,7 @@ pub const DEFAULT_CHANNEL_SIZE: usize = 10;
pub struct Manager<T, I>
where
T: Indexable<Type = I> + Clone + Send + Sync + 'static,
I: PartialOrd + Clone + Ord + Send + Sync + 'static,
I: PartialOrd + Clone + Debug + Ord + Send + Sync + 'static,
{
indexes: IndexTree<T, I>,
unsubscription_sender: mpsc::Sender<(SubId, Vec<Index<I>>)>,
Expand All @@ -51,7 +52,7 @@ where
impl<T, I> Default for Manager<T, I>
where
T: Indexable<Type = I> + Clone + Send + Sync + 'static,
I: Clone + PartialOrd + Ord + Send + Sync + 'static,
I: PartialOrd + Clone + Debug + Ord + Send + Sync + 'static,
{
fn default() -> Self {
let (sender, receiver) = mpsc::channel(DEFAULT_REMOVE_SIZE);
Expand All @@ -74,7 +75,7 @@ where
impl<T, I> Manager<T, I>
where
T: Indexable<Type = I> + Clone + Send + Sync + 'static,
I: Clone + PartialOrd + Ord + Send + Sync + 'static,
I: Clone + Debug + PartialOrd + Ord + Send + Sync + 'static,
{
#[inline]
/// Broadcast an event to all listeners
Expand All @@ -84,6 +85,8 @@ where
async fn broadcast_impl(storage: &IndexTree<T, I>, event: T) {
let index_storage = storage.read().await;
for index in event.to_indexes() {
println!("{:?}", index);
println!("{:?}", index_storage.keys().collect::<Vec<_>>());
for (key, sender) in index_storage.range(index.clone()..) {
if index.cmp_prefix(&key) != Ordering::Equal {
break;
Expand Down Expand Up @@ -112,13 +115,13 @@ where
}

/// Subscribe to a specific event
pub async fn subscribe<SI: Into<SubId>, P: Into<Vec<Index<I>>>>(
pub async fn subscribe<P: AsRef<SubId> + Into<Vec<Index<I>>>>(
&self,
sub_id: SI,
params: P,
) -> ActiveSubscription<T, I> {
let (sender, receiver) = mpsc::channel(10);
let indexes = params.into();
let sub_id: SubId = params.as_ref().clone();
let indexes: Vec<Index<I>> = params.into();

let mut index_storage = self.indexes.write().await;
for index in indexes.clone() {
Expand Down Expand Up @@ -169,7 +172,7 @@ where
impl<T, I> Drop for Manager<T, I>
where
T: Indexable<Type = I> + Clone + Send + Sync + 'static,
I: Clone + PartialOrd + Ord + Send + Sync + 'static,
I: Clone + Debug + PartialOrd + Ord + Send + Sync + 'static,
{
fn drop(&mut self) {
for task in self.background_tasks.drain(..) {
Expand All @@ -187,7 +190,7 @@ where
pub struct ActiveSubscription<T, I>
where
T: Send + Sync,
I: Clone + PartialOrd + Ord + Send + Sync + 'static,
I: Clone + Debug + PartialOrd + Ord + Send + Sync + 'static,
{
/// The subscription ID
pub id: SubId,
Expand All @@ -199,7 +202,7 @@ where
impl<T, I> Deref for ActiveSubscription<T, I>
where
T: Send + Sync,
I: Clone + PartialOrd + Ord + Send + Sync + 'static,
I: Clone + Debug + PartialOrd + Ord + Send + Sync + 'static,
{
type Target = mpsc::Receiver<(SubId, T)>;

Expand All @@ -211,7 +214,7 @@ where
impl<T, I> DerefMut for ActiveSubscription<T, I>
where
T: Indexable + Clone + Send + Sync + 'static,
I: Clone + PartialOrd + Ord + Send + Sync + 'static,
I: Clone + Debug + PartialOrd + Ord + Send + Sync + 'static,
{
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.receiver
Expand All @@ -226,7 +229,7 @@ where
impl<T, I> Drop for ActiveSubscription<T, I>
where
T: Send + Sync,
I: Clone + PartialOrd + Ord + Send + Sync + 'static,
I: Clone + Debug + PartialOrd + Ord + Send + Sync + 'static,
{
fn drop(&mut self) {
self.drop
Expand Down

0 comments on commit 3cbc5af

Please sign in to comment.