Skip to content

Commit

Permalink
Fixes datacleaner#583 by introducing an identifyable "RowProcessingSt…
Browse files Browse the repository at this point in the history
…ream" to hold

Table and AnalysisJob
  • Loading branch information
kaspersorensen committed Aug 27, 2015
1 parent 6751e8d commit 58556b2
Show file tree
Hide file tree
Showing 10 changed files with 446 additions and 148 deletions.
4 changes: 2 additions & 2 deletions api/src/main/java/org/datacleaner/api/OutputDataStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ public interface OutputDataStream extends HasName, Serializable {
public String getName();

/**
* Gets the logical {@link Table} objects that represent the format of the
* data that will be made available by the {@link HasOutputDataStreams}
* Gets the logical (or physical) {@link Table} objects that represent the
* format of the data that will be made available by the {@link DataStream}
*
* @return
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ public AnalysisJob getAnalysisJob() {

@Override
public ComponentMetrics getComponentMetrics(ComponentJob componentJob) {
Table table = getRowProcessingTable(componentJob);
RowProcessingMetrics rowProcessingMetrics = getRowProcessingMetrics(table);
final Table table = getRowProcessingTable(componentJob);
final RowProcessingMetrics rowProcessingMetrics = getRowProcessingMetrics(table);
return new ComponentMetricsImpl(rowProcessingMetrics, componentJob);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,14 @@ private List<RowProcessingConsumer> extractConsumers(AnalysisJob analysisJob,

final RowProcessingPublisher publisher;
if (rowConsumeConfiguration.table != null) {
publisher = rowProcessingPublishers.getRowProcessingPublisher(rowConsumeConfiguration.table);
if (publisher == null) {
@SuppressWarnings("deprecation")
RowProcessingPublisher tablePublisher = rowProcessingPublishers
.getRowProcessingPublisher(rowConsumeConfiguration.table);
if (tablePublisher == null) {
throw new IllegalArgumentException("Job does not consume records from table: "
+ rowConsumeConfiguration.table);
}
publisher = tablePublisher;
} else {
final Collection<RowProcessingPublisher> publisherCollection = rowProcessingPublishers
.getRowProcessingPublishers();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
import org.datacleaner.job.concurrent.ForkTaskListener;
import org.datacleaner.job.concurrent.JoinTaskListener;
import org.datacleaner.job.concurrent.RunNextTaskTaskListener;
import org.datacleaner.job.concurrent.SingleThreadedTaskRunner;
import org.datacleaner.job.concurrent.TaskListener;
import org.datacleaner.job.concurrent.TaskRunnable;
import org.datacleaner.job.concurrent.TaskRunner;
Expand All @@ -82,27 +81,25 @@ public final class RowProcessingPublisher {

private final RowProcessingPublisher _parentPublisher;
private final RowProcessingPublishers _publishers;
private final RowProcessingStream _stream;
private final TaskRunner _taskRunner;
private final Table _table;
private final AnalysisJob _analysisJob;
private final Set<Column> _physicalColumns = new LinkedHashSet<Column>();
private final List<RowProcessingConsumer> _consumers = new ArrayList<RowProcessingConsumer>();
private final LazyRef<RowProcessingQueryOptimizer> _queryOptimizerRef;
private final AtomicBoolean _successful = new AtomicBoolean(true);
private final Map<RowProcessingConsumer, ReferenceDataActivationManager> _referenceDataActivationManagers;
private final SourceColumnFinder _sourceColumnFinder;

/**
* Constructor to use for creating a {@link RowProcessingPublisher} which
* feeds data from a source datastore.
*
* @param publishers
* @param analysisJob
* @param table
* @param stream
* @param taskRunner
*/
public RowProcessingPublisher(RowProcessingPublishers publishers, AnalysisJob analysisJob, Table table,
TaskRunner taskRunner) {
this(publishers, null, analysisJob, table, taskRunner);
public RowProcessingPublisher(RowProcessingPublishers publishers, RowProcessingStream stream, TaskRunner taskRunner) {
this(publishers, null, stream, taskRunner);
}

/**
Expand All @@ -113,41 +110,38 @@ public RowProcessingPublisher(RowProcessingPublishers publishers, AnalysisJob an
* {@link RowProcessingPublisher} itself.
*
* @param parentPublisher
* @param analysisJob
* @param table
* @param stream
*/
public RowProcessingPublisher(RowProcessingPublisher parentPublisher, AnalysisJob analysisJob, Table table) {
this(parentPublisher._publishers, parentPublisher, analysisJob, table, new SingleThreadedTaskRunner());
public RowProcessingPublisher(RowProcessingPublisher parentPublisher, RowProcessingStream stream) {
this(parentPublisher._publishers, parentPublisher, stream, parentPublisher._taskRunner);
}

private RowProcessingPublisher(RowProcessingPublishers publishers, RowProcessingPublisher parentPublisher,
AnalysisJob analysisJob, Table table, TaskRunner taskRunner) {
RowProcessingStream stream, TaskRunner taskRunner) {
if (publishers == null) {
throw new IllegalArgumentException("RowProcessingPublishers cannot be null");
}
if (analysisJob == null) {
throw new IllegalArgumentException("AnalysisJob cannot be null");
if (stream == null) {
throw new IllegalArgumentException("RowProcessingStream cannot be null");
}
if (table == null) {
throw new IllegalArgumentException("Table cannot be null");
}
_analysisJob = analysisJob;
_parentPublisher = parentPublisher;
_publishers = publishers;
_table = table;
_stream = stream;
_taskRunner = taskRunner;
_sourceColumnFinder = new SourceColumnFinder();
_sourceColumnFinder.addSources(stream.getAnalysisJob());

_queryOptimizerRef = createQueryOptimizerRef();
_referenceDataActivationManagers = new IdentityHashMap<>();

final boolean aggressiveOptimizeSelectClause = SystemProperties.getBoolean(
SystemProperties.QUERY_SELECTCLAUSE_OPTIMIZE, false);
if (!aggressiveOptimizeSelectClause) {
final Collection<InputColumn<?>> sourceColumns = analysisJob.getSourceColumns();
final Collection<InputColumn<?>> sourceColumns = stream.getAnalysisJob().getSourceColumns();
final List<Column> columns = new ArrayList<Column>();
for (InputColumn<?> sourceColumn : sourceColumns) {
final Column column = sourceColumn.getPhysicalColumn();
if (column != null && table.equals(column.getTable())) {
if (column != null && _stream.getTable().equals(column.getTable())) {
columns.add(column);
}
}
Expand All @@ -168,8 +162,12 @@ public RowProcessingMetrics getRowProcessingMetrics() {
return metrics;
}

public RowProcessingStream getStream() {
return _stream;
}

public Table getTable() {
return _table;
return _stream.getTable();
}

/**
Expand All @@ -183,13 +181,13 @@ public Table getTable() {
* values will be retrieved.
*/
public void addPrimaryKeysIfSourced() {
final Column[] primaryKeyColumns = _table.getPrimaryKeys();
final Column[] primaryKeyColumns = getTable().getPrimaryKeys();
if (primaryKeyColumns == null || primaryKeyColumns.length == 0) {
logger.info("No primary keys defined for table {}, not pre-selecting primary keys", _table.getName());
logger.info("No primary keys defined for table {}, not pre-selecting primary keys", getTable().getName());
return;
}

final Collection<InputColumn<?>> sourceInputColumns = _analysisJob.getSourceColumns();
final Collection<InputColumn<?>> sourceInputColumns = getAnalysisJob().getSourceColumns();
final List<Column> sourceColumns = CollectionUtils.map(sourceInputColumns, new Func<InputColumn<?>, Column>() {
@Override
public Column eval(InputColumn<?> inputColumn) {
Expand All @@ -211,12 +209,12 @@ private LazyRef<RowProcessingQueryOptimizer> createQueryOptimizerRef() {
return new LazyRef<RowProcessingQueryOptimizer>() {
@Override
protected RowProcessingQueryOptimizer fetch() {
final Datastore datastore = _analysisJob.getDatastore();
final Datastore datastore = getAnalysisJob().getDatastore();
try (final DatastoreConnection con = datastore.openConnection()) {
final DataContext dataContext = con.getDataContext();

final Column[] columnArray = _physicalColumns.toArray(new Column[_physicalColumns.size()]);
final Query baseQuery = dataContext.query().from(_table).select(columnArray).toQuery();
final Query baseQuery = dataContext.query().from(getTable()).select(columnArray).toQuery();

logger.debug("Base query for row processing: {}", baseQuery);

Expand Down Expand Up @@ -260,9 +258,9 @@ public void onAllConsumersRegistered() {

public void addPhysicalColumns(Column... columns) {
for (Column column : columns) {
if (!_table.equals(column.getTable())) {
if (!getTable().equals(column.getTable())) {
throw new IllegalArgumentException("Column does not pertain to the correct table. Expected table: "
+ _table + ", actual table: " + column.getTable());
+ getTable() + ", actual table: " + column.getTable());
}
_physicalColumns.add(column);
}
Expand Down Expand Up @@ -305,7 +303,7 @@ public void processRows(RowProcessingMetrics rowProcessingMetrics) {
return;
}

analysisListener.rowProcessingSuccess(_analysisJob, rowProcessingMetrics);
analysisListener.rowProcessingSuccess(getAnalysisJob(), rowProcessingMetrics);
}

private boolean awaitProcessing(AnalysisListener listener) {
Expand All @@ -317,7 +315,7 @@ private boolean awaitProcessing(AnalysisListener listener) {
activeOutputDataStream.await();
} catch (InterruptedException e) {
logger.error("Unexpected error awaiting output data stream", e);
listener.errorUnknown(_analysisJob, e);
listener.errorUnknown(getAnalysisJob(), e);
return false;
}
}
Expand All @@ -338,10 +336,10 @@ private boolean processRowsFromQuery(AnalysisListener analysisListener, RowProce

final ConsumeRowHandler consumeRowHandler = createConsumeRowHandler();

final RowConsumerTaskListener taskListener = new RowConsumerTaskListener(_analysisJob, analysisListener,
final RowConsumerTaskListener taskListener = new RowConsumerTaskListener(getAnalysisJob(), analysisListener,
_taskRunner);

final Datastore datastore = _analysisJob.getDatastore();
final Datastore datastore = getAnalysisJob().getDatastore();

try (final DatastoreConnection con = datastore.openConnection()) {
final DataContext dataContext = con.getDataContext();
Expand Down Expand Up @@ -405,7 +403,7 @@ protected ConsumeRowHandler createConsumeRowHandler() {
final RowProcessingMetrics rowProcessingMetrics = getRowProcessingMetrics();
final ComponentMetrics metrics = rowProcessingMetrics.getAnalysisJobMetrics().getComponentMetrics(
componentJob);
analysisListener.componentBegin(_analysisJob, componentJob, metrics);
analysisListener.componentBegin(getAnalysisJob(), componentJob, metrics);

if (consumer instanceof TransformerConsumer) {
((TransformerConsumer) consumer).setRowIdGenerator(idGenerator);
Expand Down Expand Up @@ -513,8 +511,8 @@ public void runRowProcessing(Queue<JobAndResult> resultQueue, TaskListener finis
final TaskListener initFinishedListener = new RunNextTaskTaskListener(_taskRunner, runTask,
runCompletionListener);

final TaskListener consumerInitFinishedListener = new RunNextTaskTaskListener(_taskRunner, new FireRowProcessingBeginTask(this, rowProcessingMetrics), initFinishedListener);

final TaskListener consumerInitFinishedListener = new RunNextTaskTaskListener(_taskRunner,
new FireRowProcessingBeginTask(this, rowProcessingMetrics), initFinishedListener);

// kick off the initialization
initializeConsumers(consumerInitFinishedListener);
Expand Down Expand Up @@ -558,15 +556,17 @@ private Task createCollectResultTask(RowProcessingConsumer consumer, Queue<JobAn
if (component instanceof HasAnalyzerResult) {
final HasAnalyzerResult<?> hasAnalyzerResult = (HasAnalyzerResult<?>) component;
final AnalysisListener analysisListener = _publishers.getAnalysisListener();
return new CollectResultsTask(hasAnalyzerResult, _analysisJob, consumer.getComponentJob(), resultQueue,
return new CollectResultsTask(hasAnalyzerResult, getAnalysisJob(), consumer.getComponentJob(), resultQueue,
analysisListener);
}
return null;
}

private TaskRunnable createCloseTask(RowProcessingConsumer consumer, TaskListener closeTaskListener) {
final LifeCycleHelper lifeCycleHelper = getConsumerSpecificLifeCycleHelper(consumer);
return new TaskRunnable(null, new CloseTaskListener(lifeCycleHelper, consumer, _successful, closeTaskListener));
final CloseTaskListener taskListener = new CloseTaskListener(lifeCycleHelper, consumer, _successful,
closeTaskListener, _publishers.getAnalysisListener(), getAnalysisJob());
return new TaskRunnable(null, taskListener);
}

private TaskRunnable createInitTask(RowProcessingConsumer consumer, TaskListener listener) {
Expand All @@ -582,7 +582,7 @@ public LifeCycleHelper getConsumerSpecificLifeCycleHelper(RowProcessingConsumer
final ReferenceDataActivationManager referenceDataActivationManager = getConsumerSpecificReferenceDataActivationManager(
consumer, outerLifeCycleHelper);
final ContextAwareInjectionManager injectionManager = new ContextAwareInjectionManager(outerInjectionManager,
_analysisJob, consumer.getComponentJob(), _publishers.getAnalysisListener());
getAnalysisJob(), consumer.getComponentJob(), _publishers.getAnalysisListener());

final LifeCycleHelper lifeCycleHelper = new LifeCycleHelper(injectionManager, referenceDataActivationManager,
includeNonDistributedTasks);
Expand All @@ -601,7 +601,8 @@ private ReferenceDataActivationManager getConsumerSpecificReferenceDataActivatio

@Override
public String toString() {
return "RowProcessingPublisher[table=" + _table.getQualifiedLabel() + ", consumers=" + _consumers.size() + "]";
return "RowProcessingPublisher[table=" + getTable().getQualifiedLabel() + ", consumers=" + _consumers.size()
+ "]";
}

public AnalyzerJob[] getAnalyzerJobs() {
Expand All @@ -616,21 +617,26 @@ public AnalyzerJob[] getAnalyzerJobs() {
}

public AnalysisJob getAnalysisJob() {
return _analysisJob;
return _stream.getAnalysisJob();
}

public Datastore getDatastore() {
return _analysisJob.getDatastore();
return getAnalysisJob().getDatastore();
}

public boolean containsComponentJob(ComponentJob componentJob) {
final Collection<ComponentJob> components = RowProcessingPublishers.getAllComponents(getAnalysisJob());
return components.contains(componentJob);
}

public AnalysisListener getAnalysisListener() {
return _publishers.getAnalysisListener();
}

public SourceColumnFinder getSourceColumnFinder() {
return _publishers.getSourceColumnFinder();
return _sourceColumnFinder;
}

public ComponentJob[] getResultProducers() {
final List<ComponentJob> resultProducers = new ArrayList<ComponentJob>();
for (RowProcessingConsumer consumer : _consumers) {
Expand Down
Loading

0 comments on commit 58556b2

Please sign in to comment.