Skip to content

Commit

Permalink
[FLINK-2815] [REFACTOR] Remove Pact from class and file names since i…
Browse files Browse the repository at this point in the history
…t is no longer valid reference

Remove Pact word from class and file names in Apache Flink.
Pact was the name used in Stratosphere time to refer to concept of distributed datasets (similar to Flink Dataset). It was used when Pact and Nephele still separate concept.

As part of 0.10.0 release cleanup effort, let's remove the Pact names to avoid confusion.

The PR also contains small cleanups (sorry):
1. Small refactor DataSinkTask and DataSourceTask to follow Java7 generic convention creation new collection. Remove LOG.isDebugEnabled check.
2. Simple cleanup to update MapValue and TypeInformation with Java7 generic convention creation new collection.
3. Combine several exceptions that have same catch operation.

Apologize for the extra changes with PR. But I separated them into different commits for easier review.

Author: hsaputra <[email protected]>

Closes apache#1218 from hsaputra/remove_pact_name and squashes the following commits:

b3c55b4 [hsaputra] Rename RegularTask to BatchTask per review.
e278fac [hsaputra] Address review comments from chiwanpark (good catch).
9f92f33 [hsaputra] Remove Pact from the file names of teh flink-runtime and flink-clients modules.
dbb2175 [hsaputra] Simple cleanup to update MapValue with Java7 generic for new collection. Remove unused imports in CollectionsDataTypeTest.
df2f553 [hsaputra] Use Java7 style of type resolution for new collection.
6403d44 [hsaputra] Remove the word Pact from the Javadoc for ChainedDriver.
0c562f4 [hsaputra] Small refactor on DataSinkTask and DataSourceTask classes to keep up with modern Java practice.
  • Loading branch information
hsaputra committed Oct 6, 2015
1 parent e494c27 commit b08669a
Show file tree
Hide file tree
Showing 69 changed files with 320 additions and 359 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@
import org.apache.flink.client.program.PackagedProgram;


public class PactJobJSONServlet extends HttpServlet {
public class JobJSONServlet extends HttpServlet {

/** Serial UID for serialization interoperability. */
private static final long serialVersionUID = 558077298726449201L;

private static final Logger LOG = LoggerFactory.getLogger(PactJobJSONServlet.class);
private static final Logger LOG = LoggerFactory.getLogger(JobJSONServlet.class);

// ------------------------------------------------------------------------

Expand All @@ -50,7 +50,7 @@ public class PactJobJSONServlet extends HttpServlet {

private final File jobStoreDirectory; // the directory in which the jobs are stored

public PactJobJSONServlet(File jobStoreDirectory) {
public JobJSONServlet(File jobStoreDirectory) {
this.jobStoreDirectory = jobStoreDirectory;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public WebInterfaceServer(String configDir, Configuration config, int port) thro
CliFrontend cli = new CliFrontend(configDir);
ServletContextHandler servletContext = new ServletContextHandler(ServletContextHandler.SESSIONS);
servletContext.setContextPath("/");
servletContext.addServlet(new ServletHolder(new PactJobJSONServlet(uploadDir)), "/pactPlan");
servletContext.addServlet(new ServletHolder(new JobJSONServlet(uploadDir)), "/pactPlan");
servletContext.addServlet(new ServletHolder(new PlanDisplayServlet(jobManagerWebPort)), "/showPlan");
servletContext.addServlet(new ServletHolder(new JobsServlet(uploadDir, tmpDir, "launch.html")), "/jobs");
servletContext.addServlet(new ServletHolder(new JobSubmissionServlet(cli, uploadDir, planDumpDir)), "/runJob");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public abstract class TypeInformation<T> implements Serializable {
*/
public List<TypeInformation<?>> getGenericParameters() {
// Return an empty list as the default implementation
return new LinkedList<TypeInformation<?>>();
return new LinkedList<>();
}

/**
Expand Down
18 changes: 8 additions & 10 deletions flink-core/src/main/java/org/apache/flink/types/MapValue.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

/**
* Generic map base type for PACT programs that implements the Value and Map interfaces.
* PactMap encapsulates a Java HashMap object.
* The {@link MapValue} encapsulates a Java {@link HashMap} object.
*
* @see org.apache.flink.types.Value
* @see java.util.Map
Expand All @@ -56,10 +56,10 @@ public abstract class MapValue<K extends Value, V extends Value> implements Valu
* Initializes the encapsulated map with an empty HashMap.
*/
public MapValue() {
this.keyClass = ReflectionUtil.<K> getTemplateType1(this.getClass());
this.valueClass = ReflectionUtil.<V> getTemplateType2(this.getClass());
this.keyClass = ReflectionUtil.getTemplateType1(this.getClass());
this.valueClass = ReflectionUtil.getTemplateType2(this.getClass());

this.map = new HashMap<K, V>();
this.map = new HashMap<>();
}

/**
Expand All @@ -68,10 +68,10 @@ public MapValue() {
* @param map Map holding all entries with which the new encapsulated map is filled.
*/
public MapValue(Map<K, V> map) {
this.keyClass = ReflectionUtil.<K> getTemplateType1(this.getClass());
this.valueClass = ReflectionUtil.<V> getTemplateType2(this.getClass());
this.keyClass = ReflectionUtil.getTemplateType1(this.getClass());
this.valueClass = ReflectionUtil.getTemplateType2(this.getClass());

this.map = new HashMap<K, V>(map);
this.map = new HashMap<>(map);
}

@Override
Expand All @@ -87,9 +87,7 @@ public void read(final DataInputView in) throws IOException {
val.read(in);
this.map.put(key, val);
}
} catch (final InstantiationException e) {
throw new RuntimeException(e);
} catch (final IllegalAccessException e) {
} catch (final InstantiationException | IllegalAccessException e) {
throw new RuntimeException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,6 @@

import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
import org.apache.flink.types.DoubleValue;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.ListValue;
import org.apache.flink.types.MapValue;
import org.apache.flink.types.Pair;
import org.apache.flink.types.StringValue;
import org.junit.Before;
import org.junit.Test;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,28 +55,28 @@
import org.apache.flink.runtime.io.network.DataExchangeMode;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.iterative.convergence.WorksetEmptyConvergenceCriterion;
import org.apache.flink.runtime.iterative.task.IterationHeadPactTask;
import org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask;
import org.apache.flink.runtime.iterative.task.IterationHeadTask;
import org.apache.flink.runtime.iterative.task.IterationIntermediateTask;
import org.apache.flink.runtime.iterative.task.IterationSynchronizationSinkTask;
import org.apache.flink.runtime.iterative.task.IterationTailPactTask;
import org.apache.flink.runtime.iterative.task.IterationTailTask;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.InputFormatVertex;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.OutputFormatVertex;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.operators.BatchTask;
import org.apache.flink.runtime.operators.CoGroupDriver;
import org.apache.flink.runtime.operators.CoGroupWithSolutionSetFirstDriver;
import org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDriver;
import org.apache.flink.runtime.operators.DataSinkTask;
import org.apache.flink.runtime.operators.DataSourceTask;
import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.JoinDriver;
import org.apache.flink.runtime.operators.JoinWithSolutionSetFirstDriver;
import org.apache.flink.runtime.operators.JoinWithSolutionSetSecondDriver;
import org.apache.flink.runtime.operators.JoinDriver;
import org.apache.flink.runtime.operators.NoOpDriver;
import org.apache.flink.runtime.operators.RegularPactTask;
import org.apache.flink.runtime.operators.chaining.ChainedDriver;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.runtime.operators.util.LocalStrategy;
Expand Down Expand Up @@ -829,7 +829,7 @@ private JobVertex createSingleInputVertex(SingleInputPlanNode node) throws Compi
} else {
// create task vertex
vertex = new JobVertex(taskName);
vertex.setInvokableClass((this.currentIteration != null && node.isOnDynamicPath()) ? IterationIntermediatePactTask.class : RegularPactTask.class);
vertex.setInvokableClass((this.currentIteration != null && node.isOnDynamicPath()) ? IterationIntermediateTask.class : BatchTask.class);

config = new TaskConfig(vertex.getConfiguration());
config.setDriver(ds.getDriverClass());
Expand All @@ -854,7 +854,7 @@ private JobVertex createDualInputVertex(DualInputPlanNode node) throws CompilerE
final DriverStrategy ds = node.getDriverStrategy();
final JobVertex vertex = new JobVertex(taskName);
final TaskConfig config = new TaskConfig(vertex.getConfiguration());
vertex.setInvokableClass( (this.currentIteration != null && node.isOnDynamicPath()) ? IterationIntermediatePactTask.class : RegularPactTask.class);
vertex.setInvokableClass( (this.currentIteration != null && node.isOnDynamicPath()) ? IterationIntermediateTask.class : BatchTask.class);

// set user code
config.setStubWrapper(node.getProgramOperator().getUserCodeWrapper());
Expand Down Expand Up @@ -951,15 +951,15 @@ private JobVertex createBulkIterationHead(BulkPartialSolutionPlanNode pspn) {
}

// reset the vertex type to iteration head
headVertex.setInvokableClass(IterationHeadPactTask.class);
headVertex.setInvokableClass(IterationHeadTask.class);
headConfig = new TaskConfig(headVertex.getConfiguration());
toReturn = null;
} else {
// instantiate the head vertex and give it a no-op driver as the driver strategy.
// everything else happens in the post visit, after the input (the initial partial solution)
// is connected.
headVertex = new JobVertex("PartialSolution ("+iteration.getNodeName()+")");
headVertex.setInvokableClass(IterationHeadPactTask.class);
headVertex.setInvokableClass(IterationHeadTask.class);
headConfig = new TaskConfig(headVertex.getConfiguration());
headConfig.setDriver(NoOpDriver.class);
toReturn = headVertex;
Expand Down Expand Up @@ -1019,15 +1019,15 @@ private JobVertex createWorksetIterationHead(WorksetPlanNode wspn) {
}

// reset the vertex type to iteration head
headVertex.setInvokableClass(IterationHeadPactTask.class);
headVertex.setInvokableClass(IterationHeadTask.class);
headConfig = new TaskConfig(headVertex.getConfiguration());
toReturn = null;
} else {
// instantiate the head vertex and give it a no-op driver as the driver strategy.
// everything else happens in the post visit, after the input (the initial partial solution)
// is connected.
headVertex = new JobVertex("IterationHead("+iteration.getNodeName()+")");
headVertex.setInvokableClass(IterationHeadPactTask.class);
headVertex.setInvokableClass(IterationHeadTask.class);
headConfig = new TaskConfig(headVertex.getConfiguration());
headConfig.setDriver(NoOpDriver.class);
toReturn = headVertex;
Expand Down Expand Up @@ -1310,7 +1310,7 @@ private void finalizeBulkIteration(IterationDescriptor descr) {
// No following termination criterion
if (rootOfStepFunction.getOutgoingChannels().isEmpty()) {

rootOfStepFunctionVertex.setInvokableClass(IterationTailPactTask.class);
rootOfStepFunctionVertex.setInvokableClass(IterationTailTask.class);

tailConfig.setOutputSerializer(bulkNode.getSerializerForIterationChannel());
}
Expand All @@ -1337,7 +1337,7 @@ private void finalizeBulkIteration(IterationDescriptor descr) {
tailConfigOfTerminationCriterion = new TaskConfig(rootOfTerminationCriterionVertex.getConfiguration());
}

rootOfTerminationCriterionVertex.setInvokableClass(IterationTailPactTask.class);
rootOfTerminationCriterionVertex.setInvokableClass(IterationTailTask.class);
// Hack
tailConfigOfTerminationCriterion.setIsSolutionSetUpdate();
tailConfigOfTerminationCriterion.setOutputSerializer(bulkNode.getSerializerForIterationChannel());
Expand Down Expand Up @@ -1457,7 +1457,7 @@ private void finalizeWorksetIteration(IterationDescriptor descr) {
worksetTailConfig.setIsWorksetUpdate();

if (hasWorksetTail) {
nextWorksetVertex.setInvokableClass(IterationTailPactTask.class);
nextWorksetVertex.setInvokableClass(IterationTailTask.class);

worksetTailConfig.setOutputSerializer(iterNode.getWorksetSerializer());
}
Expand All @@ -1481,7 +1481,7 @@ private void finalizeWorksetIteration(IterationDescriptor descr) {
solutionDeltaConfig.setIsSolutionSetUpdate();

if (hasSolutionSetTail) {
solutionDeltaVertex.setInvokableClass(IterationTailPactTask.class);
solutionDeltaVertex.setInvokableClass(IterationTailTask.class);

solutionDeltaConfig.setOutputSerializer(iterNode.getSolutionSetSerializer());

Expand Down
2 changes: 1 addition & 1 deletion flink-runtime-web/src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
throw error to test failing scenarios. Logging those would overflow the log. -->
<!---->
<logger name="org.apache.flink.runtime.operators.DataSinkTask" level="OFF"/>
<logger name="org.apache.flink.runtime.operators.RegularPactTask" level="OFF"/>
<logger name="org.apache.flink.runtime.operators.BatchTask" level="OFF"/>
<logger name="org.apache.flink.runtime.client.JobClient" level="OFF"/>
<logger name="org.apache.flink.runtime.taskmanager.Task" level="OFF"/>
<logger name="org.apache.flink.runtime.jobmanager.JobManager" level="OFF"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.runtime.io.network.api.reader.MutableReader;
import org.apache.flink.runtime.operators.RegularPactTask;
import org.apache.flink.runtime.operators.BatchTask;

public class BroadcastVariableManager {

Expand All @@ -33,7 +33,7 @@ public class BroadcastVariableManager {

// --------------------------------------------------------------------------------------------

public <T> BroadcastVariableMaterialization<T, ?> materializeBroadcastVariable(String name, int superstep, RegularPactTask<?, ?> holder,
public <T> BroadcastVariableMaterialization<T, ?> materializeBroadcastVariable(String name, int superstep, BatchTask<?, ?> holder,
MutableReader<?> reader, TypeSerializerFactory<T> serializerFactory) throws IOException
{
final BroadcastVariableKey key = new BroadcastVariableKey(holder.getEnvironment().getJobVertexId(), name, superstep);
Expand Down Expand Up @@ -77,12 +77,12 @@ public class BroadcastVariableManager {
}


public void releaseReference(String name, int superstep, RegularPactTask<?, ?> referenceHolder) {
public void releaseReference(String name, int superstep, BatchTask<?, ?> referenceHolder) {
BroadcastVariableKey key = new BroadcastVariableKey(referenceHolder.getEnvironment().getJobVertexId(), name, superstep);
releaseReference(key, referenceHolder);
}

public void releaseReference(BroadcastVariableKey key, RegularPactTask<?, ?> referenceHolder) {
public void releaseReference(BroadcastVariableKey key, BatchTask<?, ?> referenceHolder) {
BroadcastVariableMaterialization<?, ?> mat = variables.get(key);

// release this reference
Expand All @@ -93,7 +93,7 @@ public void releaseReference(BroadcastVariableKey key, RegularPactTask<?, ?> ref
}


public void releaseAllReferencesFromTask(RegularPactTask<?, ?> referenceHolder) {
public void releaseAllReferencesFromTask(BatchTask<?, ?> referenceHolder) {
// go through all registered variables
for (Map.Entry<BroadcastVariableKey, BroadcastVariableMaterialization<?, ?>> entry : variables.entrySet()) {
BroadcastVariableMaterialization<?, ?> mat = entry.getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.runtime.io.network.api.reader.MutableReader;
import org.apache.flink.runtime.operators.RegularPactTask;
import org.apache.flink.runtime.operators.BatchTask;
import org.apache.flink.runtime.operators.util.ReaderIterator;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.slf4j.Logger;
Expand All @@ -44,7 +44,7 @@ public class BroadcastVariableMaterialization<T, C> {
private static final Logger LOG = LoggerFactory.getLogger(BroadcastVariableMaterialization.class);


private final Set<RegularPactTask<?, ?>> references = new HashSet<RegularPactTask<?,?>>();
private final Set<BatchTask<?, ?>> references = new HashSet<BatchTask<?,?>>();

private final Object materializationMonitor = new Object();

Expand All @@ -65,7 +65,7 @@ public BroadcastVariableMaterialization(BroadcastVariableKey key) {

// --------------------------------------------------------------------------------------------

public void materializeVariable(MutableReader<?> reader, TypeSerializerFactory<?> serializerFactory, RegularPactTask<?, ?> referenceHolder)
public void materializeVariable(MutableReader<?> reader, TypeSerializerFactory<?> serializerFactory, BatchTask<?, ?> referenceHolder)
throws MaterializationExpiredException, IOException
{
Preconditions.checkNotNull(reader);
Expand Down Expand Up @@ -156,15 +156,15 @@ public void materializeVariable(MutableReader<?> reader, TypeSerializerFactory<?
}
}

public boolean decrementReference(RegularPactTask<?, ?> referenceHolder) {
public boolean decrementReference(BatchTask<?, ?> referenceHolder) {
return decrementReferenceInternal(referenceHolder, true);
}

public boolean decrementReferenceIfHeld(RegularPactTask<?, ?> referenceHolder) {
public boolean decrementReferenceIfHeld(BatchTask<?, ?> referenceHolder) {
return decrementReferenceInternal(referenceHolder, false);
}

private boolean decrementReferenceInternal(RegularPactTask<?, ?> referenceHolder, boolean errorIfNoReference) {
private boolean decrementReferenceInternal(BatchTask<?, ?> referenceHolder, boolean errorIfNoReference) {
synchronized (references) {
if (disposed || references.isEmpty()) {
if (errorIfNoReference) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@

package org.apache.flink.runtime.iterative.concurrent;

import org.apache.flink.runtime.iterative.task.IterationHeadTask;
import org.apache.flink.runtime.iterative.task.IterationTailTask;

import java.util.concurrent.CountDownLatch;

/**
* Resettable barrier to synchronize the
* {@link org.apache.flink.runtime.iterative.task.IterationHeadPactTask} and
* the {@link org.apache.flink.runtime.iterative.task.IterationTailPactTask} in case of
* {@link IterationHeadTask} and
* the {@link IterationTailTask} in case of
* iterations that contain a separate solution set tail.
*/
public class SolutionSetUpdateBarrier {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@

package org.apache.flink.runtime.iterative.concurrent;

import org.apache.flink.runtime.iterative.task.IterationHeadTask;
import org.apache.flink.runtime.iterative.task.IterationTailTask;

/**
* Broker to hand over {@link SolutionSetUpdateBarrier} from
* {@link org.apache.flink.runtime.iterative.task.IterationHeadPactTask} to
* {@link org.apache.flink.runtime.iterative.task.IterationTailPactTask}.
* {@link IterationHeadTask} to
* {@link IterationTailTask}.
*/
public class SolutionSetUpdateBarrierBroker extends Broker<SolutionSetUpdateBarrier> {

Expand Down
Loading

0 comments on commit b08669a

Please sign in to comment.