Skip to content

Commit

Permalink
[Feature] Support collect Hive histogram statistics (StarRocks#42186)
Browse files Browse the repository at this point in the history
Signed-off-by: Youngwb <[email protected]>
  • Loading branch information
Youngwb authored Mar 13, 2024
1 parent 1661842 commit c7ab986
Show file tree
Hide file tree
Showing 26 changed files with 1,028 additions and 60 deletions.
31 changes: 31 additions & 0 deletions be/src/runtime/statistic_result_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const int STATISTIC_TABLE_VERSION = 3;
const int STATISTIC_BATCH_VERSION = 4;
const int STATISTIC_EXTERNAL_VERSION = 5;
const int STATISTIC_EXTERNAL_QUERY_VERSION = 6;
const int STATISTIC_EXTERNAL_HISTOGRAM_VERSION = 7;

StatisticResultWriter::StatisticResultWriter(BufferControlBlock* sinker,
const std::vector<ExprContext*>& output_expr_ctxs,
Expand Down Expand Up @@ -156,6 +157,9 @@ StatusOr<TFetchDataResultPtr> StatisticResultWriter::_process_chunk(Chunk* chunk
} else if (version == STATISTIC_EXTERNAL_QUERY_VERSION) {
RETURN_IF_ERROR_WITH_WARN(_fill_full_statistic_query_external(version, result_columns, chunk, result.get()),
"Fill table statistic data failed");
} else if (version == STATISTIC_EXTERNAL_HISTOGRAM_VERSION) {
RETURN_IF_ERROR_WITH_WARN(_fill_statistic_histogram_external(version, result_columns, chunk, result.get()),
"Fill table statistic data failed");
}
return result;
}
Expand Down Expand Up @@ -266,6 +270,33 @@ Status StatisticResultWriter::_fill_statistic_histogram(int version, const Colum
return Status::OK();
}

Status StatisticResultWriter::_fill_statistic_histogram_external(int version, const Columns& columns,
const Chunk* chunk, TFetchDataResult* result) {
SCOPED_TIMER(_serialize_timer);
DCHECK(columns.size() == 3);

auto* columnName = down_cast<BinaryColumn*>(ColumnHelper::get_data_column(columns[1].get()));
auto* histogramColumn = down_cast<BinaryColumn*>(ColumnHelper::get_data_column(columns[2].get()));

std::vector<TStatisticData> data_list;
int num_rows = chunk->num_rows();

data_list.resize(num_rows);
for (int i = 0; i < num_rows; ++i) {
data_list[i].__set_columnName(columnName->get_slice(i).to_string());
data_list[i].__set_histogram(histogramColumn->get_slice(i).to_string());
}

result->result_batch.rows.resize(num_rows);
result->result_batch.__set_statistic_version(version);

ThriftSerializer serializer(true, chunk->memory_usage());
for (int i = 0; i < num_rows; ++i) {
RETURN_IF_ERROR(serializer.serialize(&data_list[i], &result->result_batch.rows[i]));
}
return Status::OK();
}

Status StatisticResultWriter::_fill_table_statistic_data(int version, const Columns& columns, const Chunk* chunk,
TFetchDataResult* result) {
SCOPED_TIMER(_serialize_timer);
Expand Down
3 changes: 3 additions & 0 deletions be/src/runtime/statistic_result_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ class StatisticResultWriter final : public ResultWriter {
Status _fill_full_statistic_query_external(int version, const Columns& columns, const Chunk* chunk,
TFetchDataResult* result);

Status _fill_statistic_histogram_external(int version, const Columns& columns, const Chunk* chunk,
TFetchDataResult* result);

private:
BufferControlBlock* _sinker;
const std::vector<ExprContext*>& _output_expr_ctxs;
Expand Down
11 changes: 11 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/journal/JournalEntity.java
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@
import com.starrocks.statistic.ExternalAnalyzeJob;
import com.starrocks.statistic.ExternalAnalyzeStatus;
import com.starrocks.statistic.ExternalBasicStatsMeta;
import com.starrocks.statistic.ExternalHistogramStatsMeta;
import com.starrocks.statistic.HistogramStatsMeta;
import com.starrocks.statistic.NativeAnalyzeJob;
import com.starrocks.statistic.NativeAnalyzeStatus;
Expand Down Expand Up @@ -974,6 +975,16 @@ public void readFields(DataInput in) throws IOException {
isRead = true;
break;
}
case OperationType.OP_ADD_EXTERNAL_HISTOGRAM_STATS_META: {
data = ExternalHistogramStatsMeta.read(in);
isRead = true;
break;
}
case OperationType.OP_REMOVE_EXTERNAL_HISTOGRAM_STATS_META: {
data = ExternalHistogramStatsMeta.read(in);
isRead = true;
break;
}
case OperationType.OP_MODIFY_HIVE_TABLE_COLUMN: {
data = ModifyTableColumnOperationLog.read(in);
isRead = true;
Expand Down
20 changes: 20 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/persist/EditLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@
import com.starrocks.statistic.ExternalAnalyzeJob;
import com.starrocks.statistic.ExternalAnalyzeStatus;
import com.starrocks.statistic.ExternalBasicStatsMeta;
import com.starrocks.statistic.ExternalHistogramStatsMeta;
import com.starrocks.statistic.HistogramStatsMeta;
import com.starrocks.statistic.NativeAnalyzeJob;
import com.starrocks.statistic.NativeAnalyzeStatus;
Expand Down Expand Up @@ -1009,6 +1010,17 @@ public static void loadJournal(GlobalStateMgr globalStateMgr, JournalEntity jour
globalStateMgr.getAnalyzeMgr().replayRemoveExternalBasicStatsMeta(basicStatsMeta);
break;
}
case OperationType.OP_ADD_EXTERNAL_HISTOGRAM_STATS_META: {
ExternalHistogramStatsMeta histogramStatsMeta = (ExternalHistogramStatsMeta) journal.getData();
globalStateMgr.getAnalyzeMgr().replayAddExternalHistogramStatsMeta(histogramStatsMeta);
// todo(ywb): refresh connector table histogram statistics cache
break;
}
case OperationType.OP_REMOVE_EXTERNAL_HISTOGRAM_STATS_META: {
ExternalHistogramStatsMeta histogramStatsMeta = (ExternalHistogramStatsMeta) journal.getData();
globalStateMgr.getAnalyzeMgr().replayRemoveExternalHistogramStatsMeta(histogramStatsMeta);
break;
}
case OperationType.OP_MODIFY_HIVE_TABLE_COLUMN: {
ModifyTableColumnOperationLog modifyTableColumnOperationLog =
(ModifyTableColumnOperationLog) journal.getData();
Expand Down Expand Up @@ -1796,6 +1808,14 @@ public void logRemoveExternalBasicStatsMeta(ExternalBasicStatsMeta meta) {
logEdit(OperationType.OP_REMOVE_EXTERNAL_BASIC_STATS_META, meta);
}

public void logAddExternalHistogramStatsMeta(ExternalHistogramStatsMeta meta) {
logEdit(OperationType.OP_ADD_EXTERNAL_HISTOGRAM_STATS_META, meta);
}

public void logRemoveExternalHistogramStatsMeta(ExternalHistogramStatsMeta meta) {
logEdit(OperationType.OP_REMOVE_EXTERNAL_HISTOGRAM_STATS_META, meta);
}

public void logModifyTableColumn(ModifyTableColumnOperationLog log) {
logEdit(OperationType.OP_MODIFY_HIVE_TABLE_COLUMN, log);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,12 @@ public class OperationType {
@IgnorableOnReplayFailed
public static final short OP_REMOVE_EXTERNAL_BASIC_STATS_META = 11205;

@IgnorableOnReplayFailed
public static final short OP_ADD_EXTERNAL_HISTOGRAM_STATS_META = 11206;

@IgnorableOnReplayFailed
public static final short OP_REMOVE_EXTERNAL_HISTOGRAM_STATS_META = 11207;

//Database json format log
public static final short OP_CREATE_DB_V2 = 12001;
@IgnorableOnReplayFailed
Expand Down
14 changes: 14 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/ShowExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@
import com.starrocks.statistic.AnalyzeStatus;
import com.starrocks.statistic.BasicStatsMeta;
import com.starrocks.statistic.ExternalBasicStatsMeta;
import com.starrocks.statistic.ExternalHistogramStatsMeta;
import com.starrocks.statistic.HistogramStatsMeta;
import com.starrocks.system.Backend;
import com.starrocks.system.SystemInfoService;
Expand Down Expand Up @@ -2282,6 +2283,19 @@ public ShowResultSet visitShowHistogramStatsMetaStatement(ShowHistogramStatsMeta
}
}

List<ExternalHistogramStatsMeta> externalMetas =
new ArrayList<>(context.getGlobalStateMgr().getAnalyzeMgr().getExternalHistogramStatsMetaMap().values());
for (ExternalHistogramStatsMeta meta : externalMetas) {
try {
List<String> result = ShowHistogramStatsMetaStmt.showExternalHistogramStatsMeta(context, meta);
if (result != null) {
rows.add(result);
}
} catch (MetaNotFoundException e) {
// pass
}
}

rows = doPredicate(statement, statement.getMetaData(), rows);
return new ShowResultSet(statement.getMetaData(), rows);
}
Expand Down
63 changes: 42 additions & 21 deletions fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@
import com.starrocks.statistic.AnalyzeMgr;
import com.starrocks.statistic.AnalyzeStatus;
import com.starrocks.statistic.ExternalAnalyzeStatus;
import com.starrocks.statistic.ExternalHistogramStatisticsCollectJob;
import com.starrocks.statistic.HistogramStatisticsCollectJob;
import com.starrocks.statistic.NativeAnalyzeJob;
import com.starrocks.statistic.NativeAnalyzeStatus;
Expand Down Expand Up @@ -1236,20 +1237,30 @@ private void executeAnalyze(ConnectContext statsConnectCtx, AnalyzeStmt analyzeS
Database db, Table table) {
StatisticExecutor statisticExecutor = new StatisticExecutor();
if (analyzeStmt.isExternal()) {
StatsConstants.AnalyzeType analyzeType = analyzeStmt.isSample() ? StatsConstants.AnalyzeType.SAMPLE :
StatsConstants.AnalyzeType.FULL;
// TODO: we should check old statistic and confirm paritionlist
statisticExecutor.collectStatistics(statsConnectCtx,
StatisticsCollectJobFactory.buildExternalStatisticsCollectJob(
analyzeStmt.getTableName().getCatalog(),
db, table, null,
analyzeStmt.getColumnNames(),
analyzeType,
StatsConstants.ScheduleType.ONCE, analyzeStmt.getProperties()),
analyzeStatus,
false);
if (analyzeStmt.getAnalyzeTypeDesc().isHistogram()) {
statisticExecutor.collectStatistics(statsConnectCtx,
new ExternalHistogramStatisticsCollectJob(analyzeStmt.getTableName().getCatalog(),
db, table, analyzeStmt.getColumnNames(),
StatsConstants.AnalyzeType.HISTOGRAM, StatsConstants.ScheduleType.ONCE,
analyzeStmt.getProperties()),
analyzeStatus,
false);
} else {
StatsConstants.AnalyzeType analyzeType = analyzeStmt.isSample() ? StatsConstants.AnalyzeType.SAMPLE :
StatsConstants.AnalyzeType.FULL;
// TODO: we should check old statistic and confirm paritionlist
statisticExecutor.collectStatistics(statsConnectCtx,
StatisticsCollectJobFactory.buildExternalStatisticsCollectJob(
analyzeStmt.getTableName().getCatalog(),
db, table, null,
analyzeStmt.getColumnNames(),
analyzeType,
StatsConstants.ScheduleType.ONCE, analyzeStmt.getProperties()),
analyzeStatus,
false);
}
} else {
if (analyzeStmt.getAnalyzeTypeDesc() instanceof AnalyzeHistogramDesc) {
if (analyzeStmt.getAnalyzeTypeDesc().isHistogram()) {
statisticExecutor.collectStatistics(statsConnectCtx,
new HistogramStatisticsCollectJob(db, table, analyzeStmt.getColumnNames(),
StatsConstants.AnalyzeType.HISTOGRAM, StatsConstants.ScheduleType.ONCE,
Expand Down Expand Up @@ -1297,14 +1308,24 @@ private void handleDropStatsStmt() {

private void handleDropHistogramStmt() {
DropHistogramStmt dropHistogramStmt = (DropHistogramStmt) parsedStmt;
OlapTable table = (OlapTable) MetaUtils.getTable(context, dropHistogramStmt.getTableName());
List<String> columns = table.getBaseSchema().stream().filter(d -> !d.isAggregated()).map(Column::getName)
.collect(Collectors.toList());

GlobalStateMgr.getCurrentState().getAnalyzeMgr().dropAnalyzeStatus(table.getId());
GlobalStateMgr.getCurrentState().getAnalyzeMgr()
.dropHistogramStatsMetaAndData(StatisticUtils.buildConnectContext(), Sets.newHashSet(table.getId()));
GlobalStateMgr.getCurrentState().getStatisticStorage().expireHistogramStatistics(table.getId(), columns);
Table table = MetaUtils.getTable(context, dropHistogramStmt.getTableName());
if (dropHistogramStmt.isExternal()) {
GlobalStateMgr.getCurrentState().getAnalyzeMgr().dropExternalAnalyzeStatus(table.getUUID());

GlobalStateMgr.getCurrentState().getAnalyzeMgr().dropExternalHistogramStatsMetaAndData(
StatisticUtils.buildConnectContext(), dropHistogramStmt.getTableName(), table,
dropHistogramStmt.getColumnNames());
// todo(ywb): expire external histogram statistics
} else {
List<String> columns = table.getBaseSchema().stream().filter(d -> !d.isAggregated()).map(Column::getName)
.collect(Collectors.toList());

GlobalStateMgr.getCurrentState().getAnalyzeMgr().dropAnalyzeStatus(table.getId());
GlobalStateMgr.getCurrentState().getAnalyzeMgr()
.dropHistogramStatsMetaAndData(StatisticUtils.buildConnectContext(),
Sets.newHashSet(table.getId()));
GlobalStateMgr.getCurrentState().getStatisticStorage().expireHistogramStatistics(table.getId(), columns);
}
}

private void handleKillAnalyzeStmt() {
Expand Down
Loading

0 comments on commit c7ab986

Please sign in to comment.