Skip to content

Commit

Permalink
Added JobFlow & JobGroup control via UI
Browse files Browse the repository at this point in the history
  • Loading branch information
smrjans committed Sep 28, 2014
1 parent d07ba46 commit 99e5529
Show file tree
Hide file tree
Showing 12 changed files with 330 additions and 111 deletions.
9 changes: 1 addition & 8 deletions src/main/java/com/talentica/job4j/api/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,7 @@
import com.talentica.job4j.model.JobStatus;


public interface Job<I,O>{

public boolean start();
public boolean stop();
public boolean pause();
public boolean resume();
public boolean abort();
public boolean schedule();
public interface Job<I,O> extends JobControl{

public O process(I input);
public Future<O> submit(I input);
Expand Down
9 changes: 9 additions & 0 deletions src/main/java/com/talentica/job4j/api/JobControl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.talentica.job4j.api;

public interface JobControl{
public boolean start();
public boolean stop();
public boolean pause();
public boolean resume();
public boolean abort();
}
7 changes: 6 additions & 1 deletion src/main/java/com/talentica/job4j/api/JobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,13 @@

import java.util.List;

import com.talentica.job4j.model.JobFlow;
import com.talentica.job4j.model.JobGroup;

public interface JobManager {

public List<JobFlow> getJobFlowList();
public List<JobGroup> getJobGroupList();
public List<Job> getJobList();
public boolean processAction(String name, String action);
public boolean processAction(String type, String name, String action);
}
7 changes: 7 additions & 0 deletions src/main/java/com/talentica/job4j/constant/JobTypeEnum.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.talentica.job4j.constant;

public enum JobTypeEnum {
flow,
group,
job;
}
42 changes: 6 additions & 36 deletions src/main/java/com/talentica/job4j/impl/AbstractJob.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.talentica.job4j.impl;

import java.util.Date;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;

Expand Down Expand Up @@ -68,13 +69,17 @@ public boolean start() {
this.jobThread = new Thread(this, name);
jobThread.start();
}

jobStatus.setStartTime(new Date());
jobStatus.setStopTime(null);
return true;
}

public boolean stop() {
logger.info("Job "+ this.getClass().getSimpleName() +" stopped...");
abstractInputProducer.setFinished(true);
postStop();
jobStatus.setStopTime(new Date());
postStop();
return true;
}

Expand All @@ -93,41 +98,6 @@ public O process(I input) {
}


/*public JobDetail getJobDetail() {
jobDetail.setName(name);
jobDetail.setDescription(description);
jobDetail.setTimeZone(timeZone);
jobDetail.setStartCronSchedule(startCronSchedule);
jobDetail.setStopCronSchedule(stopCronSchedule);
jobDetail.setContinous(isContinous);
jobDetail.setMaxThreadCount(maxThreadCount);
jobDetail.setThreadSleepTime(threadSleepTime);
jobDetail.setMaxIdleTime(maxIdleTime);
jobDetail.setMailingList(mailingList);
jobDetail.setEmailEnabled(isEmailEnabled);
jobDetail.setRecoveryType(recoveryType);
return jobDetail;
}
public void setJobDetail(JobDetail jobDetail) {
this.name = jobDetail.getName();
this.description = jobDetail.getDescription();
this.timeZone = jobDetail.getTimeZone();
this.startCronSchedule = jobDetail.getStartCronSchedule();
this.stopCronSchedule = jobDetail.getStopCronSchedule();
this.isContinous = jobDetail.isContinous();
this.maxThreadCount = jobDetail.getMaxThreadCount();
this.threadSleepTime = jobDetail.getThreadSleepTime();
this.maxIdleTime = jobDetail.getMaxIdleTime();
this.mailingList = jobDetail.getMailingList();
this.isEmailEnabled = jobDetail.isEmailEnabled();
this.recoveryType = jobDetail.getRecoveryType();
}*/

public String getName() {
return name;
}
Expand Down
83 changes: 64 additions & 19 deletions src/main/java/com/talentica/job4j/impl/JobManagerImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

import com.talentica.job4j.api.Job;
import com.talentica.job4j.api.JobManager;
import com.talentica.job4j.constant.JobTypeEnum;
import com.talentica.job4j.model.JobFlow;
import com.talentica.job4j.model.JobGroup;

Expand All @@ -29,7 +31,7 @@ public class JobManagerImpl implements JobManager{

private Map<Job, List<JobGroup>> jobGroupByJobMap = new HashMap<Job, List<JobGroup>>();
private Map<JobGroup, List<JobFlow>> jobFlowByJobGroupMap = new HashMap<JobGroup, List<JobFlow>>();

@PostConstruct
public void init() {
for(Job job : jobList){
Expand All @@ -44,7 +46,7 @@ public void init() {
}
}
}

for(JobGroup jobGroup : jobGroupList){
List<JobFlow> jobFlows = jobFlowByJobGroupMap.get(jobGroup);
if(jobFlows==null){
Expand All @@ -57,10 +59,10 @@ public void init() {
}
}
}

logger.debug("jobGroupByJobMap >> "+jobGroupByJobMap);
logger.debug("jobFlowByJobGroupMap >> "+jobFlowByJobGroupMap);

/*if(jobFlowList!=null && jobFlowList.size()>0){
for(JobFlow jobFlow : jobFlowList){
jobFlow.schedule();
Expand All @@ -76,20 +78,61 @@ public void init() {
}*/
}

public boolean processAction(String name, String action){
public boolean processAction(String type, String name, String action){
boolean status=false;
for(Job job : jobList){
if(job.getName().equalsIgnoreCase(name)){
logger.info("Invoking "+action+" action on job >> "+name);
if(action.contains("start")){
status = job.start();
} else if(action.contains("pause")){
status = job.pause();
} else if(action.contains("resume")){
status = job.resume();
} else if(action.contains("stop")){
status = job.stop();
}
if(type==null){
type = "job";
}
if(name!=null && action!=null){
switch (JobTypeEnum.valueOf(type)) {
case flow:
for(JobFlow jobFlow : jobFlowList){
if(jobFlow.getName().equalsIgnoreCase(name)){
logger.info("Invoking "+action+" action on jobFlow >> "+name);
if(action.contains("start")){
status = jobFlow.start();
} else if(action.contains("pause")){
status = jobFlow.pause();
} else if(action.contains("resume")){
status = jobFlow.resume();
} else if(action.contains("stop")){
status = jobFlow.stop();
}
}
}
break;
case group:
for(JobGroup jobGroup : jobGroupList){
if(jobGroup.getName().equalsIgnoreCase(name)){
logger.info("Invoking "+action+" action on jobGroup >> "+name);
if(action.contains("start")){
status = jobGroup.start();
} else if(action.contains("pause")){
status = jobGroup.pause();
} else if(action.contains("resume")){
status = jobGroup.resume();
} else if(action.contains("stop")){
status = jobGroup.stop();
}
}
}
break;
case job:
for(Job job : jobList){
if(job.getName().equalsIgnoreCase(name)){
logger.info("Invoking "+action+" action on job >> "+name);
if(action.contains("start")){
status = job.start();
} else if(action.contains("pause")){
status = job.pause();
} else if(action.contains("resume")){
status = job.resume();
} else if(action.contains("stop")){
status = job.stop();
}
}
}
break;
}
}
return status;
Expand All @@ -98,9 +141,11 @@ public boolean processAction(String name, String action){

public List<Job> getJobList() {
return jobList;
}

}
public List<JobGroup> getJobGroupList() {
return jobGroupList;
}
public List<JobFlow> getJobFlowList() {
return jobFlowList;
}
}
25 changes: 0 additions & 25 deletions src/main/java/com/talentica/job4j/impl/queue/QueueJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,31 +117,6 @@ public JobStatus getJobStatus() {
return jobStatus;
}

public String getStatus() {
String status = "ready";
if(threadPoolExecutor!=null){
jobStatus.setSubmittedTaskCount(threadPoolExecutor.getTaskCount());
jobStatus.setActiveTaskCount(threadPoolExecutor.getActiveCount());
jobStatus.setCompletedTaskCount(threadPoolExecutor.getCompletedTaskCount());
jobStatus.setCurrentThreadCount(threadPoolExecutor.getPoolSize());

if(threadPoolExecutor.isTerminated()){
status = JobStatusEnum.COMPLETED.name();
} else if(threadPoolExecutor.isTerminating()){
status = JobStatusEnum.STOPPING.name();
} else if(threadPoolExecutor.isShutdown()){
status = JobStatusEnum.STOPPING.name();
} else if(threadPoolExecutor.isPaused()){
status = JobStatusEnum.PAUSED.name();
} else if(threadPoolExecutor.getActiveCount() > 0){
status = JobStatusEnum.RUNNING.name();
}
}
jobStatus.setStatus(status);
return status;

}

public BlockingQueue<I> getInputQueue() {
return inputQueue;
}
Expand Down
48 changes: 46 additions & 2 deletions src/main/java/com/talentica/job4j/model/JobFlow.java
Original file line number Diff line number Diff line change
@@ -1,17 +1,27 @@
package com.talentica.job4j.model;

import java.util.Date;
import java.util.List;

import javax.annotation.PostConstruct;

import com.talentica.job4j.api.JobControl;
import com.talentica.job4j.util.CronUtil;

public class JobFlow extends JobSchedule{

public class JobFlow extends JobSchedule implements JobControl{
private String name;
private JobStatus jobStatus = new JobStatus();
private List<JobGroup> jobGroupList;

@PostConstruct
public void init() {
if(name==null){
name="";
for(JobGroup jobGroup : jobGroupList){
name+=jobGroup.getName();
}
name+="Flow";
}
if(startCronSchedule==null){
startCronSchedule = jobGroupList.get(0).getStartCronSchedule();
stopCronSchedule = jobGroupList.get(0).getStopCronSchedule();
Expand All @@ -28,6 +38,16 @@ public void init() {
}
}

public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public JobStatus getJobStatus() {
return jobStatus;
}

public List<JobGroup> getJobGroupList() {
return jobGroupList;
}
Expand All @@ -39,12 +59,36 @@ public boolean start() {
for(JobGroup jobGroup : jobGroupList){
jobGroup.start();
}
jobStatus.setStartTime(new Date());
jobStatus.setStopTime(null);
return true;
}
public boolean stop() {
for(JobGroup jobGroup : jobGroupList){
jobGroup.stop();
}
jobStatus.setStopTime(new Date());
return true;
}

public boolean pause() {
for(JobGroup jobGroup : jobGroupList){
jobGroup.pause();
}
return true;
}

public boolean resume() {
for(JobGroup jobGroup : jobGroupList){
jobGroup.resume();
}
return true;
}

public boolean abort() {
for(JobGroup jobGroup : jobGroupList){
jobGroup.abort();
}
return true;
}

Expand Down
Loading

0 comments on commit 99e5529

Please sign in to comment.