Skip to content

Commit

Permalink
Add ReadOnly state for Proxy (apache#23944)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhaojinchao95 authored Feb 4, 2023
1 parent 0b8dafa commit b79d2fd
Show file tree
Hide file tree
Showing 12 changed files with 139 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ SQL 错误码以标准的 SQL State,Vendor Code 和详细错误信息提供,
| --------- | ----------- | ------ |
| 08000 | 13000 | Can not register driver, reason is: %s |
| 01000 | 13010 | Circuit break open, the request has been ignored. |
| 01000 | 13011 | The current instance is read-only, Not allowed write traffic. |
| 08000 | 13020 | Can not get %d connections one time, partition succeed connection(%d) have released. Please consider increasing the \`maxPoolSize\` of the data sources or decreasing the \`max-connections-size-per-query\` in properties. |
| 08000 | 13030 | Connection has been closed. |
| 08000 | 13031 | Result set has been closed. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ SQL error codes provide by standard `SQL State`, `Vendor Code` and `Reason`, whi
| --------- | ----------- | ------ |
| 08000 | 13000 | Can not register driver, reason is: %s |
| 01000 | 13010 | Circuit break open, the request has been ignored. |
| 01000 | 13011 | The current instance is read-only, Not allowed write traffic. |
| 08000 | 13020 | Can not get %d connections one time, partition succeed connection(%d) have released. Please consider increasing the \`maxPoolSize\` of the data sources or decreasing the \`max-connections-size-per-query\` in properties. |
| 08000 | 13030 | Connection has been closed. |
| 08000 | 13031 | Result set has been closed. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@

package org.apache.shardingsphere.dbdiscovery.algorithm;

import com.google.common.base.Strings;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.dbdiscovery.spi.DatabaseDiscoveryProvider;
import org.apache.shardingsphere.dbdiscovery.spi.ReplicaDataSourceStatus;
import org.apache.shardingsphere.infra.datasource.state.DataSourceState;
import org.apache.shardingsphere.infra.instance.InstanceContext;
import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDatabase;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.infra.state.StateType;
import org.apache.shardingsphere.mode.metadata.compute.event.ComputeNodeStatusChangedEvent;
import org.apache.shardingsphere.mode.metadata.storage.StorageNodeDataSource;
import org.apache.shardingsphere.mode.metadata.storage.StorageNodeRole;
import org.apache.shardingsphere.mode.metadata.storage.event.DataSourceDisabledEvent;
Expand All @@ -46,7 +49,7 @@ public final class DatabaseDiscoveryEngine {

private final DatabaseDiscoveryProvider provider;

private final EventBusContext eventBusContext;
private final InstanceContext instanceContext;

/**
* Check environment of database cluster.
Expand All @@ -71,6 +74,7 @@ public void checkEnvironment(final String databaseName, final Map<String, DataSo
public String changePrimaryDataSource(final String databaseName, final String groupName, final String originalPrimaryDataSourceName,
final Map<String, DataSource> dataSourceMap, final Collection<String> disabledDataSourceNames) {
Optional<String> newPrimaryDataSourceName = findPrimaryDataSourceName(dataSourceMap);
postComputeNodeStatusChangedEvent(newPrimaryDataSourceName.orElse(""));
newPrimaryDataSourceName.ifPresent(optional -> postPrimaryChangedEvent(databaseName, groupName, originalPrimaryDataSourceName, optional));
Map<String, DataSource> replicaDataSourceMap = new HashMap<>(dataSourceMap);
newPrimaryDataSourceName.ifPresent(replicaDataSourceMap::remove);
Expand All @@ -91,9 +95,19 @@ private Optional<String> findPrimaryDataSourceName(final Map<String, DataSource>
return Optional.empty();
}

private void postComputeNodeStatusChangedEvent(final String newPrimaryDataSourceName) {
if (Strings.isNullOrEmpty(newPrimaryDataSourceName) && StateType.OK.equals(instanceContext.getInstance().getState().getCurrentState())) {
instanceContext.getEventBusContext().post(new ComputeNodeStatusChangedEvent(instanceContext.getInstance().getCurrentInstanceId(), StateType.READ_ONLY));
return;
}
if (!Strings.isNullOrEmpty(newPrimaryDataSourceName) && StateType.READ_ONLY.equals(instanceContext.getInstance().getState().getCurrentState())) {
instanceContext.getEventBusContext().post(new ComputeNodeStatusChangedEvent(instanceContext.getInstance().getCurrentInstanceId(), StateType.OK));
}
}

private void postPrimaryChangedEvent(final String databaseName, final String groupName, final String originalPrimaryDataSourceName, final String newPrimaryDataSourceName) {
if (!newPrimaryDataSourceName.equals(originalPrimaryDataSourceName)) {
eventBusContext.post(new PrimaryDataSourceChangedEvent(new QualifiedDatabase(databaseName, groupName, newPrimaryDataSourceName)));
instanceContext.getEventBusContext().post(new PrimaryDataSourceChangedEvent(new QualifiedDatabase(databaseName, groupName, newPrimaryDataSourceName)));
}
}

Expand All @@ -104,16 +118,16 @@ private void postReplicaDisabledEvent(final String databaseName, final String gr
StorageNodeDataSource replicaStorageNode = createReplicaStorageNode(loadReplicaStatus(entry.getValue()));
if (DataSourceState.ENABLED == replicaStorageNode.getStatus()) {
enabledReplicasCount += disabledDataSourceNames.contains(entry.getKey()) ? 1 : 0;
eventBusContext.post(new DataSourceDisabledEvent(databaseName, groupName, entry.getKey(), replicaStorageNode));
instanceContext.getEventBusContext().post(new DataSourceDisabledEvent(databaseName, groupName, entry.getKey(), replicaStorageNode));
continue;
}
if (provider.getMinEnabledReplicas().isPresent() && 0 == provider.getMinEnabledReplicas().get()) {
eventBusContext.post(new DataSourceDisabledEvent(databaseName, groupName, entry.getKey(), replicaStorageNode));
instanceContext.getEventBusContext().post(new DataSourceDisabledEvent(databaseName, groupName, entry.getKey(), replicaStorageNode));
continue;
}
if (enabledReplicasCount > provider.getMinEnabledReplicas().get()) {
enabledReplicasCount -= disabledDataSourceNames.contains(entry.getKey()) ? 0 : 1;
eventBusContext.post(new DataSourceDisabledEvent(databaseName, groupName, entry.getKey(), replicaStorageNode));
instanceContext.getEventBusContext().post(new DataSourceDisabledEvent(databaseName, groupName, entry.getKey(), replicaStorageNode));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.shardingsphere.dbdiscovery.spi.DatabaseDiscoveryProvider;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.infra.instance.InstanceContext;

import javax.sql.DataSource;
import java.util.Collection;
Expand All @@ -46,11 +46,11 @@ public final class HeartbeatJob implements SimpleJob {

private final Collection<String> disabledDataSourceNames;

private final EventBusContext eventBusContext;
private final InstanceContext instanceContext;

@Override
public void execute(final ShardingContext shardingContext) {
new DatabaseDiscoveryEngine(databaseDiscoveryProvider, eventBusContext).changePrimaryDataSource(databaseName, groupName, originalPrimaryDataSourceName,
new DatabaseDiscoveryEngine(databaseDiscoveryProvider, instanceContext).changePrimaryDataSource(databaseName, groupName, originalPrimaryDataSourceName,
dataSourceMap, disabledDataSourceNames);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ private void findPrimaryReplicaRelationship(final String databaseName, final Map
String groupName = entry.getKey();
DatabaseDiscoveryDataSourceRule dataSourceRule = entry.getValue();
Map<String, DataSource> originalDataSourceMap = dataSourceRule.getDataSourceGroup(dataSourceMap);
DatabaseDiscoveryEngine engine = new DatabaseDiscoveryEngine(dataSourceRule.getProvider(), instanceContext.getEventBusContext());
DatabaseDiscoveryEngine engine = new DatabaseDiscoveryEngine(dataSourceRule.getProvider(), instanceContext);
engine.checkEnvironment(databaseName, originalDataSourceMap);
dataSourceRule.changePrimaryDataSourceName(engine.changePrimaryDataSource(
databaseName, groupName, entry.getValue().getPrimaryDataSourceName(), originalDataSourceMap, dataSourceRule.getDisabledDataSourceNames()));
Expand Down Expand Up @@ -177,7 +177,7 @@ private void initHeartBeatJobs() {
DatabaseDiscoveryDataSourceRule rule = entry.getValue();
String jobName = rule.getProvider().getType() + "-" + databaseName + "-" + rule.getGroupName();
CronJob job = new CronJob(jobName, each -> new HeartbeatJob(databaseName, rule.getGroupName(), rule.getPrimaryDataSourceName(), rule.getDataSourceGroup(dataSourceMap),
rule.getProvider(), rule.getDisabledDataSourceNames(), instanceContext.getEventBusContext()).execute(null),
rule.getProvider(), rule.getDisabledDataSourceNames(), instanceContext).execute(null),
rule.getHeartbeatProps().getProperty("keep-alive-cron"));
scheduleContext.startSchedule(job);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,5 @@
*/
public enum StateType {

OK, CIRCUIT_BREAK, LOCK
OK, CIRCUIT_BREAK, READ_ONLY, LOCK
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event;
package org.apache.shardingsphere.mode.metadata.compute.event;

import lombok.Getter;
import lombok.RequiredArgsConstructor;
Expand All @@ -28,7 +28,7 @@
@Getter
public final class ComputeNodeStatusChangedEvent {

private final StateType state;

private final String instanceId;

private final StateType state;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import com.google.common.eventbus.Subscribe;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ComputeNodeStatusChangedEvent;
import org.apache.shardingsphere.mode.metadata.compute.event.ComputeNodeStatusChangedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsChangedEvent;
import org.apache.shardingsphere.mode.metadata.persist.node.ComputeNode;
import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.util.exception.external.sql.type.generic.UnsupportedSQLOperationException;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ComputeNodeStatusChangedEvent;
import org.apache.shardingsphere.mode.metadata.compute.event.ComputeNodeStatusChangedEvent;
import org.apache.shardingsphere.proxy.backend.handler.distsql.ral.UpdatableRALBackendHandler;

/**
Expand All @@ -40,7 +40,7 @@ protected void update(final ContextManager contextManager) {
} else {
checkEnablingIsValid(contextManager, instanceId);
}
contextManager.getInstanceContext().getEventBusContext().post(new ComputeNodeStatusChangedEvent(isDisable ? StateType.CIRCUIT_BREAK : StateType.OK, instanceId));
contextManager.getInstanceContext().getEventBusContext().post(new ComputeNodeStatusChangedEvent(instanceId, isDisable ? StateType.CIRCUIT_BREAK : StateType.OK));
}

private void checkEnablingIsValid(final ContextManager contextManager, final String instanceId) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.proxy.frontend.exception;

import org.apache.shardingsphere.infra.exception.ConnectionSQLException;
import org.apache.shardingsphere.infra.util.exception.external.sql.sqlstate.XOpenSQLState;

/**
* Read only exception.
*/
public final class ReadOnlyException extends ConnectionSQLException {

private static final long serialVersionUID = 6339672680026286798L;

public ReadOnlyException() {
super(XOpenSQLState.GENERAL_WARNING, 11, "The current instance is read-only, Not allowed write traffic.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.shardingsphere.proxy.frontend.state.impl.CircuitBreakProxyState;
import org.apache.shardingsphere.proxy.frontend.state.impl.OKProxyState;
import org.apache.shardingsphere.proxy.frontend.state.impl.LockProxyState;
import org.apache.shardingsphere.proxy.frontend.state.impl.ReadOnlyProxyState;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -43,6 +44,7 @@ public final class ProxyStateContext {
STATES.put(StateType.OK, new OKProxyState());
STATES.put(StateType.LOCK, new LockProxyState());
STATES.put(StateType.CIRCUIT_BREAK, new CircuitBreakProxyState());
STATES.put(StateType.READ_ONLY, new ReadOnlyProxyState());
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.proxy.frontend.state.impl;

import io.netty.channel.ChannelHandlerContext;
import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
import org.apache.shardingsphere.infra.binder.statement.CommonSQLStatementContext;
import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
import org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
import org.apache.shardingsphere.infra.hint.HintManager;
import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
import org.apache.shardingsphere.proxy.frontend.exception.ReadOnlyException;
import org.apache.shardingsphere.proxy.frontend.spi.DatabaseProtocolFrontendEngine;
import org.apache.shardingsphere.proxy.frontend.state.ProxyState;
import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.SelectStatement;
import org.apache.shardingsphere.sql.parser.sql.dialect.handler.dml.SelectStatementHandler;

import java.util.Optional;

/**
* ReadOnly proxy state.
*/
public final class ReadOnlyProxyState implements ProxyState {

@Override
public void execute(final ChannelHandlerContext context, final Object message, final DatabaseProtocolFrontendEngine databaseProtocolFrontendEngine, final ConnectionSession connectionSession) {
if (isPrimaryRoute(connectionSession.getQueryContext().getSqlStatementContext())) {
context.writeAndFlush(databaseProtocolFrontendEngine.getCommandExecuteEngine().getErrorPacket(new ReadOnlyException()));
Optional<DatabasePacket<?>> databasePacket = databaseProtocolFrontendEngine.getCommandExecuteEngine().getOtherPacket(connectionSession);
databasePacket.ifPresent(context::writeAndFlush);
}
}

private boolean isPrimaryRoute(final SQLStatementContext<?> sqlStatementContext) {
return isWriteRouteStatement(sqlStatementContext) || isHintWriteRouteOnly(sqlStatementContext);
}

private boolean isWriteRouteStatement(final SQLStatementContext<?> sqlStatementContext) {
SQLStatement sqlStatement = sqlStatementContext.getSqlStatement();
return containsLockSegment(sqlStatement) || containsLastInsertIdProjection(sqlStatementContext) || !(sqlStatement instanceof SelectStatement);
}

private boolean containsLockSegment(final SQLStatement sqlStatement) {
return sqlStatement instanceof SelectStatement && SelectStatementHandler.getLockSegment((SelectStatement) sqlStatement).isPresent();
}

private boolean containsLastInsertIdProjection(final SQLStatementContext<?> sqlStatementContext) {
return sqlStatementContext instanceof SelectStatementContext && ((SelectStatementContext) sqlStatementContext).getProjectionsContext().isContainsLastInsertIdProjection();
}

private boolean isHintWriteRouteOnly(final SQLStatementContext<?> sqlStatementContext) {
return HintManager.isWriteRouteOnly() || (sqlStatementContext instanceof CommonSQLStatementContext && ((CommonSQLStatementContext<?>) sqlStatementContext).isHintWriteRouteOnly());
}
}

0 comments on commit b79d2fd

Please sign in to comment.