Skip to content

Commit

Permalink
Expire cached statistics for Fe follower when excute analyze job (Sta…
Browse files Browse the repository at this point in the history
  • Loading branch information
Youngwb authored Sep 24, 2021
1 parent 4a2c202 commit 3f9a964
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import com.starrocks.catalog.Catalog;
import com.starrocks.catalog.Column;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.OlapTable;
import com.starrocks.catalog.Partition;
import com.starrocks.catalog.Table;
import com.starrocks.common.ThreadPoolManager;
import com.starrocks.common.io.Text;
import com.starrocks.common.io.Writable;
import com.starrocks.persist.gson.GsonUtils;
Expand All @@ -22,16 +24,22 @@
import java.time.Clock;
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;

public class AnalyzeManager implements Writable {
private static final Logger LOG = LogManager.getLogger(AnalyzeManager.class);

private final Map<Long, AnalyzeJob> analyzeJobMap;

private ExecutorService executor;

public AnalyzeManager() {
analyzeJobMap = Maps.newConcurrentMap();
executor = ThreadPoolManager.newDaemonFixedThreadPool(1, 16, "analyze-replay-pool", true);
}

public void addAnalyzeJob(AnalyzeJob job) {
Expand Down Expand Up @@ -113,6 +121,7 @@ public void expireAnalyzeJob() {
}

public void replayAddAnalyzeJob(AnalyzeJob job) {
executor.submit(new AnalyzeReplayTask(job));
analyzeJobMap.put(job.getId(), job);
}

Expand Down Expand Up @@ -147,4 +156,78 @@ private static class SerializeData {
@SerializedName("analyzeJobs")
public List<AnalyzeJob> jobs;
}

// This task is used to expire cached statistics
public class AnalyzeReplayTask implements Runnable {
private AnalyzeJob analyzeJob;

public AnalyzeReplayTask(AnalyzeJob job) {
this.analyzeJob = job;
}

public void checkAndExpireCachedStatistics(Table table, AnalyzeJob job) {
if (null == table || !Table.TableType.OLAP.equals(table.getType())) {
return;
}

// check table has update
// use job last work time compare table update time to determine whether to expire cached statistics
LocalDateTime updateTime = StatisticUtils.getTableLastUpdateTime(table);
LocalDateTime jobLastWorkTime = LocalDateTime.MIN;
if (analyzeJobMap.containsKey(job.getId())) {
jobLastWorkTime = analyzeJobMap.get(job.getId()).getWorkTime();
}
if (jobLastWorkTime.isBefore(updateTime)) {
List<String> columns = (job.getColumns() == null || job.getColumns().isEmpty()) ?
table.getFullSchema().stream().filter(d -> !d.isAggregated()).map(Column::getName)
.collect(Collectors.toList()) : job.getColumns();
Catalog.getCurrentStatisticStorage().expireColumnStatistics(table, columns);
}
}

public void expireCachedStatistics(AnalyzeJob job) {
if (job.getScheduleType().equals(Constants.ScheduleType.ONCE)) {
Database db = Catalog.getCurrentCatalog().getDb(job.getDbId());
if (null == db) {
return;
}
Catalog.getCurrentStatisticStorage()
.expireColumnStatistics(db.getTable(job.getTableId()), job.getColumns());
} else {
List<Table> tableNeedCheck = new ArrayList<>();
if (job.getDbId() == AnalyzeJob.DEFAULT_ALL_ID) {
List<Long> dbIds = Catalog.getCurrentCatalog().getDbIds();
for (Long dbId : dbIds) {
Database db = Catalog.getCurrentCatalog().getDb(dbId);
if (null == db || StatisticUtils.statisticDatabaseBlackListCheck(db.getFullName())) {
continue;
}
tableNeedCheck.addAll(db.getTables());
}
} else if (job.getDbId() != AnalyzeJob.DEFAULT_ALL_ID &&
job.getTableId() == AnalyzeJob.DEFAULT_ALL_ID) {
Database db = Catalog.getCurrentCatalog().getDb(job.getDbId());
if (null == db) {
return;
}
tableNeedCheck.addAll(db.getTables());
} else {
Database db = Catalog.getCurrentCatalog().getDb(job.getDbId());
if (null == db) {
return;
}
tableNeedCheck.add(db.getTable(job.getTableId()));
}

for (Table table : tableNeedCheck) {
checkAndExpireCachedStatistics(table, job);
}
}
}

@Override
public void run() {
expireCachedStatistics(analyzeJob);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
import com.starrocks.catalog.Catalog;
import com.starrocks.catalog.Column;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.OlapTable;
import com.starrocks.catalog.Partition;
import com.starrocks.catalog.Table;
import com.starrocks.common.Config;
import com.starrocks.common.util.MasterDaemon;
Expand All @@ -18,8 +16,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.time.Clock;
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.Comparator;
Expand Down Expand Up @@ -234,7 +230,7 @@ private void createTableJobs(Map<Long, List<TableCollectJob>> tableJobs, Analyze
private void createTableJobs(Map<Long, List<TableCollectJob>> tableJobs, AnalyzeJob job,
Database db, Table table, List<String> columns) {
// check table has update
LocalDateTime updateTime = getTableLastUpdateTime(table);
LocalDateTime updateTime = StatisticUtils.getTableLastUpdateTime(table);

// 1. If job is schedule and the table has update, we need re-collect data
// 2. If job is once and is happened after the table update, we need add it to avoid schedule-job cover data
Expand Down Expand Up @@ -295,10 +291,4 @@ private void expireStatistic() {
LOG.warn("expire statistic failed.", e);
}
}

private LocalDateTime getTableLastUpdateTime(Table table) {
long maxTime = ((OlapTable) table).getPartitions().stream().map(Partition::getVisibleVersionTime)
.max(Long::compareTo).orElse(0L);
return LocalDateTime.ofInstant(Instant.ofEpochMilli(maxTime), Clock.systemDefaultZone().getZone());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,16 @@
import com.starrocks.analysis.UserIdentity;
import com.starrocks.catalog.Catalog;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.OlapTable;
import com.starrocks.catalog.Partition;
import com.starrocks.catalog.Table;
import com.starrocks.common.util.UUIDUtil;
import com.starrocks.qe.ConnectContext;
import com.starrocks.system.SystemInfoService;

import java.time.Clock;
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.List;

public class StatisticUtils {
Expand Down Expand Up @@ -68,4 +73,10 @@ public static boolean statisticTableBlackListCheck(long tableId) {
return false;
}

public static LocalDateTime getTableLastUpdateTime(Table table) {
long maxTime = ((OlapTable) table).getPartitions().stream().map(Partition::getVisibleVersionTime)
.max(Long::compareTo).orElse(0L);
return LocalDateTime.ofInstant(Instant.ofEpochMilli(maxTime), Clock.systemDefaultZone().getZone());
}

}

0 comments on commit 3f9a964

Please sign in to comment.