Skip to content

Commit

Permalink
KYLIN-3617 Use job's cache in job scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
nichunen authored and shaofengshi committed Oct 15, 2018
1 parent 50f1758 commit 52307ba
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 16 deletions.
23 changes: 23 additions & 0 deletions core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Collections;
import java.util.List;
import java.util.NavigableSet;
import java.util.Set;

import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.JsonSerializer;
Expand Down Expand Up @@ -241,6 +242,10 @@ public List<ExecutableOutputPO> getJobOutputs(long timeStart, long timeEndExclus
}
}

public ExecutableOutputPO getJobOutputDigest(String uuid) {
return executableOutputDigestMap.get(uuid);
}

public List<ExecutableOutputPO> getJobOutputDigests(long timeStart, long timeEndExclusive) {
List<ExecutableOutputPO> jobOutputDigests = Lists.newArrayList();
for (ExecutableOutputPO po : executableOutputDigestMap.values()) {
Expand Down Expand Up @@ -268,6 +273,10 @@ public List<ExecutablePO> getJobs(long timeStart, long timeEndExclusive) throws
}
}

public ExecutablePO getJobDigest(String uuid) {
return executableDigestMap.get(uuid);
}

public List<ExecutablePO> getJobDigests(long timeStart, long timeEndExclusive) {
List<ExecutablePO> jobDigests = Lists.newArrayList();
for (ExecutablePO po : executableDigestMap.values()) {
Expand All @@ -277,6 +286,11 @@ public List<ExecutablePO> getJobDigests(long timeStart, long timeEndExclusive) {
return jobDigests;
}

public List<String> getJobIdsInCache() {
Set<String> idSet = executableDigestMap.keySet();
return Lists.newArrayList(idSet);
}

public List<String> getJobIds() throws PersistentException {
try {
NavigableSet<String> resources = store.listResources(ResourceStore.EXECUTE_RESOURCE_ROOT);
Expand Down Expand Up @@ -391,4 +405,13 @@ public void deleteJobOutput(String uuid) throws PersistentException {
throw new PersistentException(e);
}
}

public void reloadAll() throws IOException {
try (AutoReadWriteLock.AutoLock lock = executableDigestMapLock.lockForWrite()) {
executableDigestCrud.reloadAll();
}
try (AutoReadWriteLock.AutoLock lock = executableOutputDigestMapLock.lockForWrite()) {
executableOutputDigestCrud.reloadAll();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@ public AbstractExecutable getJob(String uuid) {
}
}

public AbstractExecutable getJobDigest(String uuid) {
return parseTo(executableDao.getJobDigest(uuid));
}

public Output getOutput(String uuid) {
try {
final ExecutableOutputPO jobOutput = executableDao.getJobOutput(uuid);
Expand All @@ -166,6 +170,12 @@ public Output getOutput(String uuid) {
}
}

public Output getOutputDigest(String uuid) {
final ExecutableOutputPO jobOutput = executableDao.getJobOutputDigest(uuid);
Preconditions.checkArgument(jobOutput != null, "there is no related output for job id:" + uuid);
return parseOutput(jobOutput);
}

private DefaultOutput parseOutput(ExecutableOutputPO jobOutput) {
final DefaultOutput result = new DefaultOutput();
result.setExtra(jobOutput.getInfo());
Expand Down Expand Up @@ -286,6 +296,10 @@ public void updateAllRunningJobsToError() {
}
}

public List<String> getAllJobIdsInCache() {
return executableDao.getJobIdsInCache();
}

public void resumeAllRunningJobs() {
try {
final List<ExecutableOutputPO> jobOutputs = executableDao.getJobOutputs();
Expand Down Expand Up @@ -439,6 +453,10 @@ public void updateJobOutput(String jobId, ExecutableState newStatus, Map<String,
}
}

public void reloadAll() throws IOException {
executableDao.reloadAll();
}

public void forceKillJob(String jobId) {
try {
final ExecutableOutputPO jobOutput = executableDao.getJobOutput(jobId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ synchronized public void run() {
}

int nRunning = 0, nReady = 0, nStopped = 0, nOthers = 0, nError = 0, nDiscarded = 0, nSUCCEED = 0;
for (final String id : executableManager.getAllJobIds()) {
for (final String id : executableManager.getAllJobIdsInCache()) {
if (isJobPoolFull()) {
return;
}
Expand All @@ -60,16 +60,16 @@ synchronized public void run() {
continue;
}

final Output output = executableManager.getOutput(id);
if ((output.getState() != ExecutableState.READY)) {
final Output outputDigest = executableManager.getOutputDigest(id);
if ((outputDigest.getState() != ExecutableState.READY)) {
// logger.debug("Job id:" + id + " not runnable");
if (output.getState() == ExecutableState.SUCCEED) {
if (outputDigest.getState() == ExecutableState.SUCCEED) {
nSUCCEED++;
} else if (output.getState() == ExecutableState.ERROR) {
} else if (outputDigest.getState() == ExecutableState.ERROR) {
nError++;
} else if (output.getState() == ExecutableState.DISCARDED) {
} else if (outputDigest.getState() == ExecutableState.DISCARDED) {
nDiscarded++;
} else if (output.getState() == ExecutableState.STOPPED) {
} else if (outputDigest.getState() == ExecutableState.STOPPED) {
nStopped++;
} else {
if (fetchFailed) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ public static synchronized void destroyInstance() {
// ============================================================================

private JobLock jobLock;
private ExecutableManager executableManager;
private FetcherRunner fetcher;
private ScheduledExecutorService fetcherPool;
private ExecutorService jobPool;
Expand All @@ -95,6 +94,10 @@ public DefaultScheduler() {
}
}

public ExecutableManager getExecutableManager() {
return ExecutableManager.getInstance(jobEngineConfig.getConfig());
}

public FetcherRunner getFetcherRunner() {
return fetcher;
}
Expand Down Expand Up @@ -159,7 +162,6 @@ public synchronized void init(JobEngineConfig jobEngineConfig, JobLock lock) thr
throw new IllegalStateException("Cannot start job scheduler due to lack of job lock");
}

executableManager = ExecutableManager.getInstance(jobEngineConfig.getConfig());
//load all executable, set them to a consistent status
fetcherPool = Executors.newScheduledThreadPool(1);
int corePoolSize = jobEngineConfig.getMaxConcurrentJobLimit();
Expand All @@ -168,6 +170,7 @@ public synchronized void init(JobEngineConfig jobEngineConfig, JobLock lock) thr
context = new DefaultContext(Maps.<String, Executable> newConcurrentMap(), jobEngineConfig.getConfig());

logger.info("Staring resume all running jobs.");
ExecutableManager executableManager = getExecutableManager();
executableManager.resumeAllRunningJobs();
logger.info("Finishing resume all running jobs.");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,23 +86,23 @@ synchronized public void run() {
}

int nRunning = 0, nReady = 0, nStopped = 0, nOthers = 0, nError = 0, nDiscarded = 0, nSUCCEED = 0;
for (final String id : executableManager.getAllJobIds()) {
for (final String id : executableManager.getAllJobIdsInCache()) {
if (runningJobs.containsKey(id)) {
// logger.debug("Job id:" + id + " is already running");
nRunning++;
continue;
}

final Output output = executableManager.getOutput(id);
if ((output.getState() != ExecutableState.READY)) {
final Output outputDigest = executableManager.getOutputDigest(id);
if ((outputDigest.getState() != ExecutableState.READY)) {
// logger.debug("Job id:" + id + " not runnable");
if (output.getState() == ExecutableState.SUCCEED) {
if (outputDigest.getState() == ExecutableState.SUCCEED) {
nSUCCEED++;
} else if (output.getState() == ExecutableState.ERROR) {
} else if (outputDigest.getState() == ExecutableState.ERROR) {
nError++;
} else if (output.getState() == ExecutableState.DISCARDED) {
} else if (outputDigest.getState() == ExecutableState.DISCARDED) {
nDiscarded++;
} else if (output.getState() == ExecutableState.STOPPED) {
} else if (outputDigest.getState() == ExecutableState.STOPPED) {
nStopped++;
} else {
if (fetchFailed) {
Expand Down

0 comments on commit 52307ba

Please sign in to comment.