From a0c0727dda85841417326c319a1bc29fd22c88ab Mon Sep 17 00:00:00 2001 From: Bikas Saha Date: Tue, 25 Aug 2015 14:18:44 -0700 Subject: [PATCH] TEZ-2740. Create a reconfigureVertex alias for deprecated setVertexParallelism API (bikas) --- CHANGES.txt | 2 + .../dag/api/VertexManagerPluginContext.java | 37 +++++++++++++++++++ .../tez/dag/app/dag/impl/VertexManager.java | 13 +++++++ 3 files changed, 52 insertions(+) diff --git a/CHANGES.txt b/CHANGES.txt index d2c39e9256..ff7571394d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -10,6 +10,8 @@ INCOMPATIBLE CHANGES TEZ-2468. Change the minimum Java version to Java 7. ALL CHANGES: + TEZ-2740. Create a reconfigureVertex alias for deprecated + setVertexParallelism API TEZ-2690. Add critical path analyser TEZ-2734. Add a test to verify the filename generated by OnDiskMerge. TEZ-2732. DefaultSorter throws ArrayIndex exceptions on 2047 Mb size sort buffers diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java index 883387b6aa..242bceea14 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java @@ -167,6 +167,43 @@ public void setVertexParallelism(int parallelism, @Nullable Map sourceEdgeManagers, @Nullable Map rootInputSpecUpdate); + /** + * API to reconfigure a {@link Vertex} that is reading root inputs based on + * the data read from the root inputs. Root inputs are external data sources + * that provide the initial data for the DAG and are added to the + * {@link Vertex} using the + * {@link Vertex#addDataSource(String, DataSourceDescriptor)} API. Typically, + * the parallelism of such vertices is determined at runtime by gathering + * information about the data source. This API may be used to set the + * parallelism of the vertex at runtime based on the data sources, as well as + * changing the specification for those inputs. In addition, changing + * parallelism is often accompanied by changing the {@link EdgeProperty} of + * the source {@link Edge} because event routing between source and + * destination tasks may need to be updated to account for the new task + * parallelism. This method can be called to update the parallelism multiple + * times until any of the tasks of the vertex have been scheduled (by invoking + * {@link #scheduleTasks(List)}. If needed, the original source edge + * properties may be obtained via {@link #getInputVertexEdgeProperties()} + * + * @param parallelism + * New number of tasks in the vertex + * @param locationHint + * the placement policy for tasks specified at + * {@link VertexLocationHint}s + * @param sourceEdgeProperties + * Map with Key=name of {@link Edge} to be updated and Value= + * {@link EdgeProperty}. The name of the Edge will be the + * corresponding source vertex name. + * @param rootInputSpecUpdate + * The key of the map is the name of the data source and the value is + * the updated {@link InputSpecUpdate} for that data source. If none + * specified, a default value is used. See {@link InputSpecUpdate} + * for details. + */ + public void reconfigureVertex(int parallelism, + @Nullable VertexLocationHint locationHint, + @Nullable Map sourceEdgeProperties, + @Nullable Map rootInputSpecUpdate); /** * API to reconfigure a {@link Vertex} by changing its task parallelism. Task diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java index 247b92f490..bb512a9a03 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java @@ -172,6 +172,19 @@ public synchronized void setVertexParallelism(int parallelism, VertexLocationHin } } + @Override + public synchronized void reconfigureVertex(int parallelism, VertexLocationHint vertexLocationHint, + Map sourceEdgeProperties, + Map rootInputSpecUpdate) { + checkAndThrowIfDone(); + try { + managedVertex.setParallelism(parallelism, vertexLocationHint, sourceEdgeProperties, + rootInputSpecUpdate, true); + } catch (AMUserCodeException e) { + throw new TezUncheckedException(e); + } + } + @Override public synchronized void reconfigureVertex(int parallelism, @Nullable VertexLocationHint locationHint,