Skip to content

Commit

Permalink
server: fix TransactionLegacy DB connection leaks due to DB switching…
Browse files Browse the repository at this point in the history
… by B&R thread (apache#4121)

BackupSync task would switch between databases to update backup usage
metrics in the cloud_usage.usage_backup table. The current framework
and the usage in ManagedContext causes database connection
(LegacyTransaction) leaks. When the thread runs faster, the issue is
easily reproducible and checking via heap dump analysis or using JMX
MBeans. This fixes by moving the task of backup data updation for
usage data to the usage server by publishing usage events instead of
switching between databases in a local thread while in a
ManagedContextRunnable.

Signed-off-by: Rohit Yadav <[email protected]>
  • Loading branch information
rohityadavcloud authored Jun 16, 2020
1 parent 77947f2 commit b54d19b
Show file tree
Hide file tree
Showing 8 changed files with 75 additions and 79 deletions.
1 change: 1 addition & 0 deletions api/src/main/java/com/cloud/event/EventTypes.java
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,7 @@ public class EventTypes {
public static final String EVENT_VM_BACKUP_RESTORE_VOLUME_TO_VM = "BACKUP.RESTORE.VOLUME.TO.VM";
public static final String EVENT_VM_BACKUP_SCHEDULE_CONFIGURE = "BACKUP.SCHEDULE.CONFIGURE";
public static final String EVENT_VM_BACKUP_SCHEDULE_DELETE = "BACKUP.SCHEDULE.DELETE";
public static final String EVENT_VM_BACKUP_USAGE_METRIC = "BACKUP.USAGE.METRIC";

// external network device events
public static final String EVENT_EXTERNAL_NVP_CONTROLLER_ADD = "PHYSICAL.NVPCONTROLLER.ADD";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,11 @@
import java.util.Date;
import java.util.List;

import org.apache.cloudstack.backup.Backup;

import com.cloud.usage.UsageBackupVO;
import com.cloud.utils.db.GenericDao;
import com.cloud.vm.VirtualMachine;

public interface UsageBackupDao extends GenericDao<UsageBackupVO, Long> {
void updateMetrics(VirtualMachine vm, Backup.Metric metric);
void removeUsage(Long accountId, Long zoneId, Long backupId);
void updateMetrics(Long vmId, Long size, Long virtualSize);
void removeUsage(Long accountId, Long vmId, Date eventDate);
List<UsageBackupVO> getUsageRecords(Long accountId, Date startDate, Date endDate);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,69 +19,68 @@

import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.TimeZone;

import org.apache.cloudstack.backup.Backup;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Component;

import com.cloud.exception.CloudException;
import com.cloud.usage.UsageBackupVO;
import com.cloud.utils.DateUtil;
import com.cloud.utils.db.GenericDaoBase;
import com.cloud.utils.db.QueryBuilder;
import com.cloud.utils.db.SearchCriteria;
import com.cloud.utils.db.Transaction;
import com.cloud.utils.db.TransactionCallback;
import com.cloud.utils.db.TransactionLegacy;
import com.cloud.utils.db.TransactionStatus;
import com.cloud.vm.VirtualMachine;

@Component
public class UsageBackupDaoImpl extends GenericDaoBase<UsageBackupVO, Long> implements UsageBackupDao {
public static final Logger LOGGER = Logger.getLogger(UsageBackupDaoImpl.class);
protected static final String GET_USAGE_RECORDS_BY_ACCOUNT = "SELECT id, zone_id, account_id, domain_id, vm_id, backup_offering_id, size, protected_size, created, removed FROM cloud_usage.usage_backup WHERE " +
protected static final String UPDATE_DELETED = "UPDATE usage_backup SET removed = ? WHERE account_id = ? AND vm_id = ? and removed IS NULL";
protected static final String GET_USAGE_RECORDS_BY_ACCOUNT = "SELECT id, zone_id, account_id, domain_id, vm_id, backup_offering_id, size, protected_size, created, removed FROM usage_backup WHERE " +
" account_id = ? AND ((removed IS NULL AND created <= ?) OR (created BETWEEN ? AND ?) OR (removed BETWEEN ? AND ?) " +
" OR ((created <= ?) AND (removed >= ?)))";

@Override
public void updateMetrics(final VirtualMachine vm, Backup.Metric metric) {
boolean result = Transaction.execute(TransactionLegacy.USAGE_DB, new TransactionCallback<Boolean>() {
@Override
public Boolean doInTransaction(final TransactionStatus status) {
final QueryBuilder<UsageBackupVO> qb = QueryBuilder.create(UsageBackupVO.class);
qb.and(qb.entity().getVmId(), SearchCriteria.Op.EQ, vm.getId());
final UsageBackupVO entry = findOneBy(qb.create());
if (entry == null) {
return false;
}
entry.setSize(metric.getBackupSize());
entry.setProtectedSize(metric.getDataSize());
return update(entry.getId(), entry);
public void updateMetrics(final Long vmId, final Long size, final Long virtualSize) {
try (TransactionLegacy txn = TransactionLegacy.open(TransactionLegacy.USAGE_DB)) {
SearchCriteria<UsageBackupVO> sc = this.createSearchCriteria();
sc.addAnd("vmId", SearchCriteria.Op.EQ, vmId);
UsageBackupVO vo = findOneBy(sc);
if (vo != null) {
vo.setSize(size);
vo.setProtectedSize(virtualSize);
update(vo.getId(), vo);
}
});
if (!result) {
LOGGER.trace("Failed to update backup metrics for VM ID: " + vm.getId());
} catch (final Exception e) {
LOGGER.error("Error updating backup metrics: " + e.getMessage(), e);
}
}

@Override
public void removeUsage(Long accountId, Long zoneId, Long vmId) {
boolean result = Transaction.execute(TransactionLegacy.USAGE_DB, new TransactionCallback<Boolean>() {
@Override
public Boolean doInTransaction(final TransactionStatus status) {
final QueryBuilder<UsageBackupVO> qb = QueryBuilder.create(UsageBackupVO.class);
qb.and(qb.entity().getAccountId(), SearchCriteria.Op.EQ, accountId);
qb.and(qb.entity().getZoneId(), SearchCriteria.Op.EQ, zoneId);
qb.and(qb.entity().getVmId(), SearchCriteria.Op.EQ, vmId);
final UsageBackupVO entry = findOneBy(qb.create());
return remove(qb.create()) > 0;
public void removeUsage(Long accountId, Long vmId, Date eventDate) {
TransactionLegacy txn = TransactionLegacy.open(TransactionLegacy.USAGE_DB);
try {
txn.start();
try (PreparedStatement pstmt = txn.prepareStatement(UPDATE_DELETED);) {
if (pstmt != null) {
pstmt.setString(1, DateUtil.getDateDisplayString(TimeZone.getTimeZone("GMT"), eventDate));
pstmt.setLong(2, accountId);
pstmt.setLong(3, vmId);
pstmt.executeUpdate();
}
} catch (SQLException e) {
LOGGER.error("Error removing UsageBackupVO: " + e.getMessage(), e);
throw new CloudException("Remove backup usage exception: " + e.getMessage(), e);
}
});
if (!result) {
LOGGER.warn("Failed to remove usage entry for backup of VM ID: " + vmId);
txn.commit();
} catch (Exception e) {
txn.rollback();
LOGGER.error("Exception caught while removing UsageBackupVO: " + e.getMessage(), e);
} finally {
txn.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,30 +150,28 @@ public static TransactionLegacy open(final String name) {

public static TransactionLegacy open(final String name, final short databaseId, final boolean forceDbChange) {
TransactionLegacy txn = tls.get();
boolean isNew = false;
if (txn == null) {
if (s_logger.isTraceEnabled()) {
s_logger.trace("Creating the transaction: " + name);
}
txn = new TransactionLegacy(name, false, databaseId);
tls.set(txn);
isNew = true;
s_mbean.addTransaction(txn);
} else if (forceDbChange) {
final short currentDbId = txn.getDatabaseId();
if (currentDbId != databaseId) {
// we need to end the current transaction and switch databases
txn.close(txn.getName());
if (txn.close(txn.getName()) && txn.getCurrentConnection() == null) {
s_mbean.removeTransaction(txn);
}

txn = new TransactionLegacy(name, false, databaseId);
tls.set(txn);
isNew = true;
s_mbean.addTransaction(txn);
}
}
txn.checkConnection();
txn.takeOver(name, false);
if (isNew) {
s_mbean.addTransaction(txn);
}
return txn;
}

Expand Down Expand Up @@ -762,8 +760,8 @@ protected void closeConnection() {
}
_conn.close();
_conn = null;
s_mbean.removeTransaction(this);
}

} catch (final SQLException e) {
s_logger.warn("Unable to close connection", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,13 @@ public Pair<Boolean, String> restoreBackedUpVolume(Backup backup, String volumeU
public Map<VirtualMachine, Backup.Metric> getBackupMetrics(Long zoneId, List<VirtualMachine> vms) {
final Map<VirtualMachine, Backup.Metric> metrics = new HashMap<>();
final Backup.Metric metric = new Backup.Metric(1000L, 100L);
if (vms == null || vms.isEmpty()) {
return metrics;
}
for (VirtualMachine vm : vms) {
metrics.put(vm, metric);
if (vm != null) {
metrics.put(vm, metric);
}
}
return metrics;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,9 +216,12 @@ public Pair<Boolean, String> restoreBackedUpVolume(Backup backup, String volumeU
@Override
public Map<VirtualMachine, Backup.Metric> getBackupMetrics(final Long zoneId, final List<VirtualMachine> vms) {
final Map<VirtualMachine, Backup.Metric> metrics = new HashMap<>();
if (vms == null || vms.isEmpty()) {
return metrics;
}
final Map<String, Backup.Metric> backendMetrics = getClient(zoneId).getBackupMetrics();
for (final VirtualMachine vm : vms) {
if (!backendMetrics.containsKey(vm.getUuid())) {
if (vm == null || !backendMetrics.containsKey(vm.getUuid())) {
continue;
}
metrics.put(vm, backendMetrics.get(vm.getUuid()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@
import com.cloud.storage.VolumeVO;
import com.cloud.storage.dao.DiskOfferingDao;
import com.cloud.storage.dao.VolumeDao;
import com.cloud.usage.dao.UsageBackupDao;
import com.cloud.user.Account;
import com.cloud.user.AccountManager;
import com.cloud.user.AccountService;
Expand Down Expand Up @@ -126,8 +125,6 @@ public class BackupManagerImpl extends ManagerBase implements BackupManager {
@Inject
private AccountManager accountManager;
@Inject
private UsageBackupDao usageBackupDao;
@Inject
private VolumeDao volumeDao;
@Inject
private DataCenterDao dataCenterDao;
Expand Down Expand Up @@ -1001,7 +998,6 @@ public BackupSyncTask(final BackupManager backupManager) {

@Override
protected void runInContext() {
final int SYNC_INTERVAL = BackupSyncPollingInterval.value().intValue();
try {
if (LOG.isTraceEnabled()) {
LOG.trace("Backup sync background task is running...");
Expand All @@ -1022,31 +1018,23 @@ protected void runInContext() {
continue;
}

// Sync backup usage metrics
final Map<VirtualMachine, Backup.Metric> metrics = backupProvider.getBackupMetrics(dataCenter.getId(), new ArrayList<>(vms));
final GlobalLock syncBackupMetricsLock = GlobalLock.getInternLock("BackupSyncTask_metrics_zone_" + dataCenter.getId());
if (syncBackupMetricsLock.lock(SYNC_INTERVAL)) {
try {
for (final VirtualMachine vm : metrics.keySet()) {
final Backup.Metric metric = metrics.get(vm);
if (metric != null) {
usageBackupDao.updateMetrics(vm, metric);
}
try {
for (final VirtualMachine vm : metrics.keySet()) {
final Backup.Metric metric = metrics.get(vm);
if (metric != null) {
// Sync out-of-band backups
backupProvider.syncBackups(vm, metric);
// Emit a usage event, update usage metric for the VM by the usage server
UsageEventUtils.publishUsageEvent(EventTypes.EVENT_VM_BACKUP_USAGE_METRIC, vm.getAccountId(),
vm.getDataCenterId(), vm.getId(), "Backup-" + vm.getHostName() + "-" + vm.getUuid(),
vm.getBackupOfferingId(), null, metric.getBackupSize(), metric.getDataSize(),
Backup.class.getSimpleName(), vm.getUuid());
}
} finally {
syncBackupMetricsLock.unlock();
}
}

// Sync out-of-band backups
for (final VirtualMachine vm : vms) {
final GlobalLock syncBackupsLock = GlobalLock.getInternLock("BackupSyncTask_backup_vm_" + vm.getId());
if (syncBackupsLock.lock(SYNC_INTERVAL)) {
try {
backupProvider.syncBackups(vm, metrics.get(vm));
} finally {
syncBackupsLock.unlock();
}
} catch (final Throwable e) {
if (LOG.isTraceEnabled()) {
LOG.trace("Failed to sync backup usage metrics and out-of-band backups");
}
}
}
Expand Down
9 changes: 7 additions & 2 deletions usage/src/main/java/com/cloud/usage/UsageManagerImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -1081,7 +1081,10 @@ private boolean isVmSnapshotOnPrimaryEvent(String eventType) {
}

private boolean isBackupEvent(String eventType) {
return eventType != null && (eventType.equals(EventTypes.EVENT_VM_BACKUP_OFFERING_ASSIGN) || eventType.equals(EventTypes.EVENT_VM_BACKUP_OFFERING_REMOVE));
return eventType != null && (
eventType.equals(EventTypes.EVENT_VM_BACKUP_OFFERING_ASSIGN) ||
eventType.equals(EventTypes.EVENT_VM_BACKUP_OFFERING_REMOVE) ||
eventType.equals(EventTypes.EVENT_VM_BACKUP_USAGE_METRIC));
}

private void createVMHelperEvent(UsageEventVO event) {
Expand Down Expand Up @@ -1913,7 +1916,9 @@ private void createBackupEvent(final UsageEventVO event) {
final UsageBackupVO backupVO = new UsageBackupVO(zoneId, accountId, domainId, vmId, backupOfferingId, created);
usageBackupDao.persist(backupVO);
} else if (EventTypes.EVENT_VM_BACKUP_OFFERING_REMOVE.equals(event.getType())) {
usageBackupDao.removeUsage(accountId, zoneId, vmId);
usageBackupDao.removeUsage(accountId, vmId, event.getCreateDate());
} else if (EventTypes.EVENT_VM_BACKUP_USAGE_METRIC.equals(event.getType())) {
usageBackupDao.updateMetrics(vmId, event.getSize(), event.getVirtualSize());
}
}

Expand Down

0 comments on commit b54d19b

Please sign in to comment.