Skip to content

Commit

Permalink
nydus-cached: switch nydus-cached to mio
Browse files Browse the repository at this point in the history
nydus-cached is the only one using event-manager. So if we switch
to mio, we can remove the event-manager dependency.

Signed-off-by: uran0sH <[email protected]>
  • Loading branch information
uran0sH committed May 30, 2022
1 parent f09f83d commit 43b99b0
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 112 deletions.
11 changes: 0 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,6 @@ storage = { path = "storage" }
blobfs = { path = "blobfs", features = ["virtiofs"], optional = true }
mio = { version = "0.8", features = ["os-poll", "os-ext"]}

[target.'cfg(target_os = "linux")'.dependencies]
event-manager = "0.2.1"

[dev-dependencies]
sendfd = "0.3.3"
vmm-sys-util = "0.9.0"
Expand Down
151 changes: 53 additions & 98 deletions src/bin/nydus-cached/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,17 @@ extern crate log;
extern crate lazy_static;

use std::io::Result;
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
};
use std::os::unix::prelude::AsRawFd;
use std::sync::{atomic::AtomicBool, Mutex};

use clap::{App, Arg};
use event_manager::{EventManager, EventOps, EventSubscriber, Events, SubscriberOps};
use mio::unix::SourceFd;
use mio::{Events, Interest, Poll, Token};
use nydus_app::{dump_program_info, setup_logging, BuildTimeInfo};
use storage::remote::{RemoteBlobMgr, Server};
use vmm_sys_util::epoll::EventSet;
use vmm_sys_util::eventfd::EventFd;

const LISTERNER_EVENT_IDX: u32 = 1;
const CLIENT_TOKEN: Token = Token(1);

lazy_static! {
static ref EVENT_MANAGER_RUN: AtomicBool = AtomicBool::new(true);
Expand Down Expand Up @@ -112,9 +110,8 @@ fn main() -> Result<()> {
return run_test_cases(workdir, sock);
}

let server = Arc::new(CachedServer::new(sock)?);
let mut event_manager = EventManager::<Arc<dyn EventSubscriber>>::new().unwrap();
event_manager.add_subscriber(server.clone());
let mut cached_subscriber = CachedServerSubscriber::new(sock)?;
cached_subscriber.listen();

/*
let mut http_thread: Option<thread::JoinHandle<Result<()>>> = None;
Expand All @@ -140,12 +137,6 @@ fn main() -> Result<()> {
}
*/

//*EXIT_EVTFD.lock().unwrap().deref_mut() = Some(exit_evtfd);
while EVENT_MANAGER_RUN.load(Ordering::Relaxed) {
// If event manager dies, so does nydusd
event_manager.run().unwrap();
}

/*
if let Some(t) = http_thread {
http_exit_evtfd.write(1).unwrap();
Expand All @@ -158,104 +149,68 @@ fn main() -> Result<()> {
}
*/

server.server.stop();
cached_subscriber.server.stop();
info!("nydus-cached quits");

Ok(())
}

struct CachedServer {
pub struct CachedServerSubscriber {
server: Server,
receiver: Poll,
}

impl CachedServer {
fn new(sock: &str) -> Result<Self> {
impl CachedServerSubscriber {
pub fn new(sock: &str) -> Result<Self> {
let receiver = Poll::new()?;
let server = Server::new(sock)?;

Ok(Self { server })
receiver.registry().register(
&mut SourceFd(&server.as_raw_fd()),
CLIENT_TOKEN,
Interest::READABLE | Interest::WRITABLE,
)?;
let subscriber = Self { server, receiver };
Ok(subscriber)
}

fn handle_incoming_connection(&self, events: Events, event_ops: &mut EventOps) {
match events.event_set() {
EventSet::IN => match self.server.handle_incoming_connection() {
Err(e) => error!("failed to handle incoming connection, {}", e),
Ok(None) => {}
Ok(Some(client)) => {
let id = client.id();
debug_assert!(id != LISTERNER_EVENT_IDX);
let event = Events::with_data(client.as_ref(), id, EventSet::IN);
if let Err(e) = event_ops.add(event) {
error!(
"failed to register event handler for client connection, {}",
e
);
client.close();
pub fn listen(&mut self) {
let mut events = Events::with_capacity(64);

loop {
if let Err(e) = self.receiver.poll(&mut events, None) {
error!("Cached server poll events failed, {}", e);
break;
}
for event in &events {
match event.token() {
CLIENT_TOKEN => match self.server.handle_incoming_connection() {
Err(e) => error!("failed to handle incoming connection, {}", e),
Ok(None) => {}
Ok(Some(client)) => {
let id = client.id();
self.receiver
.registry()
.register(
&mut SourceFd(&client.clone().as_raw_fd()),
Token(id as usize),
Interest::READABLE | Interest::WRITABLE,
)
.unwrap_or_else(|e| {
error!("client connection is failed to register, {}", e);
});
}
},
token => {
let id = token.0 as u32;
if let Err(e) = self.server.handle_event(id) {
error!("failed to handle client request, {}", e);
self.server.close_connection(id);
}
}
}
},
EventSet::ERROR => error!("failed to accept incoming connection."),
EventSet::HANG_UP => {
event_ops
.remove(events)
.unwrap_or_else(|e| error!("failed to unregister handler for listener, {}", e));
}
_ => {}
}
}

fn handle_event(&self, events: Events, event_ops: &mut EventOps) {
let id = events.data();
let event_set = events.event_set();

if event_set.contains(EventSet::HANG_UP) || event_set.contains(EventSet::READ_HANG_UP) {
event_ops
.remove(events)
.unwrap_or_else(|e| error!("failed to unregister handler for listener, {}", e));
self.server.close_connection(id);
} else if event_set.contains(EventSet::ERROR) {
error!("epoll from client connection returns error");
event_ops
.remove(events)
.unwrap_or_else(|e| error!("failed to unregister handler for listener, {}", e));
self.server.close_connection(id);
} else if event_set.contains(EventSet::IN) {
if let Err(e) = self.server.handle_event(id) {
error!("failed to handle client request, {}", e);
event_ops
.remove(events)
.unwrap_or_else(|e| error!("failed to unregister handler for listener, {}", e));
self.server.close_connection(id);
}
} else {
error!(
"unknown epoll event from client connection {}",
events.event_set().bits()
);
event_ops
.remove(events)
.unwrap_or_else(|e| error!("failed to unregister handler for listener, {}", e));
self.server.close_connection(id);
}
}
}

impl EventSubscriber for CachedServer {
fn process(&self, events: Events, event_ops: &mut EventOps) {
let data = events.data();

if data == LISTERNER_EVENT_IDX {
self.handle_incoming_connection(events, event_ops);
} else {
self.handle_event(events, event_ops);
}
}

fn init(&self, ops: &mut EventOps) {
let event = Events::with_data(&self.server, LISTERNER_EVENT_IDX, EventSet::IN);

ops.add(event)
.expect("Cannot register event handler for listener");
}
}

fn run_test_cases(workdir: &str, sock: &str) -> Result<()> {
Expand Down

0 comments on commit 43b99b0

Please sign in to comment.