Skip to content

Commit

Permalink
#3749 Replace Stroom simple cron scheduler with standard compliant Qu…
Browse files Browse the repository at this point in the history
…artz cron scheduler
  • Loading branch information
stroomdev66 committed Feb 9, 2024
1 parent b678f74 commit e6a6deb
Show file tree
Hide file tree
Showing 45 changed files with 268 additions and 633 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ ext.libs = [
okhttp_logging_interceptor : "com.squareup.okhttp3:logging-interceptor:${versions.okhttp}",
poi : "org.apache.poi:poi:${versions.poi}",
poi_ooxml : "org.apache.poi:poi-ooxml:${versions.poi}",
quartz : "org.quartz-scheduler:quartz:2.5.0-rc1",
restygwt : "org.fusesource.restygwt:restygwt:2.2.7",
saxon_he : "net.sf.saxon:Saxon-HE:9.9.1-8",
simple_java_mail : 'org.simplejavamail:simple-java-mail:8.3.1',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
import com.google.inject.AbstractModule;
import jakarta.inject.Inject;

import static stroom.job.api.Schedule.ScheduleType.PERIODIC;

public class AnalyticsModule extends AbstractModule {

@Override
Expand All @@ -41,19 +39,19 @@ protected void configure() {
.bindJobTo(TableBuilderAnalyticExecutorRunnable.class, builder -> builder
.name("Analytic Executor: Table Builder")
.description("Run table building analytics periodically")
.schedule(PERIODIC, "10m")
.periodicSchedule("10m")
.enabled(false)
.advanced(true))
// .bindJobTo(StreamingAnalyticExecutorRunnable.class, builder -> builder
// .name("Analytic Executor: Streaming")
// .description("Run streaming analytics periodically")
// .schedule(PERIODIC, "1m")
// .periodicSchedule("1m")
// .enabled(false)
// .advanced(true))
.bindJobTo(ScheduledAnalyticExecutorRunnable.class, builder -> builder
.name("Analytic Executor: Scheduled Query")
.description("Run scheduled index query analytics periodically")
.schedule(PERIODIC, "10m")
.periodicSchedule("10m")
.enabled(false)
.advanced(true));
GuiceUtil.buildMultiBinder(binder(), HasResultStoreInfo.class).addBinding(AnalyticDataStores.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package stroom.cache.impl;

import stroom.cache.api.CacheManager;
import stroom.job.api.Schedule;
import stroom.job.api.ScheduledJobsBinder;
import stroom.util.RunnableWrapper;
import stroom.util.guice.GuiceUtil;
Expand All @@ -42,7 +41,7 @@ protected void configure() {
.name("Evict expired elements")
.description("Evicts expired cache entries")
.managed(false)
.schedule(Schedule.ScheduleType.PERIODIC, "1m"));
.periodicSchedule("1m"));

HasSystemInfoBinder.create(binder())
.bind(CacheManagerImpl.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
import com.google.inject.AbstractModule;
import jakarta.inject.Inject;

import static stroom.job.api.Schedule.ScheduleType.PERIODIC;

public class ClusterLockModule extends AbstractModule {

@Override
Expand All @@ -33,12 +31,12 @@ protected void configure() {
.description("Every 10 minutes try and unlock/remove any locks that " +
"we hold that have not been refreshed by their owner for 10 minutes.")
.managed(false)
.schedule(PERIODIC, "10m"))
.periodicSchedule("10m"))
.bindJobTo(KeepAlive.class, builder -> builder
.name("Keep alive")
.description("Keeps a locks alive")
.managed(false)
.schedule(PERIODIC, "1m"));
.periodicSchedule("1m"));
}

private static class UnlockOldLocks extends RunnableWrapper {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
import io.dropwizard.lifecycle.Managed;
import jakarta.inject.Inject;

import static stroom.job.api.Schedule.ScheduleType.PERIODIC;

public class GlobalConfigModule extends AbstractModule {

@Override
Expand All @@ -38,7 +36,7 @@ protected void configure() {
.bindJobTo(PropertyCacheReload.class, builder -> builder
.name("Property Cache Reload")
.description("Reload properties in the cluster")
.schedule(PERIODIC, "1m"));
.periodicSchedule("1m"));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package stroom.query.client.presenter;

import stroom.dispatch.client.Rest;
import stroom.dispatch.client.RestFactory;
import stroom.docref.DocRef;
import stroom.docref.StringMatch;
Expand All @@ -10,11 +9,9 @@
import stroom.query.shared.CompletionsRequest;
import stroom.query.shared.QueryResource;
import stroom.util.shared.PageRequest;
import stroom.util.shared.ResultPage;

import com.google.gwt.core.client.GWT;
import com.google.gwt.safehtml.shared.SafeHtml;
import com.google.gwt.safehtml.shared.SafeHtmlUtils;
import edu.ycp.cs.dh.acegwt.client.ace.AceCompletion;
import edu.ycp.cs.dh.acegwt.client.ace.AceCompletionCallback;
import edu.ycp.cs.dh.acegwt.client.ace.AceCompletionProvider;
Expand Down Expand Up @@ -59,8 +56,9 @@ public void getProposals(final AceEditor editor,
pos.getColumn(),
new StringMatch(MatchType.STARTS_WITH, false, prefix),
showAll);
final Rest<ResultPage<CompletionValue>> rest = restFactory.create();
rest
restFactory
.builder()
.forResultPageOf(CompletionValue.class)
.onSuccess(result -> {
final List<AceCompletion> aceCompletions = result
.getValues()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
import com.google.inject.AbstractModule;
import jakarta.inject.Inject;

import static stroom.job.api.Schedule.ScheduleType.CRON;

public class StoredQueryModule extends AbstractModule {

@Override
Expand All @@ -23,7 +21,7 @@ protected void configure() {
.bindJobTo(QueryHistoryClean.class, builder -> builder
.name("Query History Clean")
.description("Job to clean up old query history items")
.schedule(CRON, "0 0 *")
.cronSchedule("0 0 0 * * ?")
.advanced(false));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import com.google.inject.Provides;
import jakarta.inject.Inject;

import static stroom.job.api.Schedule.ScheduleType.CRON;

public class DataRetentionModule extends AbstractModule {

@Override
Expand All @@ -41,7 +39,7 @@ protected void configure() {
.name(DataRetentionPolicyExecutor.JOB_NAME)
.description("Delete data that exceeds the retention period " +
"specified by data retention policy")
.schedule(CRON, "0 0 *"));
.cronSchedule("0 0 0 * * ?"));
}

@SuppressWarnings("unused") // called by guice
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import com.google.inject.AbstractModule;
import jakarta.inject.Inject;

import static stroom.job.api.Schedule.ScheduleType.CRON;

public class FsDataStoreJobsModule extends AbstractModule {

@Override
Expand All @@ -35,17 +33,17 @@ protected void configure() {
.name(PhysicalDeleteExecutor.TASK_NAME)
.description("Physically delete meta data and associated files that have been logically " +
"deleted based on age of delete (stroom.data.store.deletePurgeAge)")
.schedule(CRON, "0 0 *")
.cronSchedule("0 0 0 * * ?")
.advanced(false))
.bindJobTo(OrphanFileFinder.class, builder -> builder
.name(FsOrphanFileFinderExecutor.TASK_NAME)
.description("Job to find files that do not exist in the meta store")
.schedule(CRON, "0 0 *")
.cronSchedule("0 0 0 * * ?")
.enabled(false))
.bindJobTo(OrphanMetaFinder.class, builder -> builder
.name(FsOrphanMetaFinderExecutor.TASK_NAME)
.description("Job to find items in the meta store that have no associated data")
.schedule(CRON, "0 0 *")
.cronSchedule("0 0 0 * * ?")
.enabled(false));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
import com.google.inject.AbstractModule;
import jakarta.inject.Inject;

import static stroom.job.api.Schedule.ScheduleType.PERIODIC;

public class FsVolumeJobsModule extends AbstractModule {
@Override
protected void configure() {
Expand All @@ -17,7 +15,7 @@ protected void configure() {
.bindJobTo(FileVolumeStatus.class, builder -> builder
.name("File System Volume Status")
.description("Update the usage status of file system volumes")
.schedule(PERIODIC, "5m"));
.periodicSchedule("5m"));
}

private static class FileVolumeStatus extends RunnableWrapper {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public class DataRetentionJobModule extends AbstractModule {
// .bindJobTo(DataRetention.class, builder -> builder
// .name("Feed Based Data Retention")
// .description("Delete data that exceeds the retention period specified by feed")
// .schedule(CRON, "0 0 *"));
// .schedule(CRON, "0 0 0 * * ?"));
// }
//
// private static class DataRetention extends RunnableWrapper {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,6 @@
import com.google.inject.AbstractModule;
import jakarta.inject.Inject;

import static stroom.job.api.Schedule.ScheduleType.CRON;
import static stroom.job.api.Schedule.ScheduleType.PERIODIC;

public class IndexModule extends AbstractModule {

@Override
Expand Down Expand Up @@ -83,24 +80,24 @@ protected void configure() {
.bindJobTo(IndexShardDelete.class, builder -> builder
.name("Index Shard Delete")
.description("Job to delete index shards from disk that have been marked as deleted")
.schedule(CRON, "0 0 *"))
.cronSchedule("0 0 0 * * ?"))
.bindJobTo(IndexShardRetention.class, builder -> builder
.name("Index Shard Retention")
.description("Job to set index shards to have a status of deleted that have past their " +
"retention period")
.schedule(PERIODIC, "10m"))
.periodicSchedule("10m"))
.bindJobTo(IndexWriterCacheSweep.class, builder -> builder
.name("Index Writer Cache Sweep")
.description("Job to remove old index shard writers from the cache")
.schedule(PERIODIC, "10m"))
.periodicSchedule("10m"))
.bindJobTo(IndexWriterFlush.class, builder -> builder
.name("Index Writer Flush")
.description("Job to flush index shard data to disk")
.schedule(PERIODIC, "10m"))
.periodicSchedule("10m"))
.bindJobTo(VolumeStatus.class, builder -> builder
.name("Index Volume Status")
.description("Update the usage status of volumes owned by the node")
.schedule(PERIODIC, "5m"));
.periodicSchedule("5m"));

LifecycleBinder.create(binder())
.bindStartupTaskTo(IndexShardWriterCacheStartup.class)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package stroom.job.api;

import stroom.job.api.Schedule.ScheduleType;

import java.util.Objects;

public class ScheduledJob {
Expand Down Expand Up @@ -104,14 +106,13 @@ public Builder managed(boolean managed) {
* See {@link stroom.util.scheduler.SimpleCronScheduler} or
* {@link stroom.util.scheduler.FrequencyScheduler} for schedule string format.
*/
public Builder schedule(Schedule.ScheduleType scheduleType, String schedule) {
this.schedule = new Schedule(scheduleType, schedule);
public Builder cronSchedule(String schedule) {
this.schedule = new Schedule(ScheduleType.CRON, schedule);
return this;
}

public Builder schedule(final Schedule schedule) {
Objects.requireNonNull(schedule);
this.schedule = schedule;
public Builder periodicSchedule(String schedule) {
this.schedule = new Schedule(ScheduleType.PERIODIC, schedule);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
-- ------------------------------------------------------------------------
-- Copyright 2020 Crown Copyright
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
-- ------------------------------------------------------------------------

-- Stop NOTE level warnings about objects (not)? existing
SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0;

update job_node set schedule = concat('0 ', schedule, ' * ?') where job_type = 1 and regexp_like(schedule, '^[^ ]+ [^ ]+ [^ ]+$');

SET SQL_NOTES=@OLD_SQL_NOTES;

-- vim: set tabstop=4 shiftwidth=4 expandtab:
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import stroom.security.api.SecurityContext;
import stroom.security.shared.PermissionNames;
import stroom.util.AuditUtil;
import stroom.util.scheduler.QuartzCronScheduler;
import stroom.util.scheduler.Scheduler;
import stroom.util.scheduler.SimpleCron;
import stroom.util.shared.ModelStringUtil;

import jakarta.inject.Inject;
Expand Down Expand Up @@ -219,7 +219,7 @@ private void ensureSchedule(final JobNode jobNode) {
if (JobType.CRON.equals(jobNode.getJobType())) {
if (jobNode.getSchedule() != null) {
// This will throw a runtime exception if the expression is invalid.
SimpleCron.compile(jobNode.getSchedule());
new QuartzCronScheduler(jobNode.getSchedule());
}
}
if (JobType.FREQUENCY.equals(jobNode.getJobType())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import stroom.task.api.ExecutorProvider;
import stroom.util.concurrent.AsyncReference;
import stroom.util.scheduler.FrequencyScheduler;
import stroom.util.scheduler.QuartzCronScheduler;
import stroom.util.scheduler.Scheduler;
import stroom.util.scheduler.SimpleCron;

import jakarta.inject.Inject;
import jakarta.inject.Singleton;
Expand Down Expand Up @@ -121,8 +121,7 @@ static class JobNodeTrackersImpl implements JobNodeTrackers {
} else {
try {
if (JobType.CRON.equals(jobNode.getJobType())) {
schedulerMap.put(jobNode,
SimpleCron.compile(jobNode.getSchedule()).createScheduler());
schedulerMap.put(jobNode, new QuartzCronScheduler(jobNode.getSchedule()));
} else if (JobType.FREQUENCY.equals(jobNode.getJobType())) {
schedulerMap.put(jobNode, new FrequencyScheduler(jobNode.getSchedule()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
import com.google.inject.AbstractModule;
import jakarta.inject.Inject;

import static stroom.job.api.Schedule.ScheduleType.PERIODIC;

public class JobSystemModule extends AbstractModule {

@Override
Expand Down Expand Up @@ -57,7 +55,7 @@ protected void configure() {
.description("Every 10 seconds the Stroom lifecycle service will try and " +
"fetch new tasks for execution.")
.managed(false)
.schedule(PERIODIC, "10s"));
.periodicSchedule("10s"));

// Make sure the last thing to start and the first thing to stop is the scheduled task executor.
LifecycleBinder.create(binder())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import stroom.job.shared.JobNode.JobType;
import stroom.job.shared.ScheduledTimes;
import stroom.util.date.DateUtil;
import stroom.util.scheduler.SimpleCron;
import stroom.util.scheduler.QuartzCronScheduler;
import stroom.util.shared.ModelStringUtil;

class ScheduleService {
Expand All @@ -40,9 +40,9 @@ ScheduledTimes getScheduledTimes(final JobType jobType,
ScheduledTimes scheduledTimes = null;

if (JobType.CRON.equals(jobType)) {
final SimpleCron cron = SimpleCron.compile(expression);
final QuartzCronScheduler cron = new QuartzCronScheduler(expression);
Long time = System.currentTimeMillis();
time = cron.getNextTime(time);
time = cron.getNextExecute(time);
scheduledTimes = getScheduledTimes(lastExecutedTime, time);

} else if (JobType.FREQUENCY.equals(jobType)) {
Expand Down
Loading

0 comments on commit e6a6deb

Please sign in to comment.