Skip to content

Commit

Permalink
add shadow position field to ConcurrentStream.cs to handle deserializ…
Browse files Browse the repository at this point in the history
…e action and setting after init real base stream
  • Loading branch information
bezzad committed Nov 17, 2024
1 parent ad56fad commit 7d6fd44
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 15 deletions.
44 changes: 44 additions & 0 deletions src/Downloader.Test/IntegrationTests/DownloadIntegrationTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -873,4 +873,48 @@ public async Task DownloadBigFileWithMemoryLimitationOnDisk()
Assert.Equal(fillByte, fileStream.ReadByte());
}
}

[Fact]
public async Task StorePackageFileWhenDownloadInProgress()
{
// arrange
const long totalSize = 1024 * 1024 * 256; // 256MB
const double snapshotPoint = 0.50; // 50%
SemaphoreSlim semaphore = new(1, 1);
Tuple<long, string> pack = new(0, "");
Config.ChunkCount = 8;
Config.ParallelCount = 8;
Config.BufferBlockSize = 1024;
Config.MaximumBytesPerSecond = 1024 * 1024 * 8; // 8MB/s
Config.MaximumMemoryBufferBytes = totalSize / 2; // 128MB
Url = DummyFileHelper.GetFileWithNameUrl(Filename, totalSize);
Downloader.DownloadProgressChanged += async (_, e) => {
if (snapshotPoint >= e.ProgressPercentage || pack.Item1 != 0)
return;

try
{
await semaphore.WaitAsync();
if (pack.Item1 == 0)
{
pack = new Tuple<long, string>(e.ReceivedBytesSize,
JsonConvert.SerializeObject(Downloader.Package));
await Downloader.CancelTaskAsync();
}
}
finally
{
semaphore.Release();
}
};

// act
await Downloader.DownloadFileTaskAsync(Url, FilePath);
await using FileStream fileStream = File.Open(FilePath, FileMode.Open, FileAccess.Read);

// assert
Assert.True(pack.Item1 > 0);
DownloadPackage restoredPackage = JsonConvert.DeserializeObject<DownloadPackage>(pack.Item2);
// Assert.Equal(pack.Item1, restoredPackage.ReceivedBytesSize);
}
}
1 change: 0 additions & 1 deletion src/Downloader.Test/UnitTests/StorageTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ public async Task TestDispose()
Storage.Dispose();

// assert
Assert.ThrowsAny<ObjectDisposedException>(() => Storage.Length);
Assert.ThrowsAny<ObjectDisposedException>(() => Storage.Data);
}

Expand Down
56 changes: 42 additions & 14 deletions src/Downloader/ConcurrentStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@ public class ConcurrentStream : TaskStateManagement, IDisposable, IAsyncDisposab
private ConcurrentPacketBuffer<Packet> _inputBuffer;
private volatile bool _disposed;
private Stream _stream; // Lazy base stream
public long _position;

Check warning on line 17 in src/Downloader/ConcurrentStream.cs

View workflow job for this annotation

GitHub Actions / build

Missing XML comment for publicly visible type or member 'ConcurrentStream._position'

Check warning on line 17 in src/Downloader/ConcurrentStream.cs

View workflow job for this annotation

GitHub Actions / build

Missing XML comment for publicly visible type or member 'ConcurrentStream._position'

Check warning on line 17 in src/Downloader/ConcurrentStream.cs

View workflow job for this annotation

GitHub Actions / build

Missing XML comment for publicly visible type or member 'ConcurrentStream._position'

Check warning on line 17 in src/Downloader/ConcurrentStream.cs

View workflow job for this annotation

GitHub Actions / build

Missing XML comment for publicly visible type or member 'ConcurrentStream._position'

Check warning on line 17 in src/Downloader/ConcurrentStream.cs

View workflow job for this annotation

GitHub Actions / build

Missing XML comment for publicly visible type or member 'ConcurrentStream._position'

Check warning on line 17 in src/Downloader/ConcurrentStream.cs

View workflow job for this annotation

GitHub Actions / build

Missing XML comment for publicly visible type or member 'ConcurrentStream._position'
private CancellationTokenSource _watcherCancelSource;

protected Stream Stream => _stream ??= IsMemoryStream
? new MemoryStream()
: new FileStream(Path, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.ReadWrite);
protected Stream Stream => _stream ?? GetStream();

Check warning on line 20 in src/Downloader/ConcurrentStream.cs

View workflow job for this annotation

GitHub Actions / build

Missing XML comment for publicly visible type or member 'ConcurrentStream.Stream'

Check warning on line 20 in src/Downloader/ConcurrentStream.cs

View workflow job for this annotation

GitHub Actions / build

Missing XML comment for publicly visible type or member 'ConcurrentStream.Stream'

Check warning on line 20 in src/Downloader/ConcurrentStream.cs

View workflow job for this annotation

GitHub Actions / build

Missing XML comment for publicly visible type or member 'ConcurrentStream.Stream'

Check warning on line 20 in src/Downloader/ConcurrentStream.cs

View workflow job for this annotation

GitHub Actions / build

Missing XML comment for publicly visible type or member 'ConcurrentStream.Stream'

Check warning on line 20 in src/Downloader/ConcurrentStream.cs

View workflow job for this annotation

GitHub Actions / build

Missing XML comment for publicly visible type or member 'ConcurrentStream.Stream'

Check warning on line 20 in src/Downloader/ConcurrentStream.cs

View workflow job for this annotation

GitHub Actions / build

Missing XML comment for publicly visible type or member 'ConcurrentStream.Stream'

public bool IsDisposed => _disposed;

Check warning on line 22 in src/Downloader/ConcurrentStream.cs

View workflow job for this annotation

GitHub Actions / build

Missing XML comment for publicly visible type or member 'ConcurrentStream.IsDisposed'

Check warning on line 22 in src/Downloader/ConcurrentStream.cs

View workflow job for this annotation

GitHub Actions / build

Missing XML comment for publicly visible type or member 'ConcurrentStream.IsDisposed'

Check warning on line 22 in src/Downloader/ConcurrentStream.cs

View workflow job for this annotation

GitHub Actions / build

Missing XML comment for publicly visible type or member 'ConcurrentStream.IsDisposed'

Check warning on line 22 in src/Downloader/ConcurrentStream.cs

View workflow job for this annotation

GitHub Actions / build

Missing XML comment for publicly visible type or member 'ConcurrentStream.IsDisposed'

Check warning on line 22 in src/Downloader/ConcurrentStream.cs

View workflow job for this annotation

GitHub Actions / build

Missing XML comment for publicly visible type or member 'ConcurrentStream.IsDisposed'

Check warning on line 22 in src/Downloader/ConcurrentStream.cs

View workflow job for this annotation

GitHub Actions / build

Missing XML comment for publicly visible type or member 'ConcurrentStream.IsDisposed'

Expand All @@ -45,10 +44,13 @@ public byte[] Data
set
{
if (value is null) return;
lock (this)
{
// Note: Don't pass straight value to MemoryStream,
// because causes stream to be an immutable array
_stream = new MemoryStream();
}

// Note: Don't pass straight value to MemoryStream,
// because causes stream to be an immutable array
_stream = new MemoryStream();
_stream.Write(value, 0, value.Length);
}
}
Expand Down Expand Up @@ -76,15 +78,21 @@ public byte[] Data
/// <summary>
/// Gets the length of the stream in bytes.
/// </summary>
public long Length => Stream?.Length ?? 0;
public long Length => _stream?.Length ?? 0;

/// <summary>
/// Gets or sets the current position within the stream.
/// </summary>
public long Position
{
get => Stream?.Position ?? 0;
set => Stream.Position = value;
get => _stream?.Position ?? _position;
set
{
if (_stream is null)
_position = value; // keep value to deserialized and set after init stream
else
_stream.Position = value;
}
}

/// <summary>
Expand Down Expand Up @@ -141,7 +149,7 @@ public ConcurrentStream(string filename, long initSize, long maxMemoryBufferByte
{
Path = filename;

if (initSize > 0)
if (initSize >= 0)
SetLength(initSize);

Initial(maxMemoryBufferBytes);
Expand All @@ -166,6 +174,28 @@ private void Initial(long maxMemoryBufferBytes, ILogger logger = null)
task.Unwrap();
}

protected Stream GetStream()

Check warning on line 177 in src/Downloader/ConcurrentStream.cs

View workflow job for this annotation

GitHub Actions / build

Missing XML comment for publicly visible type or member 'ConcurrentStream.GetStream()'

Check warning on line 177 in src/Downloader/ConcurrentStream.cs

View workflow job for this annotation

GitHub Actions / build

Missing XML comment for publicly visible type or member 'ConcurrentStream.GetStream()'

Check warning on line 177 in src/Downloader/ConcurrentStream.cs

View workflow job for this annotation

GitHub Actions / build

Missing XML comment for publicly visible type or member 'ConcurrentStream.GetStream()'

Check warning on line 177 in src/Downloader/ConcurrentStream.cs

View workflow job for this annotation

GitHub Actions / build

Missing XML comment for publicly visible type or member 'ConcurrentStream.GetStream()'

Check warning on line 177 in src/Downloader/ConcurrentStream.cs

View workflow job for this annotation

GitHub Actions / build

Missing XML comment for publicly visible type or member 'ConcurrentStream.GetStream()'

Check warning on line 177 in src/Downloader/ConcurrentStream.cs

View workflow job for this annotation

GitHub Actions / build

Missing XML comment for publicly visible type or member 'ConcurrentStream.GetStream()'
{
if (_disposed || _stream is not null)
return _stream;

lock (this)
{
// check again after enter to lock scopt to insure another thread didn't create the stream
if (_stream is not null)
return _stream;

_stream = IsMemoryStream
? new MemoryStream()
: new FileStream(Path, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.ReadWrite);

if (_position > 0)
Seek(_position, SeekOrigin.Begin);
}

return _stream;
}

/// <summary>
/// Opens the stream for reading.
/// </summary>
Expand Down Expand Up @@ -289,11 +319,9 @@ private async Task WritePacketOnFile(Packet packet)
public async Task FlushAsync()
{
await _inputBuffer.WaitToComplete().ConfigureAwait(false);

if (CanRead)
{
await Stream.FlushAsync().ConfigureAwait(false);
GC.Collect();
}

GC.Collect();
Expand All @@ -308,7 +336,7 @@ public void Dispose()
{
_disposed = true;
_watcherCancelSource.Cancel(); // request the cancellation
Stream.Dispose();
_stream?.Dispose();
_inputBuffer.Dispose();
}
}
Expand All @@ -327,7 +355,7 @@ public async ValueTask DisposeAsync()
#else
_watcherCancelSource.Cancel(); // request the cancellation
#endif
await Stream.DisposeAsync().ConfigureAwait(false);
await _stream.DisposeAsync().ConfigureAwait(false);
_inputBuffer.Dispose();
}
}
Expand Down

0 comments on commit 7d6fd44

Please sign in to comment.