Skip to content

Commit

Permalink
Remove custom my_select_all
Browse files Browse the repository at this point in the history
  • Loading branch information
allevo committed Nov 8, 2021
1 parent dbd5584 commit 6c665da
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 296 deletions.
1 change: 0 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ extern crate log;
mod chat_service;
mod credential_service;
mod models;
mod my_select_all;
mod web_service;
mod ws_pool;

Expand Down
5 changes: 2 additions & 3 deletions src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use async_trait::async_trait;
use futures::{Stream, StreamExt};
use std::{collections::HashSet, fmt::Debug, pin::Pin, str::FromStr, sync::Arc};

use crate::my_select_all::GetId;
use serde::{Deserialize, Serialize};

#[async_trait]
Expand Down Expand Up @@ -33,8 +32,8 @@ impl Stream for ReceiverStream {
self.1.poll_next_unpin(cx)
}
}
impl GetId for ReceiverStream {
fn get_id(&self) -> &DeviceId {
impl ReceiverStream {
pub fn get_id(&self) -> &DeviceId {
&self.0
}
}
Expand Down
278 changes: 0 additions & 278 deletions src/my_select_all.rs

This file was deleted.

33 changes: 19 additions & 14 deletions src/ws_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,21 @@ use std::{
sync::Arc,
};

use futures::{
channel::mpsc::{UnboundedReceiver, UnboundedSender},
lock::Mutex,
SinkExt, StreamExt,
};
use futures::{SinkExt, StreamExt, channel::mpsc::{UnboundedReceiver, UnboundedSender}, lock::Mutex, stream::{SelectAll, select_all}};

use warp::{ws::Message, Error};

use crate::{
chat_service::ChatService,
models::{
use crate::{chat_service::ChatService, models::{
AddDevice, DeviceId, Item, PublishedMessage, ReceiverStream, SendMessageInChat,
SenderStream, WsContext,
},
my_select_all,
};
}};

trait GetId {
fn get_id(&self) -> &DeviceId;
}

pub struct WsPool {
incoming_streams: my_select_all::MySelectAll<ReceiverStream>,
incoming_streams: SelectAll<ReceiverStream>,
devices: HashMap<DeviceId, SenderStream>,
}

Expand All @@ -30,7 +26,7 @@ impl WsPool {
let recv: Vec<ReceiverStream> =
vec![ReceiverStream::new("".into(), Box::pin(item_receiver))];

let select_all = my_select_all::my_select_all(recv);
let select_all = select_all(recv);

Self {
incoming_streams: select_all,
Expand Down Expand Up @@ -62,7 +58,16 @@ impl WsPool {
info!("remove device {:?}", device_id);

self.devices.remove(device_id);
self.incoming_streams.remove_by_device_id(device_id)


let new = SelectAll::new();
let old_new = std::mem::replace(&mut self.incoming_streams, new);
for s in old_new {
if s.get_id() == device_id {
continue
}
self.incoming_streams.push(s);
}
}
}

Expand Down

0 comments on commit 6c665da

Please sign in to comment.