Skip to content

Commit

Permalink
fix rebuilding DownloadPackage.Storage issue
Browse files Browse the repository at this point in the history
  • Loading branch information
bezzad committed Nov 17, 2024
1 parent 3c8bec8 commit ad56fad
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 50 deletions.
2 changes: 1 addition & 1 deletion src/Downloader.Test/UnitTests/DownloadBuilderTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public async Task TestPackageWhenNewUrl()
public async Task TestPackageWhenResume()
{
// arrange
DownloadPackage package = new DownloadPackage() {
DownloadPackage package = new() {
Urls = [_url],
IsSupportDownloadInRange = true
};
Expand Down
6 changes: 3 additions & 3 deletions src/Downloader.Test/UnitTests/DownloadPackageTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public virtual async Task InitializeAsync()
Data = DummyData.GenerateOrderedBytes(DummyFileHelper.FileSize16Kb);
Package.BuildStorage(false, 1024 * 1024);
new ChunkHub(Config).SetFileChunks(Package);
await Package.Storage.WriteAsync(0, Data, DummyFileHelper.FileSize16Kb);
await Package.Storage.WriteAsync(0, Data, Data.Length);
await Package.Storage.FlushAsync();
}

Expand All @@ -26,9 +26,9 @@ public virtual Task DisposeAsync()
public void PackageSerializationTest()
{
// act
var serialized = Newtonsoft.Json.JsonConvert.SerializeObject(Package);
var serialized = JsonConvert.SerializeObject(Package);
Package.Storage.Dispose();
var deserialized = Newtonsoft.Json.JsonConvert.DeserializeObject<DownloadPackage>(serialized);
var deserialized = JsonConvert.DeserializeObject<DownloadPackage>(serialized);
var destData = new byte[deserialized.TotalFileSize];
_ = deserialized.Storage.OpenRead().Read(destData, 0, destData.Length);

Expand Down
4 changes: 2 additions & 2 deletions src/Downloader/AbstractDownloadService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public virtual async Task<Stream> DownloadFileTaskAsync(DownloadPackage package,
{
Package = package;
await InitialDownloader(cancellationToken, urls).ConfigureAwait(false);
return await StartDownload().ConfigureAwait(false);
return await StartDownload(false).ConfigureAwait(false);
}

/// <summary>
Expand Down Expand Up @@ -389,7 +389,7 @@ protected async Task StartDownload(string fileName)
/// Starts the download operation.
/// </summary>
/// <returns>A task that represents the asynchronous download operation. The task result contains the downloaded stream.</returns>
protected abstract Task<Stream> StartDownload();
protected abstract Task<Stream> StartDownload(bool forceBuildStorage = true);

/// <summary>
/// Raises the <see cref="DownloadStarted"/> event.
Expand Down
80 changes: 38 additions & 42 deletions src/Downloader/ConcurrentStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,19 @@ public class ConcurrentStream : TaskStateManagement, IDisposable, IAsyncDisposab
{
private ConcurrentPacketBuffer<Packet> _inputBuffer;
private volatile bool _disposed;
private Stream _stream;
private string _path;
private Stream _stream; // Lazy base stream
private CancellationTokenSource _watcherCancelSource;

protected Stream Stream => _stream ??= IsMemoryStream

Check warning on line 19 in src/Downloader/ConcurrentStream.cs

View workflow job for this annotation

GitHub Actions / build

Missing XML comment for publicly visible type or member 'ConcurrentStream.Stream'

Check warning on line 19 in src/Downloader/ConcurrentStream.cs

View workflow job for this annotation

GitHub Actions / build

Missing XML comment for publicly visible type or member 'ConcurrentStream.Stream'

Check warning on line 19 in src/Downloader/ConcurrentStream.cs

View workflow job for this annotation

GitHub Actions / build

Missing XML comment for publicly visible type or member 'ConcurrentStream.Stream'

Check warning on line 19 in src/Downloader/ConcurrentStream.cs

View workflow job for this annotation

GitHub Actions / build

Missing XML comment for publicly visible type or member 'ConcurrentStream.Stream'

Check warning on line 19 in src/Downloader/ConcurrentStream.cs

View workflow job for this annotation

GitHub Actions / build

Missing XML comment for publicly visible type or member 'ConcurrentStream.Stream'

Check warning on line 19 in src/Downloader/ConcurrentStream.cs

View workflow job for this annotation

GitHub Actions / build

Missing XML comment for publicly visible type or member 'ConcurrentStream.Stream'

Check warning on line 19 in src/Downloader/ConcurrentStream.cs

View workflow job for this annotation

GitHub Actions / build

Missing XML comment for publicly visible type or member 'ConcurrentStream.Stream'

Check warning on line 19 in src/Downloader/ConcurrentStream.cs

View workflow job for this annotation

GitHub Actions / build

Missing XML comment for publicly visible type or member 'ConcurrentStream.Stream'

Check warning on line 19 in src/Downloader/ConcurrentStream.cs

View workflow job for this annotation

GitHub Actions / build

Missing XML comment for publicly visible type or member 'ConcurrentStream.Stream'
? new MemoryStream()
: new FileStream(Path, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.ReadWrite);

public bool IsDisposed => _disposed;

Check warning on line 23 in src/Downloader/ConcurrentStream.cs

View workflow job for this annotation

GitHub Actions / build

Missing XML comment for publicly visible type or member 'ConcurrentStream.IsDisposed'

Check warning on line 23 in src/Downloader/ConcurrentStream.cs

View workflow job for this annotation

GitHub Actions / build

Missing XML comment for publicly visible type or member 'ConcurrentStream.IsDisposed'

Check warning on line 23 in src/Downloader/ConcurrentStream.cs

View workflow job for this annotation

GitHub Actions / build

Missing XML comment for publicly visible type or member 'ConcurrentStream.IsDisposed'

Check warning on line 23 in src/Downloader/ConcurrentStream.cs

View workflow job for this annotation

GitHub Actions / build

Missing XML comment for publicly visible type or member 'ConcurrentStream.IsDisposed'

Check warning on line 23 in src/Downloader/ConcurrentStream.cs

View workflow job for this annotation

GitHub Actions / build

Missing XML comment for publicly visible type or member 'ConcurrentStream.IsDisposed'

Check warning on line 23 in src/Downloader/ConcurrentStream.cs

View workflow job for this annotation

GitHub Actions / build

Missing XML comment for publicly visible type or member 'ConcurrentStream.IsDisposed'

/// <summary>
/// Gets or sets the path of the file associated with the stream.
/// </summary>
public string Path
{
get => _path;
set
{
if (string.IsNullOrWhiteSpace(value))
return;

_path = value;
_stream = new FileStream(_path, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.ReadWrite);
}
}
public string Path { get; set; }

/// <summary>
/// Gets the data of the stream as a byte array if the stream is a MemoryStream.
Expand All @@ -41,52 +35,56 @@ public byte[] Data
get
{
if (_disposed)
throw new ObjectDisposedException(nameof(_stream));
throw new ObjectDisposedException(nameof(Stream));

if (_stream is MemoryStream mem)
if (Stream is MemoryStream mem)
return mem.ToArray();

return null;
}
set
{
if (value != null)
{
// Don't pass straight value to MemoryStream,
// because causes stream to be an immutable array
_stream = new MemoryStream();
_stream.Write(value, 0, value.Length);
}
if (value is null) return;

// Note: Don't pass straight value to MemoryStream,
// because causes stream to be an immutable array
_stream = new MemoryStream();
_stream.Write(value, 0, value.Length);
}
}

/// <summary>
/// Is the <see cref="MemoryStream"/> type of the base stream or not.
/// </summary>
public bool IsMemoryStream => string.IsNullOrWhiteSpace(Path);

/// <summary>
/// Gets a value indicating whether the stream supports reading.
/// </summary>
public bool CanRead => _stream?.CanRead == true;
public bool CanRead => Stream?.CanRead == true;

/// <summary>
/// Gets a value indicating whether the stream supports seeking.
/// </summary>
public bool CanSeek => _stream?.CanSeek == true;
public bool CanSeek => Stream?.CanSeek == true;

/// <summary>
/// Gets a value indicating whether the stream supports writing.
/// </summary>
public bool CanWrite => _stream?.CanWrite == true;
public bool CanWrite => Stream?.CanWrite == true;

/// <summary>
/// Gets the length of the stream in bytes.
/// </summary>
public long Length => _stream?.Length ?? 0;
public long Length => Stream?.Length ?? 0;

/// <summary>
/// Gets or sets the current position within the stream.
/// </summary>
public long Position
{
get => _stream?.Position ?? 0;
set => _stream.Position = value;
get => Stream?.Position ?? 0;
set => Stream.Position = value;
}

/// <summary>
Expand Down Expand Up @@ -116,7 +114,6 @@ public ConcurrentStream(ILogger logger) : this(0, logger) { }
/// <param name="logger">The logger to use for logging.</param>
public ConcurrentStream(long maxMemoryBufferBytes = 0, ILogger logger = null) : base(logger)
{
_stream = new MemoryStream();
Initial(maxMemoryBufferBytes);
}

Expand All @@ -142,8 +139,7 @@ public ConcurrentStream(Stream stream, long maxMemoryBufferBytes = 0, ILogger lo
public ConcurrentStream(string filename, long initSize, long maxMemoryBufferBytes = 0, ILogger logger = null) :
base(logger)
{
_path = filename;
_stream = new FileStream(filename, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.Read);
Path = filename;

if (initSize > 0)
SetLength(initSize);
Expand Down Expand Up @@ -177,7 +173,7 @@ private void Initial(long maxMemoryBufferBytes, ILogger logger = null)
public Stream OpenRead()
{
Seek(0, SeekOrigin.Begin);
return _stream;
return Stream;
}

/// <summary>
Expand All @@ -189,8 +185,7 @@ public Stream OpenRead()
/// <returns>The total number of bytes read into the buffer.</returns>
public int Read(byte[] buffer, int offset, int count)
{
var stream = OpenRead();
return stream.Read(buffer, offset, count);
return OpenRead().Read(buffer, offset, count);
}

/// <summary>
Expand Down Expand Up @@ -259,7 +254,7 @@ public long Seek(long offset, SeekOrigin origin)
{
if (offset != Position && CanSeek)
{
_stream.Seek(offset, origin);
Stream.Seek(offset, origin);
}

return Position;
Expand All @@ -271,7 +266,7 @@ public long Seek(long offset, SeekOrigin origin)
/// <param name="value">The desired length of the current stream in bytes.</param>
public void SetLength(long value)
{
_stream.SetLength(value);
Stream.SetLength(value);
}

/// <summary>
Expand All @@ -283,7 +278,7 @@ private async Task WritePacketOnFile(Packet packet)
{
// seek with SeekOrigin.Begin is so faster than SeekOrigin.Current
Seek(packet.Position, SeekOrigin.Begin);
await _stream.WriteAsync(packet.Data).ConfigureAwait(false);
await Stream.WriteAsync(packet.Data).ConfigureAwait(false);
packet.Dispose();
}

Expand All @@ -295,9 +290,10 @@ public async Task FlushAsync()
{
await _inputBuffer.WaitToComplete().ConfigureAwait(false);

if (_stream?.CanRead == true)
if (CanRead)
{
await _stream.FlushAsync().ConfigureAwait(false);
await Stream.FlushAsync().ConfigureAwait(false);
GC.Collect();
}

GC.Collect();
Expand All @@ -312,7 +308,7 @@ public void Dispose()
{
_disposed = true;
_watcherCancelSource.Cancel(); // request the cancellation
_stream.Dispose();
Stream.Dispose();
_inputBuffer.Dispose();
}
}
Expand All @@ -331,7 +327,7 @@ public async ValueTask DisposeAsync()
#else
_watcherCancelSource.Cancel(); // request the cancellation
#endif
await _stream.DisposeAsync().ConfigureAwait(false);
await Stream.DisposeAsync().ConfigureAwait(false);
_inputBuffer.Dispose();
}
}
Expand Down
10 changes: 8 additions & 2 deletions src/Downloader/DownloadService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,21 @@ public DownloadService(ILoggerFactory loggerFactory = null) : this(null, loggerF
/// Starts the download operation.
/// </summary>
/// <returns>A task that represents the asynchronous download operation. The task result contains the downloaded stream.</returns>
protected override async Task<Stream> StartDownload()
protected override async Task<Stream> StartDownload(bool forceBuildStorage = true)
{
try
{
await SingleInstanceSemaphore.WaitAsync().ConfigureAwait(false);
Package.TotalFileSize = await RequestInstances.First().GetFileSize().ConfigureAwait(false);
Package.IsSupportDownloadInRange =
await RequestInstances.First().IsSupportDownloadInRange().ConfigureAwait(false);
Package.BuildStorage(Options.ReserveStorageSpaceBeforeStartingDownload, Options.MaximumMemoryBufferBytes);

if (forceBuildStorage || Package.Storage is null || Package.Storage.IsDisposed)
{
Package.BuildStorage(Options.ReserveStorageSpaceBeforeStartingDownload,
Options.MaximumMemoryBufferBytes);
}

ValidateBeforeChunking();
ChunkHub.SetFileChunks(Package);

Expand Down

0 comments on commit ad56fad

Please sign in to comment.