Skip to content

Commit

Permalink
BAEL-1456 - How to implement task prioritization in Java (eugenp#3366)
Browse files Browse the repository at this point in the history
* priority based job execution in java

* minor fixes

* updated to use java 8 features
  • Loading branch information
rockoder authored and pivovarit committed Jan 30, 2018
1 parent bdae4fe commit 32e309a
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.baeldung.concurrent.prioritytaskexecution;

public class Job implements Runnable {
private String jobName;
private JobPriority jobPriority;

public Job(String jobName, JobPriority jobPriority) {
this.jobName = jobName;
this.jobPriority = jobPriority != null ? jobPriority : JobPriority.MEDIUM;
}

public JobPriority getJobPriority() {
return jobPriority;
}

@Override
public void run() {
try {
System.out.println("Job:" + jobName + " Priority:" + jobPriority);
Thread.sleep(1000);
} catch (InterruptedException ignored) {
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.baeldung.concurrent.prioritytaskexecution;

public enum JobPriority {
HIGH,
MEDIUM,
LOW
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package com.baeldung.concurrent.prioritytaskexecution;

import java.util.Comparator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;

public class PriorityJobScheduler {

private ExecutorService priorityJobPoolExecutor;
private ExecutorService priorityJobScheduler;
private PriorityBlockingQueue<Runnable> priorityQueue;

public PriorityJobScheduler(Integer poolSize, Integer queueSize) {
priorityJobPoolExecutor = Executors.newFixedThreadPool(poolSize);
Comparator<? super Job> jobComparator = Comparator.comparing(Job::getJobPriority);
priorityQueue = new PriorityBlockingQueue<Runnable>(queueSize,
(Comparator<? super Runnable>) jobComparator);

priorityJobScheduler = Executors.newSingleThreadExecutor();
priorityJobScheduler.execute(()->{
while (true) {
try {
priorityJobPoolExecutor.execute(priorityQueue.take());
} catch (InterruptedException e) {
break;
}
}
});
}

public void scheduleJob(Job job) {
priorityQueue.add(job);
}

public int getQueuedTaskCount() {
return priorityQueue.size();
}

protected void close(ExecutorService scheduler) {
scheduler.shutdown();
try {
if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
scheduler.shutdownNow();
}
} catch (InterruptedException e) {
scheduler.shutdownNow();
}
}

public void closeScheduler() {
close(priorityJobPoolExecutor);
close(priorityJobScheduler);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.baeldung.concurrent.prioritytaskexecution;

import org.junit.Test;

public class PriorityJobSchedulerUnitTest {
private static int POOL_SIZE = 1;
private static int QUEUE_SIZE = 10;

@Test
public void whenMultiplePriorityJobsQueued_thenHighestPriorityJobIsPicked() {
Job job1 = new Job("Job1", JobPriority.LOW);
Job job2 = new Job("Job2", JobPriority.MEDIUM);
Job job3 = new Job("Job3", JobPriority.HIGH);
Job job4 = new Job("Job4", JobPriority.MEDIUM);
Job job5 = new Job("Job5", JobPriority.LOW);
Job job6 = new Job("Job6", JobPriority.HIGH);

PriorityJobScheduler pjs = new PriorityJobScheduler(POOL_SIZE, QUEUE_SIZE);

pjs.scheduleJob(job1);
pjs.scheduleJob(job2);
pjs.scheduleJob(job3);
pjs.scheduleJob(job4);
pjs.scheduleJob(job5);
pjs.scheduleJob(job6);

// ensure no tasks is pending before closing the scheduler
while (pjs.getQueuedTaskCount() != 0);

// delay to avoid job sleep (added for demo) being interrupted
try {
Thread.sleep(2000);
} catch (InterruptedException ignored) {
}

pjs.closeScheduler();
}
}

0 comments on commit 32e309a

Please sign in to comment.