1
0
mirror of https://github.com/oliverbooth/X10D synced 2024-11-22 07:58:49 +00:00

feat: add Progress<T>.OnProgressChanged

Provides a mechanism to wrap the ProgressChanged event of a Progress<T> as an IObservable<T>.
This commit is contained in:
Oliver Booth 2023-04-10 12:44:53 +01:00
parent a4a1d3b13a
commit f6847315a1
No known key found for this signature in database
GPG Key ID: 20BEB9DC87961025
6 changed files with 256 additions and 0 deletions

View File

@ -8,10 +8,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## 4.0.0 - [Unreleased]
### Added
- X10D: Added extension methods for `DateOnly`, for parity with `DateTime` and `DateTimeOffset`.
- X10D: Added math-related extension methods for `BigInteger`.
- X10D: Added `Span<T>.Replace(T, T)`.
- X10D: Added `CountDigits` for integer types.
- X10D: Added `Progress<T>.OnProgressChanged([T])`;
- X10D: Added `TextWriter.WriteNoAlloc(int[, ReadOnlySpan<char>[, IFormatProvider]])`.
- X10D: Added `TextWriter.WriteNoAlloc(uint[, ReadOnlySpan<char>[, IFormatProvider]])`.
- X10D: Added `TextWriter.WriteNoAlloc(long[, ReadOnlySpan<char>[, IFormatProvider]])`.
@ -24,6 +26,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- X10D.Unity: Added `GameObject.CopyComponent<T>(GameObject)` and `GameObject.MoveComponent<T>(GameObject)`.
### Changed
- X10D: `DateTime.Age(DateTime)` and `DateTimeOffset.Age(DateTimeOffset)` parameter renamed from `asOf` to `referenceDate`.
## [3.2.0] - 2023-04-03

View File

@ -22,6 +22,7 @@
<PackageReference Include="NUnit3TestAdapter" Version="4.4.2"/>
<PackageReference Include="NUnit.Analyzers" Version="3.6.1"/>
<PackageReference Include="coverlet.collector" Version="3.2.0"/>
<PackageReference Include="System.Reactive" Version="5.0.0"/>
</ItemGroup>
<ItemGroup>

View File

@ -0,0 +1,67 @@
using NUnit.Framework;
using X10D.Reactive;
namespace X10D.Tests.Reactive;
[TestFixture]
public class ProgressTests
{
[Test]
public void OnProgressChanged_ShouldCallCompletionDelegate_GivenCompletionValue()
{
var subscriberWasCalled = false;
var completionWasCalled = false;
var progress = new Progress<float>();
progress.OnProgressChanged(1.0f).Subscribe(_ => subscriberWasCalled = true, () => completionWasCalled = true);
((IProgress<float>)progress).Report(0.5f);
((IProgress<float>)progress).Report(1.0f);
Thread.Sleep(1000);
Assert.That(subscriberWasCalled);
Assert.That(completionWasCalled);
}
[Test]
public void OnProgressChanged_ShouldCallSubscribers_OnProgressChanged()
{
var subscriberWasCalled = false;
var progress = new Progress<float>();
progress.OnProgressChanged().Subscribe(_ => subscriberWasCalled = true);
((IProgress<float>)progress).Report(0.5f);
Thread.Sleep(1000);
Assert.That(subscriberWasCalled);
}
[Test]
public void OnProgressChanged_ShouldCallSubscribers_OnProgressChanged_GivenCompletionValue()
{
var subscriberWasCalled = false;
var progress = new Progress<float>();
progress.OnProgressChanged(1.0f).Subscribe(_ => subscriberWasCalled = true);
((IProgress<float>)progress).Report(0.5f);
Thread.Sleep(1000);
Assert.That(subscriberWasCalled);
}
[Test]
public void OnProgressChanged_ShouldThrowArgumentNullException_GivenNullProgress()
{
Progress<float> progress = null!;
Assert.Throws<ArgumentNullException>(() => progress.OnProgressChanged());
}
[Test]
public void OnProgressChanged_ShouldThrowArgumentNullException_GivenNullProgressAndCompletionValue()
{
Progress<float> progress = null!;
Assert.Throws<ArgumentNullException>(() => progress.OnProgressChanged(1.0f));
}
}

View File

@ -0,0 +1,48 @@
namespace X10D.Reactive;
/// <summary>
/// Represents a disposable that removes an observer from a collection of observers.
/// </summary>
internal readonly struct ObservableDisposer<T> : IDisposable
{
private readonly HashSet<IObserver<T>> _observers;
private readonly IObserver<T> _observer;
private readonly Action? _additionalAction;
/// <summary>
/// Initializes a new instance of the <see cref="ObservableDisposer{T}" /> struct.
/// </summary>
/// <param name="observers">A collection of observers from which to remove the specified observer.</param>
/// <param name="observer">The observer to remove from the collection.</param>
/// <param name="additionalAction">The additional action to run on dispose.</param>
public ObservableDisposer(HashSet<IObserver<T>> observers, IObserver<T> observer, Action? additionalAction)
{
#if NET6_0_OR_GREATER
ArgumentNullException.ThrowIfNull(observers);
ArgumentNullException.ThrowIfNull(observer);
#else
if (observers is null)
{
throw new ArgumentNullException(nameof(observers));
}
if (observer is null)
{
throw new ArgumentNullException(nameof(observer));
}
#endif
_observers = observers;
_observer = observer;
_additionalAction = additionalAction;
}
/// <summary>
/// Removes the observer from the collection of observers.
/// </summary>
public void Dispose()
{
_observers.Remove(_observer);
_additionalAction?.Invoke();
}
}

View File

@ -0,0 +1,97 @@
namespace X10D.Reactive;
/// <summary>
/// Provides extension methods for <see cref="Progress{T}" />.
/// </summary>
public static class ProgressExtensions
{
/// <summary>
/// Wraps the <see cref="Progress{T}.ProgressChanged" /> event of the current <see cref="Progress{T}" /> in an
/// <see cref="IObservable{T}" /> object.
/// </summary>
/// <param name="progress">The progress whose <see cref="Progress{T}.ProgressChanged" /> event to wrap.</param>
/// <typeparam name="T">The type of progress update value.</typeparam>
/// <returns>
/// An <see cref="IObservable{T}" /> object that wraps the <see cref="Progress{T}.ProgressChanged" /> event of the current
/// <see cref="Progress{T}" />.
/// </returns>
/// <exception cref="ArgumentNullException"><paramref name="progress" /> is <see langword="null" />.</exception>
public static IObservable<T> OnProgressChanged<T>(this Progress<T> progress)
{
#if NET6_0_OR_GREATER
ArgumentNullException.ThrowIfNull(progress);
#else
if (progress is null)
{
throw new ArgumentNullException(nameof(progress));
}
#endif
var progressObservable = new ProgressObservable<T>();
void ProgressChangedHandler(object? sender, T args)
{
IObserver<T>[] observers = progressObservable.Observers;
for (var index = 0; index < observers.Length; index++)
{
observers[index].OnNext(args);
}
}
progress.ProgressChanged += ProgressChangedHandler;
progressObservable.OnDispose = () => progress.ProgressChanged -= ProgressChangedHandler;
return progressObservable;
}
/// <summary>
/// Wraps the <see cref="Progress{T}.ProgressChanged" /> event of the current <see cref="Progress{T}" /> in an
/// <see cref="IObservable{T}" /> object, and completes the observable when the progress reaches the specified value.
/// </summary>
/// <param name="progress">The progress whose <see cref="Progress{T}.ProgressChanged" /> event to wrap.</param>
/// <param name="completeValue">The value that indicates completion.</param>
/// <typeparam name="T">The type of progress update value.</typeparam>
/// <returns>
/// An <see cref="IObservable{T}" /> object that wraps the <see cref="Progress{T}.ProgressChanged" /> event of the current
/// <see cref="Progress{T}" />.
/// </returns>
/// <exception cref="ArgumentNullException"><paramref name="progress" /> is <see langword="null" />.</exception>
public static IObservable<T> OnProgressChanged<T>(this Progress<T> progress, T completeValue)
{
#if NET6_0_OR_GREATER
ArgumentNullException.ThrowIfNull(progress);
#else
if (progress is null)
{
throw new ArgumentNullException(nameof(progress));
}
#endif
var progressObservable = new ProgressObservable<T>();
var comparer = EqualityComparer<T>.Default;
void ProgressChangedHandler(object? sender, T args)
{
IObserver<T>[] observers = progressObservable.Observers;
for (var index = 0; index < observers.Length; index++)
{
observers[index].OnNext(args);
}
if (comparer.Equals(args, completeValue))
{
for (var index = 0; index < observers.Length; index++)
{
observers[index].OnCompleted();
}
}
}
progress.ProgressChanged += ProgressChangedHandler;
progressObservable.OnDispose = () => progress.ProgressChanged -= ProgressChangedHandler;
return progressObservable;
}
}

View File

@ -0,0 +1,40 @@
namespace X10D.Reactive;
/// <summary>
/// Represents a concrete implementation of <see cref="IObservable{T}" /> that tracks progress of a <see cref="Progress{T}"/>.
/// </summary>
internal sealed class ProgressObservable<T> : IObservable<T>
{
private readonly HashSet<IObserver<T>> _observers = new();
/// <summary>
/// Gets the observers.
/// </summary>
/// <value>The observers.</value>
public IObserver<T>[] Observers
{
get => _observers.ToArray();
}
internal Action? OnDispose { get; set; }
/// <summary>
/// Subscribes the specified observer to the progress tracker.
/// </summary>
/// <param name="observer">The observer.</param>
/// <returns>An object which can be disposed to unsubscribe from progress tracking.</returns>
public IDisposable Subscribe(IObserver<T> observer)
{
#if NET6_0_OR_GREATER
ArgumentNullException.ThrowIfNull(observer);
#else
if (observer is null)
{
throw new ArgumentNullException(nameof(observer));
}
#endif
_observers.Add(observer);
return new ObservableDisposer<T>(_observers, observer, OnDispose);
}
}