diff --git a/docs/document/content/user-manual/error-code/sql-error-code.cn.md b/docs/document/content/user-manual/error-code/sql-error-code.cn.md index cdb54cda0ce01..c620aef7976b1 100644 --- a/docs/document/content/user-manual/error-code/sql-error-code.cn.md +++ b/docs/document/content/user-manual/error-code/sql-error-code.cn.md @@ -51,6 +51,7 @@ SQL 错误码以标准的 SQL State,Vendor Code 和详细错误信息提供, | --------- | ----------- | ------ | | 08000 | 13000 | Can not register driver, reason is: %s | | 01000 | 13010 | Circuit break open, the request has been ignored. | +| 01000 | 13011 | The current instance is read-only, Not allowed write traffic. | | 08000 | 13020 | Can not get %d connections one time, partition succeed connection(%d) have released. Please consider increasing the \`maxPoolSize\` of the data sources or decreasing the \`max-connections-size-per-query\` in properties. | | 08000 | 13030 | Connection has been closed. | | 08000 | 13031 | Result set has been closed. | diff --git a/docs/document/content/user-manual/error-code/sql-error-code.en.md b/docs/document/content/user-manual/error-code/sql-error-code.en.md index f8b13c497a1f4..84b6e67783b6d 100644 --- a/docs/document/content/user-manual/error-code/sql-error-code.en.md +++ b/docs/document/content/user-manual/error-code/sql-error-code.en.md @@ -51,6 +51,7 @@ SQL error codes provide by standard `SQL State`, `Vendor Code` and `Reason`, whi | --------- | ----------- | ------ | | 08000 | 13000 | Can not register driver, reason is: %s | | 01000 | 13010 | Circuit break open, the request has been ignored. | +| 01000 | 13011 | The current instance is read-only, Not allowed write traffic. | | 08000 | 13020 | Can not get %d connections one time, partition succeed connection(%d) have released. Please consider increasing the \`maxPoolSize\` of the data sources or decreasing the \`max-connections-size-per-query\` in properties. | | 08000 | 13030 | Connection has been closed. | | 08000 | 13031 | Result set has been closed. | diff --git a/features/db-discovery/core/src/main/java/org/apache/shardingsphere/dbdiscovery/algorithm/DatabaseDiscoveryEngine.java b/features/db-discovery/core/src/main/java/org/apache/shardingsphere/dbdiscovery/algorithm/DatabaseDiscoveryEngine.java index 73d279d6eab71..694e0533bf0d9 100644 --- a/features/db-discovery/core/src/main/java/org/apache/shardingsphere/dbdiscovery/algorithm/DatabaseDiscoveryEngine.java +++ b/features/db-discovery/core/src/main/java/org/apache/shardingsphere/dbdiscovery/algorithm/DatabaseDiscoveryEngine.java @@ -17,13 +17,16 @@ package org.apache.shardingsphere.dbdiscovery.algorithm; +import com.google.common.base.Strings; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.dbdiscovery.spi.DatabaseDiscoveryProvider; import org.apache.shardingsphere.dbdiscovery.spi.ReplicaDataSourceStatus; import org.apache.shardingsphere.infra.datasource.state.DataSourceState; +import org.apache.shardingsphere.infra.instance.InstanceContext; import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDatabase; -import org.apache.shardingsphere.infra.util.eventbus.EventBusContext; +import org.apache.shardingsphere.infra.state.StateType; +import org.apache.shardingsphere.mode.metadata.compute.event.ComputeNodeStatusChangedEvent; import org.apache.shardingsphere.mode.metadata.storage.StorageNodeDataSource; import org.apache.shardingsphere.mode.metadata.storage.StorageNodeRole; import org.apache.shardingsphere.mode.metadata.storage.event.DataSourceDisabledEvent; @@ -46,7 +49,7 @@ public final class DatabaseDiscoveryEngine { private final DatabaseDiscoveryProvider provider; - private final EventBusContext eventBusContext; + private final InstanceContext instanceContext; /** * Check environment of database cluster. @@ -71,6 +74,7 @@ public void checkEnvironment(final String databaseName, final Map dataSourceMap, final Collection disabledDataSourceNames) { Optional newPrimaryDataSourceName = findPrimaryDataSourceName(dataSourceMap); + postComputeNodeStatusChangedEvent(newPrimaryDataSourceName.orElse("")); newPrimaryDataSourceName.ifPresent(optional -> postPrimaryChangedEvent(databaseName, groupName, originalPrimaryDataSourceName, optional)); Map replicaDataSourceMap = new HashMap<>(dataSourceMap); newPrimaryDataSourceName.ifPresent(replicaDataSourceMap::remove); @@ -91,9 +95,19 @@ private Optional findPrimaryDataSourceName(final Map return Optional.empty(); } + private void postComputeNodeStatusChangedEvent(final String newPrimaryDataSourceName) { + if (Strings.isNullOrEmpty(newPrimaryDataSourceName) && StateType.OK.equals(instanceContext.getInstance().getState().getCurrentState())) { + instanceContext.getEventBusContext().post(new ComputeNodeStatusChangedEvent(instanceContext.getInstance().getCurrentInstanceId(), StateType.READ_ONLY)); + return; + } + if (!Strings.isNullOrEmpty(newPrimaryDataSourceName) && StateType.READ_ONLY.equals(instanceContext.getInstance().getState().getCurrentState())) { + instanceContext.getEventBusContext().post(new ComputeNodeStatusChangedEvent(instanceContext.getInstance().getCurrentInstanceId(), StateType.OK)); + } + } + private void postPrimaryChangedEvent(final String databaseName, final String groupName, final String originalPrimaryDataSourceName, final String newPrimaryDataSourceName) { if (!newPrimaryDataSourceName.equals(originalPrimaryDataSourceName)) { - eventBusContext.post(new PrimaryDataSourceChangedEvent(new QualifiedDatabase(databaseName, groupName, newPrimaryDataSourceName))); + instanceContext.getEventBusContext().post(new PrimaryDataSourceChangedEvent(new QualifiedDatabase(databaseName, groupName, newPrimaryDataSourceName))); } } @@ -104,16 +118,16 @@ private void postReplicaDisabledEvent(final String databaseName, final String gr StorageNodeDataSource replicaStorageNode = createReplicaStorageNode(loadReplicaStatus(entry.getValue())); if (DataSourceState.ENABLED == replicaStorageNode.getStatus()) { enabledReplicasCount += disabledDataSourceNames.contains(entry.getKey()) ? 1 : 0; - eventBusContext.post(new DataSourceDisabledEvent(databaseName, groupName, entry.getKey(), replicaStorageNode)); + instanceContext.getEventBusContext().post(new DataSourceDisabledEvent(databaseName, groupName, entry.getKey(), replicaStorageNode)); continue; } if (provider.getMinEnabledReplicas().isPresent() && 0 == provider.getMinEnabledReplicas().get()) { - eventBusContext.post(new DataSourceDisabledEvent(databaseName, groupName, entry.getKey(), replicaStorageNode)); + instanceContext.getEventBusContext().post(new DataSourceDisabledEvent(databaseName, groupName, entry.getKey(), replicaStorageNode)); continue; } if (enabledReplicasCount > provider.getMinEnabledReplicas().get()) { enabledReplicasCount -= disabledDataSourceNames.contains(entry.getKey()) ? 0 : 1; - eventBusContext.post(new DataSourceDisabledEvent(databaseName, groupName, entry.getKey(), replicaStorageNode)); + instanceContext.getEventBusContext().post(new DataSourceDisabledEvent(databaseName, groupName, entry.getKey(), replicaStorageNode)); } } } diff --git a/features/db-discovery/core/src/main/java/org/apache/shardingsphere/dbdiscovery/heartbeat/HeartbeatJob.java b/features/db-discovery/core/src/main/java/org/apache/shardingsphere/dbdiscovery/heartbeat/HeartbeatJob.java index cab23c449bff5..67ac474b25381 100644 --- a/features/db-discovery/core/src/main/java/org/apache/shardingsphere/dbdiscovery/heartbeat/HeartbeatJob.java +++ b/features/db-discovery/core/src/main/java/org/apache/shardingsphere/dbdiscovery/heartbeat/HeartbeatJob.java @@ -22,7 +22,7 @@ import org.apache.shardingsphere.dbdiscovery.spi.DatabaseDiscoveryProvider; import org.apache.shardingsphere.elasticjob.api.ShardingContext; import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob; -import org.apache.shardingsphere.infra.util.eventbus.EventBusContext; +import org.apache.shardingsphere.infra.instance.InstanceContext; import javax.sql.DataSource; import java.util.Collection; @@ -46,11 +46,11 @@ public final class HeartbeatJob implements SimpleJob { private final Collection disabledDataSourceNames; - private final EventBusContext eventBusContext; + private final InstanceContext instanceContext; @Override public void execute(final ShardingContext shardingContext) { - new DatabaseDiscoveryEngine(databaseDiscoveryProvider, eventBusContext).changePrimaryDataSource(databaseName, groupName, originalPrimaryDataSourceName, + new DatabaseDiscoveryEngine(databaseDiscoveryProvider, instanceContext).changePrimaryDataSource(databaseName, groupName, originalPrimaryDataSourceName, dataSourceMap, disabledDataSourceNames); } } diff --git a/features/db-discovery/core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java b/features/db-discovery/core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java index d0a3063f13d75..252481d87d8b8 100644 --- a/features/db-discovery/core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java +++ b/features/db-discovery/core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java @@ -112,7 +112,7 @@ private void findPrimaryReplicaRelationship(final String databaseName, final Map String groupName = entry.getKey(); DatabaseDiscoveryDataSourceRule dataSourceRule = entry.getValue(); Map originalDataSourceMap = dataSourceRule.getDataSourceGroup(dataSourceMap); - DatabaseDiscoveryEngine engine = new DatabaseDiscoveryEngine(dataSourceRule.getProvider(), instanceContext.getEventBusContext()); + DatabaseDiscoveryEngine engine = new DatabaseDiscoveryEngine(dataSourceRule.getProvider(), instanceContext); engine.checkEnvironment(databaseName, originalDataSourceMap); dataSourceRule.changePrimaryDataSourceName(engine.changePrimaryDataSource( databaseName, groupName, entry.getValue().getPrimaryDataSourceName(), originalDataSourceMap, dataSourceRule.getDisabledDataSourceNames())); @@ -177,7 +177,7 @@ private void initHeartBeatJobs() { DatabaseDiscoveryDataSourceRule rule = entry.getValue(); String jobName = rule.getProvider().getType() + "-" + databaseName + "-" + rule.getGroupName(); CronJob job = new CronJob(jobName, each -> new HeartbeatJob(databaseName, rule.getGroupName(), rule.getPrimaryDataSourceName(), rule.getDataSourceGroup(dataSourceMap), - rule.getProvider(), rule.getDisabledDataSourceNames(), instanceContext.getEventBusContext()).execute(null), + rule.getProvider(), rule.getDisabledDataSourceNames(), instanceContext).execute(null), rule.getHeartbeatProps().getProperty("keep-alive-cron")); scheduleContext.startSchedule(job); } diff --git a/infra/common/src/main/java/org/apache/shardingsphere/infra/state/StateType.java b/infra/common/src/main/java/org/apache/shardingsphere/infra/state/StateType.java index e68c1d7e3298a..e87e24cf6870c 100644 --- a/infra/common/src/main/java/org/apache/shardingsphere/infra/state/StateType.java +++ b/infra/common/src/main/java/org/apache/shardingsphere/infra/state/StateType.java @@ -22,5 +22,5 @@ */ public enum StateType { - OK, CIRCUIT_BREAK, LOCK + OK, CIRCUIT_BREAK, READ_ONLY, LOCK } diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/ComputeNodeStatusChangedEvent.java b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/compute/event/ComputeNodeStatusChangedEvent.java similarity index 92% rename from mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/ComputeNodeStatusChangedEvent.java rename to mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/compute/event/ComputeNodeStatusChangedEvent.java index fd7d5191ebef9..d7f1df5bb679d 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/ComputeNodeStatusChangedEvent.java +++ b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/compute/event/ComputeNodeStatusChangedEvent.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event; +package org.apache.shardingsphere.mode.metadata.compute.event; import lombok.Getter; import lombok.RequiredArgsConstructor; @@ -28,7 +28,7 @@ @Getter public final class ComputeNodeStatusChangedEvent { - private final StateType state; - private final String instanceId; + + private final StateType state; } diff --git a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/ComputeNodeStatusSubscriber.java b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/ComputeNodeStatusSubscriber.java index 7f36f2b7acb13..97e65f58f1452 100644 --- a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/ComputeNodeStatusSubscriber.java +++ b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/ComputeNodeStatusSubscriber.java @@ -19,7 +19,7 @@ import com.google.common.eventbus.Subscribe; import org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter; -import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ComputeNodeStatusChangedEvent; +import org.apache.shardingsphere.mode.metadata.compute.event.ComputeNodeStatusChangedEvent; import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsChangedEvent; import org.apache.shardingsphere.mode.metadata.persist.node.ComputeNode; import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository; diff --git a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/SetInstanceStatusHandler.java b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/SetInstanceStatusHandler.java index 3ab33a7c853f4..0c1010fc9235a 100644 --- a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/SetInstanceStatusHandler.java +++ b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/SetInstanceStatusHandler.java @@ -22,7 +22,7 @@ import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions; import org.apache.shardingsphere.infra.util.exception.external.sql.type.generic.UnsupportedSQLOperationException; import org.apache.shardingsphere.mode.manager.ContextManager; -import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ComputeNodeStatusChangedEvent; +import org.apache.shardingsphere.mode.metadata.compute.event.ComputeNodeStatusChangedEvent; import org.apache.shardingsphere.proxy.backend.handler.distsql.ral.UpdatableRALBackendHandler; /** @@ -40,7 +40,7 @@ protected void update(final ContextManager contextManager) { } else { checkEnablingIsValid(contextManager, instanceId); } - contextManager.getInstanceContext().getEventBusContext().post(new ComputeNodeStatusChangedEvent(isDisable ? StateType.CIRCUIT_BREAK : StateType.OK, instanceId)); + contextManager.getInstanceContext().getEventBusContext().post(new ComputeNodeStatusChangedEvent(instanceId, isDisable ? StateType.CIRCUIT_BREAK : StateType.OK)); } private void checkEnablingIsValid(final ContextManager contextManager, final String instanceId) { diff --git a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/exception/ReadOnlyException.java b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/exception/ReadOnlyException.java new file mode 100644 index 0000000000000..1e0e394fe7a45 --- /dev/null +++ b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/exception/ReadOnlyException.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.proxy.frontend.exception; + +import org.apache.shardingsphere.infra.exception.ConnectionSQLException; +import org.apache.shardingsphere.infra.util.exception.external.sql.sqlstate.XOpenSQLState; + +/** + * Read only exception. + */ +public final class ReadOnlyException extends ConnectionSQLException { + + private static final long serialVersionUID = 6339672680026286798L; + + public ReadOnlyException() { + super(XOpenSQLState.GENERAL_WARNING, 11, "The current instance is read-only, Not allowed write traffic."); + } +} diff --git a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/ProxyStateContext.java b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/ProxyStateContext.java index 776cd42359403..7f1a4355c3c0d 100644 --- a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/ProxyStateContext.java +++ b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/ProxyStateContext.java @@ -27,6 +27,7 @@ import org.apache.shardingsphere.proxy.frontend.state.impl.CircuitBreakProxyState; import org.apache.shardingsphere.proxy.frontend.state.impl.OKProxyState; import org.apache.shardingsphere.proxy.frontend.state.impl.LockProxyState; +import org.apache.shardingsphere.proxy.frontend.state.impl.ReadOnlyProxyState; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -43,6 +44,7 @@ public final class ProxyStateContext { STATES.put(StateType.OK, new OKProxyState()); STATES.put(StateType.LOCK, new LockProxyState()); STATES.put(StateType.CIRCUIT_BREAK, new CircuitBreakProxyState()); + STATES.put(StateType.READ_ONLY, new ReadOnlyProxyState()); } /** diff --git a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/ReadOnlyProxyState.java b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/ReadOnlyProxyState.java new file mode 100644 index 0000000000000..09bf876136641 --- /dev/null +++ b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/ReadOnlyProxyState.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.proxy.frontend.state.impl; + +import io.netty.channel.ChannelHandlerContext; +import org.apache.shardingsphere.db.protocol.packet.DatabasePacket; +import org.apache.shardingsphere.infra.binder.statement.CommonSQLStatementContext; +import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext; +import org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext; +import org.apache.shardingsphere.infra.hint.HintManager; +import org.apache.shardingsphere.proxy.backend.session.ConnectionSession; +import org.apache.shardingsphere.proxy.frontend.exception.ReadOnlyException; +import org.apache.shardingsphere.proxy.frontend.spi.DatabaseProtocolFrontendEngine; +import org.apache.shardingsphere.proxy.frontend.state.ProxyState; +import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement; +import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.SelectStatement; +import org.apache.shardingsphere.sql.parser.sql.dialect.handler.dml.SelectStatementHandler; + +import java.util.Optional; + +/** + * ReadOnly proxy state. + */ +public final class ReadOnlyProxyState implements ProxyState { + + @Override + public void execute(final ChannelHandlerContext context, final Object message, final DatabaseProtocolFrontendEngine databaseProtocolFrontendEngine, final ConnectionSession connectionSession) { + if (isPrimaryRoute(connectionSession.getQueryContext().getSqlStatementContext())) { + context.writeAndFlush(databaseProtocolFrontendEngine.getCommandExecuteEngine().getErrorPacket(new ReadOnlyException())); + Optional> databasePacket = databaseProtocolFrontendEngine.getCommandExecuteEngine().getOtherPacket(connectionSession); + databasePacket.ifPresent(context::writeAndFlush); + } + } + + private boolean isPrimaryRoute(final SQLStatementContext sqlStatementContext) { + return isWriteRouteStatement(sqlStatementContext) || isHintWriteRouteOnly(sqlStatementContext); + } + + private boolean isWriteRouteStatement(final SQLStatementContext sqlStatementContext) { + SQLStatement sqlStatement = sqlStatementContext.getSqlStatement(); + return containsLockSegment(sqlStatement) || containsLastInsertIdProjection(sqlStatementContext) || !(sqlStatement instanceof SelectStatement); + } + + private boolean containsLockSegment(final SQLStatement sqlStatement) { + return sqlStatement instanceof SelectStatement && SelectStatementHandler.getLockSegment((SelectStatement) sqlStatement).isPresent(); + } + + private boolean containsLastInsertIdProjection(final SQLStatementContext sqlStatementContext) { + return sqlStatementContext instanceof SelectStatementContext && ((SelectStatementContext) sqlStatementContext).getProjectionsContext().isContainsLastInsertIdProjection(); + } + + private boolean isHintWriteRouteOnly(final SQLStatementContext sqlStatementContext) { + return HintManager.isWriteRouteOnly() || (sqlStatementContext instanceof CommonSQLStatementContext && ((CommonSQLStatementContext) sqlStatementContext).isHintWriteRouteOnly()); + } +}