Skip to content

Commit

Permalink
Merge pull request polardb#28 from F-ca7/fca7_fix_limit_phy_sql_cache
Browse files Browse the repository at this point in the history
Bug fix: it maybe cause the wrong result for the physical sql cache.
  • Loading branch information
hustfxj authored Nov 2, 2021
2 parents 210de81 + 9c08c42 commit a66b025
Show file tree
Hide file tree
Showing 12 changed files with 132 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ public static LockMode getLockMode(SqlNode sqlNOde) {
SqlNodeList orderBy;
SqlNode offset;
SqlNode fetch;
/**
* computedFetch should be a SqlNumericLiteral
* while fetch is a SqlBasicCall with SqlDynamicParam
*/
SqlNode computedFetch;
SqlMatchRecognize matchRecognize;
OutFileParams outFileParams;

Expand Down Expand Up @@ -141,8 +146,13 @@ public SqlOperator getOperator() {
* accepted by parametrize and physical sql generation
*/
public List<SqlNode> getParameterizableOperandList() {
return ImmutableNullableList.of(keywordList, selectList, from, where,
groupBy, having, windowDecls, orderBy, fetch, offset);
if (isDynamicFetch()) {
return ImmutableNullableList.of(keywordList, selectList, from, where,
groupBy, having, windowDecls, orderBy, computedFetch, offset);
} else {
return ImmutableNullableList.of(keywordList, selectList, from, where,
groupBy, having, windowDecls, orderBy, fetch, offset);
}
}

@Override public void setOperand(int i, SqlNode operand) {
Expand Down Expand Up @@ -265,6 +275,24 @@ public void setFetch(SqlNode fetch) {
this.fetch = fetch;
}

/**
* computed fetch should be set only once as a DynamicParam
* if it is set concurrently, make sure the param index is identical
*/
public void setComputedFetch(SqlNode computedFetch) {
assert computedFetch instanceof SqlDynamicParam;
if (this.computedFetch != null) {
Preconditions.checkArgument(((SqlDynamicParam) this.computedFetch).index ==
((SqlDynamicParam) computedFetch).index, "Computed fetch should be set at the exact same index");
} else {
this.computedFetch = computedFetch;
}
}

public boolean isDynamicFetch() {
return fetch != null && fetch.getKind() == SqlKind.PLUS && computedFetch != null;
}

public SqlMatchRecognize getMatchRecognize() {
return matchRecognize;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,11 @@ public void unparse(
unparseListClause(writer, select.orderBy);
writer.endList(orderFrame);
}
writer.fetchOffset(select.fetch, select.offset);
if (select.isDynamicFetch()) {
writer.fetchOffset(select.computedFetch, select.offset);
} else {
writer.fetchOffset(select.fetch, select.offset);
}
writer.endList(selectFrame);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.HashMap;
import java.util.Map;


public class ConnectionParams {

public static final Map<String, ConfigParam> SUPPORTED_PARAMS = new HashMap<>();
Expand Down Expand Up @@ -1080,6 +1079,13 @@ public static void addSupportedParam(ConfigParam param) {
public static final BooleanConfigParam PLAN_CACHE = new BooleanConfigParam(ConnectionProperties.PLAN_CACHE, true,
true);

/**
* Physical sql template string cache for external sql
*/
public static final BooleanConfigParam PHY_SQL_TEMPLATE_CACHE =
new BooleanConfigParam(ConnectionProperties.PHY_SQL_TEMPLATE_CACHE, true,
true);

public static final BooleanConfigParam SKIP_READONLY_CHECK = new BooleanConfigParam(
ConnectionProperties.SKIP_READONLY_CHECK, false, true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
public class ConnectionProperties {

public static final String PLAN_CACHE = "PLAN_CACHE";
public static final String PHY_SQL_TEMPLATE_CACHE = "PHY_SQL_TEMPLATE_CACHE";
public static final String PREPARE_OPTIMIZE = "PREPARE_OPTIMIZE";

public static final String ENABLE_RECYCLEBIN = "ENABLE_RECYCLEBIN";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@ public Session toSession(TaskId taskId, QueryContext queryContext, long trxId) {
ec.getCacheRelNodeIds().addAll(cacheRelNodesId);
ec.getRecordRowCnt().putAll(recordRowCnt);
ec.setInternalSystemSql(false);
ec.setUsingPhySqlCache(true);

//mock connection
MppMockConnection mppMockConnection = new MppMockConnection(user, catalog, schema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import com.alibaba.polardbx.optimizer.core.function.calc.AbstractScalarFunction;
import com.alibaba.polardbx.optimizer.core.planner.ExecutionPlan;
import com.alibaba.polardbx.optimizer.core.profiler.RuntimeStat;
import com.alibaba.polardbx.optimizer.core.rel.PhyTableScanBuilder;
import com.alibaba.polardbx.optimizer.core.row.Row;
import com.alibaba.polardbx.optimizer.memory.MemoryPool;
import com.alibaba.polardbx.optimizer.memory.QueryMemoryPoolHolder;
Expand Down Expand Up @@ -220,6 +221,15 @@ public class ExecutionContext {
private String sqlTemplateId = null;
private RuntimeStat runtimeStatistics = null;

/**
* Only use physical sql cache when it is a query from external user.
* When it is an internal sql, caching physical sql may cause expansion of params in ExecutionContext
*
* @see com.alibaba.polardbx.optimizer.core.rel.PhyTableScanBuilder
* @see com.alibaba.polardbx.optimizer.core.rel.LogicalView#sqlTemplateCache
*/
private boolean usingPhySqlCache = false;

private QueryMemoryPoolHolder memoryPoolHolder = new QueryMemoryPoolHolder();
private boolean doingBatchInsertBySpliter = false;

Expand Down Expand Up @@ -1026,6 +1036,7 @@ public ExecutionContext copy(CopyOption option) {
ec.onlyUseTmpTblPool = isOnlyUseTmpTblPool();
ec.doingBatchInsertBySpliter = isDoingBatchInsertBySpliter();
ec.internalSystemSql = isInternalSystemSql();
ec.usingPhySqlCache = isUsingPhySqlCache();
ec.runtimeStatistics = getRuntimeStatistics();
ec.isApplyingSubquery = isApplyingSubquery();
ec.subqueryId = getSubqueryId();
Expand Down Expand Up @@ -1153,6 +1164,19 @@ public void setInternalSystemSql(boolean internalSystemSql) {
this.internalSystemSql = internalSystemSql;
}

private boolean isUsingPhySqlCache() {
return usingPhySqlCache;
}

public void setUsingPhySqlCache(boolean usingPhySqlCache) {
this.usingPhySqlCache = usingPhySqlCache;
}

public boolean enablePhySqlCache() {
return usingPhySqlCache && getParamManager()
.getBoolean(ConnectionParams.PHY_SQL_TEMPLATE_CACHE);
}

public String getSubqueryId() {
return subqueryId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,9 @@ public void setIsUnderMergeSort(boolean isUnderMergeSort) {
*/
public List<RelNode> getInput(ExecutionContext executionContext) {
Map<String, List<List<String>>> targetTables = getTargetTables(executionContext);
PhyTableScanBuilder phyTableScanbuilder = new PhyTableScanBuilder((SqlSelect) getSqlTemplate(),
SqlSelect sqlTemplate = (SqlSelect) getSqlTemplate(executionContext);

PhyTableScanBuilder phyTableScanbuilder = new PhyTableScanBuilder(sqlTemplate,
targetTables,
executionContext,
this,
Expand All @@ -516,7 +518,8 @@ public List<RelNode> getInput(ExecutionContext executionContext) {

public List<RelNode> getInnerInput(UnionOptHelper helper, ExecutionContext executionContext,
boolean forceIgnoreRF) {
SqlSelect sqlTemplate = (SqlSelect) getSqlTemplate();
SqlSelect sqlTemplate = (SqlSelect) getSqlTemplate(executionContext);

return getInnerInput(sqlTemplate, helper, executionContext, forceIgnoreRF);
}

Expand Down Expand Up @@ -674,17 +677,21 @@ public void setTargetTables(Map<String, List<List<String>>> targetTables) {
this.targetTablesHintCache = targetTables;
}

public SqlNode getSqlTemplate(ExecutionContext executionContext) {
return getSqlTemplate(null, executionContext.enablePhySqlCache());
}

/**
* if no cache, return current SqlTemplate
*/
public SqlNode getSqlTemplate() {
return getSqlTemplate(null);
return getSqlTemplate(null, true);
}

public SqlNode getSqlTemplate(ReplaceCallWithLiteralVisitor visitor) {
public SqlNode getSqlTemplate(ReplaceCallWithLiteralVisitor visitor, boolean usingCache) {
if (sqlTemplateHintCache == null) {
if (visitor == null && scalarList.isEmpty() && correlateVariableScalar.isEmpty()) {
if (sqlTemplateCache == null) {
if (!usingCache || sqlTemplateCache == null) {
sqlTemplateCache = buildSqlTemplate(null);
}
return sqlTemplateCache;
Expand All @@ -696,6 +703,10 @@ public SqlNode getSqlTemplate(ReplaceCallWithLiteralVisitor visitor) {
}
}

public SqlNode getSqlTemplate(ReplaceCallWithLiteralVisitor visitor) {
return getSqlTemplate(visitor, true);
}

/**
* update SqlTemplate cache, for HINT ONLY!
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import com.alibaba.polardbx.optimizer.core.rel.util.IndexedDynamicParamInfo;
import com.alibaba.polardbx.optimizer.core.rel.util.RuntimeFilterDynamicParamInfo;
import com.alibaba.polardbx.optimizer.memory.MemoryAllocatorCtx;
import com.alibaba.polardbx.optimizer.parse.custruct.FastSqlConstructUtils;
import com.alibaba.polardbx.optimizer.utils.CalciteUtils;
import com.alibaba.polardbx.optimizer.utils.OptimizerUtils;
import com.alibaba.polardbx.optimizer.utils.PlannerUtils;
import com.alibaba.polardbx.optimizer.utils.RelUtils;
import com.google.common.base.Preconditions;
Expand All @@ -45,6 +47,8 @@
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlSelect;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.util.SqlShuttle;
import org.apache.calcite.util.Util;
import org.apache.commons.collections.CollectionUtils;
Expand Down Expand Up @@ -106,9 +110,18 @@ public PhyTableScanBuilder(SqlSelect sqlTemplate, Map<String, List<List<String>>
ExecutionContext executionContext, RelNode parent, DbType dbType,
RelDataType rowType, String schemaName, List<String> logicalTableNames) {
this.executionContext = executionContext;
if (executionContext.getCorrelateFieldInViewMap() == null
|| executionContext.getCorrelateFieldInViewMap().isEmpty()) {
this.targetTables = targetTables;
this.params = executionContext.getParams() == null ? null : executionContext.getParams().getCurrentParameter();
this.parent = parent;
this.dbType = dbType;
this.rowType = rowType;

boolean usingPhySqlCache = executionContext.enablePhySqlCache();
if (usingPhySqlCache &&
(executionContext.getCorrelateFieldInViewMap() == null || executionContext
.getCorrelateFieldInViewMap().isEmpty())) {
this.sqlTemplate = sqlTemplate;
this.sqlTemplate.accept(new FetchPreprocessor(params, true));
} else {
this.sqlTemplate = (SqlSelect) sqlTemplate.accept(
new ReplaceTableNameWithSomethingVisitor(executionContext.getCorrelateFieldInViewMap(), schemaName,
Expand All @@ -119,13 +132,8 @@ protected SqlNode buildSth(SqlNode sqlNode) {
}
}
);
this.sqlTemplate.accept(new FetchPreprocessor(params, false));
}
this.targetTables = targetTables;
this.params = executionContext.getParams() == null ? null : executionContext.getParams().getCurrentParameter();
this.parent = parent;
this.dbType = dbType;
this.rowType = rowType;
this.sqlTemplate.accept(new FetchPreprocessor(params));
this.dynamicParamList = PlannerUtils.getDynamicParamInfoList(this.sqlTemplate);
this.schemaName = schemaName;
this.logicalTableNames = logicalTableNames;
Expand All @@ -141,9 +149,11 @@ public PhyTableScanBuilder(SqlSelect sqlTemplate, Map<String, List<List<String>>
private static class FetchPreprocessor extends SqlShuttle {

protected final Map<Integer, ParameterContext> params;
private final boolean usingCache;

private FetchPreprocessor(Map<Integer, ParameterContext> params) {
private FetchPreprocessor(Map<Integer, ParameterContext> params, boolean usingCache) {
this.params = params;
this.usingCache = usingCache;
}

@Override
Expand Down Expand Up @@ -176,12 +186,29 @@ private void preProcessFetch(SqlSelect sqlTemplate) {
if (fetchVal == -1) {
return;
}
/**
* Set the new Fetch value. For native sql, we do not parameterized the limit
* value.
*/
sqlTemplate
.setFetch(SqlLiteral.createExactNumeric(String.valueOf(fetchVal), fetch.getParserPosition()));

if (usingCache) {
/*
We have to parameterize the limit due to LogicalView#sqlTemplateStringCache.
*/
SqlDynamicParam dynamicParam = new SqlDynamicParam(params.size(),
SqlTypeName.BIGINT,
SqlParserPos.ZERO,
null);
// it is ok to set concurrently
sqlTemplate.setComputedFetch(dynamicParam);
// put the computed value to params in execution context
params.put(params.size() + 1, new ParameterContext(
OptimizerUtils.getParameterMethod(fetchVal), new Object[] {params.size() + 1, fetchVal}));
} else {
/*
When no cache, we can generate a Literal directly.
Set the new Fetch value. For native sql, we do not parameterized the limit
value.
*/
sqlTemplate
.setFetch(SqlLiteral.createExactNumeric(String.valueOf(fetchVal), fetch.getParserPosition()));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ protected String getPlan(String testSql) {
false);
final HintPlanner hintPlanner = HintPlanner.getInstance(appName, executionContext);
executionContext.setInternalSystemSql(false);
executionContext.setUsingPhySqlCache(true);

final HintCmdOperator.CmdBean cmdBean = new HintCmdOperator.CmdBean(appName, executionContext.getExtraCmds(),
executionContext.getGroupHint());
executionContext.setRandomPhyTableEnabled(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ protected String getPlan(String testSql) {
ExecutionContext executionContext = new ExecutionContext();
final HintPlanner hintPlanner = HintPlanner.getInstance(appName, executionContext);
executionContext.setInternalSystemSql(false);
executionContext.setUsingPhySqlCache(true);

final HintCmdOperator.CmdBean cmdBean = new HintCmdOperator.CmdBean(appName, executionContext.getExtraCmds(),
executionContext.getGroupHint());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ public ExecutionContext makeExecutionContext(
ec.setRecorder(dataSource.getRecorder());
ec.setExecutorService(dataSource.borrowExecutorService());
ec.setInternalSystemSql(false);
ec.setUsingPhySqlCache(true);
ec.setExecuteMode(ExecutorMode.MPP);
ec.setPrivilegeContext(new MppPrivilegeContext());
ec.setTxIsolation(txIsolation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1194,6 +1194,7 @@ private void beforeExecution() {
* system sql of drds
*/
ec.setInternalSystemSql(false);
ec.setUsingPhySqlCache(true);
}
ec.setStartTime(System.nanoTime());
ec.setPrivilegeMode(isPrivilegeMode());
Expand Down

0 comments on commit a66b025

Please sign in to comment.