Allow for cancellation of async operations

This commit is contained in:
Oliver Booth 2022-05-18 17:18:33 +01:00
parent 1b0d165f10
commit 4d317298b8
No known key found for this signature in database
GPG Key ID: 32A00B35503AF634
4 changed files with 24 additions and 18 deletions

View File

@ -20,10 +20,10 @@ await Task.Delay(-1);
internal sealed class PingPacketHandler : PacketHandler<PingPacket> internal sealed class PingPacketHandler : PacketHandler<PingPacket>
{ {
public override async Task HandleAsync(BaseClientNode recipient, PingPacket packet) public override async Task HandleAsync(BaseClientNode recipient, PingPacket packet, CancellationToken cancellationToken = default)
{ {
Console.WriteLine($"Client {recipient.SessionId} sent ping with payload {BitConverter.ToString(packet.Payload)}"); Console.WriteLine($"Client {recipient.SessionId} sent ping with payload {BitConverter.ToString(packet.Payload)}");
var pong = new PongPacket(packet.Payload); var pong = new PongPacket(packet.Payload);
await recipient.SendPacketAsync(pong); await recipient.SendPacketAsync(pong, cancellationToken);
} }
} }

View File

@ -53,8 +53,9 @@ public abstract class BaseClientNode : Node
/// <summary> /// <summary>
/// Reads the next packet from the client's stream. /// Reads the next packet from the client's stream.
/// </summary> /// </summary>
/// <param name="cancellationToken">A cancellation token that can be used to cancel the asynchronous operation.</param>
/// <returns>The next packet, or <see langword="null" /> if no valid packet was read.</returns> /// <returns>The next packet, or <see langword="null" /> if no valid packet was read.</returns>
public async Task<Packet?> ReadNextPacketAsync() public async Task<Packet?> ReadNextPacketAsync(CancellationToken cancellationToken = default)
{ {
await using var networkStream = new NetworkStream(BaseSocket); await using var networkStream = new NetworkStream(BaseSocket);
using var networkReader = new ProtocolReader(networkStream); using var networkReader = new ProtocolReader(networkStream);
@ -97,7 +98,7 @@ public abstract class BaseClientNode : Node
await targetStream.DisposeAsync(); await targetStream.DisposeAsync();
if (RegisteredPacketHandlers.TryGetValue(packetType, out IReadOnlyCollection<PacketHandler>? handlers)) if (RegisteredPacketHandlers.TryGetValue(packetType, out IReadOnlyCollection<PacketHandler>? handlers))
await Task.WhenAll(handlers.Select(h => h.HandleAsync(this, packet))); await Task.WhenAll(handlers.Select(h => h.HandleAsync(this, packet, cancellationToken)));
return packet; return packet;
} }
@ -106,8 +107,9 @@ public abstract class BaseClientNode : Node
/// Sends a packet to the remote endpoint. /// Sends a packet to the remote endpoint.
/// </summary> /// </summary>
/// <param name="packet">The packet to send.</param> /// <param name="packet">The packet to send.</param>
/// <param name="cancellationToken">A cancellation token that can be used to cancel the asynchronous operation.</param>
/// <typeparam name="TPacket">The type of the packet.</typeparam> /// <typeparam name="TPacket">The type of the packet.</typeparam>
public async Task SendPacketAsync<TPacket>(TPacket packet) public async Task SendPacketAsync<TPacket>(TPacket packet, CancellationToken cancellationToken = default)
where TPacket : Packet where TPacket : Packet
{ {
var buffer = new MemoryStream(); var buffer = new MemoryStream();
@ -130,13 +132,13 @@ public abstract class BaseClientNode : Node
break; break;
} }
await targetStream.FlushAsync(); await targetStream.FlushAsync(cancellationToken);
buffer.Position = 0; buffer.Position = 0;
await using var networkStream = new NetworkStream(BaseSocket); await using var networkStream = new NetworkStream(BaseSocket);
await using var networkWriter = new ProtocolWriter(networkStream); await using var networkWriter = new ProtocolWriter(networkStream);
networkWriter.Write((int) buffer.Length); networkWriter.Write((int) buffer.Length);
await buffer.CopyToAsync(networkStream); await buffer.CopyToAsync(networkStream, cancellationToken);
await networkStream.FlushAsync(); await networkStream.FlushAsync(cancellationToken);
} }
} }

View File

@ -11,7 +11,8 @@ public abstract class PacketHandler
/// </summary> /// </summary>
/// <param name="recipient">The recipient of the packet.</param> /// <param name="recipient">The recipient of the packet.</param>
/// <param name="packet">The packet to handle.</param> /// <param name="packet">The packet to handle.</param>
public abstract Task HandleAsync(BaseClientNode recipient, Packet packet); /// <param name="cancellationToken">A cancellation token that can be used to cancel the asynchronous operation.</param>
public abstract Task HandleAsync(BaseClientNode recipient, Packet packet, CancellationToken cancellationToken = default);
} }
/// <summary> /// <summary>
@ -26,9 +27,9 @@ public abstract class PacketHandler<T> : PacketHandler
public static readonly PacketHandler<T> Empty = new NullPacketHandler<T>(); public static readonly PacketHandler<T> Empty = new NullPacketHandler<T>();
/// <inheritdoc /> /// <inheritdoc />
public override Task HandleAsync(BaseClientNode recipient, Packet packet) public override Task HandleAsync(BaseClientNode recipient, Packet packet, CancellationToken cancellationToken = default)
{ {
if (packet is T actual) return HandleAsync(recipient, actual); if (packet is T actual) return HandleAsync(recipient, actual, cancellationToken);
return Task.CompletedTask; return Task.CompletedTask;
} }
@ -37,7 +38,8 @@ public abstract class PacketHandler<T> : PacketHandler
/// </summary> /// </summary>
/// <param name="recipient">The recipient of the packet.</param> /// <param name="recipient">The recipient of the packet.</param>
/// <param name="packet">The packet to handle.</param> /// <param name="packet">The packet to handle.</param>
public abstract Task HandleAsync(BaseClientNode recipient, T packet); /// <param name="cancellationToken">A cancellation token that can be used to cancel the asynchronous operation.</param>
public abstract Task HandleAsync(BaseClientNode recipient, T packet, CancellationToken cancellationToken = default);
} }
/// <summary> /// <summary>
@ -48,7 +50,7 @@ internal sealed class NullPacketHandler<T> : PacketHandler<T>
where T : Packet where T : Packet
{ {
/// <inheritdoc /> /// <inheritdoc />
public override Task HandleAsync(BaseClientNode recipient, T packet) public override Task HandleAsync(BaseClientNode recipient, T packet, CancellationToken cancellationToken = default)
{ {
return Task.CompletedTask; return Task.CompletedTask;
} }

View File

@ -22,6 +22,7 @@ public sealed class ProtocolClient : BaseClientNode
/// </summary> /// </summary>
/// <param name="host">The remote host to which this client should connect.</param> /// <param name="host">The remote host to which this client should connect.</param>
/// <param name="port">The remote port to which this client should connect.</param> /// <param name="port">The remote port to which this client should connect.</param>
/// <param name="cancellationToken">A cancellation token that can be used to cancel the asynchronous operation.</param>
/// <exception cref="ArgumentNullException"><paramref name="host" /> is <see langword="null" />.</exception> /// <exception cref="ArgumentNullException"><paramref name="host" /> is <see langword="null" />.</exception>
/// <exception cref="ArgumentException"><paramref name="host" /> contains an empty string.</exception> /// <exception cref="ArgumentException"><paramref name="host" /> contains an empty string.</exception>
/// <exception cref="ArgumentOutOfRangeException"> /// <exception cref="ArgumentOutOfRangeException">
@ -29,9 +30,9 @@ public sealed class ProtocolClient : BaseClientNode
/// than <see cref="IPEndPoint.MaxPort" />. /// than <see cref="IPEndPoint.MaxPort" />.
/// </exception> /// </exception>
/// <exception cref="SocketException">An error occurred when attempting to access the socket.</exception> /// <exception cref="SocketException">An error occurred when attempting to access the socket.</exception>
public Task ConnectAsync(string host, int port) public Task ConnectAsync(string host, int port, CancellationToken cancellationToken = default)
{ {
return ConnectAsync(new DnsEndPoint(host, port)); return ConnectAsync(new DnsEndPoint(host, port), cancellationToken);
} }
/// <summary> /// <summary>
@ -39,15 +40,16 @@ public sealed class ProtocolClient : BaseClientNode
/// </summary> /// </summary>
/// <param name="address">The remote <see cref="IPAddress" /> to which this client should connect.</param> /// <param name="address">The remote <see cref="IPAddress" /> to which this client should connect.</param>
/// <param name="port">The remote port to which this client should connect.</param> /// <param name="port">The remote port to which this client should connect.</param>
/// <param name="cancellationToken">A cancellation token that can be used to cancel the asynchronous operation.</param>
/// <exception cref="ArgumentOutOfRangeException"> /// <exception cref="ArgumentOutOfRangeException">
/// <paramref name="port" /> is less than <see cref="IPEndPoint.MinPort" />. -or - <paramref name="port" /> is greater /// <paramref name="port" /> is less than <see cref="IPEndPoint.MinPort" />. -or - <paramref name="port" /> is greater
/// than <see cref="IPEndPoint.MaxPort" />. -or- <paramref name="address" /> is less than 0 or greater than /// than <see cref="IPEndPoint.MaxPort" />. -or- <paramref name="address" /> is less than 0 or greater than
/// 0x00000000FFFFFFFF. /// 0x00000000FFFFFFFF.
/// </exception> /// </exception>
/// <exception cref="SocketException">An error occurred when attempting to access the socket.</exception> /// <exception cref="SocketException">An error occurred when attempting to access the socket.</exception>
public Task ConnectAsync(IPAddress address, int port) public Task ConnectAsync(IPAddress address, int port, CancellationToken cancellationToken = default)
{ {
return ConnectAsync(new IPEndPoint(address, port)); return ConnectAsync(new IPEndPoint(address, port), cancellationToken);
} }
/// <summary> /// <summary>