Skip to content

Commit

Permalink
server: Submitting multiple dynamic VM Scaling API commands for the s…
Browse files Browse the repository at this point in the history
…ame instance can result in two usage events in the same second causing a compound key violation in usage service (apache#3991)

Root cause:
Even though dynamic scaling job is handled in vmworkjob queue which ensures serilizing multiple jobs but the database updating and generating usage events are out of the job queue.

Solution:
Moved all updations into the job queue

Firstly I have tested all the scenarios to check if nothing is broken:
Scaling on a running VM with normal compute offering
Scaling on a stopped VM with normal compute offering
Scaling on a running VM with custom compute offering
Scaling on stopped VM with custom compute offering
Scaling on stopped/running VM between custom compute offering and normal compute offering and combinations among these. Checked if the custom parameters have been populated or deleted accordingly based on the offering to which the VM is scaled
Since this is a corner scenario I could not test the exact point where two usage events are recorded at the same time for two different API calls on same VM.
  • Loading branch information
harikrishna-patnala authored Jun 16, 2020
1 parent 6a683dc commit 5054766
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ void advanceReboot(String vmUuid, Map<VirtualMachineProfile.Param, Object> param
* @param serviceOfferingId
* @return
*/
boolean upgradeVmDb(long vmId, long serviceOfferingId);
boolean upgradeVmDb(long vmId, ServiceOffering newServiceOffering, ServiceOffering currentServiceOffering);

/**
* @param vm
Expand Down Expand Up @@ -201,7 +201,7 @@ NicProfile addVmToNetwork(VirtualMachine vm, Network network, NicProfile request
boolean replugNic(Network network, NicTO nic, VirtualMachineTO vm, ReservationContext context, DeployDestination dest) throws ConcurrentOperationException,
ResourceUnavailableException, InsufficientCapacityException;

VirtualMachine reConfigureVm(String vmUuid, ServiceOffering newServiceOffering, boolean sameHost) throws ResourceUnavailableException, ConcurrentOperationException,
VirtualMachine reConfigureVm(String vmUuid, ServiceOffering oldServiceOffering, ServiceOffering newServiceOffering, Map<String, String> customParameters, boolean sameHost) throws ResourceUnavailableException, ConcurrentOperationException,
InsufficientServerCapacityException;

void findHostAndMigrate(String vmUuid, Long newSvcOfferingId, DeploymentPlanner.ExcludeList excludeHostList) throws InsufficientCapacityException,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@

import com.cloud.agent.api.PrepareForMigrationAnswer;
import com.cloud.agent.api.to.DpdkTO;
import com.cloud.event.UsageEventVO;
import com.cloud.offering.NetworkOffering;
import com.cloud.offerings.dao.NetworkOfferingDetailsDao;
import org.apache.cloudstack.affinity.dao.AffinityGroupVMMapDao;
Expand Down Expand Up @@ -232,6 +233,8 @@ public class VirtualMachineManagerImpl extends ManagerBase implements VirtualMac

private static final String VM_SYNC_ALERT_SUBJECT = "VM state sync alert";

@Inject
private UserVmManager _userVmMgr;
@Inject
private DataStoreManager dataStoreMgr;
@Inject
Expand Down Expand Up @@ -3406,13 +3409,20 @@ public void checkIfCanUpgrade(final VirtualMachine vmInstance, final ServiceOffe
}

@Override
public boolean upgradeVmDb(final long vmId, final long serviceOfferingId) {
final VMInstanceVO vmForUpdate = _vmDao.createForUpdate();
vmForUpdate.setServiceOfferingId(serviceOfferingId);
final ServiceOffering newSvcOff = _entityMgr.findById(ServiceOffering.class, serviceOfferingId);
public boolean upgradeVmDb(final long vmId, final ServiceOffering newServiceOffering, ServiceOffering currentServiceOffering) {

final VMInstanceVO vmForUpdate = _vmDao.findById(vmId);
vmForUpdate.setServiceOfferingId(newServiceOffering.getId());
final ServiceOffering newSvcOff = _entityMgr.findById(ServiceOffering.class, newServiceOffering.getId());
vmForUpdate.setHaEnabled(newSvcOff.isOfferHA());
vmForUpdate.setLimitCpuUse(newSvcOff.getLimitCpuUse());
vmForUpdate.setServiceOfferingId(newSvcOff.getId());
if (newServiceOffering.isDynamic()) {
saveCustomOfferingDetails(vmId, newServiceOffering);
}
if (currentServiceOffering.isDynamic() && !newServiceOffering.isDynamic()) {
removeCustomOfferingDetails(vmId);
}
return _vmDao.update(vmId, vmForUpdate);
}

Expand Down Expand Up @@ -4087,8 +4097,8 @@ public boolean unplugNic(final Network network, final NicTO nic, final VirtualMa
}

@Override
public VMInstanceVO reConfigureVm(final String vmUuid, final ServiceOffering oldServiceOffering,
final boolean reconfiguringOnExistingHost)
public VMInstanceVO reConfigureVm(final String vmUuid, final ServiceOffering oldServiceOffering, final ServiceOffering newServiceOffering,
Map<String, String> customParameters, final boolean reconfiguringOnExistingHost)
throws ResourceUnavailableException, InsufficientServerCapacityException, ConcurrentOperationException {

final AsyncJobExecutionContext jobContext = AsyncJobExecutionContext.getCurrentExecutionContext();
Expand All @@ -4098,14 +4108,14 @@ public VMInstanceVO reConfigureVm(final String vmUuid, final ServiceOffering old
final VirtualMachine vm = _vmDao.findByUuid(vmUuid);
placeHolder = createPlaceHolderWork(vm.getId());
try {
return orchestrateReConfigureVm(vmUuid, oldServiceOffering, reconfiguringOnExistingHost);
return orchestrateReConfigureVm(vmUuid, oldServiceOffering, newServiceOffering, reconfiguringOnExistingHost);
} finally {
if (placeHolder != null) {
_workJobDao.expunge(placeHolder.getId());
}
}
} else {
final Outcome<VirtualMachine> outcome = reconfigureVmThroughJobQueue(vmUuid, oldServiceOffering, reconfiguringOnExistingHost);
final Outcome<VirtualMachine> outcome = reconfigureVmThroughJobQueue(vmUuid, oldServiceOffering, newServiceOffering, customParameters, reconfiguringOnExistingHost);

VirtualMachine vm = null;
try {
Expand Down Expand Up @@ -4134,14 +4144,12 @@ public VMInstanceVO reConfigureVm(final String vmUuid, final ServiceOffering old
}
}

private VMInstanceVO orchestrateReConfigureVm(final String vmUuid, final ServiceOffering oldServiceOffering, final boolean reconfiguringOnExistingHost) throws ResourceUnavailableException,
ConcurrentOperationException {
private VMInstanceVO orchestrateReConfigureVm(final String vmUuid, final ServiceOffering oldServiceOffering, final ServiceOffering newServiceOffering,
final boolean reconfiguringOnExistingHost) throws ResourceUnavailableException, ConcurrentOperationException {
final VMInstanceVO vm = _vmDao.findByUuid(vmUuid);
upgradeVmDb(vm.getId(), newServiceOffering, oldServiceOffering);

final long newServiceofferingId = vm.getServiceOfferingId();
final ServiceOffering newServiceOffering = _offeringDao.findById(vm.getId(), newServiceofferingId);
final HostVO hostVo = _hostDao.findById(vm.getHostId());

final Float memoryOvercommitRatio = CapacityManager.MemOverprovisioningFactor.valueIn(hostVo.getClusterId());
final Float cpuOvercommitRatio = CapacityManager.CpuOverprovisioningFactor.valueIn(hostVo.getClusterId());
final long minMemory = (long)(newServiceOffering.getRamSize() / memoryOvercommitRatio);
Expand All @@ -4168,7 +4176,7 @@ private VMInstanceVO orchestrateReConfigureVm(final String vmUuid, final Service
if (reconfiguringOnExistingHost) {
vm.setServiceOfferingId(oldServiceOffering.getId());
_capacityMgr.releaseVmCapacity(vm, false, false, vm.getHostId()); //release the old capacity
vm.setServiceOfferingId(newServiceofferingId);
vm.setServiceOfferingId(newServiceOffering.getId());
_capacityMgr.allocateVmCapacity(vm, false); // lock the new capacity
}

Expand All @@ -4177,7 +4185,9 @@ private VMInstanceVO orchestrateReConfigureVm(final String vmUuid, final Service
s_logger.error("Unable to scale vm due to " + (reconfigureAnswer == null ? "" : reconfigureAnswer.getDetails()));
throw new CloudRuntimeException("Unable to scale vm due to " + (reconfigureAnswer == null ? "" : reconfigureAnswer.getDetails()));
}

if (vm.getType().equals(VirtualMachine.Type.User)) {
_userVmMgr.generateUsageEvent(vm, vm.isDisplayVm(), EventTypes.EVENT_VM_DYNAMIC_SCALE);
}
success = true;
} catch (final OperationTimedoutException e) {
throw new AgentUnavailableException("Operation timed out on reconfiguring " + vm, dstHostId);
Expand All @@ -4186,7 +4196,7 @@ private VMInstanceVO orchestrateReConfigureVm(final String vmUuid, final Service
} finally {
if (!success) {
_capacityMgr.releaseVmCapacity(vm, false, false, vm.getHostId()); // release the new capacity
vm.setServiceOfferingId(oldServiceOffering.getId());
upgradeVmDb(vm.getId(), oldServiceOffering, newServiceOffering); // rollback
_capacityMgr.allocateVmCapacity(vm, false); // allocate the old capacity
}
}
Expand All @@ -4195,6 +4205,33 @@ private VMInstanceVO orchestrateReConfigureVm(final String vmUuid, final Service

}

private void removeCustomOfferingDetails(long vmId) {
Map<String, String> details = userVmDetailsDao.listDetailsKeyPairs(vmId);
details.remove(UsageEventVO.DynamicParameters.cpuNumber.name());
details.remove(UsageEventVO.DynamicParameters.cpuSpeed.name());
details.remove(UsageEventVO.DynamicParameters.memory.name());
List<UserVmDetailVO> detailList = new ArrayList<UserVmDetailVO>();
for(Map.Entry<String, String> entry: details.entrySet()) {
UserVmDetailVO detailVO = new UserVmDetailVO(vmId, entry.getKey(), entry.getValue(), true);
detailList.add(detailVO);
}
userVmDetailsDao.saveDetails(detailList);
}

private void saveCustomOfferingDetails(long vmId, ServiceOffering serviceOffering) {
//save the custom values to the database.
Map<String, String> details = userVmDetailsDao.listDetailsKeyPairs(vmId);
details.put(UsageEventVO.DynamicParameters.cpuNumber.name(), serviceOffering.getCpu().toString());
details.put(UsageEventVO.DynamicParameters.cpuSpeed.name(), serviceOffering.getSpeed().toString());
details.put(UsageEventVO.DynamicParameters.memory.name(), serviceOffering.getRamSize().toString());
List<UserVmDetailVO> detailList = new ArrayList<UserVmDetailVO>();
for (Map.Entry<String, String> entry: details.entrySet()) {
UserVmDetailVO detailVO = new UserVmDetailVO(vmId, entry.getKey(), entry.getValue(), true);
detailList.add(detailVO);
}
userVmDetailsDao.saveDetails(detailList);
}

@Override
public String getConfigComponentName() {
return VirtualMachineManager.class.getSimpleName();
Expand Down Expand Up @@ -5090,7 +5127,7 @@ public Outcome<VirtualMachine> removeVmFromNetworkThroughJobQueue(
}

public Outcome<VirtualMachine> reconfigureVmThroughJobQueue(
final String vmUuid, final ServiceOffering newServiceOffering, final boolean reconfiguringOnExistingHost) {
final String vmUuid, final ServiceOffering oldServiceOffering, final ServiceOffering newServiceOffering, Map<String, String> customParameters, final boolean reconfiguringOnExistingHost) {

final CallContext context = CallContext.current();
final User user = context.getCallingUser();
Expand Down Expand Up @@ -5121,7 +5158,7 @@ public Outcome<VirtualMachine> reconfigureVmThroughJobQueue(

// save work context info (there are some duplications)
final VmWorkReconfigure workInfo = new VmWorkReconfigure(user.getId(), account.getId(), vm.getId(),
VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, newServiceOffering.getId(), reconfiguringOnExistingHost);
VirtualMachineManagerImpl.VM_WORK_JOB_HANDLER, oldServiceOffering.getId(), newServiceOffering.getId(), customParameters, reconfiguringOnExistingHost);
workJob.setCmdInfo(VmWorkSerializer.serialize(workInfo));

_jobMgr.submitAsyncJob(workJob, VmWorkConstants.VM_WORK_QUEUE, vm.getId());
Expand Down Expand Up @@ -5280,10 +5317,14 @@ private Pair<JobInfo.Status, String> orchestrateReconfigure(final VmWorkReconfig
s_logger.info("Unable to find vm " + work.getVmId());
}
assert vm != null;
ServiceOfferingVO oldServiceOffering = _offeringDao.findById(work.getOldServiceOfferingId());
ServiceOfferingVO newServiceOffering = _offeringDao.findById(work.getNewServiceOfferingId());
if (newServiceOffering.isDynamic()) {
// update the service offering object with the custom parameters like cpu, memory
newServiceOffering = _offeringDao.getcomputeOffering(newServiceOffering, work.getCustomParameters());
}

final ServiceOffering newServiceOffering = _offeringDao.findById(vm.getId(), work.getNewServiceOfferingId());

reConfigureVm(vm.getUuid(), newServiceOffering,
reConfigureVm(vm.getUuid(), oldServiceOffering, newServiceOffering, work.getCustomParameters(),
work.isSameHost());
return new Pair<JobInfo.Status, String>(JobInfo.Status.SUCCEEDED, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,40 @@
package com.cloud.vm;


import java.util.Map;

public class VmWorkReconfigure extends VmWork {
private static final long serialVersionUID = -4517030323758086615L;

Long oldServiceOfferingId;
Long newServiceOfferingId;

Map<String, String> customParameters;
boolean sameHost;

public VmWorkReconfigure(long userId, long accountId, long vmId, String handlerName,
Long newServiceOfferingId, boolean sameHost) {
public VmWorkReconfigure(long userId, long accountId, long vmId, String handlerName, Long oldServiceOfferingId,
Long newServiceOfferingId, Map<String, String> customParameters, boolean sameHost) {

super(userId, accountId, vmId, handlerName);

this.oldServiceOfferingId = oldServiceOfferingId;
this.newServiceOfferingId = newServiceOfferingId;
this.customParameters = customParameters;
this.sameHost = sameHost;
}

public Long getOldServiceOfferingId() {
return oldServiceOfferingId;
}

public Long getNewServiceOfferingId() {
return newServiceOfferingId;
}

public Map<String, String> getCustomParameters() {
return customParameters;
}

public boolean isSameHost() {
return sameHost;
}
Expand Down
10 changes: 1 addition & 9 deletions server/src/main/java/com/cloud/server/ManagementServerImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -4086,15 +4086,7 @@ private VirtualMachine upgradeStoppedSystemVm(final Long systemVmId, final Long
}
_itMgr.checkIfCanUpgrade(systemVm, newServiceOffering);

final boolean result = _itMgr.upgradeVmDb(systemVmId, serviceOfferingId);

if (newServiceOffering.isDynamic()) {
//save the custom values to the database.
_userVmMgr.saveCustomOfferingDetails(systemVmId, newServiceOffering);
}
if (currentServiceOffering.isDynamic() && !newServiceOffering.isDynamic()) {
_userVmMgr.removeCustomOfferingDetails(systemVmId);
}
final boolean result = _itMgr.upgradeVmDb(systemVmId, newServiceOffering, currentServiceOffering);

if (result) {
return _vmInstanceDao.findById(systemVmId);
Expand Down
5 changes: 0 additions & 5 deletions server/src/main/java/com/cloud/vm/UserVmManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import com.cloud.exception.ManagementServerException;
import com.cloud.exception.ResourceUnavailableException;
import com.cloud.exception.VirtualMachineMigrationException;
import com.cloud.offering.ServiceOffering;
import com.cloud.service.ServiceOfferingVO;
import com.cloud.storage.Storage.StoragePoolType;
import com.cloud.user.Account;
Expand Down Expand Up @@ -117,10 +116,6 @@ UserVm updateVirtualMachine(long id, String displayName, String group, Boolean h
//find a common place for all the scaling and upgrading code of both user and systemvms.
void validateCustomParameters(ServiceOfferingVO serviceOffering, Map<String, String> customParameters);

public void saveCustomOfferingDetails(long vmId, ServiceOffering serviceOffering);

public void removeCustomOfferingDetails(long vmId);

void generateUsageEvent(VirtualMachine vm, boolean isDisplay, String eventType);

void persistDeviceBusInfo(UserVmVO paramUserVmVO, String paramString);
Expand Down
Loading

0 comments on commit 5054766

Please sign in to comment.