Skip to content

Commit

Permalink
KAFKA-4763; Handle disk failure for JBOD (KIP-112)
Browse files Browse the repository at this point in the history
Author: Dong Lin <[email protected]>

Reviewers: Jiangjie Qin <[email protected]>, Jun Rao <[email protected]>, Ismael Juma <[email protected]>, Onur Karaman <[email protected]>

Closes apache#2929 from lindong28/KAFKA-4763
  • Loading branch information
lindong28 authored and becketqin committed Jul 22, 2017
1 parent 91b5fc7 commit fc93fb4
Show file tree
Hide file tree
Showing 97 changed files with 2,526 additions and 1,096 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -864,7 +864,8 @@ private PartitionRecords parseCompletedFetch(CompletedFetch completedFetch) {
log.trace("Updating last stable offset for partition {} to {}", tp, partition.lastStableOffset);
subscriptions.updateLastStableOffset(tp, partition.lastStableOffset);
}
} else if (error == Errors.NOT_LEADER_FOR_PARTITION) {
} else if (error == Errors.NOT_LEADER_FOR_PARTITION ||
error == Errors.KAFKA_STORAGE_ERROR) {
log.debug("Error in fetch for partition {}: {}", tp, error.exceptionName());
this.metadata.requestUpdate();
} else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
Expand All @@ -884,7 +885,7 @@ private PartitionRecords parseCompletedFetch(CompletedFetch completedFetch) {
} else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
log.warn("Not authorized to read from topic {}.", tp.topic());
throw new TopicAuthorizationException(Collections.singleton(tp.topic()));
} else if (error == Errors.UNKNOWN) {
} else if (error == Errors.UNKNOWN_SERVER_ERROR) {
log.warn("Unknown error fetching data for topic-partition {}", tp);
} else {
throw new IllegalStateException("Unexpected error code " + error.code() + " while fetching data");
Expand Down
21 changes: 18 additions & 3 deletions clients/src/main/java/org/apache/kafka/common/PartitionInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.kafka.common;

/**
* Information about a topic-partition.
* This is used to describe per-partition state in the MetadataResponse.
*/
public class PartitionInfo {

Expand All @@ -26,13 +26,20 @@ public class PartitionInfo {
private final Node leader;
private final Node[] replicas;
private final Node[] inSyncReplicas;
private final Node[] offlineReplicas;

// Used only by tests
public PartitionInfo(String topic, int partition, Node leader, Node[] replicas, Node[] inSyncReplicas) {
this(topic, partition, leader, replicas, inSyncReplicas, new Node[0]);
}

public PartitionInfo(String topic, int partition, Node leader, Node[] replicas, Node[] inSyncReplicas, Node[] offlineReplicas) {
this.topic = topic;
this.partition = partition;
this.leader = leader;
this.replicas = replicas;
this.inSyncReplicas = inSyncReplicas;
this.offlineReplicas = offlineReplicas;
}

/**
Expand Down Expand Up @@ -71,14 +78,22 @@ public Node[] inSyncReplicas() {
return inSyncReplicas;
}

/**
* The subset of the replicas that are offline
*/
public Node[] offlineReplicas() {
return offlineReplicas;
}

@Override
public String toString() {
return String.format("Partition(topic = %s, partition = %d, leader = %s, replicas = %s, isr = %s)",
return String.format("Partition(topic = %s, partition = %d, leader = %s, replicas = %s, isr = %s, offlineReplicas = %s)",
topic,
partition,
leader == null ? "none" : leader.idString(),
formatNodeIds(replicas),
formatNodeIds(inSyncReplicas));
formatNodeIds(inSyncReplicas),
formatNodeIds(offlineReplicas));
}

/* Extract the node ids from each item in the array and format for display */
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;

/**
* Miscellaneous disk-related IOException occurred when handling a request.
* Client should request metadata update and retry if the response shows KafkaStorageException
*
* Here are the guidelines on how to handle KafkaStorageException and IOException:
*
* 1) If the server has not finished loading logs, IOException does not need to be converted to KafkaStorageException
* 2) After the server has finished loading logs, IOException should be caught and trigger LogDirFailureChannel.maybeAddLogFailureEvent
* Then the IOException should either be swallowed and logged, or be converted and re-thrown as KafkaStorageException
* 3) It is preferred for IOException to be caught in Log rather than in ReplicaManager or LogSegment.
*
*/
public class KafkaStorageException extends InvalidMetadataException {

private static final long serialVersionUID = 1L;

public KafkaStorageException() {
super();
}

public KafkaStorageException(String message) {
super(message);
}

public KafkaStorageException(Throwable cause) {
super(cause);
}

public KafkaStorageException(String message, Throwable cause) {
super(message, cause);
}
}
21 changes: 17 additions & 4 deletions clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.InvalidTxnStateException;
import org.apache.kafka.common.errors.InvalidTxnTimeoutException;
import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.errors.NotControllerException;
Expand Down Expand Up @@ -84,10 +85,15 @@
* This class contains all the client-server errors--those errors that must be sent from the server to the client. These
* are thus part of the protocol. The names can be changed but the error code cannot.
*
* Note that client library will convert an unknown error code to the non-retriable UnknownServerException if the client library
* version is old and does not recognize the newly-added error code. Therefore when a new server-side error is added,
* we may need extra logic to convert the new error code to another existing error code before sending the response back to
* the client if the request version suggests that the client may not recognize the new error code.
*
* Do not add exceptions that occur only on the client or only on the server here.
*/
public enum Errors {
UNKNOWN(-1, "The server experienced an unexpected error when processing the request",
UNKNOWN_SERVER_ERROR(-1, "The server experienced an unexpected error when processing the request",
new ApiExceptionBuilder() {
@Override
public ApiException build(String message) {
Expand Down Expand Up @@ -495,7 +501,14 @@ public ApiException build(String message) {
public ApiException build(String message) {
return new OperationNotAttemptedException(message);
}
});
}),
KAFKA_STORAGE_ERROR(56, "Disk error when trying to access log file on the disk.",
new ApiExceptionBuilder() {
@Override
public ApiException build(String message) {
return new KafkaStorageException(message);
}
});

private interface ApiExceptionBuilder {
ApiException build(String message);
Expand Down Expand Up @@ -588,7 +601,7 @@ public static Errors forCode(short code) {
return error;
} else {
log.warn("Unexpected error code: {}.", code);
return UNKNOWN;
return UNKNOWN_SERVER_ERROR;
}
}

Expand All @@ -604,7 +617,7 @@ public static Errors forException(Throwable t) {
return error;
clazz = clazz.getSuperclass();
}
return UNKNOWN;
return UNKNOWN_SERVER_ERROR;
}

private static String toHtml() {
Expand Down
Loading

0 comments on commit fc93fb4

Please sign in to comment.