Skip to content

Commit

Permalink
SAMZA-2488: Add JobCoordinatorLaunchUtil to handle common logic when …
Browse files Browse the repository at this point in the history
…launching job coordiantor. (apache#1318)
  • Loading branch information
kw2542 authored Mar 18, 2020
1 parent 7d09f23 commit b7cab95
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@
import org.apache.samza.SamzaException;
import org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadataStore;
import org.apache.samza.application.ApplicationUtil;
import org.apache.samza.application.descriptors.ApplicationDescriptor;
import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
import org.apache.samza.application.descriptors.ApplicationDescriptorUtil;
import org.apache.samza.classloader.IsolatingClassLoaderFactory;
import org.apache.samza.clustermanager.container.placement.ContainerPlacementRequestAllocator;
import org.apache.samza.config.ApplicationConfig;
Expand All @@ -58,7 +55,6 @@
import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
import org.apache.samza.execution.RemoteJobPlanner;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.job.model.JobModelUtil;
Expand Down Expand Up @@ -571,8 +567,16 @@ private static void runClusterBasedJobCoordinator(String[] args) {
throw new SamzaException(e);
}
} else {
ClusterBasedJobCoordinator jc = createFromConfigLoader(submissionConfig);
jc.run();
JobConfig jobConfig = new JobConfig(submissionConfig);

if (!jobConfig.getConfigLoaderFactory().isPresent()) {
throw new SamzaException(JobConfig.CONFIG_LOADER_FACTORY + " is required to initialize job coordinator from config loader");
}

// load full job config with ConfigLoader
Config originalConfig = ConfigUtil.loadConfig(submissionConfig);

JobCoordinatorLaunchUtil.run(ApplicationUtil.fromConfig(originalConfig), originalConfig);
}

LOG.info("Finished running ClusterBasedJobCoordinator");
Expand Down Expand Up @@ -614,49 +618,6 @@ static ClusterBasedJobCoordinator createFromMetadataStore(Config metadataStoreCo
return new ClusterBasedJobCoordinator(metrics, coordinatorStreamStore, config);
}

/**
* Initialize {@link ClusterBasedJobCoordinator} with submission config, full job config will be fetched using
* specified {@link org.apache.samza.config.ConfigLoaderFactory}
*
* @param submissionConfig specifies {@link org.apache.samza.config.ConfigLoaderFactory}
* @return {@link ClusterBasedJobCoordinator}
*/
@VisibleForTesting
static ClusterBasedJobCoordinator createFromConfigLoader(Config submissionConfig) {
JobConfig jobConfig = new JobConfig(submissionConfig);

if (!jobConfig.getConfigLoaderFactory().isPresent()) {
throw new SamzaException(JobConfig.CONFIG_LOADER_FACTORY + " is required to initialize job coordinator from config loader");
}

MetricsRegistryMap metrics = new MetricsRegistryMap();
// load full job config with ConfigLoader
Config originalConfig = ConfigUtil.loadConfig(submissionConfig);

// Execute planning
ApplicationDescriptorImpl<? extends ApplicationDescriptor>
appDesc = ApplicationDescriptorUtil.getAppDescriptor(ApplicationUtil.fromConfig(originalConfig), originalConfig);
RemoteJobPlanner planner = new RemoteJobPlanner(appDesc);
List<JobConfig> jobConfigs = planner.prepareJobs();

if (jobConfigs.size() != 1) {
throw new SamzaException("Only support single remote job is supported.");
}

Config config = jobConfigs.get(0);

// This needs to be consistent with RemoteApplicationRunner#run where JobRunner#submit to be called instead of JobRunner#run
CoordinatorStreamUtil.writeConfigToCoordinatorStream(config, true);
DiagnosticsUtil.createDiagnosticsStream(config);
MetadataStore metadataStore = new CoordinatorStreamStore(CoordinatorStreamUtil.buildCoordinatorStreamConfig(config), metrics);
metadataStore.init();

return new ClusterBasedJobCoordinator(
metrics,
metadataStore,
config);
}

/**
* Convert Samza config to command line arguments to invoke app.main.class
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.clustermanager;

import java.util.List;
import org.apache.samza.SamzaException;
import org.apache.samza.application.SamzaApplication;
import org.apache.samza.application.descriptors.ApplicationDescriptor;
import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
import org.apache.samza.application.descriptors.ApplicationDescriptorUtil;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
import org.apache.samza.execution.RemoteJobPlanner;
import org.apache.samza.metadatastore.MetadataStore;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.util.CoordinatorStreamUtil;
import org.apache.samza.util.DiagnosticsUtil;


/**
* Util class to launch and run {@link ClusterBasedJobCoordinator}.
* This util is being used by both high/low and beam API Samza jobs.
*/
public class JobCoordinatorLaunchUtil {
/**
* Run {@link ClusterBasedJobCoordinator} with full job config.
*
* @param app SamzaApplication to run.
* @param config full job config.
*/
@SuppressWarnings("rawtypes")
public static void run(SamzaApplication app, Config config) {
// Execute planning
ApplicationDescriptorImpl<? extends ApplicationDescriptor>
appDesc = ApplicationDescriptorUtil.getAppDescriptor(app, config);
RemoteJobPlanner planner = new RemoteJobPlanner(appDesc);
List<JobConfig> jobConfigs = planner.prepareJobs();

if (jobConfigs.size() != 1) {
throw new SamzaException("Only support single remote job is supported.");
}

Config finalConfig = jobConfigs.get(0);

// This needs to be consistent with RemoteApplicationRunner#run where JobRunner#submit to be called instead of JobRunner#run
CoordinatorStreamUtil.writeConfigToCoordinatorStream(finalConfig, true);
DiagnosticsUtil.createDiagnosticsStream(finalConfig);
MetricsRegistryMap metrics = new MetricsRegistryMap();
MetadataStore
metadataStore = new CoordinatorStreamStore(CoordinatorStreamUtil.buildCoordinatorStreamConfig(finalConfig), metrics);
// MetadataStore will be closed in ClusterBasedJobCoordinator#onShutDown
// initialization of MetadataStore can be moved to ClusterBasedJobCoordinator after we clean up
// ClusterBasedJobCoordinator#createFromMetadataStore
metadataStore.init();

ClusterBasedJobCoordinator jc = new ClusterBasedJobCoordinator(
metrics,
metadataStore,
finalConfig);
jc.run();
}

private JobCoordinatorLaunchUtil() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,30 +23,25 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.application.MockStreamApplication;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.loaders.PropertiesConfigLoaderFactory;
import org.apache.samza.coordinator.StreamPartitionCountMonitor;
import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer;
import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory;
import org.apache.samza.execution.RemoteJobPlanner;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.startpoint.StartpointManager;
import org.apache.samza.system.MockSystemFactory;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.util.ConfigUtil;
import org.apache.samza.util.CoordinatorStreamUtil;
import org.junit.After;
import org.junit.Before;
Expand All @@ -68,7 +63,6 @@
import static org.mockito.Mockito.when;
import static org.powermock.api.mockito.PowerMockito.mock;
import static org.powermock.api.mockito.PowerMockito.verifyPrivate;
import static org.powermock.api.mockito.PowerMockito.verifyNew;


/**
Expand Down Expand Up @@ -213,35 +207,6 @@ public void testRunWithClassLoader() throws Exception {
verifyPrivate(ClusterBasedJobCoordinator.class).invoke("runClusterBasedJobCoordinator", new Object[]{aryEq(args)});
}

@Test(expected = SamzaException.class)
public void testCreateFromConfigLoaderWithoutConfigLoaderFactory() {
ClusterBasedJobCoordinator.createFromConfigLoader(new MapConfig());
}

@Test
public void testCreateFromConfigLoader() throws Exception {
Map<String, String> config = new HashMap<>();
config.put(ApplicationConfig.APP_CLASS, MockStreamApplication.class.getCanonicalName());
config.put(JobConfig.CONFIG_LOADER_FACTORY, PropertiesConfigLoaderFactory.class.getCanonicalName());
config.put(PropertiesConfigLoaderFactory.CONFIG_LOADER_PROPERTIES_PREFIX + "path",
getClass().getResource("/test.properties").getPath());
Config submissionConfig = new MapConfig(config);
JobConfig fullJobConfig = new JobConfig(ConfigUtil.loadConfig(submissionConfig));

RemoteJobPlanner mockJobPlanner = mock(RemoteJobPlanner.class);
CoordinatorStreamStore mockCoordinatorStreamStore = mock(CoordinatorStreamStore.class);

PowerMockito.whenNew(ClusterBasedJobCoordinator.class).withAnyArguments().thenReturn(mock(ClusterBasedJobCoordinator.class));
PowerMockito.doReturn(new MapConfig()).when(CoordinatorStreamUtil.class, "buildCoordinatorStreamConfig", any());
PowerMockito.whenNew(CoordinatorStreamStore.class).withAnyArguments().thenReturn(mockCoordinatorStreamStore);
PowerMockito.whenNew(RemoteJobPlanner.class).withAnyArguments().thenReturn(mockJobPlanner);
when(mockJobPlanner.prepareJobs()).thenReturn(Collections.singletonList(fullJobConfig));

ClusterBasedJobCoordinator.createFromConfigLoader(submissionConfig);

verifyNew(ClusterBasedJobCoordinator.class).withArguments(any(MetricsRegistryMap.class), eq(mockCoordinatorStreamStore), eq(fullJobConfig));
}

@Test
public void testToArgs() {
ApplicationConfig appConfig = new ApplicationConfig(new MapConfig(ImmutableMap.of(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.clustermanager;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.samza.application.MockStreamApplication;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.loaders.PropertiesConfigLoaderFactory;
import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
import org.apache.samza.execution.RemoteJobPlanner;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.util.ConfigUtil;
import org.apache.samza.util.CoordinatorStreamUtil;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.powermock.api.mockito.PowerMockito.mock;
import static org.powermock.api.mockito.PowerMockito.verifyNew;

@RunWith(PowerMockRunner.class)
@PrepareForTest({
CoordinatorStreamUtil.class,
JobCoordinatorLaunchUtil.class,
CoordinatorStreamStore.class,
RemoteJobPlanner.class})
public class TestJobCoordinatorLaunchUtil {
@Test
public void testCreateFromConfigLoader() throws Exception {
Map<String, String> config = new HashMap<>();
config.put(JobConfig.CONFIG_LOADER_FACTORY, PropertiesConfigLoaderFactory.class.getName());
config.put(PropertiesConfigLoaderFactory.CONFIG_LOADER_PROPERTIES_PREFIX + "path",
getClass().getResource("/test.properties").getPath());
JobConfig originalConfig = new JobConfig(ConfigUtil.loadConfig(new MapConfig(config)));
JobConfig fullJobConfig = new JobConfig(new MapConfig(originalConfig, Collections.singletonMap("isAfterPlanning", "true")));

RemoteJobPlanner mockJobPlanner = mock(RemoteJobPlanner.class);
CoordinatorStreamStore mockCoordinatorStreamStore = mock(CoordinatorStreamStore.class);
ClusterBasedJobCoordinator mockJC = mock(ClusterBasedJobCoordinator.class);

PowerMockito.mockStatic(CoordinatorStreamUtil.class);
PowerMockito.doReturn(new MapConfig()).when(CoordinatorStreamUtil.class, "buildCoordinatorStreamConfig", any());
PowerMockito.whenNew(CoordinatorStreamStore.class).withAnyArguments().thenReturn(mockCoordinatorStreamStore);
PowerMockito.whenNew(RemoteJobPlanner.class).withAnyArguments().thenReturn(mockJobPlanner);
PowerMockito.whenNew(ClusterBasedJobCoordinator.class).withAnyArguments().thenReturn(mockJC);
when(mockJobPlanner.prepareJobs()).thenReturn(Collections.singletonList(fullJobConfig));

JobCoordinatorLaunchUtil.run(new MockStreamApplication(), originalConfig);

verifyNew(ClusterBasedJobCoordinator.class).withArguments(any(MetricsRegistryMap.class), eq(mockCoordinatorStreamStore), eq(fullJobConfig));
verify(mockJC, times(1)).run();
}
}

0 comments on commit b7cab95

Please sign in to comment.