Skip to content

Commit

Permalink
Do not run TickerScheduleTriggerEngine watches if the schedule trigge…
Browse files Browse the repository at this point in the history
…r engine is paused (elastic#110061)
  • Loading branch information
masseyke authored Jul 8, 2024
1 parent 320b88a commit 52b2a41
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 2 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/110061.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 110061
summary: Avoiding running watch jobs in TickerScheduleTriggerEngine if it is paused
area: Watcher
type: bug
issues:
- 105933
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.elasticsearch.common.settings.Setting.positiveTimeSetting;

Expand All @@ -50,6 +51,7 @@ public class TickerScheduleTriggerEngine extends ScheduleTriggerEngine {
private final TimeValue tickInterval;
private final Map<String, ActiveSchedule> schedules = new ConcurrentHashMap<>();
private final Ticker ticker;
private final AtomicBoolean isRunning = new AtomicBoolean(false);

public TickerScheduleTriggerEngine(Settings settings, ScheduleRegistry scheduleRegistry, Clock clock) {
super(scheduleRegistry, clock);
Expand All @@ -60,7 +62,8 @@ public TickerScheduleTriggerEngine(Settings settings, ScheduleRegistry scheduleR
@Override
public synchronized void start(Collection<Watch> jobs) {
long startTime = clock.millis();
logger.info("Watcher starting watches at {}", WatcherDateTimeUtils.dateTimeFormatter.formatMillis(startTime));
isRunning.set(true);
logger.info("Starting watcher engine at {}", WatcherDateTimeUtils.dateTimeFormatter.formatMillis(startTime));
Map<String, ActiveSchedule> startingSchedules = Maps.newMapWithExpectedSize(jobs.size());
for (Watch job : jobs) {
if (job.trigger() instanceof ScheduleTrigger trigger) {
Expand All @@ -81,17 +84,22 @@ public synchronized void start(Collection<Watch> jobs) {

@Override
public void stop() {
logger.info("Stopping watcher engine");
isRunning.set(false);
schedules.clear();
ticker.close();
}

@Override
public synchronized void pauseExecution() {
public void pauseExecution() {
logger.info("Pausing watcher engine");
isRunning.set(false);
schedules.clear();
}

@Override
public void add(Watch watch) {
logger.trace("Adding watch [{}] to engine (engine is running: {})", watch.id(), isRunning.get());
assert watch.trigger() instanceof ScheduleTrigger;
ScheduleTrigger trigger = (ScheduleTrigger) watch.trigger();
ActiveSchedule currentSchedule = schedules.get(watch.id());
Expand All @@ -106,13 +114,25 @@ public void add(Watch watch) {

@Override
public boolean remove(String jobId) {
logger.debug("Removing watch [{}] from engine (engine is running: {})", jobId, isRunning.get());
return schedules.remove(jobId) != null;
}

void checkJobs() {
if (isRunning.get() == false) {
logger.debug(
"Watcher not running because the engine is paused. Currently scheduled watches being skipped: {}",
schedules.size()
);
return;
}
long triggeredTime = clock.millis();
List<TriggerEvent> events = new ArrayList<>();
for (ActiveSchedule schedule : schedules.values()) {
if (isRunning.get() == false) {
logger.debug("Watcher paused while running [{}]", schedule.name);
break;
}
long scheduledTime = schedule.check(triggeredTime);
if (scheduledTime > 0) {
ZonedDateTime triggeredDateTime = utcDateTimeAtEpochMillis(triggeredTime);
Expand Down

0 comments on commit 52b2a41

Please sign in to comment.