Skip to content

Commit

Permalink
messenger: don't use signal SIGUSRx anymore
Browse files Browse the repository at this point in the history
Was used internally to wake up blocking messenger threads.
  • Loading branch information
Yehuda Sadeh committed Mar 8, 2010
1 parent a7ae330 commit cf44146
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 53 deletions.
71 changes: 21 additions & 50 deletions src/msg/SimpleMessenger.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <signal.h>
#include <sys/socket.h>
#include <netinet/tcp.h>
#include <sys/uio.h>
Expand Down Expand Up @@ -58,11 +57,6 @@ static ostream& _prefix(SimpleMessenger *messenger) {
* Accepter
*/

void noop_signal_handler(int s)
{
//dout(0) << "blah_handler got " << s << dendl;
}

int SimpleMessenger::Accepter::bind(int64_t force_nonce)
{
// bind to a socket
Expand Down Expand Up @@ -151,14 +145,6 @@ int SimpleMessenger::Accepter::start()
{
dout(1) << "accepter.start" << dendl;

// set a harmless handle for SIGUSR1 (we'll use it to stop the accepter)
struct sigaction sa;
memset(&sa, 0, sizeof(sa));
sa.sa_handler = noop_signal_handler;
sa.sa_flags = 0;
sigemptyset(&sa.sa_mask);
sigaction(SIGUSR1, &sa, NULL);

// start thread
create();

Expand All @@ -169,26 +155,24 @@ void *SimpleMessenger::Accepter::entry()
{
dout(10) << "accepter starting" << dendl;

fd_set fds;
int errors = 0;

sigset_t sigmask, sigempty;
sigemptyset(&sigmask);
sigaddset(&sigmask, SIGUSR1);
sigemptyset(&sigempty);

// block SIGUSR1
pthread_sigmask(SIG_BLOCK, &sigmask, NULL);

char buf[80];

struct pollfd pfd;
pfd.fd = listen_sd;
pfd.events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
while (!done) {
FD_ZERO(&fds);
FD_SET(listen_sd, &fds);
dout(20) << "accepter calling select" << dendl;
int r = ::pselect(listen_sd+1, &fds, 0, &fds, 0, &sigempty); // unblock SIGUSR1 inside select()
dout(20) << "accepter select got " << r << dendl;

dout(20) << "accepter calling poll" << dendl;
int r = poll(&pfd, 1, -1);
if (r < 0)
break;
dout(20) << "accepter poll got " << r << dendl;

if (pfd.revents & (POLLERR | POLLNVAL | POLLHUP))
break;

dout(10) << "pfd.revents=" << pfd.revents << dendl;
if (done) break;

// accept
Expand Down Expand Up @@ -227,9 +211,6 @@ void *SimpleMessenger::Accepter::entry()
}
}

// unblock SIGUSR1
pthread_sigmask(SIG_UNBLOCK, &sigmask, NULL);

dout(20) << "accepter closing" << dendl;
// don't close socket, in case we start up again? blech.
if (listen_sd >= 0) {
Expand All @@ -244,8 +225,12 @@ void *SimpleMessenger::Accepter::entry()
void SimpleMessenger::Accepter::stop()
{
done = true;
dout(10) << "stop sending SIGUSR1" << dendl;
this->kill(SIGUSR1);
dout(10) << "stop accepter" << dendl;
if (listen_sd) {
::shutdown(listen_sd, SHUT_RDWR);
::close(listen_sd);
listen_sd = -1;
}
join();
done = false;
}
Expand Down Expand Up @@ -852,9 +837,7 @@ int SimpleMessenger::Pipe::accept()

replace:
dout(10) << "accept replacing " << existing << dendl;
existing->state = STATE_CLOSED;
existing->cond.Signal();
existing->reader_thread.kill(SIGUSR2);
existing->stop();
existing->unregister_pipe();

// steal queue and out_seq
Expand Down Expand Up @@ -1427,13 +1410,10 @@ void SimpleMessenger::Pipe::stop()
state = STATE_CLOSED;
cond.Signal();
if (sd >= 0) {
::shutdown(sd, SHUT_RDWR);
::close(sd);
sd = -1;
}
if (reader_running)
reader_thread.kill(SIGUSR2);
if (writer_running)
writer_thread.kill(SIGUSR2);
}


Expand Down Expand Up @@ -2204,15 +2184,6 @@ int SimpleMessenger::start(bool nodaemon)
if (g_conf.kill_after)
g_timer.add_event_after(g_conf.kill_after, new C_Die);

// set noop handlers for SIGUSR2, SIGPIPE
struct sigaction sa;
memset(&sa, 0, sizeof(sa));
sa.sa_handler = noop_signal_handler;
sa.sa_flags = 0;
sigemptyset(&sa.sa_mask);
sigaction(SIGUSR2, &sa, NULL);
sigaction(SIGPIPE, &sa, NULL); // mask SIGPIPE too. FIXME: i'm quite certain this is a roundabout way to do that.

// go!
if (did_bind)
accepter.start();
Expand Down
1 change: 0 additions & 1 deletion src/msg/SimpleMessenger.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,6 @@ class SimpleMessenger : public Messenger {
assert(!reader_joining);
reader_joining = true;
cond.Signal();
reader_thread.kill(SIGUSR2);
pipe_lock.Unlock();
reader_thread.join();
pipe_lock.Lock();
Expand Down
29 changes: 27 additions & 2 deletions src/msg/tcp.cc
Original file line number Diff line number Diff line change
@@ -1,15 +1,30 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab

#include <poll.h>
#include "tcp.h"

/******************
* tcp crap
*/

int tcp_read(int sd, char *buf, int len) {
struct pollfd pfd;
pfd.fd = sd;
pfd.events = POLLIN | POLLHUP | POLLRDHUP | POLLNVAL | POLLERR;
while (len > 0) {
int got = ::recv( sd, buf, len, 0 );
if (poll(&pfd, 1, -1) < 0)
return -1;

if (!(pfd.revents & POLLIN))
return -1;

/*
* although we turn on the MSG_DONTWAIT flag, we don't expect
* receivng an EAGAIN, as we polled on the socket, so there
* should be data waiting for us.
*/
int got = ::recv( sd, buf, len, MSG_DONTWAIT );
if (got <= 0) {
//char buf[100];
//generic_dout(0) << "tcp_read socket " << sd << " returned " << got
Expand All @@ -24,10 +39,20 @@ int tcp_read(int sd, char *buf, int len) {
}

int tcp_write(int sd, const char *buf, int len) {
struct pollfd pfd;
pfd.fd = sd;
pfd.events = POLLOUT | POLLHUP | POLLRDHUP | POLLNVAL | POLLERR;

if (poll(&pfd, 1, -1) < 0)
return -1;

if (!(pfd.revents & POLLOUT))
return -1;

//generic_dout(DBL) << "tcp_write writing " << len << dendl;
assert(len > 0);
while (len > 0) {
int did = ::send( sd, buf, len, 0 );
int did = ::send( sd, buf, len, MSG_NOSIGNAL );
if (did < 0) {
//generic_dout(1) << "tcp_write error did = " << did << " errno " << errno << " " << strerror(errno) << dendl;
//generic_derr(1) << "tcp_write error did = " << did << " errno " << errno << " " << strerror(errno) << dendl;
Expand Down

0 comments on commit cf44146

Please sign in to comment.