Skip to content

Commit

Permalink
Issue datacleaner#224: Added API changes needed for Output Data Streams
Browse files Browse the repository at this point in the history
  • Loading branch information
kaspersorensen committed May 8, 2015
1 parent 496f400 commit 7d7843c
Show file tree
Hide file tree
Showing 13 changed files with 564 additions and 15 deletions.
68 changes: 68 additions & 0 deletions api/src/main/java/org/datacleaner/api/HasOutputDataStreams.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/**
* DataCleaner (community edition)
* Copyright (C) 2014 Neopost - Customer Information Management
*
* This copyrighted material is made available to anyone wishing to use, modify,
* copy, or redistribute it subject to the terms and conditions of the GNU
* Lesser General Public License, as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
* for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this distribution; if not, write to:
* Free Software Foundation, Inc.
* 51 Franklin Street, Fifth Floor
* Boston, MA 02110-1301 USA
*/
package org.datacleaner.api;

import org.apache.metamodel.query.Query;

/**
* Interface for components that produce data streams as an output of their
* work.
*
* Each output data stream has a {@link OutputDataStream} that describe the
* metadata and structure of the output data stream.
*
* For each output data stream that is relevant (consumed by one or more
* components) the
* {@link #initializeOutputDataStream(OutputDataStream, Query, OutputRowCollector)}
* method is invoked at initialization time of this component.
*/
public interface HasOutputDataStreams {

/**
* Gets the {@link OutputDataStream}s that this component can produce.
*
* @return
*/
public OutputDataStream[] getOutputDataStreams();

/**
* Method invoked for each {@link OutputDataStream} that is consumed. The
* method is invoked after validation time (see {@link Validate} ) and
* before initialization time (see {@link Initialize}) of the component. The
* method passes on an {@link OutputRowCollector} which makes it possible
* for this component to post records into the output data stream.
*
* If a particular {@link OutputDataStream} is NOT consumed by any following
* components then this method will not be called.
*
* @param outputDataStream
* @param query
* the query posted towards the {@link OutputDataStream}. In most
* cases this will be a plain "SELECT * FROM table" query, but if
* {@link OutputDataStream#getPerformanceCharacteristics()}
* indicates that query optimization is possible, then the query
* may be adapted.
* @param outputRowCollector
* an {@link OutputRowCollector} which the component should use
* to post records into the output stream.
*/
public void initializeOutputDataStream(OutputDataStream outputDataStream, Query query,
OutputRowCollector outputRowCollector);
}
61 changes: 61 additions & 0 deletions api/src/main/java/org/datacleaner/api/OutputDataStream.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/**
* DataCleaner (community edition)
* Copyright (C) 2014 Neopost - Customer Information Management
*
* This copyrighted material is made available to anyone wishing to use, modify,
* copy, or redistribute it subject to the terms and conditions of the GNU
* Lesser General Public License, as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
* for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this distribution; if not, write to:
* Free Software Foundation, Inc.
* 51 Franklin Street, Fifth Floor
* Boston, MA 02110-1301 USA
*/
package org.datacleaner.api;

import java.io.Serializable;

import org.apache.metamodel.query.Query;
import org.apache.metamodel.schema.Table;
import org.apache.metamodel.util.HasName;
import org.datacleaner.connection.PerformanceCharacteristics;

/**
* Interface that describes the metadata of an output data stream.
*
* See {@link HasOutputDataStreams} for details on how the metadata relates to
* actual data.
*/
public interface OutputDataStream extends HasName, Serializable {

/**
* Gets the name of the output data stream, as presented to the user and
* referenced to in analysis job files etc.
*/
@Override
public String getName();

/**
* Gets the logical {@link Table} objects that represent the format of the
* data that will be made available by the {@link HasOutputDataStreams}
*
* @return
*/
public Table getTable();

/**
* Gets performance characteristics of the output data stream. This may
* influence the {@link Query} posted to consume the data. See
* {@link HasOutputDataStreams#initializeOutputDataStream(OutputDataStream, Query, OutputRowCollector)}
* for details on usage.
*
* @return
*/
public PerformanceCharacteristics getPerformanceCharacteristics();
}
26 changes: 18 additions & 8 deletions api/src/main/java/org/datacleaner/api/OutputRowCollector.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import javax.inject.Inject;

import org.apache.metamodel.data.Row;

/**
* An {@link OutputRowCollector} is a consumer of output rows from
* {@link Transformer}s.
Expand All @@ -40,12 +42,20 @@
*/
public interface OutputRowCollector {

/**
* Puts transformed values into an output row.
*
* @param values
* an array of output values, equivalent to the return type of
* {@link Transformer#transform(org.datacleaner.data.InputRow)}
*/
public void putValues(Object... values);
/**
* Puts transformed values into the output stream.
*
* @param values
* an array of output values, equivalent to the return type of
* {@link Transformer#transform(org.datacleaner.data.InputRow)}
*/
public void putValues(Object... values);

/**
* Puts a row (containing values) into the output stream.
*
* @param row
* a row containing values to put into the output stream.
*/
public void putRow(Row row);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
*/
package org.datacleaner.connection;

import java.io.Serializable;

/**
* Represents the performance characteristics of a {@link Datastore}.
* Performance characteristics can be used to optimize the execution plan of a
* job or interaction with the datastore.
*/
public interface PerformanceCharacteristics {
public interface PerformanceCharacteristics extends Serializable {

/**
* If the datastore has it's own (native) query engine, it is often best to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
*/
public final class PerformanceCharacteristicsImpl implements PerformanceCharacteristics {

private static final long serialVersionUID = 1L;

private final boolean _queryOptimizationPreferred;
private final boolean _naturalRecordOrderConsistent;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/**
* DataCleaner (community edition)
* Copyright (C) 2014 Neopost - Customer Information Management
*
* This copyrighted material is made available to anyone wishing to use, modify,
* copy, or redistribute it subject to the terms and conditions of the GNU
* Lesser General Public License, as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
* for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this distribution; if not, write to:
* Free Software Foundation, Inc.
* 51 Franklin Street, Fifth Floor
* Boston, MA 02110-1301 USA
*/
package org.datacleaner.job.output;

import org.apache.metamodel.schema.ColumnType;
import org.apache.metamodel.schema.Table;
import org.datacleaner.api.OutputDataStream;

/**
* Builder object for easily creating {@link OutputDataStream} objects.
*/
public interface OutputDataStreamBuilder {

public OutputDataStream toOutputDataStream();

public OutputDataStreamBuilder likeTable(Table table);

public OutputDataStreamBuilder withColumn(String name, ColumnType columnType);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/**
* DataCleaner (community edition)
* Copyright (C) 2014 Neopost - Customer Information Management
*
* This copyrighted material is made available to anyone wishing to use, modify,
* copy, or redistribute it subject to the terms and conditions of the GNU
* Lesser General Public License, as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
* for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this distribution; if not, write to:
* Free Software Foundation, Inc.
* 51 Franklin Street, Fifth Floor
* Boston, MA 02110-1301 USA
*/
package org.datacleaner.job.output;

import org.apache.metamodel.schema.Column;
import org.apache.metamodel.schema.ColumnType;
import org.apache.metamodel.schema.MutableColumn;
import org.apache.metamodel.schema.MutableTable;
import org.apache.metamodel.schema.Table;
import org.datacleaner.api.OutputDataStream;

final class OutputDataStreamBuilderImpl implements OutputDataStreamBuilder {

private final String _name;
private final MutableTable _table;

public OutputDataStreamBuilderImpl(String name) {
_name = name;
_table = new MutableTable(name);
}

@Override
public OutputDataStream toOutputDataStream() {
if (_table.getColumnCount() == 0) {
throw new IllegalStateException("No columns defined in OutputDataStream '" + _name + "'");
}
return new PushOutputDataStream(_name, _table);
}

@Override
public OutputDataStreamBuilder likeTable(Table table) {
final Column[] existingColumns = _table.getColumns();
for (Column column : existingColumns) {
_table.removeColumn(column);
}
final Column[] newColumns = table.getColumns();
for (Column column : newColumns) {
withColumn(column.getName(), column.getType());
}
return this;
}

@Override
public OutputDataStreamBuilder withColumn(String name, ColumnType type) {
final int columnNumber = _table.getColumnCount() + 1;
final MutableColumn column = new MutableColumn(name, type, _table, columnNumber, true);
_table.addColumn(column);
return this;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/**
* DataCleaner (community edition)
* Copyright (C) 2014 Neopost - Customer Information Management
*
* This copyrighted material is made available to anyone wishing to use, modify,
* copy, or redistribute it subject to the terms and conditions of the GNU
* Lesser General Public License, as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
* for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this distribution; if not, write to:
* Free Software Foundation, Inc.
* 51 Franklin Street, Fifth Floor
* Boston, MA 02110-1301 USA
*/
package org.datacleaner.job.output;

import org.apache.metamodel.query.Query;
import org.datacleaner.api.HasOutputDataStreams;
import org.datacleaner.api.OutputDataStream;

/**
* Convenience/utility methods related to handling and building
* {@link OutputDataStream} objects.
*/
public class OutputDataStreams {

private OutputDataStreams() {
// prevent instantiation
}

/**
* Creates an {@link OutputDataStreamBuilder} for push-based dispatching of
* records (meaning that the {@link HasOutputDataStreams} component will
* push records to the output data stream without supporting {@link Query}
* optimization).
*
* @param name
* @return
*/
public static OutputDataStreamBuilder pushDataStream(String name) {
return new OutputDataStreamBuilderImpl(name);
}
}
Loading

0 comments on commit 7d7843c

Please sign in to comment.