Skip to content

Commit

Permalink
Revoke JobConfigurationChangedProcessEngine code (apache#29736)
Browse files Browse the repository at this point in the history
  • Loading branch information
azexcy authored Jan 16, 2024
1 parent 0eef118 commit 4ad497a
Showing 1 changed file with 7 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;

import java.util.Collection;

/**
* Job configuration changed process engine.
*/
Expand All @@ -47,8 +49,10 @@ public final class JobConfigurationChangedProcessEngine {
public <T extends PipelineJobConfiguration> void process(final Type eventType, final JobConfiguration jobConfig, final JobConfigurationChangedProcessor<T> processor) {
String jobId = jobConfig.getJobName();
if (jobConfig.isDisabled()) {
// Get sharding items before stop job, because sharding items will be cleared after stop job.
Collection<Integer> shardingItems = PipelineJobRegistry.getShardingItems(jobId);
PipelineJobRegistry.stop(jobId);
disableJob(jobId);
disableJob(jobId, shardingItems);
return;
}
switch (eventType) {
Expand All @@ -70,9 +74,9 @@ public <T extends PipelineJobConfiguration> void process(final Type eventType, f
}
}

private void disableJob(final String jobId) {
private void disableJob(final String jobId, final Collection<Integer> shardingItems) {
PipelineDistributedBarrier distributedBarrier = PipelineDistributedBarrier.getInstance(PipelineJobIdUtils.parseContextKey(jobId));
for (Integer each : PipelineJobRegistry.getShardingItems(jobId)) {
for (Integer each : shardingItems) {
distributedBarrier.persistEphemeralChildrenNode(PipelineMetaDataNode.getJobBarrierDisablePath(jobId), each);
}
}
Expand Down

0 comments on commit 4ad497a

Please sign in to comment.