Skip to content

Commit

Permalink
Misc. Util cleanup
Browse files Browse the repository at this point in the history
Major changes:
1. Broke up 'Util' class into multiple classes: 'FileUtil', 'HttpUtil', 'CoordinatorStreamUtil'.
2. Consolidated some Util classes: MathUtil, ScalaJavaUtil
3. Removed redundant Util classes: ClassloaderUtil, ScalaToJavaUtil
4. Renamed some Util classes for consistency: TimerUtils -> TimerUtil.
5. Inlined some util classes and methods to where they're used: LexicographicComparator to RocksDBKeyValueStore, defaultSerdeFactoryFromSerdeName to SerdeManager, etc.

Rest of the changes are updates to use the new classes and methods.

Testing: Local build and test works. Tested with a locally deployed Samza job.

Author: Prateek Maheshwari <[email protected]>

Reviewers: Jacob Maes <[email protected]>, Jagadish Venkatraman <[email protected]>

Closes apache#455 from prateekm/util-cleanup
  • Loading branch information
prateekm committed Apr 18, 2018
1 parent bca978e commit 5d73ecd
Show file tree
Hide file tree
Showing 80 changed files with 860 additions and 778 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.util.BlobUtils;
import org.apache.samza.util.ClassLoaderHelper;
import org.apache.samza.util.LeaseBlobManager;
import org.apache.samza.util.SystemClock;
import org.apache.samza.util.TableUtils;
Expand Down Expand Up @@ -301,7 +300,8 @@ private Stream<SystemStreamPartition> mapSSMToSSP(Map.Entry<SystemStream, System
private SystemStreamPartitionGrouper getSystemStreamPartitionGrouper() {
JobConfig jobConfig = new JobConfig(config);
String factoryString = jobConfig.getSystemStreamPartitionGrouperFactory();
SystemStreamPartitionGrouper grouper = Util.<SystemStreamPartitionGrouperFactory>getObj(factoryString).getSystemStreamPartitionGrouper(jobConfig);
SystemStreamPartitionGrouper grouper = Util.getObj(factoryString, SystemStreamPartitionGrouperFactory.class)
.getSystemStreamPartitionGrouper(jobConfig);
return grouper;
}

Expand Down Expand Up @@ -479,7 +479,7 @@ private String createProcessorId(Config config) {
return appConfig.getProcessorId();
} else if (StringUtils.isNotBlank(appConfig.getAppProcessorIdGeneratorClass())) {
ProcessorIdGenerator idGenerator =
ClassLoaderHelper.fromClassName(appConfig.getAppProcessorIdGeneratorClass(), ProcessorIdGenerator.class);
Util.getObj(appConfig.getAppProcessorIdGeneratorClass(), ProcessorIdGenerator.class);
return idGenerator.generateProcessorId(config);
} else {
throw new ConfigException(String
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ protected SamzaResource peekAllocatedResource(String host) {
*/
private CommandBuilder getCommandBuilder(String samzaContainerId) {
String cmdBuilderClassName = taskConfig.getCommandClass(ShellCommandBuilder.class.getName());
CommandBuilder cmdBuilder = (CommandBuilder) Util.getObj(cmdBuilderClassName);
CommandBuilder cmdBuilder = Util.getObj(cmdBuilderClassName, CommandBuilder.class);

cmdBuilder.setConfig(config).setId(samzaContainerId).setUrl(state.jobModelManager.server().getUrl());
return cmdBuilder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
import org.apache.samza.metrics.ContainerProcessManagerMetrics;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.util.ClassLoaderHelper;
import org.apache.samza.util.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
Expand Down Expand Up @@ -460,15 +460,9 @@ private ResourceManagerFactory getContainerProcessManagerFactory(final ClusterMa
final ResourceManagerFactory factory;

try {
factory = ClassLoaderHelper.<ResourceManagerFactory>fromClassName(containerManagerFactoryClass);
} catch (InstantiationException e) {
log.error("Instantiation exception when creating ContainerManager", e);
throw new SamzaException(e);
} catch (IllegalAccessException e) {
log.error("Illegal access exception when creating ContainerManager", e);
throw new SamzaException(e);
} catch (ClassNotFoundException e) {
log.error("ClassNotFound Exception when creating ContainerManager", e);
factory = Util.getObj(containerManagerFactoryClass, ResourceManagerFactory.class);
} catch (Exception e) {
log.error("Exception when creating ContainerManager", e);
throw new SamzaException(e);
}
return factory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public Map<String, SystemFactory> getSystemFactories() {
throw new SamzaException(
String.format("A stream uses system %s, which is missing from the configuration.", systemName));
}
return Util.getObj(systemFactoryClassName);
return Util.getObj(systemFactoryClassName, SystemFactory.class);
}));

return systemFactories;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import com.google.common.base.Strings;
import org.apache.samza.SamzaException;
import org.apache.samza.coordinator.CoordinationUtilsFactory;
import org.apache.samza.util.ClassLoaderHelper;
import org.apache.samza.util.Util;
import org.apache.samza.zk.ZkCoordinationUtilsFactory;

public class JobCoordinatorConfig extends MapConfig {
Expand Down Expand Up @@ -55,7 +55,7 @@ public CoordinationUtilsFactory getCoordinationUtilsFactory() {
// load the class
String coordinationUtilsFactoryClass = getJobCoordinationUtilsFactoryClassName();

return ClassLoaderHelper.fromClassName(coordinationUtilsFactoryClass, CoordinationUtilsFactory.class);
return Util.getObj(coordinationUtilsFactoryClass, CoordinationUtilsFactory.class);
}

public String getJobCoordinatorFactoryClassName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public CheckpointManager getCheckpointManager(MetricsRegistry metricsRegistry) {
String checkpointManagerFactoryName = getCheckpointManagerFactoryName();
if (StringUtils.isNotBlank(checkpointManagerFactoryName)) {
CheckpointManager checkpointManager =
Util.<CheckpointManagerFactory>getObj(checkpointManagerFactoryName).getCheckpointManager(this, metricsRegistry);
Util.getObj(checkpointManagerFactoryName, CheckpointManagerFactory.class).getCheckpointManager(this, metricsRegistry);
return checkpointManager;
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.stream.Collectors;
import org.apache.samza.util.Util;
import org.apache.samza.util.HttpUtil;
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -86,7 +86,7 @@ String httpGet(URL url) throws IOException {
BufferedReader br = null;
for (int currentTry = 0; currentTry < NUM_RETRIES; currentTry++) {
try {
conn = Util.getHttpConnection(url, TIMEOUT_MS);
conn = HttpUtil.getHttpConnection(url, TIMEOUT_MS);
br = new BufferedReader(new InputStreamReader(conn.getInputStream()));
if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
throw new IOException(String.format("HTTP error fetching url %s. Returned status code %d", url.toString(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@
import scala.runtime.AbstractFunction1;
import java.util.concurrent.ExecutorService;

import static org.apache.samza.util.Util.asScalaClock;

import static org.apache.samza.util.ScalaJavaUtil.toScalaFunction;

/**
* Factory class to create runloop for a Samza task, based on the type
Expand Down Expand Up @@ -78,7 +77,7 @@ public Boolean apply(TaskInstance t) {
maxThrottlingDelayMs,
taskWindowMs,
taskCommitMs,
asScalaClock(() -> System.nanoTime()));
toScalaFunction(() -> clock.nanoTime()));
} else {
Integer taskMaxConcurrency = config.getMaxConcurrency();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.system.SystemStreamPartitionIterator;
import org.apache.samza.util.Util;
import org.apache.samza.util.CoordinatorStreamUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -68,8 +68,8 @@ public class CoordinatorStreamSystemConsumer {
private volatile Set<CoordinatorStreamMessage> bootstrappedStreamSet = Collections.emptySet();

public CoordinatorStreamSystemConsumer(Config config, MetricsRegistry registry) {
SystemStream coordinatorSystemStream = Util.getCoordinatorSystemStream(config);
SystemFactory systemFactory = Util.getCoordinatorSystemFactory(config);
SystemStream coordinatorSystemStream = CoordinatorStreamUtil.getCoordinatorSystemStream(config);
SystemFactory systemFactory = CoordinatorStreamUtil.getCoordinatorSystemFactory(config);
SystemAdmin systemAdmin = systemFactory.getAdmin(coordinatorSystemStream.getSystem(), config);
SystemConsumer systemConsumer = systemFactory.getConsumer(coordinatorSystemStream.getSystem(), config, registry);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.apache.samza.system.SystemFactory;
import org.apache.samza.system.SystemProducer;
import org.apache.samza.system.SystemStream;
import org.apache.samza.util.Util;
import org.apache.samza.util.CoordinatorStreamUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -56,8 +56,8 @@ public class CoordinatorStreamSystemProducer {


public CoordinatorStreamSystemProducer(Config config, MetricsRegistry registry) {
SystemStream coordinatorSystemStream = Util.getCoordinatorSystemStream(config);
SystemFactory systemFactory = Util.getCoordinatorSystemFactory(config);
SystemStream coordinatorSystemStream = CoordinatorStreamUtil.getCoordinatorSystemStream(config);
SystemFactory systemFactory = CoordinatorStreamUtil.getCoordinatorSystemFactory(config);
SystemAdmin systemAdmin = systemFactory.getAdmin(coordinatorSystemStream.getSystem(), config);
SystemProducer systemProducer = systemFactory.getProducer(coordinatorSystemStream.getSystem(), config, registry);
this.systemStream = coordinatorSystemStream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
import org.apache.samza.operators.spec.OutputStreamImpl;
import org.apache.samza.operators.spec.StatefulOperatorSpec;
import org.apache.samza.operators.spec.WindowOperatorSpec;
import org.apache.samza.operators.util.MathUtils;
import org.apache.samza.util.MathUtil;
import org.apache.samza.serializers.Serde;
import org.apache.samza.serializers.SerializableSerde;
import org.apache.samza.system.StreamSpec;
Expand Down Expand Up @@ -187,7 +187,8 @@ public JobConfig generateConfig(String executionPlanJson) {
// Note: no need to generate config for Serde's, as they are already produced by addSerdeConfigs()

// Generate additional configuration
TableProviderFactory tableProviderFactory = Util.getObj(tableSpec.getTableProviderFactoryClassName());
TableProviderFactory tableProviderFactory =
Util.getObj(tableSpec.getTableProviderFactoryClassName(), TableProviderFactory.class);
TableProvider tableProvider = tableProviderFactory.getTableProvider(tableSpec);
configs.putAll(tableProvider.generateConfig(configs));
});
Expand Down Expand Up @@ -343,7 +344,7 @@ private long computeTriggerInterval() {
}

// Compute the gcd of the resultant list
return MathUtils.gcd(candidateTimerIntervals);
return MathUtil.gcd(candidateTimerIntervals);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import org.slf4j.LoggerFactory;
import scala.collection.JavaConversions;

import static org.apache.samza.util.ScalaToJavaUtils.defaultValue;
import static org.apache.samza.util.ScalaJavaUtil.defaultValue;

public class StreamManager {
private static final Logger LOGGER = LoggerFactory.getLogger(StreamManager.class);
Expand Down Expand Up @@ -112,17 +112,20 @@ public void clearStreamsFromPreviousRun(Config prevConfig) {

//Find checkpoint stream and clean up
TaskConfig taskConfig = new TaskConfig(prevConfig);
String checkpointManagerFactoryClass = taskConfig.getCheckpointManagerFactory().getOrElse(defaultValue(null));
if (checkpointManagerFactoryClass != null) {
CheckpointManager checkpointManager = ((CheckpointManagerFactory) Util.getObj(checkpointManagerFactoryClass))
.getCheckpointManager(prevConfig, new MetricsRegistryMap());
String checkpointManagerFactoryClassName = taskConfig.getCheckpointManagerFactory()
.getOrElse(defaultValue(null));
if (checkpointManagerFactoryClassName != null) {
CheckpointManager checkpointManager =
Util.getObj(checkpointManagerFactoryClassName, CheckpointManagerFactory.class)
.getCheckpointManager(prevConfig, new MetricsRegistryMap());
checkpointManager.clearCheckpoints();
}

//Find changelog streams and remove them
StorageConfig storageConfig = new StorageConfig(prevConfig);
for (String store : JavaConversions.asJavaCollection(storageConfig.getStoreNames())) {
String changelog = storageConfig.getChangelogStream(store).getOrElse(defaultValue(null));
String changelog = storageConfig.getChangelogStream(store)
.getOrElse(defaultValue(null));
if (changelog != null) {
LOGGER.info("Clear store {} changelog {}", store, changelog);
SystemStream systemStream = Util.getSystemStreamFromNames(changelog);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.samza.operators.triggers.RepeatingTrigger;
import org.apache.samza.operators.triggers.TimeBasedTrigger;
import org.apache.samza.operators.triggers.Trigger;
import org.apache.samza.operators.util.MathUtils;
import org.apache.samza.util.MathUtil;
import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.operators.windows.internal.WindowInternal;
import org.apache.samza.serializers.Serde;
Expand Down Expand Up @@ -95,7 +95,7 @@ public long getDefaultTriggerMs() {
.map(timeBasedTrigger -> timeBasedTrigger.getDuration().toMillis())
.collect(Collectors.toList());

return MathUtils.gcd(candidateDurations);
return MathUtil.gcd(candidateDurations);
}

private List<TimeBasedTrigger> getTimeBasedTriggers(Trigger rootTrigger) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.samza.metrics.MetricsReporter;
import org.apache.samza.task.AsyncStreamTaskFactory;
import org.apache.samza.task.StreamTaskFactory;
import org.apache.samza.util.ScalaJavaUtil;
import org.apache.samza.util.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -114,11 +115,8 @@ public StreamProcessor(Config config, Map<String, MetricsReporter> customMetrics

/* package private */
JobCoordinator getJobCoordinator() {
return Util.
<JobCoordinatorFactory>getObj(
new JobCoordinatorConfig(config)
.getJobCoordinatorFactoryClassName())
.getJobCoordinator(config);
String jobCoordinatorFactoryClassName = new JobCoordinatorConfig(config).getJobCoordinatorFactoryClassName();
return Util.getObj(jobCoordinatorFactoryClassName, JobCoordinatorFactory.class).getJobCoordinator(config);
}

@VisibleForTesting
Expand Down Expand Up @@ -201,7 +199,7 @@ SamzaContainer createSamzaContainer(String processorId, JobModel jobModel) {
processorId,
jobModel,
config,
Util.javaMapAsScalaMap(customMetricsReporter),
ScalaJavaUtil.toScalaMap(customMetricsReporter),
taskFactory);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,8 @@
import org.apache.samza.container.SamzaContainerListener;
import org.apache.samza.job.ApplicationStatus;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.metrics.MetricsReporter;
import org.apache.samza.task.TaskFactoryUtil;
import org.apache.samza.util.ScalaToJavaUtils;
import org.apache.samza.util.Util;
import org.apache.samza.util.ScalaJavaUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -79,7 +77,7 @@ public void run(StreamApplication streamApp) {
containerId,
jobModel,
config,
Util.<String, MetricsReporter>javaMapAsScalaMap(new HashMap<>()),
ScalaJavaUtil.toScalaMap(new HashMap<>()),
taskFactory);
container.setContainerListener(
new SamzaContainerListener() {
Expand Down Expand Up @@ -140,7 +138,7 @@ public static void main(String[] args) throws Exception {
throw new SamzaException("can not find the job name");
}
String jobName = jobConfig.getName().get();
String jobId = jobConfig.getJobId().getOrElse(ScalaToJavaUtils.defaultValue("1"));
String jobId = jobConfig.getJobId().getOrElse(ScalaJavaUtil.defaultValue("1"));
MDC.put("containerName", "samza-container-" + containerId);
MDC.put("jobName", jobName);
MDC.put("jobId", jobId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.job.model.TaskModel;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.util.Util;
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.JsonParser;
Expand Down Expand Up @@ -166,16 +166,23 @@ public TaskName deserialize(JsonParser jsonParser, DeserializationContext contex

public static class SystemStreamPartitionKeySerializer extends JsonSerializer<SystemStreamPartition> {
@Override
public void serialize(SystemStreamPartition systemStreamPartition, JsonGenerator jgen, SerializerProvider provider) throws IOException {
String ssp = Util.sspToString(systemStreamPartition);
jgen.writeFieldName(ssp);
public void serialize(SystemStreamPartition ssp, JsonGenerator jgen, SerializerProvider provider) throws IOException {
String sspString = ssp.getSystem() + "." + ssp.getStream() + "." + String.valueOf(ssp.getPartition().getPartitionId());
jgen.writeFieldName(sspString);
}
}

public static class SystemStreamPartitionKeyDeserializer extends KeyDeserializer {
@Override
public Object deserializeKey(String sspString, DeserializationContext ctxt) throws IOException {
return Util.stringToSsp(sspString);
int idx = sspString.indexOf('.');
int lastIdx = sspString.lastIndexOf('.');
if (idx < 0 || lastIdx < 0) {
throw new IllegalArgumentException("System stream partition expected in format 'system.stream.partition");
}
return new SystemStreamPartition(
new SystemStream(sspString.substring(0, idx), sspString.substring(idx + 1, lastIdx)),
new Partition(Integer.parseInt(sspString.substring(lastIdx + 1))));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ private String createProcessorId(Config config) {
return appConfig.getProcessorId();
} else if (appConfig.getAppProcessorIdGeneratorClass() != null) {
ProcessorIdGenerator idGenerator =
ClassLoaderHelper.fromClassName(appConfig.getAppProcessorIdGeneratorClass(), ProcessorIdGenerator.class);
Util.getObj(appConfig.getAppProcessorIdGeneratorClass(), ProcessorIdGenerator.class);
return idGenerator.generateProcessorId(config);
} else {
throw new ConfigException(String
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.util.CommandLine;
import org.apache.samza.util.ScalaJavaUtil;
import org.apache.samza.util.SystemClock;
import org.apache.samza.util.Util;
import org.slf4j.Logger;
Expand Down Expand Up @@ -151,7 +152,7 @@ private void getChangeLogSystemStreamsAndStorageFactories() {

String factoryClass = config.getStorageFactoryClassName(storeName);
if (factoryClass != null) {
storageEngineFactories.put(storeName, Util.<StorageEngineFactory<Object, Object>>getObj(factoryClass));
storageEngineFactories.put(storeName, Util.getObj(factoryClass, StorageEngineFactory.class));
} else {
throw new SamzaException("Missing storage factory for " + storeName + ".");
}
Expand Down Expand Up @@ -229,9 +230,9 @@ private void getTaskStorageManagers() {
}
TaskStorageManager taskStorageManager = new TaskStorageManager(
taskModel.getTaskName(),
Util.javaMapAsScalaMap(taskStores),
Util.javaMapAsScalaMap(storeConsumers),
Util.javaMapAsScalaMap(changeLogSystemStreams),
ScalaJavaUtil.toScalaMap(taskStores),
ScalaJavaUtil.toScalaMap(storeConsumers),
ScalaJavaUtil.toScalaMap(changeLogSystemStreams),
maxPartitionNumber,
streamMetadataCache,
storeBaseDir,
Expand Down
Loading

0 comments on commit 5d73ecd

Please sign in to comment.