Skip to content

Commit

Permalink
Issue datacleaner#224: Added engine and job-builder support for Outpu…
Browse files Browse the repository at this point in the history
…tDataStreams
  • Loading branch information
kaspersorensen committed May 8, 2015
1 parent f7ea335 commit 6846892
Show file tree
Hide file tree
Showing 55 changed files with 1,665 additions and 891 deletions.
6 changes: 3 additions & 3 deletions api/src/main/java/org/datacleaner/job/ComponentJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@
import java.io.Serializable;
import java.util.Map;

import org.datacleaner.descriptors.ComponentDescriptor;
import org.apache.metamodel.util.HasName;
import org.datacleaner.descriptors.ComponentDescriptor;

/**
* Super-interface for all job entries in an Analysis. A {@link ComponentJob}
* represents a component's configuration in a {@link AnalysisJob}.
*/
public interface ComponentJob extends HasName, InputColumnSinkJob, HasComponentRequirement, HasComponentConfiguration,
Serializable {
public interface ComponentJob extends HasName, InputColumnSinkJob, OutputDataStreamJobSource, HasComponentRequirement,
HasComponentConfiguration, Serializable {

/**
* Gets the descriptor of this component type.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,19 @@
* 51 Franklin Street, Fifth Floor
* Boston, MA 02110-1301 USA
*/
package org.datacleaner.lifecycle;
package org.datacleaner.job;

import org.datacleaner.descriptors.ComponentDescriptor;
import java.io.Serializable;

import org.datacleaner.api.OutputDataStream;

/**
* Represents a callback method that will execute a step in the lifecycle of a
* component. A step might be to call any initializing methods, inject
* properties or to close the component.
*
*
*
* @param <C>
* the component type
* @param <D>
* the descriptor type
* Represents an entry to run an {@link AnalysisJob} for a particular
* {@link OutputDataStream}.
*/
public interface LifeCycleCallback<C, D extends ComponentDescriptor<?>> {
public interface OutputDataStreamJob extends Serializable {

public OutputDataStream getOutputDataStream();

public void onEvent(C component, D descriptor);
public AnalysisJob getJob();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,12 @@
* 51 Franklin Street, Fifth Floor
* Boston, MA 02110-1301 USA
*/
package org.datacleaner.job.tasks;

import org.datacleaner.lifecycle.LifeCycleHelper;
package org.datacleaner.job;

/**
* Task that invokes initializing methods for reference data where this is
* nescesary.
*
*
* Represents a source of {@link OutputDataStreamJob}s.
*/
public class InitializeReferenceDataTask implements Task {

private final LifeCycleHelper _lifeCycleHelper;

public InitializeReferenceDataTask(LifeCycleHelper lifeCycleHelper) {
_lifeCycleHelper = lifeCycleHelper;
}

@Override
public void execute() throws Exception {
_lifeCycleHelper.initializeReferenceData();
}
public interface OutputDataStreamJobSource {

public OutputDataStreamJob[] getOutputDataStreamJobs();
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@

import org.datacleaner.api.AnalyzerResult;
import org.datacleaner.api.InputRow;
import org.datacleaner.configuration.AnalyzerBeansConfiguration;
import org.datacleaner.configuration.AnalyzerBeansConfigurationImpl;
import org.datacleaner.configuration.DataCleanerConfigurationImpl;
import org.datacleaner.connection.Datastore;
import org.datacleaner.connection.DatastoreCatalog;
import org.datacleaner.connection.DatastoreCatalogImpl;
Expand All @@ -46,7 +45,7 @@ public class DataStructuresIntegrationTest extends TestCase {
public void testBuildAndExtractFromStructures() throws Throwable {
Datastore datastore = TestHelper.createSampleDatabaseDatastore("orderdb");
DatastoreCatalog datastoreCatalog = new DatastoreCatalogImpl(datastore);
AnalyzerBeansConfiguration configuration = new AnalyzerBeansConfigurationImpl().replace(datastoreCatalog);
DataCleanerConfigurationImpl configuration = new DataCleanerConfigurationImpl().withDatastoreCatalog(datastoreCatalog);

AnalysisJobBuilder ajb = new AnalysisJobBuilder(configuration);
ajb.setDatastore("orderdb");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@
import java.util.Arrays;
import java.util.List;

import junit.framework.TestCase;

import org.datacleaner.api.InputColumn;
import org.datacleaner.api.OutputRowCollector;
import org.datacleaner.data.MockInputColumn;
import org.datacleaner.data.MockInputRow;
import org.datacleaner.job.AbstractOutputRowCollector;

import junit.framework.TestCase;

public class JavaScriptAdvancedTransformerTest extends TestCase {

public void testCompileScript() throws Exception {
Expand All @@ -47,7 +47,7 @@ public void putValues(Object... arg0) {

JavaScriptAdvancedTransformer transformer = new JavaScriptAdvancedTransformer();
transformer.rowCollector = collector;
transformer.columns = new InputColumn[] {col1, col2};
transformer.columns = new InputColumn[] { col1, col2 };
transformer.init();

assertNull(transformer.transform(new MockInputRow().put(col1, "foo").put(col2, "bar")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,34 @@
import javax.inject.Inject;
import javax.inject.Named;

import org.apache.metamodel.query.Query;
import org.apache.metamodel.schema.ColumnType;
import org.datacleaner.api.Analyzer;
import org.datacleaner.api.ColumnProperty;
import org.datacleaner.api.Concurrent;
import org.datacleaner.api.Configured;
import org.datacleaner.api.Description;
import org.datacleaner.api.HasOutputDataStreams;
import org.datacleaner.api.InputColumn;
import org.datacleaner.api.InputRow;
import org.datacleaner.api.OutputDataStream;
import org.datacleaner.api.OutputRowCollector;
import org.datacleaner.api.Provided;
import org.datacleaner.job.output.OutputDataStreams;
import org.datacleaner.storage.CollectionFactory;
import org.datacleaner.storage.CollectionFactoryImpl;
import org.datacleaner.storage.InMemoryRowAnnotationFactory;
import org.datacleaner.storage.InMemoryStorageProvider;
import org.datacleaner.storage.RowAnnotationFactory;
import org.datacleaner.util.LabelUtils;
import org.datacleaner.util.NullTolerableComparator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Named("Value distribution")
@Description("Gets the distributions of values that occur in a dataset.\nOften used as an initial way to see if a lot of repeated values are to be expected, if nulls occur and if a few un-repeated values add exceptions to the typical usage-pattern.")
@Concurrent(true)
public class ValueDistributionAnalyzer implements Analyzer<ValueDistributionAnalyzerResult> {
public class ValueDistributionAnalyzer implements Analyzer<ValueDistributionAnalyzerResult>, HasOutputDataStreams {

public static final String PROPERTY_COLUMN = "Column";
public static final String PROPERTY_GROUP_COLUMN = "Group column";
Expand Down Expand Up @@ -93,6 +100,9 @@ public class ValueDistributionAnalyzer implements Analyzer<ValueDistributionAnal
RowAnnotationFactory _annotationFactory;

private final Map<String, ValueDistributionGroup> _valueDistributionGroups;
private final OutputDataStream outputDataStream = OutputDataStreams.pushDataStream("Distribution")
.withColumn("Value", ColumnType.STRING).withColumn(LabelUtils.COUNT_LABEL, ColumnType.INTEGER)
.toOutputDataStream();

/**
* Constructor used for testing and ad-hoc purposes
Expand Down Expand Up @@ -136,6 +146,17 @@ public ValueDistributionAnalyzer() {
NullTolerableComparator.get(String.class));
}

@Override
public OutputDataStream[] getOutputDataStreams() {
return new OutputDataStream[] { outputDataStream };
}

@Override
public void initializeOutputDataStream(OutputDataStream outputDataStream, Query query,
OutputRowCollector outputRowCollector) {
//TODO
}

@Override
public void run(InputRow row, int distinctCount) {
final Object value = row.getValue(_column);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,9 +312,9 @@ public void run(UpdateCallback callback) {
{
final DatastoreCatalog datastoreCatalog = new DatastoreCatalogImpl(datastoreIn);

final DataCleanerConfiguration configuration = new DataCleanerConfigurationImpl().withDatastoreCatalog(
datastoreCatalog).withEnvironment(
new DataCleanerEnvironmentImpl().withTaskRunner(new MultiThreadedTaskRunner(4)));
DataCleanerConfiguration configuration = new DataCleanerConfigurationImpl().withEnvironment(
new DataCleanerEnvironmentImpl().withTaskRunner(new MultiThreadedTaskRunner(4)))
.withDatastoreCatalog(datastoreCatalog);

final AnalysisJobBuilder ajb = new AnalysisJobBuilder(configuration);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,12 @@ public void testCreateProcessOrderedConsumerListWithMergedOutcomes() throws Exce
tjb1.setName("tjb1");

// 3: merge either the null or the trimmed value
TransformerComponentBuilder<CoalesceMultipleFieldsTransformer> coalesce = ajb.addTransformer(CoalesceMultipleFieldsTransformer.class);
TransformerComponentBuilder<CoalesceMultipleFieldsTransformer> coalesce = ajb
.addTransformer(CoalesceMultipleFieldsTransformer.class);
CoalesceUnit unit1 = new CoalesceUnit(tjb1.getOutputColumns().get(0));
CoalesceUnit unit2 = new CoalesceUnit(inputColumn);
coalesce.getComponentInstance().configureUsingCoalesceUnits(unit1, unit2);

MutableInputColumn<?> mergedColumn1 = coalesce.getOutputColumns().get(0);

// 4: add another filter (depends on merged output)
Expand All @@ -108,13 +109,12 @@ public void testCreateProcessOrderedConsumerListWithMergedOutcomes() throws Exce
assertEquals("ImmutableFilterJob[name=fjb1,filter=Mock filter]", consumers.get(0).getComponentJob().toString());
assertEquals("ImmutableTransformerJob[name=tjb1,transformer=Transformer mock]", consumers.get(1)
.getComponentJob().toString());
assertEquals(
"ImmutableTransformerJob[name=null,transformer=Fuse / Coalesce fields]",
consumers.get(2).getComponentJob().toString());
assertEquals("ImmutableTransformerJob[name=null,transformer=Fuse / Coalesce fields]", consumers.get(2)
.getComponentJob().toString());
assertEquals("ImmutableFilterJob[name=fjb2,filter=Mock filter]", consumers.get(3).getComponentJob().toString());
assertEquals("ImmutableAnalyzerJob[name=null,analyzer=String analyzer]", consumers.get(4).getComponentJob()
.toString());

ajb.close();
}

Expand Down Expand Up @@ -167,7 +167,7 @@ public void testCreateProcessOrderedConsumerListWithFilterDependencies() throws
assertEquals("ImmutableTransformerJob[name=tjb2,transformer=Transformer mock]", consumers.get(2)
.getComponentJob().toString());
assertEquals("ImmutableFilterJob[name=fjb2,filter=Mock filter]", consumers.get(3).getComponentJob().toString());

ajb.close();
}

Expand All @@ -179,8 +179,8 @@ public void testCreateProcessOrderedConsumerListChainedTransformers() throws Exc
ajb.getSourceColumns().get(0));
TransformerComponentBuilder<TransformerMock> tjb2 = ajb.addTransformer(TransformerMock.class).addInputColumn(
tjb1.getOutputColumns().get(0));
TransformerComponentBuilder<ConvertToStringTransformer> tjb3 = ajb.addTransformer(ConvertToStringTransformer.class)
.addInputColumn(tjb2.getOutputColumns().get(0));
TransformerComponentBuilder<ConvertToStringTransformer> tjb3 = ajb.addTransformer(
ConvertToStringTransformer.class).addInputColumn(tjb2.getOutputColumns().get(0));

ajb.addAnalyzer(StringAnalyzer.class).addInputColumn(ajb.getSourceColumns().get(0));
ajb.addAnalyzer(StringAnalyzer.class).addInputColumn(tjb3.getOutputColumns().get(0));
Expand Down Expand Up @@ -225,28 +225,29 @@ public void testCreateProcessOrderedConsumerListChainedTransformers() throws Exc

assertTrue(analyzerJob1found);
assertEquals(4, jobDependenciesFound);

ajb.close();
}

private List<RowProcessingConsumer> getConsumers(AnalysisJob analysisJob) {
List<RowProcessingConsumer> consumers = new ArrayList<RowProcessingConsumer>();
RowProcessingPublishers publishers = new RowProcessingPublishers(analysisJob,
null, null, null, null);
RowProcessingPublishers publishers = new RowProcessingPublishers(analysisJob, null, null, null, null);
RowProcessingPublisher publisher = publishers.getRowProcessingPublisher(analysisJob.getSourceColumns().get(0)
.getPhysicalColumn().getTable());

for (AnalyzerJob analyzerJob : analysisJob.getAnalyzerJobs()) {
RowProcessingConsumer consumer = new AnalyzerConsumer(analyzerJob.getDescriptor().newInstance(),
analyzerJob, analyzerJob.getInput(), publishers);
analyzerJob, analyzerJob.getInput(), publisher);
consumers.add(consumer);
}
for (TransformerJob transformerJob : analysisJob.getTransformerJobs()) {
RowProcessingConsumer consumer = new TransformerConsumer(transformerJob.getDescriptor().newInstance(),
transformerJob, transformerJob.getInput(), publishers);
transformerJob, transformerJob.getInput(), publisher);
consumers.add(consumer);
}
for (FilterJob filterJob : analysisJob.getFilterJobs()) {
FilterConsumer consumer = new FilterConsumer(filterJob.getDescriptor().newInstance(), filterJob,
filterJob.getInput(), publishers);
filterJob.getInput(), publisher);
consumers.add(consumer);
}

Expand Down
Loading

0 comments on commit 6846892

Please sign in to comment.