Skip to content

Commit

Permalink
[AMORO-3348] Extract the implementation of OptimizerManager (#3437)
Browse files Browse the repository at this point in the history
[AMORO-3346] Extract the implementation of OptimizerManager

Co-authored-by: baiyangtx <[email protected]>
  • Loading branch information
mansonliwh and baiyangtx authored Feb 19, 2025
1 parent 33b5485 commit 7c43e75
Show file tree
Hide file tree
Showing 11 changed files with 193 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.amoro.server.persistence.HttpSessionHandlerFactory;
import org.apache.amoro.server.persistence.SqlSessionFactoryProvider;
import org.apache.amoro.server.resource.ContainerMetadata;
import org.apache.amoro.server.resource.DefaultOptimizerManager;
import org.apache.amoro.server.resource.OptimizerManager;
import org.apache.amoro.server.resource.ResourceContainers;
import org.apache.amoro.server.table.DefaultTableManager;
Expand Down Expand Up @@ -99,6 +100,7 @@ public class AmoroServiceContainer {
private DataSource dataSource;
private CatalogManager catalogManager;
private TableManager tableManager;
private OptimizerManager optimizerManager;
private TableService tableService;
private DefaultOptimizingService optimizingService;
private TerminalManager terminalManager;
Expand Down Expand Up @@ -155,10 +157,13 @@ public void startService() throws Exception {

catalogManager = new DefaultCatalogManager(serviceConfig);
tableManager = new DefaultTableManager(serviceConfig, catalogManager);
optimizerManager = new DefaultOptimizerManager(serviceConfig);

tableService = new DefaultTableService(serviceConfig, catalogManager);

optimizingService =
new DefaultOptimizingService(serviceConfig, catalogManager, tableManager, tableService);
new DefaultOptimizingService(
serviceConfig, catalogManager, tableManager, optimizerManager, tableService);

LOG.info("Setting up AMS table executors...");
AsyncTableExecutors.getInstance().setup(tableService, serviceConfig);
Expand Down Expand Up @@ -253,7 +258,12 @@ private void startThriftServer(TServer server, String threadName) {
private void initHttpService() {
DashboardServer dashboardServer =
new DashboardServer(
serviceConfig, catalogManager, tableManager, optimizingService, terminalManager);
serviceConfig,
catalogManager,
tableManager,
optimizerManager,
optimizingService,
terminalManager);
RestCatalogService restCatalogService = new RestCatalogService(catalogManager, tableManager);

httpServer =
Expand Down Expand Up @@ -558,7 +568,7 @@ public CatalogManager getCatalogManager() {
}

@VisibleForTesting
public OptimizerManager getOptimizingService() {
return this.optimizingService;
public OptimizerManager getOptimizerManager() {
return this.optimizerManager;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.amoro.exception.PluginRetryAuthException;
import org.apache.amoro.exception.TaskNotFoundException;
import org.apache.amoro.properties.CatalogMetaProperties;
import org.apache.amoro.resource.Resource;
import org.apache.amoro.resource.ResourceGroup;
import org.apache.amoro.server.catalog.CatalogManager;
import org.apache.amoro.server.optimizing.OptimizingProcess;
Expand All @@ -56,7 +55,6 @@
import org.apache.amoro.server.table.TableRuntime;
import org.apache.amoro.server.table.TableService;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableList;
import org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.amoro.shade.thrift.org.apache.thrift.TException;
import org.apache.amoro.table.TableProperties;
Expand Down Expand Up @@ -90,7 +88,7 @@
* suspending tasks.
*/
public class DefaultOptimizingService extends StatedPersistentBase
implements OptimizingService.Iface, OptimizerManager, QuotaProvider {
implements OptimizingService.Iface, QuotaProvider {

private static final Logger LOG = LoggerFactory.getLogger(DefaultOptimizingService.class);

Expand All @@ -103,6 +101,7 @@ public class DefaultOptimizingService extends StatedPersistentBase
private final Map<String, OptimizerInstance> authOptimizers = new ConcurrentHashMap<>();
private final OptimizerKeeper optimizerKeeper = new OptimizerKeeper();
private final CatalogManager catalogManager;
private final OptimizerManager optimizerManager;
private final TableService tableService;
private final MaintainedTableManager tableManager;
private final RuntimeHandlerChain tableHandlerChain;
Expand All @@ -112,6 +111,7 @@ public DefaultOptimizingService(
Configurations serviceConfig,
CatalogManager catalogManager,
MaintainedTableManager tableManager,
OptimizerManager optimizerManager,
TableService tableService) {
this.optimizerTouchTimeout =
serviceConfig.get(AmoroManagementConf.OPTIMIZER_HB_TIMEOUT).toMillis();
Expand All @@ -124,6 +124,7 @@ public DefaultOptimizingService(
this.tableService = tableService;
this.catalogManager = catalogManager;
this.tableManager = tableManager;
this.optimizerManager = optimizerManager;
this.tableHandlerChain = new TableRuntimeHandlerImpl();
this.planExecutor =
Executors.newCachedThreadPool(
Expand Down Expand Up @@ -317,19 +318,7 @@ private OptimizingQueue getQueueByToken(String token) {
.orElseThrow(() -> new PluginRetryAuthException("Optimizer has not been authenticated"));
}

@Override
public List<OptimizerInstance> listOptimizers() {
return ImmutableList.copyOf(authOptimizers.values());
}

@Override
public List<OptimizerInstance> listOptimizers(String group) {
return authOptimizers.values().stream()
.filter(optimizer -> optimizer.getGroupName().equals(group))
.collect(Collectors.toList());
}

@Override
// TODO need to use optimizer manager
public void deleteOptimizer(String group, String resourceId) {
List<OptimizerInstance> deleteOptimizers =
getAs(OptimizerMapper.class, mapper -> mapper.selectByResourceId(resourceId));
Expand All @@ -340,11 +329,10 @@ public void deleteOptimizer(String group, String resourceId) {
});
}

@Override
public void createResourceGroup(ResourceGroup resourceGroup) {
doAsTransaction(
() -> {
doAs(ResourceMapper.class, mapper -> mapper.insertResourceGroup(resourceGroup));
optimizerManager.createResourceGroup(resourceGroup);
OptimizingQueue optimizingQueue =
new OptimizingQueue(
catalogManager,
Expand All @@ -357,10 +345,9 @@ public void createResourceGroup(ResourceGroup resourceGroup) {
});
}

@Override
public void deleteResourceGroup(String groupName) {
if (canDeleteResourceGroup(groupName)) {
doAs(ResourceMapper.class, mapper -> mapper.deleteResourceGroup(groupName));
optimizerManager.deleteResourceGroup(groupName);
OptimizingQueue optimizingQueue = optimizingQueueByGroup.remove(groupName);
optimizingQueue.dispose();
} else {
Expand All @@ -371,52 +358,13 @@ public void deleteResourceGroup(String groupName) {
}
}

@Override
public void updateResourceGroup(ResourceGroup resourceGroup) {
Preconditions.checkNotNull(resourceGroup, "The resource group cannot be null.");
Optional.ofNullable(optimizingQueueByGroup.get(resourceGroup.getName()))
.ifPresent(queue -> queue.updateOptimizerGroup(resourceGroup));
doAs(ResourceMapper.class, mapper -> mapper.updateResourceGroup(resourceGroup));
}

@Override
public void createResource(Resource resource) {
doAs(ResourceMapper.class, mapper -> mapper.insertResource(resource));
}

@Override
public void deleteResource(String resourceId) {
doAs(ResourceMapper.class, mapper -> mapper.deleteResource(resourceId));
optimizerManager.updateResourceGroup(resourceGroup);
}

@Override
public List<ResourceGroup> listResourceGroups() {
return getAs(ResourceMapper.class, ResourceMapper::selectResourceGroups);
}

@Override
public List<ResourceGroup> listResourceGroups(String containerName) {
return getAs(ResourceMapper.class, ResourceMapper::selectResourceGroups).stream()
.filter(group -> group.getContainer().equals(containerName))
.collect(Collectors.toList());
}

@Override
public ResourceGroup getResourceGroup(String groupName) {
return getAs(ResourceMapper.class, mapper -> mapper.selectResourceGroup(groupName));
}

@Override
public List<Resource> listResourcesByGroup(String groupName) {
return getAs(ResourceMapper.class, mapper -> mapper.selectResourcesByGroup(groupName));
}

@Override
public Resource getResource(String resourceId) {
return getAs(ResourceMapper.class, mapper -> mapper.selectResource(resourceId));
}

@Override
public void dispose() {
optimizerKeeper.dispose();
tableHandlerChain.dispose();
Expand All @@ -439,7 +387,7 @@ public boolean canDeleteResourceGroup(String name) {
return false;
}
}
for (OptimizerInstance optimizer : listOptimizers()) {
for (OptimizerInstance optimizer : optimizerManager.listOptimizers()) {
if (optimizer.getGroupName().equals(name)) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.amoro.server.dashboard.controller.VersionController;
import org.apache.amoro.server.dashboard.response.ErrorResponse;
import org.apache.amoro.server.dashboard.utils.ParamSignatureCalculator;
import org.apache.amoro.server.resource.OptimizerManager;
import org.apache.amoro.server.table.TableManager;
import org.apache.amoro.server.terminal.TerminalManager;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
Expand Down Expand Up @@ -99,15 +100,17 @@ public DashboardServer(
Configurations serviceConfig,
CatalogManager catalogManager,
TableManager tableManager,
DefaultOptimizingService optimizerManager,
OptimizerManager optimizerManager,
DefaultOptimizingService optimizingService,
TerminalManager terminalManager) {
PlatformFileManager platformFileManager = new PlatformFileManager();
this.catalogController = new CatalogController(catalogManager, platformFileManager);
this.healthCheckController = new HealthCheckController();
this.loginController = new LoginController(serviceConfig);
// TODO: remove table service from OptimizerGroupController
this.optimizerGroupController = new OptimizerGroupController(tableManager, optimizerManager);
this.optimizerController = new OptimizerController(optimizerManager);
this.optimizerGroupController =
new OptimizerGroupController(tableManager, optimizingService, optimizerManager);
this.optimizerController = new OptimizerController(optimizingService, optimizerManager);
this.platformFileInfoController = new PlatformFileInfoController(platformFileManager);
this.settingController = new SettingController(serviceConfig, optimizerManager);
ServerTableDescriptor tableDescriptor =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.amoro.server.dashboard.response.OkResponse;
import org.apache.amoro.server.resource.ContainerMetadata;
import org.apache.amoro.server.resource.OptimizerInstance;
import org.apache.amoro.server.resource.OptimizerManager;
import org.apache.amoro.server.resource.ResourceContainers;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;

Expand All @@ -36,9 +37,13 @@
/** The controller that handles optimizer requests. */
public class OptimizerController {

private final DefaultOptimizingService optimizerManager;
private final OptimizerManager optimizerManager;

public OptimizerController(DefaultOptimizingService optimizerManager) {
private final DefaultOptimizingService optimizingService;

public OptimizerController(
DefaultOptimizingService optimizingService, OptimizerManager optimizerManager) {
this.optimizingService = optimizingService;
this.optimizerManager = optimizerManager;
}

Expand All @@ -64,7 +69,7 @@ public void releaseOptimizer(Context ctx) {
resource.getProperties().putAll(optimizerInstances.get(0).getProperties());
ResourceContainers.get(resource.getContainerName()).releaseOptimizer(resource);
optimizerManager.deleteResource(resourceId);
optimizerManager.deleteOptimizer(resource.getGroupName(), resourceId);
optimizingService.deleteOptimizer(resource.getGroupName(), resourceId);
ctx.json(OkResponse.of("Success to release optimizer"));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.amoro.server.optimizing.OptimizingStatus;
import org.apache.amoro.server.resource.ContainerMetadata;
import org.apache.amoro.server.resource.OptimizerInstance;
import org.apache.amoro.server.resource.OptimizerManager;
import org.apache.amoro.server.resource.ResourceContainers;
import org.apache.amoro.server.table.TableManager;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
Expand All @@ -56,11 +57,15 @@ public class OptimizerGroupController {

private static final String ALL_GROUP = "all";
private final TableManager tableManager;
private final DefaultOptimizingService optimizerManager;
private final DefaultOptimizingService optimizingService;
private final OptimizerManager optimizerManager;

public OptimizerGroupController(
TableManager tableManager, DefaultOptimizingService optimizerManager) {
TableManager tableManager,
DefaultOptimizingService optimizingService,
OptimizerManager optimizerManager) {
this.tableManager = tableManager;
this.optimizingService = optimizingService;
this.optimizerManager = optimizerManager;
}

Expand Down Expand Up @@ -205,7 +210,7 @@ public void releaseOptimizer(Context ctx) {
resource.getProperties().putAll(optimizerInstances.get(0).getProperties());
ResourceContainers.get(resource.getContainerName()).releaseOptimizer(resource);
optimizerManager.deleteResource(resourceId);
optimizerManager.deleteOptimizer(resource.getGroupName(), resourceId);
optimizingService.deleteOptimizer(resource.getGroupName(), resourceId);
ctx.json(OkResponse.of("Success to release optimizer"));
}

Expand Down Expand Up @@ -263,7 +268,7 @@ public void createResourceGroup(Context ctx) {
}
ResourceGroup.Builder builder = new ResourceGroup.Builder(name, container);
builder.addProperties(properties);
optimizerManager.createResourceGroup(builder.build());
optimizingService.createResourceGroup(builder.build());
ctx.json(OkResponse.of("The optimizer group has been successfully created."));
}

Expand All @@ -278,21 +283,21 @@ public void updateResourceGroup(Context ctx) {
Map<String, String> properties = (Map) map.get("properties");
ResourceGroup.Builder builder = new ResourceGroup.Builder(name, container);
builder.addProperties(properties);
optimizerManager.updateResourceGroup(builder.build());
optimizingService.updateResourceGroup(builder.build());
ctx.json(OkResponse.of("The optimizer group has been successfully updated."));
}

/** delete optimizeGroup url = /optimize/resourceGroups/{resourceGroupName} */
public void deleteResourceGroup(Context ctx) {
String name = ctx.pathParam("resourceGroupName");
optimizerManager.deleteResourceGroup(name);
optimizingService.deleteResourceGroup(name);
ctx.json(OkResponse.of("The optimizer group has been successfully deleted."));
}

/** check if optimizerGroup can be deleted url = /optimize/resourceGroups/delete/check */
public void deleteCheckResourceGroup(Context ctx) {
String name = ctx.pathParam("resourceGroupName");
ctx.json(OkResponse.of(optimizerManager.canDeleteResourceGroup(name)));
ctx.json(OkResponse.of(optimizingService.canDeleteResourceGroup(name)));
}

/** check if optimizerGroup can be deleted url = /optimize/containers/get */
Expand Down
Loading

0 comments on commit 7c43e75

Please sign in to comment.