Skip to content

Commit

Permalink
fix: read-threads option caused class cast exception
Browse files Browse the repository at this point in the history
  • Loading branch information
jruaux committed Sep 5, 2024
1 parent 9af6e14 commit ba064ac
Show file tree
Hide file tree
Showing 13 changed files with 81 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,10 @@ protected boolean shouldShowProgress() {
protected abstract Job job() throws Exception;

private <I, O> TaskletStep step(Step<I, O> step) {
log.info("Creating {}", step);
SimpleStepBuilder<I, O> builder = simpleStep(step);
if (stepArgs.getRetryPolicy() == RetryPolicy.NEVER && stepArgs.getSkipPolicy() == SkipPolicy.NEVER) {
log.info("");
return builder.build();
}
FaultTolerantStepBuilder<I, O> ftStep = JobUtils.faultTolerant(builder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

import org.springframework.util.unit.DataSize;

import com.redis.spring.batch.Range;

import picocli.CommandLine;
import picocli.CommandLine.IExecutionStrategy;
import picocli.CommandLine.RunLast;
Expand Down Expand Up @@ -50,7 +48,6 @@ protected CommandLine commandLine() {
commandLine.setUnmatchedOptionsAllowedAsOptionParameters(false);
commandLine.setExecutionExceptionHandler(new PrintExceptionMessageHandler());
commandLine.registerConverter(DataSize.class, DataSize::parse);
commandLine.registerConverter(Range.class, Range::parse);
commandLine.registerConverter(Expression.class, Expression::parse);
commandLine.registerConverter(TemplateExpression.class, Expression::parseTemplate);
commandLine.setExecutionStrategy(LoggingMixin.executionStrategy(executionStrategy()));
Expand Down
12 changes: 12 additions & 0 deletions core/riot-core/src/main/java/com/redis/riot/core/Step.java
Original file line number Diff line number Diff line change
Expand Up @@ -195,4 +195,16 @@ public Collection<Class<? extends Throwable>> getRetry() {
public Collection<Class<? extends Throwable>> getSkip() {
return skip;
}

@Override
public String toString() {
return "Step [name=" + name + ", reader=" + reader + ", writer=" + writer + ", taskName=" + taskName
+ ", statusMessageSupplier=" + statusMessageSupplier + ", maxItemCountSupplier=" + maxItemCountSupplier
+ ", processor=" + processor + ", executionListeners=" + executionListeners + ", readListeners="
+ readListeners + ", writeListeners=" + writeListeners + ", live=" + live + ", flushInterval="
+ flushInterval + ", idleTimeout=" + idleTimeout + ", skip=" + skip + ", noSkip=" + noSkip + ", retry="
+ retry + ", noRetry=" + noRetry + "]";
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,13 @@ protected <K, V, T, O> Step<KeyValue<K, T>, O> step(String name, RedisItemReader
ItemWriter<O> writer) {
Step<KeyValue<K, T>, O> step = new Step<>(name, reader, writer);
if (reader.getMode() != ReaderMode.LIVEONLY) {
log.info("Configuring step with scan size estimator");
step.maxItemCountSupplier(reader.scanSizeEstimator());
}
if (reader.getMode() != ReaderMode.SCAN) {
checkNotifyConfig(reader.getClient());
log.info("Configuring step {} with live true, flushInterval {}, idleTimeout {}", name,
reader.getFlushInterval(), reader.getIdleTimeout());
step.live(true);
step.flushInterval(reader.getFlushInterval());
step.idleTimeout(reader.getIdleTimeout());
Expand All @@ -86,10 +89,12 @@ private void checkNotifyConfig(AbstractRedisClient client) {
try {
valueMap = conn.sync().configGet(NOTIFY_CONFIG);
} catch (RedisException e) {
log.info("Could not check keyspace notification config", e);
return;
}
}
String actual = valueMap.getOrDefault(NOTIFY_CONFIG, "");
log.info("Retrieved config {}: {}", NOTIFY_CONFIG, actual);
Set<Character> expected = characterSet(NOTIFY_CONFIG_VALUE);
Assert.isTrue(characterSet(actual).containsAll(expected),
String.format("Keyspace notifications not property configured: expected '%s' but was '%s'.",
Expand Down
6 changes: 3 additions & 3 deletions plugins/riot/src/main/java/com/redis/riot/GenerateArgs.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import java.time.Instant;
import java.util.List;

import com.redis.spring.batch.Range;
import com.redis.spring.batch.item.redis.common.DataType;
import com.redis.spring.batch.item.redis.common.Range;
import com.redis.spring.batch.item.redis.gen.CollectionOptions;
import com.redis.spring.batch.item.redis.gen.GeneratorItemReader;
import com.redis.spring.batch.item.redis.gen.MapOptions;
Expand All @@ -28,10 +28,10 @@ public class GenerateArgs {
@Option(names = "--keyspace", description = "Keyspace prefix for generated data structures (default: ${DEFAULT-VALUE}).", paramLabel = "<str>")
private String keyspace = GeneratorItemReader.DEFAULT_KEYSPACE;

@Option(names = "--keys", description = "Range of keys to generate in the form '<start>:<end>' (default: ${DEFAULT-VALUE}).", paramLabel = "<int>")
@Option(names = "--key-range", description = "Range of keys to generate in the form '<start>-<end>' (default: ${DEFAULT-VALUE}).", paramLabel = "<int>")
private Range keyRange = GeneratorItemReader.DEFAULT_KEY_RANGE;

@Option(arity = "1..*", names = "--types", description = "Types of data structures to generate: ${COMPLETION-CANDIDATES} (default: ${DEFAULT-VALUE}).", paramLabel = "<type>")
@Option(arity = "1..*", names = "--type", description = "Types of data structures to generate: ${COMPLETION-CANDIDATES} (default: ${DEFAULT-VALUE}).", paramLabel = "<type>")
private List<DataType> types = GeneratorItemReader.defaultTypes();

@Option(names = "--index", description = "Name of index to create that matches JSON or hash type.", paramLabel = "<name>")
Expand Down
4 changes: 2 additions & 2 deletions plugins/riot/src/main/java/com/redis/riot/KeyFilterArgs.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@

import org.springframework.util.CollectionUtils;

import com.redis.spring.batch.Range;
import com.redis.spring.batch.item.redis.common.BatchUtils;
import com.redis.spring.batch.item.redis.common.GlobPredicate;
import com.redis.spring.batch.item.redis.common.Range;

import io.lettuce.core.cluster.SlotHash;
import io.lettuce.core.codec.RedisCodec;
Expand All @@ -24,7 +24,7 @@ public class KeyFilterArgs {
@Option(names = "--key-exclude", arity = "1..*", description = "Glob pattern to match keys for exclusion. E.g. 'mykey:*' will exclude keys starting with 'mykey:'.", paramLabel = "<exp>")
private List<String> excludes;

@Option(names = "--key-slots", arity = "1..*", description = "Ranges of key slots to consider for processing. For example '0:8000' will only consider keys that fall within the range 0 to 8000.", paramLabel = "<range>")
@Option(names = "--key-slot", arity = "1..*", description = "Ranges of key slots to consider for processing. For example '0:8000' will only consider keys that fall within the range 0 to 8000.", paramLabel = "<range>")
private List<Range> slots;

public <K> Optional<Predicate<K>> predicate(RedisCodec<K, ?> codec) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ public class KeyValueProcessorArgs {
@Option(names = "--ttl-proc", description = "SpEL expression to transform key expiration times.", paramLabel = "<exp>")
private Expression ttlExpression;

@Option(names = "--ttls", description = "Propagate key expiration times. True by default.", negatable = true, defaultValue = "true", fallbackValue = "true")
@Option(names = "--ttl", description = "Propagate key expiration times. True by default.", negatable = true, defaultValue = "true", fallbackValue = "true")
private boolean propagateTtl = true;

@Option(names = "--stream-ids", description = "Propagate stream message IDs. True by default.", negatable = true, defaultValue = "true", fallbackValue = "true")
@Option(names = "--stream-id", description = "Propagate stream message IDs. True by default.", negatable = true, defaultValue = "true", fallbackValue = "true")
private boolean propagateIds = true;

@Option(names = "--stream-prune", description = "Drop empty streams.")
Expand Down
32 changes: 32 additions & 0 deletions plugins/riot/src/main/java/com/redis/riot/RangeConverter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.redis.riot;

import org.springframework.util.StringUtils;

import com.redis.spring.batch.item.redis.common.Range;

import picocli.CommandLine.ITypeConverter;

public class RangeConverter implements ITypeConverter<Range> {

public static final String SEPARATOR = "-";

@Override
public Range convert(String value) {
int pos = value.indexOf(SEPARATOR);
if (pos == -1) {
int intValue = Integer.parseInt(value);
return new Range(intValue, intValue);
}
int min = Integer.parseInt(value.substring(0, pos).trim());
int max = max(value.substring(pos + 1).trim());
return new Range(min, max);
}

private static int max(String value) {
if (StringUtils.hasLength(value)) {
return Integer.parseInt(value);
}
return Integer.MAX_VALUE;
}

}
17 changes: 17 additions & 0 deletions plugins/riot/src/main/java/com/redis/riot/RedisURIConverter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.redis.riot;

import io.lettuce.core.RedisURI;
import picocli.CommandLine.ITypeConverter;

public class RedisURIConverter implements ITypeConverter<RedisURI> {

@Override
public RedisURI convert(String value) {
try {
return RedisURI.create(value);
} catch (IllegalArgumentException e) {
return RedisURI.create(RedisURI.URI_SCHEME_REDIS + "://" + value);
}
}

}
2 changes: 1 addition & 1 deletion plugins/riot/src/main/java/com/redis/riot/Replicate.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ private ItemProcessor<KeyValue<byte[], Object>, KeyValue<byte[], Object>> proces

private ItemProcessor<KeyValue<byte[], Object>, KeyValue<byte[], Object>> keyValueProcessor() {
if (isIgnoreStreamMessageId()) {
Assert.isTrue(isStruct(), "--no-stream-ids can only be used with --struct");
Assert.isTrue(isStruct(), "--no-stream-id can only be used with --struct");
}
StandardEvaluationContext evaluationContext = evaluationContext();
log.info("Creating processor with {}", processorArgs);
Expand Down
12 changes: 3 additions & 9 deletions plugins/riot/src/main/java/com/redis/riot/Riot.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.redis.riot.core.MainCommand;
import com.redis.riot.operation.OperationCommand;
import com.redis.spring.batch.item.redis.common.Range;

import io.lettuce.core.RedisURI;
import picocli.AutoComplete.GenerateCompletion;
Expand All @@ -25,8 +26,9 @@ public static void main(String[] args) {
@Override
protected CommandLine commandLine() {
CommandLine commandLine = super.commandLine();
commandLine.registerConverter(RedisURI.class, Riot::parseRedisURI);
commandLine.registerConverter(RedisURI.class, new RedisURIConverter());
commandLine.registerConverter(Region.class, Region::of);
commandLine.registerConverter(Range.class, new RangeConverter());
return commandLine;
}

Expand All @@ -35,14 +37,6 @@ protected IExecutionStrategy executionStrategy() {
return Riot::executionStrategy;
}

public static RedisURI parseRedisURI(String string) {
try {
return RedisURI.create(string);
} catch (IllegalArgumentException e) {
return RedisURI.create(RedisURI.URI_SCHEME_REDIS + "://" + string);
}
}

public static int executionStrategy(ParseResult parseResult) {
for (ParseResult subcommand : parseResult.subcommands()) {
Object command = subcommand.commandSpec().userObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ public class MemberOperationArgs {
@Option(names = "--member-space", description = "Keyspace prefix for member IDs.", paramLabel = "<str>")
private String memberSpace;

@Option(arity = "1..*", names = "--members", description = "Member field names for collections.", paramLabel = "<fields>")
@Option(arity = "1..*", names = "--member", description = "Member field names for collections.", paramLabel = "<fields>")
private List<String> memberFields;

public String getMemberSpace() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class TsAddCommand extends AbstractOperationCommand {
@Option(names = "--on-duplicate", description = "Duplicate policy: ${COMPLETION-CANDIDATES} (default: ${DEFAULT-VALUE}).", paramLabel = "<name>")
private DuplicatePolicy duplicatePolicy = DEFAULT_DUPLICATE_POLICY;

@Option(arity = "1..*", names = "--labels", description = "Labels in the form label1=field1 label2=field2...", paramLabel = "SPEL")
@Option(arity = "1..*", names = "--label", description = "Labels in the form label1=field1 label2=field2...", paramLabel = "SPEL")
private Map<String, String> labels = new LinkedHashMap<>();

@Override
Expand Down

0 comments on commit ba064ac

Please sign in to comment.