Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Nut-17 #394

Draft
wants to merge 17 commits into
base: main
Choose a base branch
from
Prev Previous commit
Next Next commit
First functional prototype
  • Loading branch information
crodas committed Oct 9, 2024
commit 3cbc5af9e6e45e611fae536ee59b521a64d7bdc4
107 changes: 82 additions & 25 deletions crates/cdk/src/nuts/nut17.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//! Specific Subscription for the cdk crate
use super::{MeltQuoteBolt11Response, MintQuoteBolt11Response};
use crate::{
nuts::ProofState,
subscription::{self, Index, Indexable, SubId},
Expand All @@ -14,12 +15,53 @@ pub struct Params {
id: SubId,
}

impl Indexable for ProofState {
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(untagged)]
pub enum SubscriptionResponse {
ProofState(ProofState),
MeltQuoteBolt11Response(MeltQuoteBolt11Response),
MintQuoteBolt11Response(MintQuoteBolt11Response),
}

impl From<ProofState> for SubscriptionResponse {
fn from(proof_state: ProofState) -> SubscriptionResponse {
SubscriptionResponse::ProofState(proof_state)
}
}

impl From<MeltQuoteBolt11Response> for SubscriptionResponse {
fn from(melt_quote: MeltQuoteBolt11Response) -> SubscriptionResponse {
SubscriptionResponse::MeltQuoteBolt11Response(melt_quote)
}
}

impl From<MintQuoteBolt11Response> for SubscriptionResponse {
fn from(mint_quote: MintQuoteBolt11Response) -> SubscriptionResponse {
SubscriptionResponse::MintQuoteBolt11Response(mint_quote)
}
}

impl Indexable for SubscriptionResponse {
type Type = (String, Kind);

fn to_indexes(&self) -> Vec<Index<Self::Type>> {
// convert the event to a list of indexes
todo!()
match self {
SubscriptionResponse::ProofState(proof_state) => {
vec![Index::from((proof_state.y.to_hex(), Kind::ProofState))]
}
SubscriptionResponse::MeltQuoteBolt11Response(melt_quote) => {
vec![Index::from((
melt_quote.quote.clone(),
Kind::Bolt11MeltQuote,
))]
}
SubscriptionResponse::MintQuoteBolt11Response(mint_quote) => {
vec![Index::from((
mint_quote.quote.clone(),
Kind::Bolt11MintQuote,
))]
}
}
}
}

Expand All @@ -34,23 +76,23 @@ pub enum Kind {
ProofState,
}

impl Into<SubId> for &Params {
fn into(self) -> SubId {
self.id.clone()
impl AsRef<SubId> for Params {
fn as_ref(&self) -> &SubId {
&self.id
}
}

impl Into<Vec<Index<(String, Kind)>>> for &Params {
impl Into<Vec<Index<(String, Kind)>>> for Params {
fn into(self) -> Vec<Index<(String, Kind)>> {
self.filters
.iter()
.map(|filter| Index::from((filter.clone(), self.kind)))
.map(|filter| Index::from(((filter.clone(), self.kind), self.id.clone())))
.collect()
}
}

/// Manager
pub type Manager = subscription::Manager<ProofState, (String, Kind)>;
pub type Manager = subscription::Manager<SubscriptionResponse, (String, Kind)>;

#[cfg(test)]
mod test {
Expand All @@ -74,8 +116,8 @@ mod test {
// responsability of the implementor to make sure that SubId are unique
// either globally or per client
let subscriptions = vec![
manager.subscribe(&params, &params).await,
manager.subscribe(&params, &params).await,
manager.subscribe(params.clone()).await,
manager.subscribe(params).await,
];
assert_eq!(2, manager.active_subscriptions());
drop(subscriptions);
Expand All @@ -88,19 +130,27 @@ mod test {
#[tokio::test]
async fn broadcast() {
let manager = Manager::default();
let params = Params {
kind: Kind::ProofState,
filters: vec!["x".to_string()],
id: "uno".into(),
};

// Although the same param is used, two subscriptions are created, that
// is because each index is unique, thanks to `Unique`, it is the
// responsability of the implementor to make sure that SubId are unique
// either globally or per client
let mut subscriptions = vec![
manager.subscribe(&params, &params).await,
manager.subscribe(&params, &params).await,
manager
.subscribe(Params {
kind: Kind::ProofState,
filters: vec![
"02194603ffa36356f4a56b7df9371fc3192472351453ec7398b8da8117e7c3e104"
.to_string(),
],
id: "uno".into(),
})
.await,
manager
.subscribe(Params {
kind: Kind::ProofState,
filters: vec![
"02194603ffa36356f4a56b7df9371fc3192472351453ec7398b8da8117e7c3e104"
.to_string(),
],
id: "dos".into(),
})
.await,
];

let event = ProofState {
Expand All @@ -112,10 +162,17 @@ mod test {
witness: None,
};

manager.broadcast(event);
manager.broadcast(event.into());

sleep(Duration::from_millis(10)).await;

let x = subscriptions[1].try_recv().expect("valid message");
let (sub1, _) = subscriptions[0].try_recv().expect("valid message");
assert_eq!("uno", *sub1);

let (sub1, _) = subscriptions[1].try_recv().expect("valid message");
assert_eq!("dos", *sub1);

assert!(subscriptions[0].try_recv().is_err());
assert!(subscriptions[1].try_recv().is_err());
}
}
28 changes: 17 additions & 11 deletions crates/cdk/src/subscription/index.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use super::SubId;
use std::{
fmt::Debug,
ops::Deref,
sync::atomic::{AtomicUsize, Ordering},
};

/// Indexable trait
pub trait Indexable {
type Type: PartialOrd + Ord + Send + Sync;
type Type: PartialOrd + Ord + Send + Sync + Debug;

/// To indexes
fn to_indexes(&self) -> Vec<Index<Self::Type>>;
Expand All @@ -15,19 +16,22 @@ pub trait Indexable {
#[derive(Debug, Ord, PartialOrd, PartialEq, Eq, Clone)]
/// Index
///
/// The Index is a sorted structure that
/// The Index is a sorted structure that is used to quickly find matches
///
/// The counter is used to make sure each Index is unique, even if the prefix
/// are the same, and also to make sure that ealier indexes matches first
pub struct Index<T>
where
T: PartialOrd + Ord + Send + Sync,
T: PartialOrd + Ord + Send + Sync + Debug,
{
prefix: T,
counter: Unique,
id: super::SubId,
_unique: Unique,
}

impl<T> Into<super::SubId> for &Index<T>
where
T: PartialOrd + Ord + Send + Sync,
T: PartialOrd + Ord + Send + Sync + Debug,
{
fn into(self) -> super::SubId {
self.id.clone()
Expand All @@ -36,7 +40,7 @@ where

impl<T> Deref for Index<T>
where
T: PartialOrd + Ord + Send + Sync,
T: PartialOrd + Ord + Send + Sync + Debug,
{
type Target = T;

Expand All @@ -47,7 +51,7 @@ where

impl<T> Index<T>
where
T: PartialOrd + Ord + Send + Sync,
T: PartialOrd + Ord + Send + Sync + Debug,
{
/// Compare the
pub fn cmp_prefix(&self, other: &Index<T>) -> std::cmp::Ordering {
Expand All @@ -57,26 +61,26 @@ where

impl<T> From<(T, SubId)> for Index<T>
where
T: PartialOrd + Ord + Send + Sync,
T: PartialOrd + Ord + Send + Sync + Debug,
{
fn from((prefix, id): (T, SubId)) -> Self {
Self {
prefix,
id,
_unique: Default::default(),
counter: Default::default(),
}
}
}

impl<T> From<T> for Index<T>
where
T: PartialOrd + Ord + Send + Sync,
T: PartialOrd + Ord + Send + Sync + Debug,
{
fn from(prefix: T) -> Self {
Self {
prefix,
id: Default::default(),
_unique: Default::default(),
counter: Unique(0),
}
}
}
Expand All @@ -90,6 +94,8 @@ static COUNTER: AtomicUsize = AtomicUsize::new(0);
/// The prefix is used to leverage the BTree to find things quickly, but each
/// entry/key must be unique, so we use this dummy type to make sure each Index
/// is unique.
///
/// Unique is also used to make sure that the indexes are sorted by creation order
#[derive(Debug, Ord, PartialOrd, PartialEq, Eq, Clone)]
struct Unique(usize);

Expand Down
25 changes: 14 additions & 11 deletions crates/cdk/src/subscription/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use serde::{Deserialize, Serialize};
use std::{
cmp::Ordering,
collections::BTreeMap,
fmt::Debug,
ops::{Deref, DerefMut},
str::FromStr,
sync::{
Expand Down Expand Up @@ -40,7 +41,7 @@ pub const DEFAULT_CHANNEL_SIZE: usize = 10;
pub struct Manager<T, I>
where
T: Indexable<Type = I> + Clone + Send + Sync + 'static,
I: PartialOrd + Clone + Ord + Send + Sync + 'static,
I: PartialOrd + Clone + Debug + Ord + Send + Sync + 'static,
{
indexes: IndexTree<T, I>,
unsubscription_sender: mpsc::Sender<(SubId, Vec<Index<I>>)>,
Expand All @@ -51,7 +52,7 @@ where
impl<T, I> Default for Manager<T, I>
where
T: Indexable<Type = I> + Clone + Send + Sync + 'static,
I: Clone + PartialOrd + Ord + Send + Sync + 'static,
I: PartialOrd + Clone + Debug + Ord + Send + Sync + 'static,
{
fn default() -> Self {
let (sender, receiver) = mpsc::channel(DEFAULT_REMOVE_SIZE);
Expand All @@ -74,7 +75,7 @@ where
impl<T, I> Manager<T, I>
where
T: Indexable<Type = I> + Clone + Send + Sync + 'static,
I: Clone + PartialOrd + Ord + Send + Sync + 'static,
I: Clone + Debug + PartialOrd + Ord + Send + Sync + 'static,
{
#[inline]
/// Broadcast an event to all listeners
Expand All @@ -84,6 +85,8 @@ where
async fn broadcast_impl(storage: &IndexTree<T, I>, event: T) {
let index_storage = storage.read().await;
for index in event.to_indexes() {
println!("{:?}", index);
println!("{:?}", index_storage.keys().collect::<Vec<_>>());
for (key, sender) in index_storage.range(index.clone()..) {
if index.cmp_prefix(&key) != Ordering::Equal {
break;
Expand Down Expand Up @@ -112,13 +115,13 @@ where
}

/// Subscribe to a specific event
pub async fn subscribe<SI: Into<SubId>, P: Into<Vec<Index<I>>>>(
pub async fn subscribe<P: AsRef<SubId> + Into<Vec<Index<I>>>>(
&self,
sub_id: SI,
params: P,
) -> ActiveSubscription<T, I> {
let (sender, receiver) = mpsc::channel(10);
let indexes = params.into();
let sub_id: SubId = params.as_ref().clone();
let indexes: Vec<Index<I>> = params.into();

let mut index_storage = self.indexes.write().await;
for index in indexes.clone() {
Expand Down Expand Up @@ -169,7 +172,7 @@ where
impl<T, I> Drop for Manager<T, I>
where
T: Indexable<Type = I> + Clone + Send + Sync + 'static,
I: Clone + PartialOrd + Ord + Send + Sync + 'static,
I: Clone + Debug + PartialOrd + Ord + Send + Sync + 'static,
{
fn drop(&mut self) {
for task in self.background_tasks.drain(..) {
Expand All @@ -187,7 +190,7 @@ where
pub struct ActiveSubscription<T, I>
where
T: Send + Sync,
I: Clone + PartialOrd + Ord + Send + Sync + 'static,
I: Clone + Debug + PartialOrd + Ord + Send + Sync + 'static,
{
/// The subscription ID
pub id: SubId,
Expand All @@ -199,7 +202,7 @@ where
impl<T, I> Deref for ActiveSubscription<T, I>
where
T: Send + Sync,
I: Clone + PartialOrd + Ord + Send + Sync + 'static,
I: Clone + Debug + PartialOrd + Ord + Send + Sync + 'static,
{
type Target = mpsc::Receiver<(SubId, T)>;

Expand All @@ -211,7 +214,7 @@ where
impl<T, I> DerefMut for ActiveSubscription<T, I>
where
T: Indexable + Clone + Send + Sync + 'static,
I: Clone + PartialOrd + Ord + Send + Sync + 'static,
I: Clone + Debug + PartialOrd + Ord + Send + Sync + 'static,
{
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.receiver
Expand All @@ -226,7 +229,7 @@ where
impl<T, I> Drop for ActiveSubscription<T, I>
where
T: Send + Sync,
I: Clone + PartialOrd + Ord + Send + Sync + 'static,
I: Clone + Debug + PartialOrd + Ord + Send + Sync + 'static,
{
fn drop(&mut self) {
self.drop
Expand Down