Skip to content

Commit

Permalink
TEZ-3267. Publish queue name to ATS as part of dag summary. Contribut…
Browse files Browse the repository at this point in the history
…ed by Harish Jaiprakash.
  • Loading branch information
sidseth committed Feb 9, 2017
1 parent 11815a7 commit 16b93de
Show file tree
Hide file tree
Showing 16 changed files with 72 additions and 29 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES

ALL CHANGES:

TEZ-3267. Publish queue name to ATS as part of dag summary.
TEZ-3609. Improve ATSv15 performance for DAG entities read calls.
TEZ-3244. Allow overlap of input and output memory when they are not concurrent
TEZ-3581. Add different logger to enable suppressing logs for specific lines.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class ATSConstants {
public static final String DAG_PLAN = "dagPlan";
public static final String DAG_NAME = "dagName";
public static final String DAG_STATE = "dagState";
public static final String DAG_SUBMITTED_QUEUE_NAME = "submittedQueueName";
public static final String DAG_AM_WEB_SERVICE_VERSION = "amWebServiceVersion";
public static final String RECOVERY_FAILURE_REASON = "recoveryFailureReason";
public static final String VERTEX_NAME = "vertexName";
Expand Down
11 changes: 10 additions & 1 deletion tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
Original file line number Diff line number Diff line change
Expand Up @@ -2578,7 +2578,7 @@ private void startDAG(DAGPlan dagPlan, Map<String, LocalResource> additionalAMRe
// for an app later
final DAGSubmittedEvent submittedEvent = new DAGSubmittedEvent(newDAG.getID(),
submitTime, dagPlan, this.appAttemptID, cumulativeAdditionalResources,
newDAG.getUserName(), newDAG.getConf(), containerLogs);
newDAG.getUserName(), newDAG.getConf(), containerLogs, getSubmittedQueueName());
boolean dagLoggingEnabled = newDAG.getConf().getBoolean(
TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED,
TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED_DEFAULT);
Expand Down Expand Up @@ -2671,6 +2671,15 @@ public Object run() throws Exception {
});
}

private String getSubmittedQueueName() {
// TODO: Replace this with constant once the yarn patch is backported. (JIRA: TEZ-3279)
String submittedQueueName = System.getenv("YARN_RESOURCEMANAGER_APPLICATION_QUEUE");
if (submittedQueueName == null) {
submittedQueueName = amConf.get(TezConfiguration.TEZ_QUEUE_NAME);
}
return submittedQueueName;
}

@SuppressWarnings("unchecked")
private void sendEvent(Event<?> event) {
dispatcher.getEventHandler().handle(event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,15 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
private boolean historyLoggingEnabled = true;
private Configuration conf;
private String containerLogs;
private String queueName;

public DAGSubmittedEvent() {
}

public DAGSubmittedEvent(TezDAGID dagID, long submitTime,
DAGProtos.DAGPlan dagPlan, ApplicationAttemptId applicationAttemptId,
Map<String, LocalResource> cumulativeAdditionalLocalResources,
String user, Configuration conf, String containerLogs) {
String user, Configuration conf, String containerLogs, String queueName) {
this.dagID = dagID;
this.dagName = dagPlan.getName();
this.submitTime = submitTime;
Expand All @@ -74,6 +75,7 @@ public DAGSubmittedEvent(TezDAGID dagID, long submitTime,
this.user = user;
this.conf = conf;
this.containerLogs = containerLogs;
this.queueName = queueName;
}

@Override
Expand All @@ -97,6 +99,9 @@ public DAGSubmittedProto toProto() {
.setApplicationAttemptId(applicationAttemptId.toString())
.setDagPlan(dagPlan)
.setSubmitTime(submitTime);
if (queueName != null) {
builder.setQueueName(queueName);
}
if (cumulativeAdditionalLocalResources != null && !cumulativeAdditionalLocalResources.isEmpty()) {
builder.setCumulativeAdditionalAmResources(DagTypeConverters
.convertFromLocalResources(cumulativeAdditionalLocalResources));
Expand All @@ -111,6 +116,9 @@ public void fromProto(DAGSubmittedProto proto) {
this.submitTime = proto.getSubmitTime();
this.applicationAttemptId = ConverterUtils.toApplicationAttemptId(
proto.getApplicationAttemptId());
if (proto.hasQueueName()) {
this.queueName = proto.getQueueName();
}
if (proto.hasCumulativeAdditionalAmResources()) {
this.cumulativeAdditionalLocalResources = DagTypeConverters.convertFromPlanLocalResources(proto
.getCumulativeAdditionalAmResources());
Expand All @@ -134,7 +142,8 @@ public void fromProtoStream(InputStream inputStream) throws IOException {
@Override
public String toString() {
return "dagID=" + dagID
+ ", submitTime=" + submitTime;
+ ", submitTime=" + submitTime
+ ", queueName=" + queueName;
}

@Override
Expand Down Expand Up @@ -203,4 +212,8 @@ public boolean isHistoryLoggingEnabled() {
public String getContainerLogs() {
return containerLogs;
}

public String getQueueName() {
return queueName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,9 @@ private static JSONObject convertDAGSubmittedEvent(DAGSubmittedEvent event) thro
otherInfo.put(ATSConstants.CALLER_CONTEXT_TYPE,
event.getDAGPlan().getCallerContext().getCallerType());
}
if (event.getQueueName() != null) {
otherInfo.put(ATSConstants.DAG_SUBMITTED_QUEUE_NAME, event.getQueueName());
}
jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);

return jsonObject;
Expand Down
1 change: 1 addition & 0 deletions tez-dag/src/main/proto/HistoryEvents.proto
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ message DAGSubmittedProto {
optional int64 submit_time = 3;
optional string application_attempt_id = 4;
optional PlanLocalResourcesProto cumulative_additional_am_resources = 5;
optional string queue_name = 6;
}

message DAGInitializedProto {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public void testSkipAllOtherEvents_1() throws IOException {
rService.start();
rService.handle(new DAGHistoryEvent(dagID,
new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
null, "user", new Configuration(), null)));
null, "user", new Configuration(), null, null)));
rService.handle(new DAGHistoryEvent(dagID,
new DAGInitializedEvent(dagID, 1L, "user", dagPlan.getName(), null)));
// only for testing, DAGCommitStartedEvent is not supposed to happen at this time.
Expand Down Expand Up @@ -215,7 +215,7 @@ public void testSkipAllOtherEvents_2() throws IOException {
rService.start();
rService.handle(new DAGHistoryEvent(dagID,
new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
null, "user", new Configuration(), null)));
null, "user", new Configuration(), null, null)));
rService.handle(new DAGHistoryEvent(dagID,
new DAGInitializedEvent(dagID, 1L, "user", dagPlan.getName(), null)));
rService.handle(new DAGHistoryEvent(dagID,
Expand Down Expand Up @@ -264,7 +264,7 @@ public void testLastCorruptedRecoveryRecord() throws IOException {
rService.start();
rService.handle(new DAGHistoryEvent(dagID,
new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
null, "user", new Configuration(), null)));
null, "user", new Configuration(), null, null)));
// wait until DAGSubmittedEvent is handled in the RecoveryEventHandling thread
rService.await();
rService.outputStreamMap.get(dagID).writeUTF("INVALID_DATA");
Expand Down Expand Up @@ -310,7 +310,7 @@ public void testLastCorruptedSummaryRecord() throws IOException {
// write a DAGSubmittedEvent first to initialize summaryStream
rService.handle(new DAGHistoryEvent(dagID,
new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
null, "user", new Configuration(), null)));
null, "user", new Configuration(), null, null)));
// write an corrupted SummaryEvent
rService.summaryStream.writeChars("INVALID_DATA");
rService.stop();
Expand Down Expand Up @@ -344,7 +344,7 @@ public void testRecoverableSummary_DAGInCommitting() throws IOException {
// write a DAGSubmittedEvent first to initialize summaryStream
rService.handle(new DAGHistoryEvent(dagID,
new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
null, "user", new Configuration(), null)));
null, "user", new Configuration(), null, null)));
// It should be fine to skip other events, just for testing.
rService.handle(new DAGHistoryEvent(dagID,
new DAGCommitStartedEvent(dagID, 0L)));
Expand Down Expand Up @@ -376,7 +376,7 @@ public void testRecoverableSummary_DAGFinishCommitting() throws IOException {
// write a DAGSubmittedEvent first to initialize summaryStream
rService.handle(new DAGHistoryEvent(dagID,
new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
null, "user", new Configuration(), null)));
null, "user", new Configuration(), null, null)));
// It should be fine to skip other events, just for testing.
rService.handle(new DAGHistoryEvent(dagID,
new DAGCommitStartedEvent(dagID, 0L)));
Expand Down Expand Up @@ -412,7 +412,7 @@ public void testRecoverableSummary_VertexInCommitting() throws IOException {
// write a DAGSubmittedEvent first to initialize summaryStream
rService.handle(new DAGHistoryEvent(dagID,
new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
null, "user", new Configuration(), null)));
null, "user", new Configuration(), null, null)));
// It should be fine to skip other events, just for testing.
rService.handle(new DAGHistoryEvent(dagID,
new VertexCommitStartedEvent(TezVertexID.getInstance(dagID, 0), 0L)));
Expand Down Expand Up @@ -445,7 +445,7 @@ public void testRecoverableSummary_VertexFinishCommitting() throws IOException {
// write a DAGSubmittedEvent first to initialize summaryStream
rService.handle(new DAGHistoryEvent(dagID,
new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
null, "user", new Configuration(), null)));
null, "user", new Configuration(), null, null)));
// It should be fine to skip other events, just for testing.
TezVertexID vertexId = TezVertexID.getInstance(dagID, 0);
rService.handle(new DAGHistoryEvent(dagID,
Expand Down Expand Up @@ -482,7 +482,7 @@ public void testRecoverableSummary_VertexGroupInCommitting() throws IOException
// write a DAGSubmittedEvent first to initialize summaryStream
rService.handle(new DAGHistoryEvent(dagID,
new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
null, "user", new Configuration(), null)));
null, "user", new Configuration(), null, null)));
// It should be fine to skip other events, just for testing.
rService.handle(new DAGHistoryEvent(dagID,
new VertexGroupCommitStartedEvent(dagID, "group_1",
Expand Down Expand Up @@ -516,7 +516,7 @@ public void testRecoverableSummary_VertexGroupFinishCommitting() throws IOExcept
// write a DAGSubmittedEvent first to initialize summaryStream
rService.handle(new DAGHistoryEvent(dagID,
new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
null, "user", new Configuration(), null)));
null, "user", new Configuration(), null, null)));
// It should be fine to skip other events, just for testing.
TezVertexID v0 = TezVertexID.getInstance(dagID, 0);
TezVertexID v1 = TezVertexID.getInstance(dagID, 1);
Expand Down Expand Up @@ -565,7 +565,7 @@ public void testRecoverableNonSummary1() throws IOException {
// write a DAGSubmittedEvent first to initialize summaryStream
rService.handle(new DAGHistoryEvent(dagID,
new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
null, "user", new Configuration(), null)));
null, "user", new Configuration(), null, null)));
// It should be fine to skip other events, just for testing.
TezVertexID vertexId = TezVertexID.getInstance(dagID, 0);
rService.handle(new DAGHistoryEvent(dagID,
Expand Down Expand Up @@ -601,7 +601,7 @@ public void testRecoverableNonSummary2() throws IOException {
// write a DAGSubmittedEvent first to initialize summaryStream
rService.handle(new DAGHistoryEvent(dagID,
new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
null, "user", new Configuration(), null)));
null, "user", new Configuration(), null, null)));
// It should be fine to skip other events, just for testing.
TezVertexID vertexId = TezVertexID.getInstance(dagID, 0);
rService.handle(new DAGHistoryEvent(dagID,
Expand Down Expand Up @@ -640,7 +640,7 @@ public void testRecoveryData() throws IOException {
// DAG DAGSubmittedEvent -> DAGInitializedEvent -> DAGStartedEvent
rService.handle(new DAGHistoryEvent(dagID,
new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1),
null, "user", new Configuration(), null)));
null, "user", new Configuration(), null, null)));
DAGInitializedEvent dagInitedEvent = new DAGInitializedEvent(dagID, 100L,
"user", "dagName", null);
DAGStartedEvent dagStartedEvent = new DAGStartedEvent(dagID, 0L, "user", "dagName");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ private List<DAGHistoryEvent> makeHistoryEvents(TezDAGID dagId, Configuration in
new AMStartedEvent(attemptId, time, user)));
historyEvents.add(new DAGHistoryEvent(dagId,
new DAGSubmittedEvent(dagId, time, DAGPlan.getDefaultInstance(), attemptId, null, user,
conf, null)));
conf, null, "default")));
TezVertexID vertexID = TezVertexID.getInstance(dagId, 1);
historyEvents.add(new DAGHistoryEvent(dagId,
new VertexStartedEvent(vertexID, time, time)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,13 @@ private void testAMStartedEvent() throws Exception {
logEvents(event, deserializedEvent);
}

private final String QUEUE_NAME = "TEST_QUEUE_NAME";
private void testDAGSubmittedEvent() throws Exception {
DAGSubmittedEvent event = new DAGSubmittedEvent(TezDAGID.getInstance(
ApplicationId.newInstance(0, 1), 1), 1001l,
DAGPlan.newBuilder().setName("foo").build(),
ApplicationAttemptId.newInstance(
ApplicationId.newInstance(0, 1), 1), null, "", null, null);
ApplicationId.newInstance(0, 1), 1), null, "", null, null, QUEUE_NAME);
DAGSubmittedEvent deserializedEvent = (DAGSubmittedEvent)
testProtoConversion(event);
Assert.assertEquals(event.getApplicationAttemptId(),
Expand All @@ -189,6 +190,7 @@ private void testDAGSubmittedEvent() throws Exception {
deserializedEvent.getSubmitTime());
Assert.assertEquals(event.getDAGPlan(),
deserializedEvent.getDAGPlan());
Assert.assertEquals(event.getQueueName(), deserializedEvent.getQueueName());
logEvents(event, deserializedEvent);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public void testHandlerExists() throws JSONException {
break;
case DAG_SUBMITTED:
event = new DAGSubmittedEvent(tezDAGID, random.nextInt(), dagPlan, applicationAttemptId,
null, user, null, null);
null, user, null, null, "Q_" + eventType.name());
break;
case DAG_INITIALIZED:
event = new DAGInitializedEvent(tezDAGID, random.nextInt(), user, dagPlan.getName(), null);
Expand Down Expand Up @@ -200,7 +200,15 @@ public void testHandlerExists() throws JSONException {
if (event == null || !event.isHistoryEvent()) {
continue;
}
HistoryEventJsonConversion.convertToJson(event);
JSONObject json = HistoryEventJsonConversion.convertToJson(event);
if (eventType == HistoryEventType.DAG_SUBMITTED) {
try {
Assert.assertEquals("Q_" + eventType.name(), json.getJSONObject(ATSConstants.OTHER_INFO)
.getString(ATSConstants.DAG_SUBMITTED_QUEUE_NAME));
} catch (JSONException ex) {
Assert.fail("Exception: " + ex.getMessage() + " for type: " + eventType);
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ public void testRecoveryFlushOnSummaryEvent() throws Exception {
DAGPlan dagPlan = DAGPlan.newBuilder().setName("test_dag").build();
// This writes to recovery immediately.
recoveryService.handle(new DAGHistoryEvent(dagId, new DAGSubmittedEvent(
dagId, startTime, dagPlan, appAttemptId, null, "nobody", conf, null)));
dagId, startTime, dagPlan, appAttemptId, null, "nobody", conf, null, "default")));
waitForDrain(-1);
verify(summaryFos, times(1)).hflush();
verify(dagFos, times(1)).hflush();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ public void testDagLoggingDisabled() throws Exception {
DAGPlan dagPlan = DAGPlan.newBuilder().setName("DAGPlanMock").build();
DAGSubmittedEvent submittedEvent = new DAGSubmittedEvent(tezDAGID,
1, dagPlan, appAttemptId, null,
"usr", tezConf, null);
"usr", tezConf, null, null);
submittedEvent.setHistoryLoggingEnabled(false);
DAGHistoryEvent event = new DAGHistoryEvent(tezDAGID, submittedEvent);
historyLoggingService.handle(new DAGHistoryEvent(tezDAGID, submittedEvent));
Expand Down Expand Up @@ -446,7 +446,7 @@ public void testDagLoggingEnabled() throws Exception {
DAGPlan dagPlan = DAGPlan.newBuilder().setName("DAGPlanMock").build();
DAGSubmittedEvent submittedEvent = new DAGSubmittedEvent(tezDAGID,
1, dagPlan, appAttemptId, null,
"usr", tezConf, null);
"usr", tezConf, null, null);
submittedEvent.setHistoryLoggingEnabled(true);
DAGHistoryEvent event = new DAGHistoryEvent(tezDAGID, submittedEvent);
historyLoggingService.handle(new DAGHistoryEvent(tezDAGID, submittedEvent));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ private List<DAGHistoryEvent> makeHistoryEvents(TezDAGID dagId,
new AMStartedEvent(attemptId, time, user)));
historyEvents.add(new DAGHistoryEvent(dagId,
new DAGSubmittedEvent(dagId, time, DAGPlan.getDefaultInstance(), attemptId, null, user,
conf, null)));
conf, null, "default")));
TezVertexID vertexID = TezVertexID.getInstance(dagId, 1);
historyEvents.add(new DAGHistoryEvent(dagId,
new VertexStartedEvent(vertexID, time, time)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,9 @@ private static TimelineEntity convertDAGSubmittedEvent(DAGSubmittedEvent event)
atsEntity.addOtherInfo(ATSConstants.CALLER_CONTEXT_TYPE,
event.getDAGPlan().getCallerContext().getCallerType());
}
if (event.getQueueName() != null) {
atsEntity.addOtherInfo(ATSConstants.DAG_SUBMITTED_QUEUE_NAME, event.getQueueName());
}

return atsEntity;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ private List<DAGHistoryEvent> makeHistoryEvents(TezDAGID dagId,
Configuration conf = new Configuration(service.getConfig());
historyEvents.add(new DAGHistoryEvent(null, new AMStartedEvent(attemptId, time, "user")));
historyEvents.add(new DAGHistoryEvent(dagId, new DAGSubmittedEvent(dagId, time,
DAGPlan.getDefaultInstance(), attemptId, null, "user", conf, null)));
DAGPlan.getDefaultInstance(), attemptId, null, "user", conf, null, "default")));
TezVertexID vertexID = TezVertexID.getInstance(dagId, 1);
historyEvents.add(new DAGHistoryEvent(dagId, new VertexStartedEvent(vertexID, time, time)));
TezTaskID tezTaskID = TezTaskID.getInstance(vertexID, 1);
Expand Down
Loading

0 comments on commit 16b93de

Please sign in to comment.