Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TEZ-4527: Add generic and pluggable hooks for DAGs and task attempts #324

Merged
merged 8 commits into from
Dec 22, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
TEZ-4527: Add generic and pluggable hooks for DAGs and task attempts
  • Loading branch information
okumin committed Dec 21, 2024
commit d92fac8994673736c3ef5d5c6dc0fcf7f1f3cf97
21 changes: 19 additions & 2 deletions tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -2297,12 +2297,14 @@ static Set<String> getPropertySet() {
public static final String TEZ_MRREADER_CONFIG_UPDATE_PROPERTIES = "tez.mrreader.config.update.properties";

/**
* Frequency at which thread dump should be captured. Supports TimeUnits.
* Frequency at which thread dump should be captured. Supports TimeUnits. This is effective only
* when org.apache.tez.dag.app.ThreadDumpDAGHook is configured to tez.am.hooks or
* org.apache.tez.runtime.task.ThreadDumpTaskAttemptHook is configured to tez.task.attempt.hooks.
*/
@ConfigurationScope(Scope.DAG)
@ConfigurationProperty
public static final String TEZ_THREAD_DUMP_INTERVAL = "tez.thread.dump.interval";
public static final String TEZ_THREAD_DUMP_INTERVAL_DEFAULT = "0ms";
public static final String TEZ_THREAD_DUMP_INTERVAL_DEFAULT = "100ms";
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we introduce pluggable hooks, I think we can change the default value. We may remove NOOP_TEZ_THREAD_DUMP_HELPER, too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense
I think for package clarity's sake, all the hook related configs can go a namespace that implies they're hooks:

tez.hook.thread.dump.internal

also:

TEZ_HOOK_THREAD_DUMP_INTERVAL

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can remove the NoopTezThreadDumpHelper

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


/**
* Limits the amount of data that can be written to LocalFileSystem by a Task.
Expand All @@ -2312,4 +2314,19 @@ static Set<String> getPropertySet() {
public static final String TEZ_TASK_LOCAL_FS_WRITE_LIMIT_BYTES = "tez.task.local-fs.write-limit.bytes";
public static final long TEZ_TASK_LOCAL_FS_WRITE_LIMIT_BYTES_DEFAULT = -1;

/**
* Comma-separated list of hook classes implementing org.apache.tez.runtime.hook.TezDAGHook.
* e.g. org.apache.tez.dag.app.ThreadDumpDAGHook
*/
@ConfigurationScope(Scope.AM)
@ConfigurationProperty
public static final String TEZ_AM_HOOKS = TEZ_AM_PREFIX + "hooks";

/**
* Comma-separated list of hook classes implementing org.apache.tez.runtime.hook.TezTaskAttemptHook.
* e.g. org.apache.tez.runtime.task.ThreadDumpTaskAttemptHook
*/
@ConfigurationScope(Scope.DAG)
@ConfigurationProperty
public static final String TEZ_TASK_ATTEMPT_HOOKS = TEZ_TASK_PREFIX + "attempt.hooks";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.apache.tez.runtime.hook;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.dag.records.TezDAGID;

/**
* A hook which is instantiated and triggered before and after a DAG is exeucted.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface TezDAGHook {
/**
* Invoked before the DAG starts.
*
* @param id the DAG id
* @param conf the conf
*/
void start(TezDAGID id, Configuration conf);

/**
* Invoked after the DAG finishes.
*/
void stop();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.apache.tez.runtime.hook;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.dag.records.TezTaskAttemptID;

/**
* A hook which is instantiated and triggered before and after a task attempt is executed.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface TezTaskAttemptHook {
/**
* Invoked before the task attempt starts.
*
* @param id the task attempt id
* @param conf the conf
*/
void start(TezTaskAttemptID id, Configuration conf);

/**
* Invoked after the task attempt finishes.
*/
void stop();
}
21 changes: 16 additions & 5 deletions tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.apache.tez.Utils;
import org.apache.tez.client.CallerContext;
import org.apache.tez.client.TezClientUtils;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.NamedEntityDescriptor;
import org.apache.tez.dag.api.SessionNotRunning;
Expand Down Expand Up @@ -187,7 +188,7 @@
import org.apache.tez.dag.utils.Simple2LevelVersionComparator;
import org.apache.tez.hadoop.shim.HadoopShim;
import org.apache.tez.hadoop.shim.HadoopShimsLoader;
import org.apache.tez.runtime.TezThreadDumpHelper;
import org.apache.tez.runtime.hook.TezDAGHook;
import org.apache.tez.util.LoggingUtils;
import org.apache.tez.util.TezMxBeanResourceCalculator;
import org.codehaus.jettison.json.JSONException;
Expand Down Expand Up @@ -343,7 +344,7 @@ public class DAGAppMaster extends AbstractService {
Map<Service, ServiceWithDependency> services =
new LinkedHashMap<Service, ServiceWithDependency>();
private ThreadLocalMap mdcContext;
private TezThreadDumpHelper tezThreadDumpHelper = TezThreadDumpHelper.NOOP_TEZ_THREAD_DUMP_HELPER;
private TezDAGHook[] hooks = {};

public DAGAppMaster(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
Expand Down Expand Up @@ -770,7 +771,9 @@ protected synchronized void handle(DAGAppMasterEvent event) {
"DAGAppMaster Internal Error occurred");
break;
case DAG_FINISHED:
tezThreadDumpHelper.stop();
for (TezDAGHook hook : hooks) {
hook.stop();
}
DAGAppMasterEventDAGFinished finishEvt =
(DAGAppMasterEventDAGFinished) event;
String timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Calendar.getInstance().getTime());
Expand Down Expand Up @@ -2227,7 +2230,9 @@ public Void run() throws Exception {
}

// Check if the thread dump service is up in any case, if yes attempt a shutdown
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove thread dump helper related comment, and change to a more generic one that tells we're about to stop hooks if they are running in any case

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done
943012a

tezThreadDumpHelper.stop();
for (TezDAGHook hook : hooks) {
hook.stop();
}

super.serviceStop();
}
Expand Down Expand Up @@ -2599,7 +2604,13 @@ private void countHeldContainers(DAG newDAG) {
private void startDAGExecution(DAG dag, final Map<String, LocalResource> additionalAmResources)
throws TezException {
currentDAG = dag;
tezThreadDumpHelper = TezThreadDumpHelper.getInstance(dag.getConf()).start(dag.getID().toString());
final Configuration conf = dag.getConf();
final String[] hookClasses = conf.getStrings(TezConfiguration.TEZ_AM_HOOKS, new String[0]);
final TezDAGHook[] hooks = new TezDAGHook[hookClasses.length];
for (int i = 0; i < hooks.length; i++) {
hooks[i] = ReflectionUtils.createClazzInstance(hookClasses[i]);
hooks[i].start(dag.getID(), conf);
}

// Try localizing the actual resources.
List<URL> additionalUrlsForClasspath;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.apache.tez.dag.app;

import org.apache.hadoop.conf.Configuration;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.runtime.TezThreadDumpHelper;
import org.apache.tez.runtime.hook.TezDAGHook;

/**
* A DAG hook which dumps thread information periodically.
*/
public class ThreadDumpDAGHook implements TezDAGHook {
private TezThreadDumpHelper helper;

@Override
public void start(TezDAGID id, Configuration conf) {
helper = TezThreadDumpHelper.getInstance(conf).start(id.toString());
}

@Override
public void stop() {
helper.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.log4j.helpers.ThreadLocalMap;
import org.apache.tez.common.ContainerContext;
import org.apache.tez.common.ContainerTask;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezExecutors;
import org.apache.tez.common.TezLocalResource;
Expand All @@ -69,10 +70,10 @@
import org.apache.tez.dag.utils.RelocalizationUtils;
import org.apache.tez.hadoop.shim.HadoopShim;
import org.apache.tez.hadoop.shim.HadoopShimsLoader;
import org.apache.tez.runtime.TezThreadDumpHelper;
import org.apache.tez.runtime.api.ExecutionContext;
import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl;
import org.apache.tez.runtime.hook.TezTaskAttemptHook;
import org.apache.tez.runtime.internals.api.TaskReporterInterface;
import org.apache.tez.util.LoggingUtils;

Expand Down Expand Up @@ -120,7 +121,6 @@ public class TezChild {
private final AtomicBoolean isShutdown = new AtomicBoolean(false);
private final String user;
private final boolean updateSysCounters;
private TezThreadDumpHelper tezThreadDumpHelper = TezThreadDumpHelper.NOOP_TEZ_THREAD_DUMP_HELPER;

private Multimap<String, String> startedInputsMap = HashMultimap.create();
private final boolean ownUmbilical;
Expand Down Expand Up @@ -295,7 +295,13 @@ public ContainerExecutionResult run() throws IOException, InterruptedException,
hadoopShim, sharedExecutor);

boolean shouldDie;
tezThreadDumpHelper = TezThreadDumpHelper.getInstance(taskConf).start(attemptId.toString());
final String[] hookClasses = taskConf
.getStrings(TezConfiguration.TEZ_TASK_ATTEMPT_HOOKS, new String[0]);
final TezTaskAttemptHook[] hooks = new TezTaskAttemptHook[hookClasses.length];
for (int i = 0; i < hooks.length; i++) {
hooks[i] = ReflectionUtils.createClazzInstance(hookClasses[i]);
hooks[i].start(attemptId, taskConf);
}
try {
TaskRunner2Result result = taskRunner.run();
LOG.info("TaskRunner2Result: {}", result);
Expand All @@ -314,7 +320,9 @@ public ContainerExecutionResult run() throws IOException, InterruptedException,
e, "TaskExecutionFailure: " + e.getMessage());
}
} finally {
tezThreadDumpHelper.stop();
for (TezTaskAttemptHook hook : hooks) {
hook.stop();
}
FileSystem.closeAllForUGI(childUGI);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.apache.tez.runtime.task;

import org.apache.hadoop.conf.Configuration;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.TezThreadDumpHelper;
import org.apache.tez.runtime.hook.TezTaskAttemptHook;

/**
* A task attempt hook which dumps thread information periodically.
*/
public class ThreadDumpTaskAttemptHook implements TezTaskAttemptHook {
private TezThreadDumpHelper helper;

@Override
public void start(TezTaskAttemptID id, Configuration conf) {
helper = TezThreadDumpHelper.getInstance(conf).start(id.toString());
}

@Override
public void stop() {
helper.stop();
}
}