Skip to content

Commit

Permalink
Enhancements to IndexTaskClient. (apache#12011)
Browse files Browse the repository at this point in the history
* Enhancements to IndexTaskClient.

1) Ability to use handlers other than StringFullResponseHandler. This
   functionality is not used in production code yet, but is useful
   because it will allow tasks to communicate with each other in
   non-string-based formats and in streaming fashion. In the future,
   we'll be able to use this to make task-to-task communication
   more efficient.

2) Truncate server errors at 1KB, so long errors do not pollute logs.

3) Change error log level for retryable errors from WARN to INFO. (The
   final error is still WARN.)

4) Harmonize log and exception messages to have a more consistent format.

* Additional tests and improvements.
  • Loading branch information
gianm authored Dec 3, 2021
1 parent f7f5505 commit e0e05aa
Show file tree
Hide file tree
Showing 20 changed files with 1,030 additions and 244 deletions.
146 changes: 146 additions & 0 deletions core/src/main/java/org/apache/druid/java/util/common/Either.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.java.util.common;

import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;

import javax.annotation.Nullable;
import java.util.Objects;
import java.util.function.Function;

/**
* Encapsulates either an "error" or a "value".
*
* Similar to the Either class in Scala or Haskell, except the possibilities are named "error" and "value" instead of
* "left" and "right".
*/
public class Either<L, R>
{
private final L error;
private final R value;

private Either(L error, R value)
{
this.error = error;
this.value = value;
}

public static <L, R> Either<L, R> error(final L error)
{
return new Either<>(Preconditions.checkNotNull(error, "error"), null);
}

public static <L, R> Either<L, R> value(@Nullable final R value)
{
return new Either<>(null, value);
}

public boolean isValue()
{
return error == null;
}

public boolean isError()
{
return error != null;
}

public L error()
{
if (isError()) {
return error;
} else {
throw new IllegalStateException("Not an error; check isError first");
}
}

/**
* If this Either represents a value, returns it. If this Either represents an error, throw an error.
*
* If the error is a {@link RuntimeException} or {@link Error}, it is thrown directly. If it is some other
* {@link Throwable}, it is wrapped in a RuntimeException and thrown. If it is not a throwable at all, a generic
* error is thrown containing the string representation of the error object.
*
* If you want to be able to retrieve the error as-is, use {@link #isError()} and {@link #error()} instead.
*/
@Nullable
public R valueOrThrow()
{
if (isValue()) {
return value;
} else if (error instanceof Throwable) {
Throwables.propagateIfPossible((Throwable) error);
throw new RuntimeException((Throwable) error);
} else {
throw new RuntimeException(error.toString());
}
}

/**
* Applies a function to this value, if present.
*
* If the mapping function throws an exception, it is thrown by this method instead of being packed up into
* the returned Either.
*
* If this Either represents an error, the mapping function is not applied.
*
* @throws NullPointerException if the mapping function returns null
*/
public <T> Either<L, T> map(final Function<R, T> fn)
{
if (isValue()) {
return Either.value(fn.apply(value));
} else {
// Safe because the value is never going to be returned.
//noinspection unchecked
return (Either<L, T>) this;
}
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Either<?, ?> either = (Either<?, ?>) o;
return Objects.equals(error, either.error) && Objects.equals(value, either.value);
}

@Override
public int hashCode()
{
return Objects.hash(error, value);
}

@Override
public String toString()
{
if (isValue()) {
return "Value[" + value + "]";
} else {
return "Error[" + error + "]";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class BytesFullResponseHandler implements HttpResponseHandler<BytesFullRe
@Override
public ClientResponse<BytesFullResponseHolder> handleResponse(HttpResponse response, TrafficCop trafficCop)
{
BytesFullResponseHolder holder = new BytesFullResponseHolder(response.getStatus(), response);
BytesFullResponseHolder holder = new BytesFullResponseHolder(response);

holder.addChunk(getContentBytes(response.getContent()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.apache.druid.java.util.http.client.response;

import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;

import java.nio.ByteBuffer;
import java.util.ArrayList;
Expand All @@ -30,9 +29,9 @@ public class BytesFullResponseHolder extends FullResponseHolder<byte[]>
{
private final List<byte[]> chunks;

public BytesFullResponseHolder(HttpResponseStatus status, HttpResponse response)
public BytesFullResponseHolder(HttpResponse response)
{
super(status, response);
super(response);
this.chunks = new ArrayList<>();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,16 @@
*/
public abstract class FullResponseHolder<T>
{
private final HttpResponseStatus status;
private final HttpResponse response;

public FullResponseHolder(HttpResponseStatus status, HttpResponse response)
public FullResponseHolder(HttpResponse response)
{
this.status = status;
this.response = response;
}

public HttpResponseStatus getStatus()
{
return status;
return response.getStatus();
}

public HttpResponse getResponse()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class InputStreamFullResponseHandler implements HttpResponseHandler<Input
@Override
public ClientResponse<InputStreamFullResponseHolder> handleResponse(HttpResponse response, TrafficCop trafficCop)
{
InputStreamFullResponseHolder holder = new InputStreamFullResponseHolder(response.getStatus(), response);
InputStreamFullResponseHolder holder = new InputStreamFullResponseHolder(response);
holder.addChunk(getContentBytes(response.getContent()));
return ClientResponse.finished(holder);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,16 @@

import org.apache.druid.java.util.http.client.io.AppendableByteArrayInputStream;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;

import java.io.InputStream;

public class InputStreamFullResponseHolder extends FullResponseHolder<InputStream>
{
private final AppendableByteArrayInputStream is;

public InputStreamFullResponseHolder(
HttpResponseStatus status,
HttpResponse response
)
public InputStreamFullResponseHolder(HttpResponse response)
{
super(status, response);
super(response);
is = new AppendableByteArrayInputStream();
}

Expand Down
Loading

0 comments on commit e0e05aa

Please sign in to comment.