Skip to content

Commit

Permalink
TEZ-2740. Create a reconfigureVertex alias for deprecated setVertexPa…
Browse files Browse the repository at this point in the history
…rallelism API (bikas)
  • Loading branch information
Bikas Saha committed Aug 25, 2015
1 parent 24e17a4 commit a0c0727
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 0 deletions.
2 changes: 2 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ INCOMPATIBLE CHANGES
TEZ-2468. Change the minimum Java version to Java 7.

ALL CHANGES:
TEZ-2740. Create a reconfigureVertex alias for deprecated
setVertexParallelism API
TEZ-2690. Add critical path analyser
TEZ-2734. Add a test to verify the filename generated by OnDiskMerge.
TEZ-2732. DefaultSorter throws ArrayIndex exceptions on 2047 Mb size sort buffers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,43 @@ public void setVertexParallelism(int parallelism,
@Nullable Map<String, EdgeManagerPluginDescriptor> sourceEdgeManagers,
@Nullable Map<String, InputSpecUpdate> rootInputSpecUpdate);

/**
* API to reconfigure a {@link Vertex} that is reading root inputs based on
* the data read from the root inputs. Root inputs are external data sources
* that provide the initial data for the DAG and are added to the
* {@link Vertex} using the
* {@link Vertex#addDataSource(String, DataSourceDescriptor)} API. Typically,
* the parallelism of such vertices is determined at runtime by gathering
* information about the data source. This API may be used to set the
* parallelism of the vertex at runtime based on the data sources, as well as
* changing the specification for those inputs. In addition, changing
* parallelism is often accompanied by changing the {@link EdgeProperty} of
* the source {@link Edge} because event routing between source and
* destination tasks may need to be updated to account for the new task
* parallelism. This method can be called to update the parallelism multiple
* times until any of the tasks of the vertex have been scheduled (by invoking
* {@link #scheduleTasks(List)}. If needed, the original source edge
* properties may be obtained via {@link #getInputVertexEdgeProperties()}
*
* @param parallelism
* New number of tasks in the vertex
* @param locationHint
* the placement policy for tasks specified at
* {@link VertexLocationHint}s
* @param sourceEdgeProperties
* Map with Key=name of {@link Edge} to be updated and Value=
* {@link EdgeProperty}. The name of the Edge will be the
* corresponding source vertex name.
* @param rootInputSpecUpdate
* The key of the map is the name of the data source and the value is
* the updated {@link InputSpecUpdate} for that data source. If none
* specified, a default value is used. See {@link InputSpecUpdate}
* for details.
*/
public void reconfigureVertex(int parallelism,
@Nullable VertexLocationHint locationHint,
@Nullable Map<String, EdgeManagerPluginDescriptor> sourceEdgeProperties,
@Nullable Map<String, InputSpecUpdate> rootInputSpecUpdate);

/**
* API to reconfigure a {@link Vertex} by changing its task parallelism. Task
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,19 @@ public synchronized void setVertexParallelism(int parallelism, VertexLocationHin
}
}

@Override
public synchronized void reconfigureVertex(int parallelism, VertexLocationHint vertexLocationHint,
Map<String, EdgeManagerPluginDescriptor> sourceEdgeProperties,
Map<String, InputSpecUpdate> rootInputSpecUpdate) {
checkAndThrowIfDone();
try {
managedVertex.setParallelism(parallelism, vertexLocationHint, sourceEdgeProperties,
rootInputSpecUpdate, true);
} catch (AMUserCodeException e) {
throw new TezUncheckedException(e);
}
}

@Override
public synchronized void reconfigureVertex(int parallelism,
@Nullable VertexLocationHint locationHint,
Expand Down

0 comments on commit a0c0727

Please sign in to comment.