Skip to content

Commit

Permalink
msg/async/Event: remove event wakeup flag
Browse files Browse the repository at this point in the history
Now only dispatch external event will wakeup event thread(previously
delete_time_event will call wakeup), we only need to use
"external_num_events" to indicate whether we have extra events.

Signed-off-by: Haomai Wang <[email protected]>
  • Loading branch information
yuyuyu101 committed Aug 16, 2016
1 parent fca0de1 commit f62f28c
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 35 deletions.
56 changes: 26 additions & 30 deletions src/msg/async/Event.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ class C_handle_notify : public EventCallback {
C_handle_notify(EventCenter *c, CephContext *cc): center(c), cct(cc) {}
void do_request(int fd_or_id) {
char c[256];
int r = 0;
do {
center->already_wakeup.set(0);
int r = read(fd_or_id, c, sizeof(c));
r = read(fd_or_id, c, sizeof(c));
if (r < 0) {
ldout(cct, 1) << __func__ << " read notify pipe failed: " << cpp_strerror(errno) << dendl;
break;
if (errno != EAGAIN)
ldout(cct, 1) << __func__ << " read notify pipe failed: " << cpp_strerror(errno) << dendl;
}
} while (center->already_wakeup.read());
} while (r > 0);
}
};

Expand Down Expand Up @@ -260,15 +260,15 @@ void EventCenter::delete_time_event(uint64_t id)

void EventCenter::wakeup()
{
ldout(cct, 1) << __func__ << dendl;
already_wakeup.compare_and_swap(0, 1);

char buf[1];
buf[0] = 'c';
// wake up "event_wait"
int n = write(notify_send_fd, buf, 1);
// FIXME ?
assert(n == 1);
ldout(cct, 1) << __func__ << dendl;

char buf = 'c';
// wake up "event_wait"
int n = write(notify_send_fd, &buf, sizeof(buf));
if (n < 0) {
ldout(cct, 1) << __func__ << " write notify pipe failed: " << cpp_strerror(errno) << dendl;
assert(0);
}
}

int EventCenter::process_time_events()
Expand Down Expand Up @@ -361,21 +361,16 @@ int EventCenter::process_events(int timeout_microseconds)

if (external_num_events.load()) {
external_lock.lock();
if (external_events.empty()) {
external_lock.unlock();
} else {
deque<EventCallbackRef> cur_process;
cur_process.swap(external_events);
external_num_events.store(0);
external_lock.unlock();
while (!cur_process.empty()) {
EventCallbackRef e = cur_process.front();
ldout(cct, 20) << __func__ << " do " << e << dendl;
if (e)
e->do_request(0);
cur_process.pop_front();
numevents++;
}
deque<EventCallbackRef> cur_process;
cur_process.swap(external_events);
external_num_events.store(0);
external_lock.unlock();
while (!cur_process.empty()) {
EventCallbackRef e = cur_process.front();
ldout(cct, 20) << __func__ << " do " << e << dendl;
e->do_request(0);
cur_process.pop_front();
numevents++;
}
}
return numevents;
Expand All @@ -385,9 +380,10 @@ void EventCenter::dispatch_event_external(EventCallbackRef e)
{
external_lock.lock();
external_events.push_back(e);
bool wake = !external_num_events.load();
uint64_t num = ++external_num_events;
external_lock.unlock();
if (!in_thread())
if (!in_thread() && wake)
wakeup();

ldout(cct, 20) << __func__ << " " << e << " pending " << num << dendl;
Expand Down
6 changes: 1 addition & 5 deletions src/msg/async/Event.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,16 +136,12 @@ class EventCenter {
}

public:
atomic_t already_wakeup;

explicit EventCenter(CephContext *c):
cct(c), nevent(0),
external_num_events(0),
driver(NULL), time_event_next_id(1),
notify_receive_fd(-1), notify_send_fd(-1), net(c),
notify_handler(NULL),
already_wakeup(0) {
}
notify_handler(NULL) { }
~EventCenter();
ostream& _event_prefix(std::ostream *_dout);

Expand Down

0 comments on commit f62f28c

Please sign in to comment.