Skip to content

Commit 1387e2b

Browse files
author
Chandana Amarnath
committed
Fixed all the code changes after review
1 parent 1a75ab8 commit 1387e2b

File tree

9 files changed

+56
-45
lines changed

9 files changed

+56
-45
lines changed

queue-load-leveling/README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,4 @@ for both the task and the service.
3131

3232
## Credits
3333

34-
* [Design Pattern: Queue-Based Load Leveling Pattern](https://msdn.microsoft.com/en-us/library/dn589783.aspx)
34+
* [Microsoft Cloud Design Patterns: Queue-Based Load Leveling Pattern](https://msdn.microsoft.com/en-us/library/dn589783.aspx)

queue-load-leveling/src/main/java/org/queue/load/leveling/App.java

+35-12
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@
2323

2424
package org.queue.load.leveling;
2525

26+
import java.util.concurrent.ExecutorService;
27+
import java.util.concurrent.Executors;
28+
import java.util.concurrent.TimeUnit;
29+
2630
import org.slf4j.Logger;
2731
import org.slf4j.LoggerFactory;
2832

@@ -58,37 +62,56 @@ public class App {
5862

5963
private static final Logger LOGGER = LoggerFactory.getLogger(App.class);
6064

65+
//Executor shut down time limit.
66+
private static final int SHUTDOWN_TIME = 15;
67+
6168
/**
6269
* Program entry point
6370
*
6471
* @param args command line args
6572
*/
6673
public static void main(String[] args) {
74+
75+
// An Executor that provides methods to manage termination and methods that can
76+
// produce a Future for tracking progress of one or more asynchronous tasks.
77+
ExecutorService executor = null;
78+
6779
try {
6880
// Create a MessageQueue object.
6981
MessageQueue msgQueue = new MessageQueue();
7082

71-
LOGGER.info("All the TaskGenerators started.");
83+
LOGGER.info("Submitting TaskGenerators and ServiceExecutor threads.");
7284

7385
// Create three TaskGenerator threads. Each of them will submit different number of jobs.
7486
Runnable taskRunnable1 = new TaskGenerator(msgQueue, 5);
7587
Runnable taskRunnable2 = new TaskGenerator(msgQueue, 1);
7688
Runnable taskRunnable3 = new TaskGenerator(msgQueue, 2);
7789

78-
Thread taskGenerator1 = new Thread(taskRunnable1, "Task_Generator_1");
79-
Thread taskGenerator2 = new Thread(taskRunnable2, "Task_Generator_2");
80-
Thread taskGenerator3 = new Thread(taskRunnable3, "Task_Generator_3");
90+
// Create e service which should process the submitted jobs.
91+
Runnable srvRunnable = new ServiceExecutor(msgQueue);
92+
93+
// Create a ThreadPool of 2 threads and
94+
// submit all Runnable task for execution to executor..
95+
executor = Executors.newFixedThreadPool(2);
96+
executor.submit(taskRunnable1);
97+
executor.submit(taskRunnable2);
98+
executor.submit(taskRunnable3);
8199

82-
taskGenerator1.start();
83-
taskGenerator2.start();
84-
taskGenerator3.start();
100+
// submitting serviceExecutor thread to the Executor service.
101+
executor.submit(srvRunnable);
85102

86-
LOGGER.info("Service Executor started.");
103+
// Initiates an orderly shutdown.
104+
LOGGER.info("Intiating shutdown. Executor will shutdown only after all the Threads are completed.");
105+
executor.shutdown();
87106

88-
// First create e service which will process all the jobs.
89-
Runnable srvRunnable = new ServiceExecutor(msgQueue);
90-
Thread srvExec = new Thread(srvRunnable, "Service_Executor_Thread");
91-
srvExec.start();
107+
// Wait for SHUTDOWN_TIME seconds for all the threads to complete
108+
// their tasks and then shut down the executor and then exit.
109+
if ( !executor.awaitTermination(SHUTDOWN_TIME, TimeUnit.SECONDS) ) {
110+
LOGGER.info("Executor was shut down and Exiting.");
111+
executor.shutdownNow();
112+
}
113+
} catch (InterruptedException ie) {
114+
LOGGER.error(ie.getMessage());
92115
} catch (Exception e) {
93116
LOGGER.error(e.getMessage());
94117
}

queue-load-leveling/src/main/java/org/queue/load/leveling/Message.java

+2-11
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,8 @@
2727
*
2828
*/
2929
public class Message {
30-
private String msg;
31-
32-
// Empty constructor.
33-
public Message() {
34-
}
35-
30+
private final String msg;
31+
3632
// Parameter constructor.
3733
public Message(String msg) {
3834
super();
@@ -44,11 +40,6 @@ public String getMsg() {
4440
return msg;
4541
}
4642

47-
// Set Method for attribute msg.
48-
public void setMsg(String msg) {
49-
this.msg = msg;
50-
}
51-
5243
@Override
5344
public String toString() {
5445
return msg;

queue-load-leveling/src/main/java/org/queue/load/leveling/MessageQueue.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public class MessageQueue {
3838

3939
private static final Logger LOGGER = LoggerFactory.getLogger(App.class);
4040

41-
private BlockingQueue<Message> blkQueue;
41+
private final BlockingQueue<Message> blkQueue;
4242

4343
// Default constructor when called creates Blocking Queue object.
4444
public MessageQueue() {

queue-load-leveling/src/main/java/org/queue/load/leveling/ServiceExecutor.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public class ServiceExecutor implements Runnable {
3636

3737
private static final Logger LOGGER = LoggerFactory.getLogger(App.class);
3838

39-
private MessageQueue msgQueue;
39+
private final MessageQueue msgQueue;
4040

4141
public ServiceExecutor(MessageQueue msgQueue) {
4242
this.msgQueue = msgQueue;
@@ -53,12 +53,12 @@ public void run() {
5353
if (null != msg) {
5454
LOGGER.info(msg.toString() + " is served.");
5555
} else {
56-
LOGGER.info("ServiceExecutor: All tasks are executed. Waiting.");
56+
LOGGER.info("Service Executor: Waiting for Messages to serve .. ");
5757
}
5858

5959
Thread.sleep(1000);
6060
}
61-
} catch (InterruptedException ie) {
61+
} catch (InterruptedException ie) {
6262
LOGGER.error(ie.getMessage());
6363
} catch (Exception e) {
6464
LOGGER.error(e.getMessage());

queue-load-leveling/src/main/java/org/queue/load/leveling/TaskGenerator.java

+9-7
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,10 @@ public class TaskGenerator implements Task, Runnable {
3737
private static final Logger LOGGER = LoggerFactory.getLogger(App.class);
3838

3939
// MessageQueue reference using which we will submit our messages.
40-
private MessageQueue msgQueue;
40+
private final MessageQueue msgQueue;
4141

4242
// Total message count that a TaskGenerator will submit.
43-
private int msgCount;
43+
private final int msgCount;
4444

4545
// Parameterized constructor.
4646
public TaskGenerator(MessageQueue msgQueue, int msgCount) {
@@ -64,16 +64,18 @@ public void submit(Message msg) {
6464
* After every message submission TaskGenerator thread will sleep for 1 second.
6565
*/
6666
public void run() {
67+
68+
int count = this.msgCount;
69+
6770
try {
68-
while (this.msgCount > 0) {
69-
String statusMsg = "Message-" + this.msgCount + " submitted by " + Thread.currentThread().getName();
70-
Message newMessage = new Message(statusMsg);
71-
this.submit(newMessage);
71+
while (count > 0) {
72+
String statusMsg = "Message-" + count + " submitted by " + Thread.currentThread().getName();
73+
this.submit(new Message(statusMsg));
7274

7375
LOGGER.info(statusMsg);
7476

7577
// reduce the message count.
76-
this.msgCount--;
78+
count--;
7779

7880
// Make the current thread to sleep after every Message submission.
7981
Thread.sleep(1000);

queue-load-leveling/src/test/java/org/queue/load/leveling/AppTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import org.junit.Test;
2828

2929
/**
30-
* Tests that Caching example runs without errors.
30+
* Application Test
3131
*/
3232
public class AppTest {
3333
@Test

queue-load-leveling/src/test/java/org/queue/load/leveling/MessageQueueTest.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,10 @@
2222
*/
2323
package org.queue.load.leveling;
2424

25-
import org.junit.Test;
2625
import static org.junit.Assert.assertEquals;
2726

27+
import org.junit.Test;
28+
2829
/**
2930
*
3031
* Test case for submitting and retrieving messages from Blocking Queue.
@@ -36,13 +37,12 @@ public class MessageQueueTest {
3637
public void messageQueueTest() {
3738

3839
MessageQueue msgQueue = new MessageQueue();
39-
Message msg = new Message("MessageQueue Test");
4040

4141
// submit message
42-
msgQueue.submitMsg(msg);
42+
msgQueue.submitMsg(new Message("MessageQueue Test"));
4343

4444
// retrieve message
45-
assertEquals(msg.getMsg(), msgQueue.retrieveMsg().getMsg());
45+
assertEquals(msgQueue.retrieveMsg().getMsg(), "MessageQueue Test");
4646
}
4747

4848
}

queue-load-leveling/src/test/java/org/queue/load/leveling/MessageTest.java

-5
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,5 @@ public void messageTest() {
3939
String testMsg = "Message Test";
4040
Message msg = new Message(testMsg);
4141
assertEquals(msg.getMsg(), testMsg);
42-
43-
// Default constructor and setter method test.
44-
Message simpleMsg = new Message();
45-
simpleMsg.setMsg(testMsg);
46-
assertEquals(simpleMsg.getMsg(), testMsg);
4742
}
4843
}

0 commit comments

Comments
 (0)