From f62f28ceea5649dc821365348093f8541db66e88 Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Sun, 10 Jul 2016 16:19:29 +0800 Subject: [PATCH] msg/async/Event: remove event wakeup flag Now only dispatch external event will wakeup event thread(previously delete_time_event will call wakeup), we only need to use "external_num_events" to indicate whether we have extra events. Signed-off-by: Haomai Wang --- src/msg/async/Event.cc | 56 ++++++++++++++++++++---------------------- src/msg/async/Event.h | 6 +---- 2 files changed, 27 insertions(+), 35 deletions(-) diff --git a/src/msg/async/Event.cc b/src/msg/async/Event.cc index b3b12ddd03036..5162059552b6d 100644 --- a/src/msg/async/Event.cc +++ b/src/msg/async/Event.cc @@ -39,14 +39,14 @@ class C_handle_notify : public EventCallback { C_handle_notify(EventCenter *c, CephContext *cc): center(c), cct(cc) {} void do_request(int fd_or_id) { char c[256]; + int r = 0; do { - center->already_wakeup.set(0); - int r = read(fd_or_id, c, sizeof(c)); + r = read(fd_or_id, c, sizeof(c)); if (r < 0) { - ldout(cct, 1) << __func__ << " read notify pipe failed: " << cpp_strerror(errno) << dendl; - break; + if (errno != EAGAIN) + ldout(cct, 1) << __func__ << " read notify pipe failed: " << cpp_strerror(errno) << dendl; } - } while (center->already_wakeup.read()); + } while (r > 0); } }; @@ -260,15 +260,15 @@ void EventCenter::delete_time_event(uint64_t id) void EventCenter::wakeup() { - ldout(cct, 1) << __func__ << dendl; - already_wakeup.compare_and_swap(0, 1); - - char buf[1]; - buf[0] = 'c'; - // wake up "event_wait" - int n = write(notify_send_fd, buf, 1); - // FIXME ? - assert(n == 1); + ldout(cct, 1) << __func__ << dendl; + + char buf = 'c'; + // wake up "event_wait" + int n = write(notify_send_fd, &buf, sizeof(buf)); + if (n < 0) { + ldout(cct, 1) << __func__ << " write notify pipe failed: " << cpp_strerror(errno) << dendl; + assert(0); + } } int EventCenter::process_time_events() @@ -361,21 +361,16 @@ int EventCenter::process_events(int timeout_microseconds) if (external_num_events.load()) { external_lock.lock(); - if (external_events.empty()) { - external_lock.unlock(); - } else { - deque cur_process; - cur_process.swap(external_events); - external_num_events.store(0); - external_lock.unlock(); - while (!cur_process.empty()) { - EventCallbackRef e = cur_process.front(); - ldout(cct, 20) << __func__ << " do " << e << dendl; - if (e) - e->do_request(0); - cur_process.pop_front(); - numevents++; - } + deque cur_process; + cur_process.swap(external_events); + external_num_events.store(0); + external_lock.unlock(); + while (!cur_process.empty()) { + EventCallbackRef e = cur_process.front(); + ldout(cct, 20) << __func__ << " do " << e << dendl; + e->do_request(0); + cur_process.pop_front(); + numevents++; } } return numevents; @@ -385,9 +380,10 @@ void EventCenter::dispatch_event_external(EventCallbackRef e) { external_lock.lock(); external_events.push_back(e); + bool wake = !external_num_events.load(); uint64_t num = ++external_num_events; external_lock.unlock(); - if (!in_thread()) + if (!in_thread() && wake) wakeup(); ldout(cct, 20) << __func__ << " " << e << " pending " << num << dendl; diff --git a/src/msg/async/Event.h b/src/msg/async/Event.h index a320489ceda2c..daa4537eef6e8 100644 --- a/src/msg/async/Event.h +++ b/src/msg/async/Event.h @@ -136,16 +136,12 @@ class EventCenter { } public: - atomic_t already_wakeup; - explicit EventCenter(CephContext *c): cct(c), nevent(0), external_num_events(0), driver(NULL), time_event_next_id(1), notify_receive_fd(-1), notify_send_fd(-1), net(c), - notify_handler(NULL), - already_wakeup(0) { - } + notify_handler(NULL) { } ~EventCenter(); ostream& _event_prefix(std::ostream *_dout);