Skip to content

Commit

Permalink
[ZEPPELIN-5604] Interpreter scheduler may not be shutdown properly
Browse files Browse the repository at this point in the history
### What is this PR for?

The issue will cause the thread used by the interpreter scheduler is not released, so overtime the thread pool may be exhausted. The root cause is that some uncaught exception may happen when closing interpreter, which causes the interpreter scheduler shutdown is not invoked. This PR put the scheduler shutdown in the finally block, so that it is always called.

### What type of PR is it?
[Bug Fix]

### Todos
* [ ] - Task

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-5604

### How should this be tested?
* CI pass

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jeff Zhang <[email protected]>

Closes apache#4307 from zjffdu/ZEPPELIN-5604 and squashes the following commits:

02d9a61 [Jeff Zhang] [ZEPPELIN-5604] Interpreter scheduler may not be shutdown properly
  • Loading branch information
zjffdu committed Apr 1, 2022
1 parent 148129e commit f85d415
Showing 1 changed file with 14 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,29 +144,28 @@ private void close(Collection<Interpreter> interpreters) {

private void closeInterpreter(Interpreter interpreter) {
Scheduler scheduler = interpreter.getScheduler();

if (Boolean.parseBoolean(
interpreter.getProperty("zeppelin.interpreter.close.cancel_job", "true"))) {
for (final Job job : scheduler.getAllJobs()) {
if (!job.isTerminated()) {
job.abort();
job.setStatus(Job.Status.ABORT);
LOGGER.info("Job {} aborted ", job.getJobName());
try {
if (Boolean.parseBoolean(
interpreter.getProperty("zeppelin.interpreter.close.cancel_job", "true"))) {
for (final Job job : scheduler.getAllJobs()) {
if (!job.isTerminated()) {
job.abort();
job.setStatus(Job.Status.ABORT);
LOGGER.info("Job {} aborted ", job.getJobName());
}
}
} else {
LOGGER.info("Keep job running while closing interpreter: {}", interpreter.getClassName());
}
} else {
LOGGER.info("Keep job running while closing interpreter: {}", interpreter.getClassName());
}

try {
LOGGER.info("Trying to close interpreter {}", interpreter.getClassName());
interpreter.close();
} catch (InterpreterException e) {
LOGGER.warn("Fail to close interpreter {}", interpreter.getClassName(), e);
} finally {
//TODO(zjffdu) move the close of schedule to Interpreter
SchedulerFactory.singleton().removeScheduler(scheduler.getName());
}

//TODO(zjffdu) move the close of schedule to Interpreter
SchedulerFactory.singleton().removeScheduler(scheduler.getName());
}

public synchronized List<Interpreter> getOrCreateSession(String user, String sessionId) {
Expand Down

0 comments on commit f85d415

Please sign in to comment.