Skip to content

Commit

Permalink
KAFKA-2813; selector doesn't close socket connection on non-IOExceptions
Browse files Browse the repository at this point in the history
Patched Selector.poll() to close the connection on any exception.

Author: Jun Rao <[email protected]>

Reviewers: Guozhang Wang <[email protected]>, Gwen Shapira <[email protected]>

Closes apache#501 from junrao/KAFKA-2813
  • Loading branch information
junrao committed Nov 12, 2015
1 parent df88d3b commit 3fd168d
Showing 1 changed file with 7 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,6 @@ public void send(Send send) {
* @throws IllegalArgumentException If `timeout` is negative
* @throws IllegalStateException If a send is given for which we have no existing connection or for which there is
* already an in-progress send
* @throws InvalidReceiveException If invalid data is received
*/
@Override
public void poll(long timeout) throws IOException {
Expand Down Expand Up @@ -284,16 +283,8 @@ public void poll(long timeout) throws IOException {
/* if channel is ready read from any connections that have readable data */
if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
NetworkReceive networkReceive;
try {
while ((networkReceive = channel.read()) != null) {
addToStagedReceives(channel, networkReceive);
}
} catch (InvalidReceiveException e) {
log.error("Invalid data received from " + channel.id() + " closing connection", e);
close(channel);
this.disconnected.add(channel.id());
throw e;
}
while ((networkReceive = channel.read()) != null)
addToStagedReceives(channel, networkReceive);
}

/* if channel is ready write to any sockets that have space in their buffer and for which we have data */
Expand All @@ -310,9 +301,12 @@ public void poll(long timeout) throws IOException {
close(channel);
this.disconnected.add(channel.id());
}
} catch (IOException e) {
} catch (Exception e) {
String desc = channel.socketDescription();
log.debug("Connection with {} disconnected", desc, e);
if (e instanceof IOException)
log.debug("Connection with {} disconnected", desc, e);
else
log.warn("Unexpected error from {}; closing connection", desc, e);
close(channel);
this.disconnected.add(channel.id());
}
Expand Down

0 comments on commit 3fd168d

Please sign in to comment.