Skip to content

Commit

Permalink
tart pull: re-try disk layer downloads by specifying "Range" header (c…
Browse files Browse the repository at this point in the history
  • Loading branch information
edigaryev authored Dec 19, 2024
1 parent eaec015 commit b96ea08
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 11 deletions.
25 changes: 20 additions & 5 deletions Sources/tart/OCI/Layerizer/DiskV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class DiskV2: Disk {
let compressedData = try (data as NSData).compressed(using: .lz4) as Data
let compressedDataDigest = Digest.hash(compressedData)

try await retry(maxAttempts: 5, backoff: .exponentialWithFullJitter(baseDelay: .seconds(5), maxDelay: .seconds(60))) {
try await retry(maxAttempts: 5) {
if try await !registry.blobExists(compressedDataDigest) {
_ = try await registry.pushBlob(fromData: compressedData, chunkSizeMb: chunkSizeMb, digest: compressedDataDigest)
}
Expand Down Expand Up @@ -208,11 +208,26 @@ class DiskV2: Disk {
diskWritingOffset = try zeroSkippingWrite(disk, rdisk, fsBlockSize, diskWritingOffset, data)
}

try await registry.pullBlob(diskLayer.digest) { data in
try filter.write(data)
var rangeStart: Int64 = 0

// Update the progress
progress.completedUnitCount += Int64(data.count)
try await retry(maxAttempts: 5) {
try await registry.pullBlob(diskLayer.digest, rangeStart: rangeStart) { data in
try filter.write(data)

// Update the progress
progress.completedUnitCount += Int64(data.count)

// Update the current range start
rangeStart += Int64(data.count)
}
} recoverFromFailure: { error in
if error is URLError {
print("Error pulling disk layer \(index + 1): \"\(error.localizedDescription)\", attempting to re-try...")

return .retry
}

return .throw
}

try filter.finalize()
Expand Down
19 changes: 16 additions & 3 deletions Sources/tart/OCI/Registry.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ enum HTTPCode: Int {
case Ok = 200
case Created = 201
case Accepted = 202
case PartialContent = 206
case Unauthorized = 401
case NotFound = 404
}
Expand Down Expand Up @@ -263,9 +264,21 @@ class Registry {
}
}

public func pullBlob(_ digest: String, handler: (Data) async throws -> Void) async throws {
let (channel, response) = try await channelRequest(.GET, endpointURL("\(namespace)/blobs/\(digest)"), viaFile: true)
if response.statusCode != HTTPCode.Ok.rawValue {
public func pullBlob(_ digest: String, rangeStart: Int64 = 0, handler: (Data) async throws -> Void) async throws {
var expectedStatusCode = HTTPCode.Ok
var headers: [String: String] = [:]

// Send Range header and expect HTTP 206 in return
//
// However, do not send Range header at all when rangeStart is 0,
// because it makes no sense and we might get HTTP 200 in return
if rangeStart != 0 {
expectedStatusCode = HTTPCode.PartialContent
headers["Range"] = "bytes=\(rangeStart)-"
}

let (channel, response) = try await channelRequest(.GET, endpointURL("\(namespace)/blobs/\(digest)"), headers: headers, viaFile: true)
if response.statusCode != expectedStatusCode.rawValue {
let body = try await channel.asData().asText()
throw RegistryError.UnexpectedHTTPStatusCode(when: "pulling blob", code: response.statusCode,
details: body)
Expand Down
5 changes: 2 additions & 3 deletions Sources/tart/VMStorageOCI.swift
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ class VMStorageOCI: PrunableStorage {
}

try await withTaskCancellationHandler(operation: {
try await retry(maxAttempts: 5, backoff: .exponentialWithFullJitter(baseDelay: .seconds(5), maxDelay: .seconds(60))) {
try await retry(maxAttempts: 5) {
// Choose the best base image which has the most deduplication ratio
let localLayerCache = try await chooseLocalLayerCache(name, manifest, registry)

Expand All @@ -213,8 +213,7 @@ class VMStorageOCI: PrunableStorage {
try await tmpVMDir.pullFromRegistry(registry: registry, manifest: manifest, concurrency: concurrency, localLayerCache: localLayerCache, deduplicate: deduplicate)
} recoverFromFailure: { error in
if error is URLError {
print("Error: \(error.localizedDescription)")
print("Attempting to re-try...")
print("Error pulling image: \"\(error.localizedDescription)\", attempting to re-try...")

return .retry
}
Expand Down

0 comments on commit b96ea08

Please sign in to comment.