Skip to content

Commit

Permalink
Support inputFormat and inputSource for sampler (apache#8901)
Browse files Browse the repository at this point in the history
* Support inputFormat and inputSource for sampler

* Cleanup javadocs and names

* fix style

* fix timed shutoff input source reader

* fix timed shutoff input source reader again

* tidy up timed shutoff reader

* unused imports

* fix tc
  • Loading branch information
jihoonson authored and gianm committed Nov 20, 2019
1 parent d628beb commit ac6d703
Show file tree
Hide file tree
Showing 89 changed files with 4,165 additions and 2,904 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public abstract class AbstractInputSource implements InputSource
public InputSourceReader reader(
InputRowSchema inputRowSchema,
@Nullable InputFormat inputFormat,
@Nullable File temporaryDirectory
File temporaryDirectory
)
{
if (needsFormat()) {
Expand All @@ -52,13 +52,13 @@ public InputSourceReader reader(
protected InputSourceReader formattableReader(
InputRowSchema inputRowSchema,
InputFormat inputFormat,
@Nullable File temporaryDirectory
File temporaryDirectory
)
{
throw new UnsupportedOperationException("Implement this method properly if needsFormat() = true");
}

protected InputSourceReader fixedFormatReader(InputRowSchema inputRowSchema, @Nullable File temporaryDirectory)
protected InputSourceReader fixedFormatReader(InputRowSchema inputRowSchema, File temporaryDirectory)
{
throw new UnsupportedOperationException("Implement this method properly if needsFormat() = false");
}
Expand Down
18 changes: 10 additions & 8 deletions core/src/main/java/org/apache/druid/data/input/Firehose.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,20 +67,22 @@ public interface Firehose extends Closeable
InputRow nextRow() throws IOException;

/**
* Returns an InputRowPlusRaw object containing the InputRow plus the raw, unparsed data corresponding to the next row
* available. Used in the sampler to provide the caller with information to assist in configuring a parse spec. If a
* ParseException is thrown by the parser, it should be caught and returned in the InputRowPlusRaw so we will be able
* to provide information on the raw row which failed to be parsed. Should only be called if hasMore returns true.
* Returns an {@link InputRowListPlusRawValues} object containing the InputRow plus the raw, unparsed data corresponding to
* the next row available. Used in the sampler to provide the caller with information to assist in configuring a parse
* spec. If a ParseException is thrown by the parser, it should be caught and returned in the InputRowListPlusJson so
* we will be able to provide information on the raw row which failed to be parsed. Should only be called if hasMore
* returns true.
*
* @return an InputRowPlusRaw which may contain any of: an InputRow, the raw data, or a ParseException
* @return an InputRowListPlusJson which may contain any of: an InputRow, map of the raw data, or a ParseException
*/
default InputRowListPlusJson nextRowWithRaw() throws IOException
@Deprecated
default InputRowListPlusRawValues nextRowWithRaw() throws IOException
{
try {
return InputRowListPlusJson.of(nextRow(), null);
return InputRowListPlusRawValues.of(nextRow(), null);
}
catch (ParseException e) {
return InputRowListPlusJson.of((byte[]) null, e);
return InputRowListPlusRawValues.of(null, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ public FiniteFirehoseFactory getFirehoseFactory()
return firehoseFactory;
}

public InputRowParser getInputRowParser()
{
return inputRowParser;
}

@Override
public boolean isSplittable()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

package org.apache.druid.data.input;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import org.apache.druid.guice.annotations.UnstableApi;
import org.apache.druid.java.util.common.parsers.CloseableIterator;

Expand All @@ -36,14 +34,7 @@
@UnstableApi
public interface InputEntityReader
{
/**
* Default JSON writer for sampler. This writer can be used to create an {@link InputRowListPlusJson}.
* Note that this writer uses the default serializer of Jackson. You may want to create a custom writer
* to serialize your custom types.
*/
ObjectWriter DEFAULT_JSON_WRITER = new ObjectMapper().writerWithDefaultPrettyPrinter();

CloseableIterator<InputRow> read() throws IOException;

CloseableIterator<InputRowListPlusJson> sample() throws IOException;
CloseableIterator<InputRowListPlusRawValues> sample() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.NestedInputFormat;
import org.apache.druid.data.input.impl.RegexInputFormat;
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.guice.annotations.UnstableApi;

Expand All @@ -42,7 +43,8 @@
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@Type(name = "csv", value = CsvInputFormat.class),
@Type(name = "json", value = JsonInputFormat.class)
@Type(name = "json", value = JsonInputFormat.class),
@Type(name = "regex", value = RegexInputFormat.class)
})
public interface InputFormat
{
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.data.input;

import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.parsers.ParseException;

import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.Map;

/**
* A triple of a list of {@link InputRow}s, a {@link Map} of raw values, and a {@link ParseException}.
* The rawValues map contains the raw values before being parsed into InputRows. Note that a single map can be parsed
* into multiple InputRows, for example, with explodeSpec.
* The ParseException is the exception thrown when parsing bytes into either the rawValues map or the list of InputRows.
*
* In any case, one of triple must not be null.
*/
public class InputRowListPlusRawValues
{
@Nullable
private final List<InputRow> inputRows;

@Nullable
private final Map<String, Object> rawValues;

@Nullable
private final ParseException parseException;

public static InputRowListPlusRawValues of(@Nullable InputRow inputRow, Map<String, Object> rawColumns)
{
return of(inputRow == null ? null : Collections.singletonList(inputRow), rawColumns);
}

public static InputRowListPlusRawValues of(@Nullable List<InputRow> inputRows, Map<String, Object> rawColumns)
{
return new InputRowListPlusRawValues(inputRows, Preconditions.checkNotNull(rawColumns, "rawColumns"), null);
}

public static InputRowListPlusRawValues of(@Nullable Map<String, Object> rawColumns, ParseException parseException)
{
return new InputRowListPlusRawValues(
null,
rawColumns,
Preconditions.checkNotNull(parseException, "parseException")
);
}

private InputRowListPlusRawValues(
@Nullable List<InputRow> inputRows,
@Nullable Map<String, Object> rawValues,
@Nullable ParseException parseException
)
{
this.inputRows = inputRows;
this.rawValues = rawValues;
this.parseException = parseException;
}

@Nullable
public List<InputRow> getInputRows()
{
return inputRows;
}

@Nullable
public Map<String, Object> getRawValues()
{
return rawValues;
}

@Nullable
public ParseException getParseException()
{
return parseException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.apache.druid.data.input.impl.HttpInputSource;
import org.apache.druid.data.input.impl.InlineInputSource;
import org.apache.druid.data.input.impl.LocalInputSource;
import org.apache.druid.guice.annotations.UnstableApi;

Expand All @@ -48,7 +49,8 @@
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@Type(name = "local", value = LocalInputSource.class),
@Type(name = "http", value = HttpInputSource.class)
@Type(name = "http", value = HttpInputSource.class),
@Type(name = "inline", value = InlineInputSource.class)
})
public interface InputSource
{
Expand Down Expand Up @@ -76,6 +78,6 @@ public interface InputSource
InputSourceReader reader(
InputRowSchema inputRowSchema,
@Nullable InputFormat inputFormat,
@Nullable File temporaryDirectory
File temporaryDirectory
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,5 @@ public interface InputSourceReader
{
CloseableIterator<InputRow> read() throws IOException;

CloseableIterator<InputRowListPlusJson> sample() throws IOException;
CloseableIterator<InputRowListPlusRawValues> sample() throws IOException;
}
Loading

0 comments on commit ac6d703

Please sign in to comment.