diff --git a/CHANGES.txt b/CHANGES.txt index 0949339569..215cb08fad 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3267. Publish queue name to ATS as part of dag summary. TEZ-3609. Improve ATSv15 performance for DAG entities read calls. TEZ-3244. Allow overlap of input and output memory when they are not concurrent TEZ-3581. Add different logger to enable suppressing logs for specific lines. diff --git a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java index c56582ca6b..03c9fa1876 100644 --- a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java +++ b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java @@ -55,6 +55,7 @@ public class ATSConstants { public static final String DAG_PLAN = "dagPlan"; public static final String DAG_NAME = "dagName"; public static final String DAG_STATE = "dagState"; + public static final String DAG_SUBMITTED_QUEUE_NAME = "submittedQueueName"; public static final String DAG_AM_WEB_SERVICE_VERSION = "amWebServiceVersion"; public static final String RECOVERY_FAILURE_REASON = "recoveryFailureReason"; public static final String VERTEX_NAME = "vertexName"; diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index eaaf18b3a1..7f27064cda 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -2578,7 +2578,7 @@ private void startDAG(DAGPlan dagPlan, Map additionalAMRe // for an app later final DAGSubmittedEvent submittedEvent = new DAGSubmittedEvent(newDAG.getID(), submitTime, dagPlan, this.appAttemptID, cumulativeAdditionalResources, - newDAG.getUserName(), newDAG.getConf(), containerLogs); + newDAG.getUserName(), newDAG.getConf(), containerLogs, getSubmittedQueueName()); boolean dagLoggingEnabled = newDAG.getConf().getBoolean( TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED, TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED_DEFAULT); @@ -2671,6 +2671,15 @@ public Object run() throws Exception { }); } + private String getSubmittedQueueName() { + // TODO: Replace this with constant once the yarn patch is backported. (JIRA: TEZ-3279) + String submittedQueueName = System.getenv("YARN_RESOURCEMANAGER_APPLICATION_QUEUE"); + if (submittedQueueName == null) { + submittedQueueName = amConf.get(TezConfiguration.TEZ_QUEUE_NAME); + } + return submittedQueueName; + } + @SuppressWarnings("unchecked") private void sendEvent(Event event) { dispatcher.getEventHandler().handle(event); diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java index 07d7c07ed7..1b1fdf38a9 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java @@ -57,6 +57,7 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent { private boolean historyLoggingEnabled = true; private Configuration conf; private String containerLogs; + private String queueName; public DAGSubmittedEvent() { } @@ -64,7 +65,7 @@ public DAGSubmittedEvent() { public DAGSubmittedEvent(TezDAGID dagID, long submitTime, DAGProtos.DAGPlan dagPlan, ApplicationAttemptId applicationAttemptId, Map cumulativeAdditionalLocalResources, - String user, Configuration conf, String containerLogs) { + String user, Configuration conf, String containerLogs, String queueName) { this.dagID = dagID; this.dagName = dagPlan.getName(); this.submitTime = submitTime; @@ -74,6 +75,7 @@ public DAGSubmittedEvent(TezDAGID dagID, long submitTime, this.user = user; this.conf = conf; this.containerLogs = containerLogs; + this.queueName = queueName; } @Override @@ -97,6 +99,9 @@ public DAGSubmittedProto toProto() { .setApplicationAttemptId(applicationAttemptId.toString()) .setDagPlan(dagPlan) .setSubmitTime(submitTime); + if (queueName != null) { + builder.setQueueName(queueName); + } if (cumulativeAdditionalLocalResources != null && !cumulativeAdditionalLocalResources.isEmpty()) { builder.setCumulativeAdditionalAmResources(DagTypeConverters .convertFromLocalResources(cumulativeAdditionalLocalResources)); @@ -111,6 +116,9 @@ public void fromProto(DAGSubmittedProto proto) { this.submitTime = proto.getSubmitTime(); this.applicationAttemptId = ConverterUtils.toApplicationAttemptId( proto.getApplicationAttemptId()); + if (proto.hasQueueName()) { + this.queueName = proto.getQueueName(); + } if (proto.hasCumulativeAdditionalAmResources()) { this.cumulativeAdditionalLocalResources = DagTypeConverters.convertFromPlanLocalResources(proto .getCumulativeAdditionalAmResources()); @@ -134,7 +142,8 @@ public void fromProtoStream(InputStream inputStream) throws IOException { @Override public String toString() { return "dagID=" + dagID - + ", submitTime=" + submitTime; + + ", submitTime=" + submitTime + + ", queueName=" + queueName; } @Override @@ -203,4 +212,8 @@ public boolean isHistoryLoggingEnabled() { public String getContainerLogs() { return containerLogs; } + + public String getQueueName() { + return queueName; + } } diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java index a767fbf7b6..69c40e4780 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java @@ -515,6 +515,9 @@ private static JSONObject convertDAGSubmittedEvent(DAGSubmittedEvent event) thro otherInfo.put(ATSConstants.CALLER_CONTEXT_TYPE, event.getDAGPlan().getCallerContext().getCallerType()); } + if (event.getQueueName() != null) { + otherInfo.put(ATSConstants.DAG_SUBMITTED_QUEUE_NAME, event.getQueueName()); + } jsonObject.put(ATSConstants.OTHER_INFO, otherInfo); return jsonObject; diff --git a/tez-dag/src/main/proto/HistoryEvents.proto b/tez-dag/src/main/proto/HistoryEvents.proto index ff3707d193..76714697a4 100644 --- a/tez-dag/src/main/proto/HistoryEvents.proto +++ b/tez-dag/src/main/proto/HistoryEvents.proto @@ -54,6 +54,7 @@ message DAGSubmittedProto { optional int64 submit_time = 3; optional string application_attempt_id = 4; optional PlanLocalResourcesProto cumulative_additional_am_resources = 5; + optional string queue_name = 6; } message DAGInitializedProto { diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java index f4edf9e79b..6673b39e5b 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java @@ -169,7 +169,7 @@ public void testSkipAllOtherEvents_1() throws IOException { rService.start(); rService.handle(new DAGHistoryEvent(dagID, new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1), - null, "user", new Configuration(), null))); + null, "user", new Configuration(), null, null))); rService.handle(new DAGHistoryEvent(dagID, new DAGInitializedEvent(dagID, 1L, "user", dagPlan.getName(), null))); // only for testing, DAGCommitStartedEvent is not supposed to happen at this time. @@ -215,7 +215,7 @@ public void testSkipAllOtherEvents_2() throws IOException { rService.start(); rService.handle(new DAGHistoryEvent(dagID, new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1), - null, "user", new Configuration(), null))); + null, "user", new Configuration(), null, null))); rService.handle(new DAGHistoryEvent(dagID, new DAGInitializedEvent(dagID, 1L, "user", dagPlan.getName(), null))); rService.handle(new DAGHistoryEvent(dagID, @@ -264,7 +264,7 @@ public void testLastCorruptedRecoveryRecord() throws IOException { rService.start(); rService.handle(new DAGHistoryEvent(dagID, new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1), - null, "user", new Configuration(), null))); + null, "user", new Configuration(), null, null))); // wait until DAGSubmittedEvent is handled in the RecoveryEventHandling thread rService.await(); rService.outputStreamMap.get(dagID).writeUTF("INVALID_DATA"); @@ -310,7 +310,7 @@ public void testLastCorruptedSummaryRecord() throws IOException { // write a DAGSubmittedEvent first to initialize summaryStream rService.handle(new DAGHistoryEvent(dagID, new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1), - null, "user", new Configuration(), null))); + null, "user", new Configuration(), null, null))); // write an corrupted SummaryEvent rService.summaryStream.writeChars("INVALID_DATA"); rService.stop(); @@ -344,7 +344,7 @@ public void testRecoverableSummary_DAGInCommitting() throws IOException { // write a DAGSubmittedEvent first to initialize summaryStream rService.handle(new DAGHistoryEvent(dagID, new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1), - null, "user", new Configuration(), null))); + null, "user", new Configuration(), null, null))); // It should be fine to skip other events, just for testing. rService.handle(new DAGHistoryEvent(dagID, new DAGCommitStartedEvent(dagID, 0L))); @@ -376,7 +376,7 @@ public void testRecoverableSummary_DAGFinishCommitting() throws IOException { // write a DAGSubmittedEvent first to initialize summaryStream rService.handle(new DAGHistoryEvent(dagID, new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1), - null, "user", new Configuration(), null))); + null, "user", new Configuration(), null, null))); // It should be fine to skip other events, just for testing. rService.handle(new DAGHistoryEvent(dagID, new DAGCommitStartedEvent(dagID, 0L))); @@ -412,7 +412,7 @@ public void testRecoverableSummary_VertexInCommitting() throws IOException { // write a DAGSubmittedEvent first to initialize summaryStream rService.handle(new DAGHistoryEvent(dagID, new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1), - null, "user", new Configuration(), null))); + null, "user", new Configuration(), null, null))); // It should be fine to skip other events, just for testing. rService.handle(new DAGHistoryEvent(dagID, new VertexCommitStartedEvent(TezVertexID.getInstance(dagID, 0), 0L))); @@ -445,7 +445,7 @@ public void testRecoverableSummary_VertexFinishCommitting() throws IOException { // write a DAGSubmittedEvent first to initialize summaryStream rService.handle(new DAGHistoryEvent(dagID, new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1), - null, "user", new Configuration(), null))); + null, "user", new Configuration(), null, null))); // It should be fine to skip other events, just for testing. TezVertexID vertexId = TezVertexID.getInstance(dagID, 0); rService.handle(new DAGHistoryEvent(dagID, @@ -482,7 +482,7 @@ public void testRecoverableSummary_VertexGroupInCommitting() throws IOException // write a DAGSubmittedEvent first to initialize summaryStream rService.handle(new DAGHistoryEvent(dagID, new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1), - null, "user", new Configuration(), null))); + null, "user", new Configuration(), null, null))); // It should be fine to skip other events, just for testing. rService.handle(new DAGHistoryEvent(dagID, new VertexGroupCommitStartedEvent(dagID, "group_1", @@ -516,7 +516,7 @@ public void testRecoverableSummary_VertexGroupFinishCommitting() throws IOExcept // write a DAGSubmittedEvent first to initialize summaryStream rService.handle(new DAGHistoryEvent(dagID, new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1), - null, "user", new Configuration(), null))); + null, "user", new Configuration(), null, null))); // It should be fine to skip other events, just for testing. TezVertexID v0 = TezVertexID.getInstance(dagID, 0); TezVertexID v1 = TezVertexID.getInstance(dagID, 1); @@ -565,7 +565,7 @@ public void testRecoverableNonSummary1() throws IOException { // write a DAGSubmittedEvent first to initialize summaryStream rService.handle(new DAGHistoryEvent(dagID, new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1), - null, "user", new Configuration(), null))); + null, "user", new Configuration(), null, null))); // It should be fine to skip other events, just for testing. TezVertexID vertexId = TezVertexID.getInstance(dagID, 0); rService.handle(new DAGHistoryEvent(dagID, @@ -601,7 +601,7 @@ public void testRecoverableNonSummary2() throws IOException { // write a DAGSubmittedEvent first to initialize summaryStream rService.handle(new DAGHistoryEvent(dagID, new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1), - null, "user", new Configuration(), null))); + null, "user", new Configuration(), null, null))); // It should be fine to skip other events, just for testing. TezVertexID vertexId = TezVertexID.getInstance(dagID, 0); rService.handle(new DAGHistoryEvent(dagID, @@ -640,7 +640,7 @@ public void testRecoveryData() throws IOException { // DAG DAGSubmittedEvent -> DAGInitializedEvent -> DAGStartedEvent rService.handle(new DAGHistoryEvent(dagID, new DAGSubmittedEvent(dagID, 1L, dagPlan, ApplicationAttemptId.newInstance(appId, 1), - null, "user", new Configuration(), null))); + null, "user", new Configuration(), null, null))); DAGInitializedEvent dagInitedEvent = new DAGInitializedEvent(dagID, 100L, "user", "dagName", null); DAGStartedEvent dagStartedEvent = new DAGStartedEvent(dagID, 0L, "user", "dagName"); diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/TestHistoryEventHandler.java b/tez-dag/src/test/java/org/apache/tez/dag/history/TestHistoryEventHandler.java index 4c0fe3f987..5a71a426ee 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/history/TestHistoryEventHandler.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/history/TestHistoryEventHandler.java @@ -219,7 +219,7 @@ private List makeHistoryEvents(TezDAGID dagId, Configuration in new AMStartedEvent(attemptId, time, user))); historyEvents.add(new DAGHistoryEvent(dagId, new DAGSubmittedEvent(dagId, time, DAGPlan.getDefaultInstance(), attemptId, null, user, - conf, null))); + conf, null, "default"))); TezVertexID vertexID = TezVertexID.getInstance(dagId, 1); historyEvents.add(new DAGHistoryEvent(dagId, new VertexStartedEvent(vertexID, time, time))); diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java index 3d29a5d75a..47d8389e34 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java @@ -171,12 +171,13 @@ private void testAMStartedEvent() throws Exception { logEvents(event, deserializedEvent); } + private final String QUEUE_NAME = "TEST_QUEUE_NAME"; private void testDAGSubmittedEvent() throws Exception { DAGSubmittedEvent event = new DAGSubmittedEvent(TezDAGID.getInstance( ApplicationId.newInstance(0, 1), 1), 1001l, DAGPlan.newBuilder().setName("foo").build(), ApplicationAttemptId.newInstance( - ApplicationId.newInstance(0, 1), 1), null, "", null, null); + ApplicationId.newInstance(0, 1), 1), null, "", null, null, QUEUE_NAME); DAGSubmittedEvent deserializedEvent = (DAGSubmittedEvent) testProtoConversion(event); Assert.assertEquals(event.getApplicationAttemptId(), @@ -189,6 +190,7 @@ private void testDAGSubmittedEvent() throws Exception { deserializedEvent.getSubmitTime()); Assert.assertEquals(event.getDAGPlan(), deserializedEvent.getDAGPlan()); + Assert.assertEquals(event.getQueueName(), deserializedEvent.getQueueName()); logEvents(event, deserializedEvent); } diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java index 9477118225..1bbecd3e4e 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java @@ -124,7 +124,7 @@ public void testHandlerExists() throws JSONException { break; case DAG_SUBMITTED: event = new DAGSubmittedEvent(tezDAGID, random.nextInt(), dagPlan, applicationAttemptId, - null, user, null, null); + null, user, null, null, "Q_" + eventType.name()); break; case DAG_INITIALIZED: event = new DAGInitializedEvent(tezDAGID, random.nextInt(), user, dagPlan.getName(), null); @@ -200,7 +200,15 @@ public void testHandlerExists() throws JSONException { if (event == null || !event.isHistoryEvent()) { continue; } - HistoryEventJsonConversion.convertToJson(event); + JSONObject json = HistoryEventJsonConversion.convertToJson(event); + if (eventType == HistoryEventType.DAG_SUBMITTED) { + try { + Assert.assertEquals("Q_" + eventType.name(), json.getJSONObject(ATSConstants.OTHER_INFO) + .getString(ATSConstants.DAG_SUBMITTED_QUEUE_NAME)); + } catch (JSONException ex) { + Assert.fail("Exception: " + ex.getMessage() + " for type: " + eventType); + } + } } } diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java b/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java index 3dec1d7624..790e2d8059 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/history/recovery/TestRecoveryService.java @@ -333,7 +333,7 @@ public void testRecoveryFlushOnSummaryEvent() throws Exception { DAGPlan dagPlan = DAGPlan.newBuilder().setName("test_dag").build(); // This writes to recovery immediately. recoveryService.handle(new DAGHistoryEvent(dagId, new DAGSubmittedEvent( - dagId, startTime, dagPlan, appAttemptId, null, "nobody", conf, null))); + dagId, startTime, dagPlan, appAttemptId, null, "nobody", conf, null, "default"))); waitForDrain(-1); verify(summaryFos, times(1)).hflush(); verify(dagFos, times(1)).hflush(); diff --git a/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java b/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java index 6b3ebd7411..8e5c95c36f 100644 --- a/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java +++ b/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java @@ -401,7 +401,7 @@ public void testDagLoggingDisabled() throws Exception { DAGPlan dagPlan = DAGPlan.newBuilder().setName("DAGPlanMock").build(); DAGSubmittedEvent submittedEvent = new DAGSubmittedEvent(tezDAGID, 1, dagPlan, appAttemptId, null, - "usr", tezConf, null); + "usr", tezConf, null, null); submittedEvent.setHistoryLoggingEnabled(false); DAGHistoryEvent event = new DAGHistoryEvent(tezDAGID, submittedEvent); historyLoggingService.handle(new DAGHistoryEvent(tezDAGID, submittedEvent)); @@ -446,7 +446,7 @@ public void testDagLoggingEnabled() throws Exception { DAGPlan dagPlan = DAGPlan.newBuilder().setName("DAGPlanMock").build(); DAGSubmittedEvent submittedEvent = new DAGSubmittedEvent(tezDAGID, 1, dagPlan, appAttemptId, null, - "usr", tezConf, null); + "usr", tezConf, null, null); submittedEvent.setHistoryLoggingEnabled(true); DAGHistoryEvent event = new DAGHistoryEvent(tezDAGID, submittedEvent); historyLoggingService.handle(new DAGHistoryEvent(tezDAGID, submittedEvent)); diff --git a/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java index 9111195fb7..cbded35556 100644 --- a/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java +++ b/tez-plugins/tez-yarn-timeline-history-with-fs/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSV15HistoryLoggingService.java @@ -450,7 +450,7 @@ private List makeHistoryEvents(TezDAGID dagId, new AMStartedEvent(attemptId, time, user))); historyEvents.add(new DAGHistoryEvent(dagId, new DAGSubmittedEvent(dagId, time, DAGPlan.getDefaultInstance(), attemptId, null, user, - conf, null))); + conf, null, "default"))); TezVertexID vertexID = TezVertexID.getInstance(dagId, 1); historyEvents.add(new DAGHistoryEvent(dagId, new VertexStartedEvent(vertexID, time, time))); diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java index 96239c3ae1..8d0c547a6c 100644 --- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java +++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java @@ -423,6 +423,9 @@ private static TimelineEntity convertDAGSubmittedEvent(DAGSubmittedEvent event) atsEntity.addOtherInfo(ATSConstants.CALLER_CONTEXT_TYPE, event.getDAGPlan().getCallerContext().getCallerType()); } + if (event.getQueueName() != null) { + atsEntity.addOtherInfo(ATSConstants.DAG_SUBMITTED_QUEUE_NAME, event.getQueueName()); + } return atsEntity; } diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java index da57eb28ef..a641cdab96 100644 --- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java +++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java @@ -444,7 +444,7 @@ private List makeHistoryEvents(TezDAGID dagId, Configuration conf = new Configuration(service.getConfig()); historyEvents.add(new DAGHistoryEvent(null, new AMStartedEvent(attemptId, time, "user"))); historyEvents.add(new DAGHistoryEvent(dagId, new DAGSubmittedEvent(dagId, time, - DAGPlan.getDefaultInstance(), attemptId, null, "user", conf, null))); + DAGPlan.getDefaultInstance(), attemptId, null, "user", conf, null, "default"))); TezVertexID vertexID = TezVertexID.getInstance(dagId, 1); historyEvents.add(new DAGHistoryEvent(dagId, new VertexStartedEvent(vertexID, time, time))); TezTaskID tezTaskID = TezTaskID.getInstance(vertexID, 1); diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java index 62fb33586b..bb189d34ce 100644 --- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java +++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java @@ -145,7 +145,7 @@ public void testHandlerExists() throws JSONException { break; case DAG_SUBMITTED: event = new DAGSubmittedEvent(tezDAGID, random.nextInt(), dagPlan, applicationAttemptId, - null, user, null, containerLogs); + null, user, null, containerLogs, null); break; case DAG_INITIALIZED: event = new DAGInitializedEvent(tezDAGID, random.nextInt(), user, dagPlan.getName(), null); @@ -473,8 +473,9 @@ public void testConvertDAGStartedEvent() { public void testConvertDAGSubmittedEvent() { long submitTime = random.nextLong(); + final String queueName = "TEST_DAG_SUBMITTED"; DAGSubmittedEvent event = new DAGSubmittedEvent(tezDAGID, submitTime, dagPlan, - applicationAttemptId, null, user, null, containerLogs); + applicationAttemptId, null, user, null, containerLogs, queueName); TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType()); @@ -509,7 +510,7 @@ public void testConvertDAGSubmittedEvent() { Assert.assertTrue( timelineEntity.getPrimaryFilters().get(ATSConstants.USER).contains(user)); - Assert.assertEquals(8, timelineEntity.getOtherInfo().size()); + Assert.assertEquals(9, timelineEntity.getOtherInfo().size()); Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.DAG_PLAN)); Assert.assertEquals(applicationId.toString(), timelineEntity.getOtherInfo().get(ATSConstants.APPLICATION_ID)); @@ -530,7 +531,8 @@ public void testConvertDAGSubmittedEvent() { Assert.assertEquals( timelineEntity.getOtherInfo().get(ATSConstants.CALLER_CONTEXT_TYPE), dagPlan.getCallerContext().getCallerType()); - + Assert.assertEquals( + queueName, timelineEntity.getOtherInfo().get(ATSConstants.DAG_SUBMITTED_QUEUE_NAME)); }