Skip to content

Commit

Permalink
chore: remove failed uploads retry logic
Browse files Browse the repository at this point in the history
  • Loading branch information
hua0512 committed Apr 26, 2024
1 parent d37ba00 commit 8a40f08
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 55 deletions.
3 changes: 0 additions & 3 deletions stream-rec/src/main/kotlin/github/hua0512/Application.kt
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,6 @@ class Application {
launch {
downloadService.run()
}
launch(Dispatchers.IO) {
uploadService.run()
}
// start a job to listen for events
launch {
EventCenter.run()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,6 @@ import kotlin.time.toDuration
*/
class UploadService(val app: App, private val uploadRepo: UploadRepo) {

/**
* A map to keep track of failed uploads and the number of times they failed.
* The key is the ID of the upload data and the value is the number of times the upload has failed.
*/
private val shouldNotRetryMap = mutableMapOf<UploadDataId, Int>()

companion object {
/**
* Logger for this class.
Expand All @@ -85,52 +79,6 @@ class UploadService(val app: App, private val uploadRepo: UploadRepo) {
*/
private val uploadSemaphore: Semaphore by lazy { Semaphore(app.config.maxConcurrentUploads) }

/**
* Runs the upload service.
* This function launches two coroutines. One for processing upload actions and another for handling failed uploads.
*/
suspend fun run() = coroutineScope {
// launch a coroutine scanning for failed uploads
launch {
uploadRepo.streamFailedUploadResults().onEach { failedResults ->
// for each failed upload get its upload data
failedResults.forEach { failedResult ->
val uploadData = uploadRepo.getUploadData(UploadDataId(failedResult.uploadDataId))
if (uploadData == null) {
// if the upload data is null, delete the failedResults
logger.info("Deleting failed upload failedResults: $failedResult")
uploadRepo.deleteUploadResult(UploadResultId(failedResult.id))
} else {
// if the upload data is not null, re-upload the file
// get upload action id
val uploadDataId = UploadDataId(uploadData.id)
val count = shouldNotRetryMap.getOrDefault(uploadDataId, 0)
// ignore if the upload data has failed more than 3 times
if (count >= 3) {
logger.error("Failed to upload file: ${uploadData.filePath} more than 3 times, skipping")
return@onEach
}

val uploadAction = uploadRepo.getUploadActionIdByUploadDataId(uploadDataId) ?: run {
logger.error("Upload action not found for upload data: $uploadData")
return@onEach // skip this failed result
}
// calculate the next retry time, current retry count * 2 factor * 10m
logger.info("Retrying failed upload: ${uploadData.filePath} in $count minutes")
val delayInMinutes = (count + 1) * 2 * 10
delay(duration = delayInMinutes.toDuration(DurationUnit.MINUTES))
// emit the same upload action with the failed upload data
upload(uploadAction.copy(files = setOf(uploadData)))
// increment the count
shouldNotRetryMap[uploadDataId] = count + 1
}
}
}.catch {
logger.error("Error in failed upload results flow", it)
}.collect()
}
}

/**
* Uploads an upload action.
* This function saves the upload action to the repository and emits it to the upload action flow.
Expand Down

0 comments on commit 8a40f08

Please sign in to comment.