Skip to content

Commit

Permalink
Refactor ClusterContextManager & ClusterContextManagerBuilder (apache…
Browse files Browse the repository at this point in the history
…#12129)

* Refactor ClusterContextManager & ClusterContextManagerBuilder
  • Loading branch information
menghaoranss authored Aug 31, 2021
1 parent 33b8cb9 commit b7b7176
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,12 @@
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.config.properties.ConfigurationPropertyKey;
import org.apache.shardingsphere.infra.lock.ShardingSphereLock;
import org.apache.shardingsphere.infra.rule.event.impl.DataSourceNameDisabledEvent;
import org.apache.shardingsphere.infra.rule.identifier.type.StatusContainedRule;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.cluster.governance.lock.ShardingSphereDistributeLock;
import org.apache.shardingsphere.mode.manager.cluster.governance.schema.GovernanceSchema;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.mode.persist.PersistService;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import org.apache.shardingsphere.transaction.context.TransactionContexts;

import java.util.Collection;
import java.util.Optional;

/**
Expand All @@ -40,8 +35,6 @@
@RequiredArgsConstructor
public final class ClusterContextManager implements ContextManager {

private final PersistService persistService;

private final RegistryCenter registryCenter;

@Getter
Expand All @@ -56,30 +49,10 @@ public final class ClusterContextManager implements ContextManager {
public void init(final MetaDataContexts metaDataContexts, final TransactionContexts transactionContexts) {
this.metaDataContexts = metaDataContexts;
this.transactionContexts = transactionContexts;
disableDataSources();
persistMetaData();
lock = createShardingSphereLock(registryCenter.getRepository());
registryCenter.onlineInstance(metaDataContexts.getAllSchemaNames());
}

private void disableDataSources() {
metaDataContexts.getMetaDataMap().forEach((key, value)
-> value.getRuleMetaData().getRules().stream().filter(each -> each instanceof StatusContainedRule).forEach(each -> disableDataSources(key, (StatusContainedRule) each)));
}

private void disableDataSources(final String schemaName, final StatusContainedRule rule) {
Collection<String> disabledDataSources = registryCenter.getDataSourceStatusService().loadDisabledDataSources(schemaName);
disabledDataSources.stream().map(this::getDataSourceName).forEach(each -> rule.updateRuleStatus(new DataSourceNameDisabledEvent(each, true)));
}

private String getDataSourceName(final String disabledDataSource) {
return new GovernanceSchema(disabledDataSource).getDataSourceName();
}

private void persistMetaData() {
metaDataContexts.getMetaDataMap().forEach((key, value) -> persistService.getSchemaMetaDataService().persist(key, value.getSchema()));
}

private ShardingSphereLock createShardingSphereLock(final ClusterPersistRepository repository) {
return metaDataContexts.getProps().<Boolean>getValue(ConfigurationPropertyKey.LOCK_ENABLED)
? new ShardingSphereDistributeLock(repository, metaDataContexts.getProps().<Long>getValue(ConfigurationPropertyKey.LOCK_WAIT_TIMEOUT_MILLISECONDS)) : null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,26 +90,44 @@ public final class ClusterContextManagerBuilder implements ContextManagerBuilder
ShardingSphereServiceLoader.register(ClusterPersistRepository.class);
}

private RegistryCenter registryCenter;

private PersistService persistService;

private MetaDataContexts metaDataContexts;

private TransactionContexts transactionContexts;

private ContextManager contextManager;

@Override
public ContextManager build(final ModeConfiguration modeConfig, final Map<String, Map<String, DataSource>> dataSourcesMap,
final Map<String, Collection<RuleConfiguration>> schemaRuleConfigs, final Collection<RuleConfiguration> globalRuleConfigs,
final Properties props, final boolean isOverwrite) throws SQLException {
beforeBuildContextManager(modeConfig, dataSourcesMap, schemaRuleConfigs, globalRuleConfigs, props, isOverwrite);
contextManager = new ClusterContextManager(registryCenter);
contextManager.init(metaDataContexts, transactionContexts);
afterBuildContextManager();
return contextManager;
}

private void beforeBuildContextManager(final ModeConfiguration modeConfig, final Map<String, Map<String, DataSource>> dataSourcesMap,
final Map<String, Collection<RuleConfiguration>> schemaRuleConfigs, final Collection<RuleConfiguration> globalRuleConfigs,
final Properties props, final boolean isOverwrite) throws SQLException {
ShardingSphereEventBus.getInstance().register(this);
ClusterPersistRepository repository = createClusterPersistRepository((ClusterPersistRepositoryConfiguration) modeConfig.getRepository());
registryCenter = new RegistryCenter(repository);
persistService = new PersistService(repository);
RegistryCenter registryCenter = new RegistryCenter(repository);
persistConfigurations(persistService, dataSourcesMap, schemaRuleConfigs, globalRuleConfigs, props, isOverwrite);
Collection<String> schemaNames = persistService.getSchemaMetaDataService().loadAllNames();
MetaDataContexts metaDataContexts = new MetaDataContextsBuilder(loadDataSourcesMap(persistService, dataSourcesMap, schemaNames),
metaDataContexts = new MetaDataContextsBuilder(loadDataSourcesMap(persistService, dataSourcesMap, schemaNames),
loadSchemaRules(persistService, schemaNames), persistService.getGlobalRuleService().load(), persistService.getPropsService().load()).build(persistService);
TransactionContexts transactionContexts = createTransactionContexts(metaDataContexts);
contextManager = new ClusterContextManager(persistService, registryCenter);
contextManager.init(metaDataContexts, transactionContexts);
return contextManager;
transactionContexts = createTransactionContexts(metaDataContexts);
}

private void afterBuildContextManager() {
disableDataSources();
persistMetaData();
}

private ClusterPersistRepository createClusterPersistRepository(final ClusterPersistRepositoryConfiguration config) {
Expand Down Expand Up @@ -235,9 +253,7 @@ public synchronized void renew(final SchemaAddedEvent event) throws SQLException
contextManager.getMetaDataContexts().getOptimizeContextFactory().getSchemaMetadatas().getSchemas().put(event.getSchemaName(), new FederateSchemaMetadata(event.getSchemaName(),
metaData.getSchema().getTables()));
contextManager.getMetaDataContexts().getMetaDataMap().put(event.getSchemaName(), metaData);
contextManager.renewMetaDataContexts(new MetaDataContexts(persistService,
contextManager.getMetaDataContexts().getMetaDataMap(), contextManager.getMetaDataContexts().getGlobalRuleMetaData(), contextManager.getMetaDataContexts().getExecutorEngine(),
contextManager.getMetaDataContexts().getProps(), contextManager.getMetaDataContexts().getOptimizeContextFactory()));
contextManager.renewMetaDataContexts(rebuildMetaDataContexts(contextManager.getMetaDataContexts().getMetaDataMap()));
ShardingSphereEventBus.getInstance().post(new DataSourceChangeCompletedEvent(event.getSchemaName(),
contextManager.getMetaDataContexts().getMetaDataMap().get(event.getSchemaName()).getResource().getDatabaseType(), metaData.getResource().getDataSources()));
}
Expand All @@ -254,9 +270,7 @@ public synchronized void renew(final SchemaDeletedEvent event) {
Map<String, ShardingSphereMetaData> schemaMetaData = new HashMap<>(contextManager.getMetaDataContexts().getMetaDataMap());
schemaMetaData.remove(schemaName);
contextManager.getMetaDataContexts().getOptimizeContextFactory().getSchemaMetadatas().getSchemas().remove(schemaName);
contextManager.renewMetaDataContexts(new MetaDataContexts(persistService,
schemaMetaData, contextManager.getMetaDataContexts().getGlobalRuleMetaData(), contextManager.getMetaDataContexts().getExecutorEngine(),
contextManager.getMetaDataContexts().getProps(), contextManager.getMetaDataContexts().getOptimizeContextFactory()));
contextManager.renewMetaDataContexts(rebuildMetaDataContexts(schemaMetaData));
ShardingSphereEventBus.getInstance().post(new DataSourceDeletedEvent(schemaName));
}

Expand All @@ -268,9 +282,7 @@ public synchronized void renew(final SchemaDeletedEvent event) {
@Subscribe
public synchronized void renew(final PropertiesChangedEvent event) {
ConfigurationProperties props = new ConfigurationProperties(event.getProps());
contextManager.renewMetaDataContexts(new MetaDataContexts(persistService,
contextManager.getMetaDataContexts().getMetaDataMap(), contextManager.getMetaDataContexts().getGlobalRuleMetaData(), contextManager.getMetaDataContexts().getExecutorEngine(),
props, contextManager.getMetaDataContexts().getOptimizeContextFactory()));
contextManager.renewMetaDataContexts(rebuildMetaDataContexts(props));
}

/**
Expand Down Expand Up @@ -302,9 +314,7 @@ public synchronized void renew(final SchemaChangedEvent event) {
contextManager.getMetaDataContexts().getOptimizeContextFactory().getSchemaMetadatas().getSchemas().put(event.getSchemaName(),
new FederateSchemaMetadata(event.getSchemaName(), metaData.getSchema().getTables()));
}
contextManager.renewMetaDataContexts(new MetaDataContexts(persistService,
schemaMetaData, contextManager.getMetaDataContexts().getGlobalRuleMetaData(), contextManager.getMetaDataContexts().getExecutorEngine(),
contextManager.getMetaDataContexts().getProps(), contextManager.getMetaDataContexts().getOptimizeContextFactory()));
contextManager.renewMetaDataContexts(rebuildMetaDataContexts(schemaMetaData));
} finally {
ShardingSphereEventBus.getInstance().post(new InnerLockReleasedEvent(LockNameUtil.getMetadataRefreshLockName()));
}
Expand All @@ -321,9 +331,7 @@ public synchronized void renew(final RuleConfigurationsChangedEvent event) throw
String schemaName = event.getSchemaName();
ShardingSphereMetaData metaData = getChangedMetaData(contextManager.getMetaDataContexts().getMetaDataMap().get(schemaName), event.getRuleConfigurations());
Map<String, ShardingSphereMetaData> schemaMetaData = rebuildSchemaMetaData(schemaName, metaData);
contextManager.renewMetaDataContexts(new MetaDataContexts(persistService, schemaMetaData, contextManager.getMetaDataContexts().getGlobalRuleMetaData(),
contextManager.getMetaDataContexts().getExecutorEngine(),
contextManager.getMetaDataContexts().getProps(), contextManager.getMetaDataContexts().getOptimizeContextFactory()));
contextManager.renewMetaDataContexts(rebuildMetaDataContexts(schemaMetaData));
persistService.getSchemaMetaDataService().persist(schemaName, schemaMetaData.get(schemaName).getSchema());
}

Expand All @@ -339,9 +347,7 @@ public synchronized void renew(final DataSourceChangedEvent event) throws SQLExc
Collection<DataSource> pendingClosedDataSources = getPendingClosedDataSources(schemaName, event.getDataSourceConfigurations());
ShardingSphereMetaData metaData = rebuildMetaData(contextManager.getMetaDataContexts().getMetaDataMap().get(schemaName), event.getDataSourceConfigurations());
Map<String, ShardingSphereMetaData> schemaMetaData = rebuildSchemaMetaData(schemaName, metaData);
contextManager.renewMetaDataContexts(new MetaDataContexts(persistService, schemaMetaData,
contextManager.getMetaDataContexts().getGlobalRuleMetaData(), contextManager.getMetaDataContexts().getExecutorEngine(), contextManager.getMetaDataContexts().getProps(),
contextManager.getMetaDataContexts().getOptimizeContextFactory()));
contextManager.renewMetaDataContexts(rebuildMetaDataContexts(schemaMetaData));
ShardingSphereEventBus.getInstance().post(new DataSourceChangeCompletedEvent(event.getSchemaName(),
contextManager.getMetaDataContexts().getMetaDataMap().get(event.getSchemaName()).getResource().getDatabaseType(),
schemaMetaData.get(event.getSchemaName()).getResource().getDataSources()));
Expand Down Expand Up @@ -391,12 +397,28 @@ public synchronized void renew(final GlobalRuleConfigurationsChangedEvent event)
if (!newGlobalConfigs.isEmpty()) {
ShardingSphereRuleMetaData newGlobalRuleMetaData = new ShardingSphereRuleMetaData(newGlobalConfigs,
ShardingSphereRulesBuilder.buildGlobalRules(newGlobalConfigs, contextManager.getMetaDataContexts().getMetaDataMap()));
contextManager.renewMetaDataContexts(new MetaDataContexts(persistService, contextManager.getMetaDataContexts().getMetaDataMap(), newGlobalRuleMetaData,
contextManager.getMetaDataContexts().getExecutorEngine(),
contextManager.getMetaDataContexts().getProps(), contextManager.getMetaDataContexts().getOptimizeContextFactory()));
contextManager.renewMetaDataContexts(rebuildMetaDataContexts(newGlobalRuleMetaData));
}
}

private MetaDataContexts rebuildMetaDataContexts(final Map<String, ShardingSphereMetaData> schemaMetaData) {
return new MetaDataContexts(contextManager.getMetaDataContexts().getPersistService().get(),
schemaMetaData, contextManager.getMetaDataContexts().getGlobalRuleMetaData(), contextManager.getMetaDataContexts().getExecutorEngine(),
contextManager.getMetaDataContexts().getProps(), contextManager.getMetaDataContexts().getOptimizeContextFactory());
}

private MetaDataContexts rebuildMetaDataContexts(final ConfigurationProperties props) {
return new MetaDataContexts(contextManager.getMetaDataContexts().getPersistService().get(),
contextManager.getMetaDataContexts().getMetaDataMap(), contextManager.getMetaDataContexts().getGlobalRuleMetaData(), contextManager.getMetaDataContexts().getExecutorEngine(),
props, contextManager.getMetaDataContexts().getOptimizeContextFactory());
}

private MetaDataContexts rebuildMetaDataContexts(final ShardingSphereRuleMetaData globalRuleMetaData) {
return new MetaDataContexts(contextManager.getMetaDataContexts().getPersistService().get(),
contextManager.getMetaDataContexts().getMetaDataMap(), globalRuleMetaData, contextManager.getMetaDataContexts().getExecutorEngine(),
contextManager.getMetaDataContexts().getProps(), contextManager.getMetaDataContexts().getOptimizeContextFactory());
}

private Map<String, ShardingSphereMetaData> rebuildSchemaMetaData(final String schemaName, final ShardingSphereMetaData metaData) {
Map<String, ShardingSphereMetaData> result = new HashMap<>(contextManager.getMetaDataContexts().getMetaDataMap());
result.put(schemaName, metaData);
Expand Down Expand Up @@ -577,6 +599,24 @@ private void renewContexts(final Map<String, ShardingSphereTransactionManagerEng
contextManager.renewTransactionContexts(new TransactionContexts(engines));
}

private void disableDataSources() {
metaDataContexts.getMetaDataMap().forEach((key, value)
-> value.getRuleMetaData().getRules().stream().filter(each -> each instanceof StatusContainedRule).forEach(each -> disableDataSources(key, (StatusContainedRule) each)));
}

private void disableDataSources(final String schemaName, final StatusContainedRule rule) {
Collection<String> disabledDataSources = registryCenter.getDataSourceStatusService().loadDisabledDataSources(schemaName);
disabledDataSources.stream().map(this::getDataSourceName).forEach(each -> rule.updateRuleStatus(new DataSourceNameDisabledEvent(each, true)));
}

private String getDataSourceName(final String disabledDataSource) {
return new GovernanceSchema(disabledDataSource).getDataSourceName();
}

private void persistMetaData() {
metaDataContexts.getMetaDataMap().forEach((key, value) -> persistService.getSchemaMetaDataService().persist(key, value.getSchema()));
}

@Override
public String getType() {
return "Cluster";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.metadata.rule.ShardingSphereRuleMetaData;
import org.apache.shardingsphere.infra.metadata.schema.ShardingSphereSchema;
import org.apache.shardingsphere.infra.optimize.context.OptimizeContextFactory;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.mode.persist.PersistService;
Expand Down Expand Up @@ -50,9 +49,6 @@ public final class ClusterContextManagerTest {

private final ConfigurationProperties props = new ConfigurationProperties(new Properties());

@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private PersistService persistService;

@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private RegistryCenter registryCenter;

Expand All @@ -66,14 +62,13 @@ public final class ClusterContextManagerTest {

@Before
public void setUp() {
clusterContextManager = new ClusterContextManager(persistService, registryCenter);
clusterContextManager = new ClusterContextManager(registryCenter);
clusterContextManager.init(
new MetaDataContexts(mock(PersistService.class), createMetaDataMap(), globalRuleMetaData, mock(ExecutorEngine.class), props, mockOptimizeContextFactory()),
mock(TransactionContexts.class, RETURNS_DEEP_STUBS));
}

private Map<String, ShardingSphereMetaData> createMetaDataMap() {
when(metaData.getSchema()).thenReturn(mock(ShardingSphereSchema.class));
when(metaData.getRuleMetaData().getRules()).thenReturn(Collections.emptyList());
return Collections.singletonMap("schema", metaData);
}
Expand Down

0 comments on commit b7b7176

Please sign in to comment.