Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] Trigger new starmgr image in alter system create image for shared-data mode (backport #54370) #54389

Merged
merged 2 commits into from
Dec 27, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
fix conflict (#54400)
Signed-off-by: xiangguangyxg <[email protected]>
  • Loading branch information
xiangguangyxg authored Dec 26, 2024
commit 298a79a5f15cde7f83486c3b130830af1c7718bb
234 changes: 3 additions & 231 deletions fe/fe-core/src/main/java/com/starrocks/alter/SystemHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,237 +96,6 @@ public SystemHandler() {
}

@Override
<<<<<<< HEAD
=======
// add synchronized to avoid process 2 or more stmts at same time
public synchronized ShowResultSet process(List<AlterClause> alterClauses, Database dummyDb,
OlapTable dummyTbl) throws StarRocksException {
Preconditions.checkArgument(alterClauses.size() == 1);
AlterClause alterClause = alterClauses.get(0);
alterClause.accept(SystemHandler.Visitor.getInstance(), null);
return null;
}

protected static class Visitor implements AstVisitor<Void, Void> {
private static final SystemHandler.Visitor INSTANCE = new SystemHandler.Visitor();

public static SystemHandler.Visitor getInstance() {
return INSTANCE;
}

@Override
public Void visitAddFollowerClause(AddFollowerClause clause, Void context) {
ErrorReport.wrapWithRuntimeException(() -> {
GlobalStateMgr.getCurrentState().getNodeMgr()
.addFrontend(FrontendNodeType.FOLLOWER, clause.getHost(), clause.getPort());
});
return null;
}

@Override
public Void visitDropFollowerClause(DropFollowerClause clause, Void context) {
ErrorReport.wrapWithRuntimeException(() -> {
GlobalStateMgr.getCurrentState().getNodeMgr()
.dropFrontend(FrontendNodeType.FOLLOWER, clause.getHost(), clause.getPort());

});
return null;
}

@Override
public Void visitAddObserverClause(AddObserverClause clause, Void context) {
ErrorReport.wrapWithRuntimeException(() -> {
GlobalStateMgr.getCurrentState().getNodeMgr()
.addFrontend(FrontendNodeType.OBSERVER, clause.getHost(), clause.getPort());
});
return null;
}

@Override
public Void visitDropObserverClause(DropObserverClause clause, Void context) {
ErrorReport.wrapWithRuntimeException(() -> {
GlobalStateMgr.getCurrentState().getNodeMgr()
.dropFrontend(FrontendNodeType.OBSERVER, clause.getHost(), clause.getPort());
});
return null;
}

@Override
public Void visitModifyFrontendHostClause(ModifyFrontendAddressClause clause, Void context) {
ErrorReport.wrapWithRuntimeException(() -> {
GlobalStateMgr.getCurrentState().getNodeMgr().modifyFrontendHost(clause);
});
return null;
}

@Override
public Void visitAddBackendClause(AddBackendClause clause, Void context) {
ErrorReport.wrapWithRuntimeException(() -> {
GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().addBackends(clause);
});
return null;
}

@Override
public Void visitDropBackendClause(DropBackendClause clause, Void context) {
ErrorReport.wrapWithRuntimeException(() -> {
GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().dropBackends(clause);
});
return null;
}

@Override
public Void visitModifyBackendClause(ModifyBackendClause clause, Void context) {
ErrorReport.wrapWithRuntimeException(() -> {
GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().modifyBackend(clause);
});
return null;
}

@Override
public Void visitDecommissionBackendClause(DecommissionBackendClause clause, Void context) {
ErrorReport.wrapWithRuntimeException(() -> {
/*
* check if the specified backends can be decommissioned
* 1. backend should exist.
* 2. after decommission, the remaining backend num should meet the replication num.
* 3. after decommission, The remaining space capacity can store data on decommissioned backends.
*/

SystemInfoService infoService = GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo();
List<Backend> decommissionBackends = Lists.newArrayList();
Set<Long> decommissionIds = new HashSet<>();

long needCapacity = 0L;
long releaseCapacity = 0L;
// check if exist
for (Pair<String, Integer> pair : clause.getHostPortPairs()) {
Backend backend = infoService.getBackendWithHeartbeatPort(pair.first, pair.second);
if (backend == null) {
throw new DdlException("Backend does not exist[" +
NetUtils.getHostPortInAccessibleFormat(pair.first, pair.second) + "]");
}
if (backend.isDecommissioned()) {
// already under decommission, ignore it
LOG.info(backend.getAddress() + " has already been decommissioned and will be ignored.");
continue;
}
needCapacity += backend.getDataUsedCapacityB();
releaseCapacity += backend.getAvailableCapacityB();
decommissionBackends.add(backend);
decommissionIds.add(backend.getId());
}

if (decommissionBackends.isEmpty()) {
LOG.info("No backends will be decommissioned.");
} else {
// when decommission backends in shared_data mode, unnecessary to check clusterCapacity or table replica
if (RunMode.isSharedNothingMode()) {
if (infoService.getClusterAvailableCapacityB() - releaseCapacity < needCapacity) {
decommissionBackends.clear();
throw new DdlException("It will cause insufficient disk space if these BEs are decommissioned.");
}

long availableBackendCnt = infoService.getAvailableBackendIds()
.stream()
.filter(beId -> !decommissionIds.contains(beId))
.count();
short maxReplicationNum = 0;
LocalMetastore localMetastore = GlobalStateMgr.getCurrentState().getLocalMetastore();
for (long dbId : localMetastore.getDbIds()) {
Database db = localMetastore.getDb(dbId);
if (db == null) {
continue;
}
Locker locker = new Locker();
locker.lockDatabase(db.getId(), LockType.READ);
try {
for (Table table : GlobalStateMgr.getCurrentState().getLocalMetastore().getTables(db.getId())) {
if (table instanceof OlapTable) {
OlapTable olapTable = (OlapTable) table;
PartitionInfo partitionInfo = olapTable.getPartitionInfo();
for (long partitionId : olapTable.getAllPartitionIds()) {
short replicationNum = partitionInfo.getReplicationNum(partitionId);
if (replicationNum > maxReplicationNum) {
maxReplicationNum = replicationNum;
if (availableBackendCnt < maxReplicationNum) {
decommissionBackends.clear();
throw new DdlException(
"It will cause insufficient BE number if these BEs " +
"are decommissioned because the table " +
db.getFullName() + "." + olapTable.getName() +
" requires " + maxReplicationNum + " replicas.");

}
}
}
}
}
} finally {
locker.unLockDatabase(db.getId(), LockType.READ);
}
}
}

// set backend's state as 'decommissioned'
// for decommission operation, here is no decommission job. the system handler will check
// all backend in decommission state
for (Backend backend : decommissionBackends) {
backend.setDecommissioned(true);
GlobalStateMgr.getCurrentState().getEditLog().logBackendStateChange(backend);
LOG.info("set backend {} to decommission", backend.getId());
}
}
});
return null;
}

@Override
public Void visitModifyBrokerClause(ModifyBrokerClause clause, Void context) {
ErrorReport.wrapWithRuntimeException(() -> {
GlobalStateMgr.getCurrentState().getBrokerMgr().execute(clause);
});
return null;
}

@Override
public Void visitAddComputeNodeClause(AddComputeNodeClause clause, Void context) {
ErrorReport.wrapWithRuntimeException(() -> {
GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().addComputeNodes(clause);
});
return null;
}

@Override
public Void visitDropComputeNodeClause(DropComputeNodeClause clause, Void context) {
ErrorReport.wrapWithRuntimeException(() -> {
GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().dropComputeNodes(clause);
});
return null;
}

@Override
public Void visitCreateImageClause(CreateImageClause clause, Void context) {
ErrorReport.wrapWithRuntimeException(() -> {
GlobalStateMgr.getCurrentState().triggerNewImage();
if (RunMode.isSharedDataMode()) {
StarMgrServer.getCurrentState().triggerNewImage();
}
});
return null;
}

@Override
public Void visitCleanTabletSchedQClause(CleanTabletSchedQClause clause, Void context) {
ErrorReport.wrapWithRuntimeException(() -> {
GlobalStateMgr.getCurrentState().getTabletScheduler().forceCleanSchedQ();
});
return null;
}
}

@Override
>>>>>>> 3f0743032b ([Enhancement] Trigger new starmgr image in alter system create image for shared-data mode (#54370))
protected void runAfterCatalogReady() {
super.runAfterCatalogReady();
runAlterJobV2();
Expand Down Expand Up @@ -436,6 +205,9 @@ public synchronized ShowResultSet process(List<AlterClause> alterClauses, Databa
GlobalStateMgr.getCurrentSystemInfo().dropComputeNodes(dropComputeNodeClause.getHostPortPairs());
} else if (alterClause instanceof CreateImageClause) {
GlobalStateMgr.getCurrentState().triggerNewImage();
if (RunMode.isSharedDataMode()) {
StarMgrServer.getCurrentState().triggerNewImage();
}
} else if (alterClause instanceof CleanTabletSchedQClause) {
GlobalStateMgr.getCurrentState().getTabletScheduler().forceCleanSchedQ();
} else {
Expand Down
11 changes: 0 additions & 11 deletions fe/fe-core/src/main/java/com/starrocks/staros/StarMgrServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -265,19 +265,8 @@ public long getMaxJournalId() {
public long getReplayId() {
return getJournalSystem().getReplayId();
}
<<<<<<< HEAD
=======

public CheckpointController getCheckpointController() {
return checkpointController;
}

public CheckpointWorker getCheckpointWorker() {
return checkpointWorker;
}

public void triggerNewImage() {
journalSystem.getJournalWriter().setForceRollJournal();
}
>>>>>>> 3f0743032b ([Enhancement] Trigger new starmgr image in alter system create image for shared-data mode (#54370))
}
Loading