Skip to content

Commit

Permalink
TEZ-3855. Allow vertex manager to send event to processor (zhiyuany)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zhiyuan Yang committed Nov 11, 2017
1 parent a246527 commit b96f79f
Show file tree
Hide file tree
Showing 9 changed files with 208 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.InputSpecUpdate;
import org.apache.tez.runtime.api.VertexStatistics;
import org.apache.tez.runtime.api.events.CustomProcessorEvent;
import org.apache.tez.runtime.api.events.InputDataInformationEvent;

import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -268,6 +270,17 @@ public void reconfigureVertex(@Nullable Map<String, InputSpecUpdate> rootInputSp
* task to which events need to be sent.
*/
public void addRootInputEvents(String inputName, Collection<InputDataInformationEvent> events);

/**
* Allows a VertexManagerPlugin to send events of custom payload to processor
* of a specific task of managed vertex
*
* It's up to user to make sure taskId is valid
*
* @param events events to be sent
* @param taskId id of a task of managed vertex
*/
public void sendEventToProcessor(Collection<CustomProcessorEvent> events, int taskId);

@Deprecated
/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.tez.runtime.api.events;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.tez.runtime.api.Event;

import java.nio.ByteBuffer;

public class CustomProcessorEvent extends Event {
private ByteBuffer payload;

/**
* Version number to indicate what app attempt generated this Event
*/
private int version;

private CustomProcessorEvent(ByteBuffer payload) {
this(payload, -1);
}

private CustomProcessorEvent(ByteBuffer payload, int version) {
this.payload = payload;
this.version = version;
}

public static CustomProcessorEvent create(ByteBuffer payload) {
return new CustomProcessorEvent(payload);
}

@Private
public static CustomProcessorEvent create(ByteBuffer payload, int version) {
return new CustomProcessorEvent(payload, version);
}

public ByteBuffer getPayload() {
return payload;
}

@Private
public void setVersion(int version) {
this.version = version;
}

public int getVersion() {
return version;
}
}
5 changes: 5 additions & 0 deletions tez-api/src/main/proto/Events.proto
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,8 @@ message RootInputInitializerEventProto {
optional string target_input_name = 2;
optional bytes user_payload = 3;
}

message CustomProcessorEventProto {
optional bytes user_payload = 1;
required int32 version = 2;
}
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@
import org.apache.tez.runtime.api.TaskAttemptIdentifier;
import org.apache.tez.runtime.api.VertexStatistics;
import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
import org.apache.tez.runtime.api.events.CustomProcessorEvent;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputDataInformationEvent;
import org.apache.tez.runtime.api.events.InputFailedEvent;
Expand Down Expand Up @@ -3884,6 +3885,17 @@ private void handleRoutedTezEvents(List<TezEvent> tezEvents, boolean isPendingEv
}
EventMetaData sourceMeta = tezEvent.getSourceInfo();
switch(tezEvent.getEventType()) {
case CUSTOM_PROCESSOR_EVENT:
{
// set version as app attempt id
((CustomProcessorEvent) tezEvent.getEvent()).setVersion(
appContext.getApplicationAttemptId().getAttemptId());
// route event to task
EventMetaData destinationMeta = tezEvent.getDestinationInfo();
Task targetTask = getTask(destinationMeta.getTaskAttemptID().getTaskID());
targetTask.registerTezEvent(tezEvent);
}
break;
case INPUT_FAILED_EVENT:
case DATA_MOVEMENT_EVENT:
case COMPOSITE_DATA_MOVEMENT_EVENT:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@

import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.tez.dag.app.dag.event.DAGEventInternalError;
import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.api.events.CustomProcessorEvent;
import org.apache.tez.runtime.api.impl.GroupInputSpec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -272,6 +275,29 @@ public TezEvent apply(InputDataInformationEvent riEvent) {
// Recovery handling is taken care of by the Vertex.
}

@Override
public void sendEventToProcessor(Collection<CustomProcessorEvent> events, int taskId) {
checkAndThrowIfDone();
Preconditions.checkArgument(taskId >= 0 && taskId < managedVertex.getTotalTasks(),
"Invalid taskId " + taskId + "; " + "There are " + managedVertex.getTotalTasks()
+ " tasks in total.");

if (events != null && events.size() > 0) {
List<TezEvent> tezEvents = new ArrayList<>();
for (CustomProcessorEvent event : events) {
TezEvent tezEvent = new TezEvent(event, null);
// use dummy task attempt id since this is not an task attempt specific event and task
// attempt id won't be used anyway
EventMetaData destinationMeta = new EventMetaData(EventProducerConsumerType.PROCESSOR,
managedVertex.getName(), managedVertex.getName(),
TezTaskAttemptID.getInstance(managedVertex.getTask(taskId).getTaskId(), -1));
tezEvent.setDestinationInfo(destinationMeta);
tezEvents.add(tezEvent);
}
appContext.getEventHandler().handle(
new VertexEventRouteEvent(managedVertex.getVertexId(), tezEvents));
}
}

@Override
public synchronized void setVertexLocationHint(VertexLocationHint locationHint) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@

package org.apache.tez.dag.app.dag.impl;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -56,8 +59,10 @@
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.CallableEvent;
import org.apache.tez.dag.app.dag.event.VertexEventInputDataInformation;
import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.TaskAttemptIdentifier;
import org.apache.tez.runtime.api.events.CustomProcessorEvent;
import org.apache.tez.runtime.api.events.InputDataInformationEvent;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.api.impl.GroupInputSpec;
Expand Down Expand Up @@ -214,10 +219,9 @@ public void testOnRootVertexInitialized2() throws Exception {
@Test(timeout = 5000)
public void testVMPluginCtxGetInputVertexGroup() throws Exception {
VertexManager vm =
new VertexManager(
VertexManagerPluginDescriptor.create(CustomVertexManager.class
.getName()), UserGroupInformation.getCurrentUser(),
mockVertex, mockAppContext, mock(StateChangeNotifier.class));
new VertexManager(VertexManagerPluginDescriptor.create(CustomVertexManager.class.getName()),
UserGroupInformation.getCurrentUser(), mockVertex, mockAppContext,
mock(StateChangeNotifier.class));

assertTrue(vm.pluginContext.getInputVertexGroups().isEmpty());

Expand All @@ -232,6 +236,51 @@ public void testVMPluginCtxGetInputVertexGroup() throws Exception {
assertTrue(groups.get(group).contains(v2));
}

@Test(timeout = 5000)
public void testSendCustomProcessorEvent() throws Exception {
VertexManager vm =
new VertexManager(VertexManagerPluginDescriptor.create(CustomVertexManager.class.getName()),
UserGroupInformation.getCurrentUser(), mockVertex, mockAppContext,
mock(StateChangeNotifier.class));
ArgumentCaptor<VertexEventRouteEvent> requestCaptor =
ArgumentCaptor.forClass(VertexEventRouteEvent.class);

when(mockVertex.getTotalTasks()).thenReturn(2);

List<CustomProcessorEvent> events = new ArrayList<>();
// task id too small, should fail
try {
vm.pluginContext.sendEventToProcessor(events, -1);
fail("Should fail for invalid task id");
} catch (IllegalArgumentException exception) {
assertTrue(exception.getMessage().contains("Invalid taskId"));
}
// task id too large, should fail
try {
vm.pluginContext.sendEventToProcessor(events, 10);
fail("Should fail for invalid task id");
} catch (IllegalArgumentException exception) {
assertTrue(exception.getMessage().contains("Invalid taskId"));
}

// null event, do nothing
vm.pluginContext.sendEventToProcessor(null, 0);
verify(mockHandler, never()).handle(requestCaptor.capture());

// empty event
vm.pluginContext.sendEventToProcessor(events, 1);
verify(mockHandler, never()).handle(requestCaptor.capture());

//events.add();
byte[] payload = new byte[] {1,2,3};
events.add(CustomProcessorEvent.create(ByteBuffer.wrap(payload)));
vm.pluginContext.sendEventToProcessor(events, 1);
verify(mockHandler, times(1)).handle(requestCaptor.capture());
CustomProcessorEvent cpe =
(CustomProcessorEvent)(requestCaptor.getValue().getEvents().get(0).getEvent());
assertArrayEquals(payload, cpe.getPayload().array());
}

public static class CustomVertexManager extends VertexManagerPlugin {

private Map<String,List<Event>> cachedEventMap = new HashMap<String, List<Event>>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.protobuf.ByteString;

import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
import org.apache.tez.runtime.api.events.CustomProcessorEvent;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.CompositeRoutedDataMovementEvent;
import org.apache.tez.runtime.api.events.EventProtos;
Expand All @@ -31,6 +32,23 @@

public class ProtoConverters {

public static EventProtos.CustomProcessorEventProto convertCustomProcessorEventToProto(
CustomProcessorEvent event) {
EventProtos.CustomProcessorEventProto.Builder builder =
EventProtos.CustomProcessorEventProto.newBuilder();
if (event.getPayload() != null) {
builder.setUserPayload(ByteString.copyFrom(event.getPayload()));
}
builder.setVersion(event.getVersion());
return builder.build();
}

public static CustomProcessorEvent convertCustomProcessorEventFromProto(
EventProtos.CustomProcessorEventProto proto) {
return CustomProcessorEvent.create(proto.getUserPayload() != null ?
proto.getUserPayload().asReadOnlyByteBuffer() : null, proto.getVersion());
}

public static EventProtos.DataMovementEventProto convertDataMovementEventToProto(
DataMovementEvent event) {
EventProtos.DataMovementEventProto.Builder builder =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,5 @@ public enum EventType {
COMPOSITE_DATA_MOVEMENT_EVENT,
ROOT_INPUT_INITIALIZER_EVENT,
COMPOSITE_ROUTED_DATA_MOVEMENT_EVENT,
CUSTOM_PROCESSOR_EVENT,
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
import org.apache.tez.runtime.api.events.CustomProcessorEvent;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.CompositeRoutedDataMovementEvent;
import org.apache.tez.runtime.api.events.EventProtos;
Expand Down Expand Up @@ -57,6 +58,8 @@
import com.google.protobuf.CodedInputStream;
import com.google.protobuf.CodedOutputStream;

import static org.apache.tez.runtime.api.events.EventProtos.*;

public class TezEvent implements Writable {

private EventType eventType;
Expand All @@ -82,6 +85,8 @@ public TezEvent(Event event, EventMetaData sourceInfo, long time) {
this.setSourceInfo(sourceInfo);
if (event instanceof DataMovementEvent) {
eventType = EventType.DATA_MOVEMENT_EVENT;
} else if (event instanceof CustomProcessorEvent) {
eventType = EventType.CUSTOM_PROCESSOR_EVENT;
} else if (event instanceof CompositeDataMovementEvent) {
eventType = EventType.COMPOSITE_DATA_MOVEMENT_EVENT;
} else if (event instanceof CompositeRoutedDataMovementEvent) {
Expand Down Expand Up @@ -157,6 +162,11 @@ private void serializeEvent(DataOutput out) throws IOException {
} else {
AbstractMessage message;
switch (eventType) {
case CUSTOM_PROCESSOR_EVENT:
message =
ProtoConverters.convertCustomProcessorEventToProto(
(CustomProcessorEvent) event);
break;
case DATA_MOVEMENT_EVENT:
message =
ProtoConverters.convertDataMovementEventToProto(
Expand Down Expand Up @@ -260,6 +270,11 @@ private void deserializeEvent(DataInput in) throws IOException {
}
input = CodedInputStream.newInstance(eventBytes, startOffset, eventBytesLen);
switch (eventType) {
case CUSTOM_PROCESSOR_EVENT:
CustomProcessorEventProto cpProto =
CustomProcessorEventProto.parseFrom(input);
event = ProtoConverters.convertCustomProcessorEventFromProto(cpProto);
break;
case DATA_MOVEMENT_EVENT:
DataMovementEventProto dmProto =
DataMovementEventProto.parseFrom(input);
Expand Down

0 comments on commit b96f79f

Please sign in to comment.