Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions src/CommandStation.Abstractions/IProgrammingControl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,24 @@ public interface IProgrammingControl

/// <summary>
/// Reads a CV on the programming track, retrying while the decoder does not acknowledge, and
/// returns the value. Throws <see cref="CvOperationTimeoutException"/> if no result arrives within
/// <paramref name="timeout"/>, or <see cref="CvShortCircuitException"/> on a short circuit.
/// returns the value. A missing acknowledgement that never clears (for example an absent or
/// unreadable decoder) is reported as a timeout rather than a distinct error. Do not call the
/// fire-and-forget CV methods concurrently with this one on the same station.
/// </summary>
/// <exception cref="CvOperationTimeoutException">No result arrived within <paramref name="timeout"/>.</exception>
/// <exception cref="CvShortCircuitException">The command station reported a short circuit.</exception>
/// <exception cref="ArgumentOutOfRangeException"><paramref name="timeout"/> is not a positive, in-range duration.</exception>
Task<byte> ReadCvAsync(ushort cvAddress, TimeSpan timeout);

/// <summary>
/// Writes a CV on the programming track, retrying while the decoder does not acknowledge, and
/// completes once the command station confirms the write. Throws
/// <see cref="CvOperationTimeoutException"/> if not confirmed within <paramref name="timeout"/>, or
/// <see cref="CvShortCircuitException"/> on a short circuit.
/// completes once the command station acknowledges the write (<c>LAN_X_CV_RESULT</c>). A missing
/// acknowledgement that never clears is reported as a timeout. Do not call the fire-and-forget CV
/// methods concurrently with this one on the same station.
/// </summary>
/// <exception cref="CvOperationTimeoutException">The write was not acknowledged within <paramref name="timeout"/>.</exception>
/// <exception cref="CvShortCircuitException">The command station reported a short circuit.</exception>
/// <exception cref="ArgumentOutOfRangeException"><paramref name="timeout"/> is not a positive, in-range duration.</exception>
Task WriteCvAsync(ushort cvAddress, byte value, TimeSpan timeout);

event EventHandler<CvValue>? CvReadCompleted;
Expand Down
43 changes: 43 additions & 0 deletions src/Z21.Client.UnitTest/Core/SafeCvProgrammingTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -211,5 +211,48 @@ public void WritePomCvAsync_NoReplyThrowsTimeout()

Assert.ThrowsAsync<CvOperationTimeoutException>(async () => await task);
}

[Test]
public void ReadCvAsync_NonPositiveTimeoutThrowsArgumentOutOfRange() =>
Assert.ThrowsAsync<ArgumentOutOfRangeException>(async () => await _station.ReadCvAsync(5, TimeSpan.Zero));

[Test]
public void WriteCvAsync_NegativeTimeoutThrowsArgumentOutOfRange() =>
Assert.ThrowsAsync<ArgumentOutOfRangeException>(async () => await _station.WriteCvAsync(5, 1, TimeSpan.FromSeconds(-1)));

[Test]
public async Task FireAndForgetCv_WhileSafeOperationActive_Throws()
{
Task<byte> safe = _station.ReadCvAsync(5, TimeSpan.FromSeconds(5));
await WaitForSentAsync(1); // safe op has acquired the CV lock and is awaiting a result

Assert.Multiple(() =>
{
Assert.Throws<InvalidOperationException>(() => _station.ReadCvAsync(9));
Assert.Throws<InvalidOperationException>(() => _station.WriteCvAsync(9, 1));
});

RaiseResult(5, 7); // let the safe op finish so the fixture tears down cleanly
Assert.That(await safe, Is.EqualTo(7));
}

[Test]
public async Task Dispose_DuringInFlightOperation_ThrowsObjectDisposed()
{
Task<byte> task = _station.ReadCvAsync(5, TimeSpan.FromSeconds(5));
await WaitForSentAsync(1);

_station.Dispose();

Assert.ThrowsAsync<ObjectDisposedException>(async () => await task);
}

[Test]
public void Operation_AfterDispose_ThrowsObjectDisposed()
{
_station.Dispose();

Assert.ThrowsAsync<ObjectDisposedException>(async () => await _station.ReadCvAsync(5, TimeSpan.FromSeconds(2)));
}
}
}
16 changes: 11 additions & 5 deletions src/Z21.Client/Core/IZ21CommandStation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,24 @@ public interface IZ21CommandStation : ICommandStation, ILocoControl, IAccessoryC

/// <summary>
/// Reads a CV of a locomotive decoder on the main track (POM), retrying while the decoder does not
/// acknowledge, and returns the value. Requires RailCom; without it the read times out. Throws
/// <see cref="CvOperationTimeoutException"/> on timeout or <see cref="CvShortCircuitException"/> on
/// a short circuit.
/// acknowledge, and returns the value. Requires RailCom; without it (or for an absent decoder) the
/// read can only ever time out. The result is correlated by CV address only — the protocol's POM
/// result carries no loco address — so do not run other CV operations on this station concurrently.
/// </summary>
/// <exception cref="CvOperationTimeoutException">No result arrived within <paramref name="timeout"/>.</exception>
/// <exception cref="CvShortCircuitException">The command station reported a short circuit.</exception>
/// <exception cref="ArgumentOutOfRangeException"><paramref name="timeout"/> is not a positive, in-range duration.</exception>
Task<byte> ReadPomCvAsync(ushort locoAddress, ushort cvAddress, TimeSpan timeout);

/// <summary>
/// Writes a CV of a locomotive decoder on the main track (POM). Because a POM write returns no
/// acknowledgement, this verifies by reading the CV back and retrying until the read-back matches
/// the written value (so it requires RailCom). Throws <see cref="CvOperationTimeoutException"/> if
/// it cannot be confirmed within <paramref name="timeout"/>.
/// the written value (so it requires RailCom). A decoder that never reads back the written value is
/// reported as a timeout. Do not run other CV operations on this station concurrently.
/// </summary>
/// <exception cref="CvOperationTimeoutException">The write could not be confirmed within <paramref name="timeout"/>.</exception>
/// <exception cref="CvShortCircuitException">The command station reported a short circuit.</exception>
/// <exception cref="ArgumentOutOfRangeException"><paramref name="timeout"/> is not a positive, in-range duration.</exception>
Task WritePomCvAsync(ushort locoAddress, ushort cvAddress, byte value, TimeSpan timeout);
}
}
169 changes: 104 additions & 65 deletions src/Z21.Client/Core/Z21CommandStation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ public class Z21CommandStation : IZ21CommandStation, IProgrammingControl, IFeedb
private readonly Z21Options _options;
private readonly DelayedAction _delayedKeepAliveAction;
private readonly SemaphoreSlim _cvLock = new(1, 1);
private readonly CancellationTokenSource _disposeCts = new();
private volatile bool _disposed;
private readonly ILogger<Z21CommandStation>? _logger;
private bool _disposed;

/// <summary>
/// IPv4 safe MTU for payload according to specification.
Expand Down Expand Up @@ -179,101 +180,90 @@ public Task SetExtAccessoryAsync(ushort accessoryAddress, byte payload) =>

public Task RequestStatusAsync() => SendCommandsAsync(Commands.Create<GetStatusCommand>());

public Task ReadCvAsync(ushort cvAddress) => SendCommandsAsync(Commands.Create<CvReadCommand>(cvAddress));

public Task WriteCvAsync(ushort cvAddress, byte value) => SendCommandsAsync(Commands.Create<CvWriteCommand>(cvAddress, value));

public async Task<byte> ReadCvAsync(ushort cvAddress, TimeSpan timeout)
public Task ReadCvAsync(ushort cvAddress)
{
using CancellationTokenSource deadline = new(timeout);
await AcquireCvLockAsync(cvAddress, timeout, deadline.Token);
try
{
return await AwaitResultLoopAsync(cvAddress, () => SendCommandsAsync(Commands.Create<CvReadCommand>(cvAddress)), deadline.Token, timeout);
}
finally
{
_cvLock.Release();
}
ThrowIfSafeCvOperationActive();
return SendCommandsAsync(Commands.Create<CvReadCommand>(cvAddress));
}

public async Task WriteCvAsync(ushort cvAddress, byte value, TimeSpan timeout)
public Task WriteCvAsync(ushort cvAddress, byte value)
{
using CancellationTokenSource deadline = new(timeout);
await AcquireCvLockAsync(cvAddress, timeout, deadline.Token);
try
{
await AwaitResultLoopAsync(cvAddress, () => SendCommandsAsync(Commands.Create<CvWriteCommand>(cvAddress, value)), deadline.Token, timeout);
}
finally
{
_cvLock.Release();
}
ThrowIfSafeCvOperationActive();
return SendCommandsAsync(Commands.Create<CvWriteCommand>(cvAddress, value));
}

public async Task<byte> ReadPomCvAsync(ushort locoAddress, ushort cvAddress, TimeSpan timeout)
public Task<byte> ReadCvAsync(ushort cvAddress, TimeSpan timeout) =>
RunUnderCvLockAsync(cvAddress, timeout,
token => AwaitResultLoopAsync(cvAddress, () => SendCommandsAsync(Commands.Create<CvReadCommand>(cvAddress)), token, timeout));

public async Task WriteCvAsync(ushort cvAddress, byte value, TimeSpan timeout) =>
await RunUnderCvLockAsync(cvAddress, timeout,
token => AwaitResultLoopAsync(cvAddress, () => SendCommandsAsync(Commands.Create<CvWriteCommand>(cvAddress, value)), token, timeout));

public Task<byte> ReadPomCvAsync(ushort locoAddress, ushort cvAddress, TimeSpan timeout) =>
RunUnderCvLockAsync(cvAddress, timeout,
token => AwaitResultLoopAsync(cvAddress, () => SendCommandsAsync(Commands.Create<CvPomReadByteCommand>(locoAddress, cvAddress)), token, timeout));

public async Task WritePomCvAsync(ushort locoAddress, ushort cvAddress, byte value, TimeSpan timeout) =>
await RunUnderCvLockAsync(cvAddress, timeout,
token => WritePomCvCoreAsync(locoAddress, cvAddress, value, token, timeout));

private async Task<byte> WritePomCvCoreAsync(ushort locoAddress, ushort cvAddress, byte value, CancellationToken token, TimeSpan timeout)
{
using CancellationTokenSource deadline = new(timeout);
await AcquireCvLockAsync(cvAddress, timeout, deadline.Token);
try
{
return await AwaitResultLoopAsync(cvAddress, () => SendCommandsAsync(Commands.Create<CvPomReadByteCommand>(locoAddress, cvAddress)), deadline.Token, timeout);
}
finally
while (true)
{
_cvLock.Release();
await SendCommandsAsync(Commands.Create<CvPomWriteByteCommand>(locoAddress, cvAddress, value)).WaitAsync(token);
byte readBack = await AwaitResultLoopAsync(cvAddress, () => SendCommandsAsync(Commands.Create<CvPomReadByteCommand>(locoAddress, cvAddress)), token, timeout);
if (readBack == value)
return value;

await DelayBeforeRetryAsync(cvAddress, token, timeout); // read-back mismatch -> wait, then re-write
}
}

public async Task WritePomCvAsync(ushort locoAddress, ushort cvAddress, byte value, TimeSpan timeout)
private async Task<byte> RunUnderCvLockAsync(ushort cvAddress, TimeSpan timeout, Func<CancellationToken, Task<byte>> operation)
{
ObjectDisposedException.ThrowIf(_disposed, this);
ValidateTimeout(timeout);

using CancellationTokenSource deadline = new(timeout);
await AcquireCvLockAsync(cvAddress, timeout, deadline.Token);
using CancellationTokenSource linked = CancellationTokenSource.CreateLinkedTokenSource(deadline.Token, _disposeCts.Token);

await AcquireCvLockAsync(cvAddress, timeout, linked.Token);
try
{
while (true)
{
await SendCommandsAsync(Commands.Create<CvPomWriteByteCommand>(locoAddress, cvAddress, value));
byte readBack = await AwaitResultLoopAsync(cvAddress, () => SendCommandsAsync(Commands.Create<CvPomReadByteCommand>(locoAddress, cvAddress)), deadline.Token, timeout);
if (readBack == value)
return;
if (deadline.IsCancellationRequested)
throw new CvOperationTimeoutException(cvAddress, timeout);
}
return await operation(linked.Token);
}
finally
{
_cvLock.Release();
ReleaseCvLock();
}
}

private async Task AcquireCvLockAsync(ushort cvAddress, TimeSpan timeout, CancellationToken deadline)
private async Task AcquireCvLockAsync(ushort cvAddress, TimeSpan timeout, CancellationToken token)
{
try
{
await _cvLock.WaitAsync(deadline);
await _cvLock.WaitAsync(token);
}
catch (OperationCanceledException)
catch (OperationCanceledException) when (token.IsCancellationRequested)
{
throw new CvOperationTimeoutException(cvAddress, timeout);
throw MapCancellation(cvAddress, timeout);
}
}

private async Task<byte> AwaitResultLoopAsync(ushort cvAddress, Func<Task> send, CancellationToken deadline, TimeSpan timeout)
private async Task<byte> AwaitResultLoopAsync(ushort cvAddress, Func<Task> send, CancellationToken token, TimeSpan timeout)
{
while (true)
{
if (deadline.IsCancellationRequested)
throw new CvOperationTimeoutException(cvAddress, timeout);

CvAttempt attempt;
try
{
attempt = await AwaitNextCvAsync(cvAddress, send, deadline);
attempt = await AwaitNextCvAsync(cvAddress, send, token);
}
catch (OperationCanceledException)
catch (OperationCanceledException) when (token.IsCancellationRequested)
{
throw new CvOperationTimeoutException(cvAddress, timeout);
throw MapCancellation(cvAddress, timeout);
}

switch (attempt.Kind)
Expand All @@ -283,12 +273,13 @@ private async Task<byte> AwaitResultLoopAsync(ushort cvAddress, Func<Task> send,
case CvAttemptKind.ShortCircuit:
throw new CvShortCircuitException(cvAddress);
default:
break; // missing acknowledgement -> retry until the deadline
await DelayBeforeRetryAsync(cvAddress, token, timeout); // missing acknowledgement -> back off, then retry until the deadline
break;
}
}
}

private async Task<CvAttempt> AwaitNextCvAsync(ushort cvAddress, Func<Task> send, CancellationToken deadline)
private async Task<CvAttempt> AwaitNextCvAsync(ushort cvAddress, Func<Task> send, CancellationToken token)
{
TaskCompletionSource<CvAttempt> completion = new(TaskCreationOptions.RunContinuationsAsynchronously);

Expand All @@ -305,9 +296,10 @@ void OnFailed(object? sender, CvProgrammingError error) =>
CvProgrammingFailed += OnFailed;
try
{
await send();
using (deadline.Register(() => completion.TrySetCanceled(deadline)))
return await completion.Task;
// WaitAsync bounds both the send and the wait by the deadline, so a stalled transport cannot
// outlive the caller's timeout.
await send().WaitAsync(token);
return await completion.Task.WaitAsync(token);
}
finally
{
Expand All @@ -316,6 +308,51 @@ void OnFailed(object? sender, CvProgrammingError error) =>
}
}

private async Task DelayBeforeRetryAsync(ushort cvAddress, CancellationToken token, TimeSpan timeout)
{
try
{
await Task.Delay(_options.CvRetryDelay, token);
}
catch (OperationCanceledException) when (token.IsCancellationRequested)
{
throw MapCancellation(cvAddress, timeout);
}
}

private void ThrowIfSafeCvOperationActive()
{
if (_cvLock.CurrentCount == 0)
throw new InvalidOperationException(
"A safe (timeout-bounded) CV operation is in progress. Fire-and-forget CV commands cannot run "
+ "concurrently with a safe CV operation, because CV NACKs carry no address and would be misattributed.");
}

private void ValidateTimeout(TimeSpan timeout)
{
if (timeout <= TimeSpan.Zero)
throw new ArgumentOutOfRangeException(nameof(timeout), timeout, "The CV operation timeout must be greater than zero.");
if (timeout.TotalMilliseconds > int.MaxValue)
throw new ArgumentOutOfRangeException(nameof(timeout), timeout, $"The CV operation timeout must not exceed {int.MaxValue} milliseconds.");
}

private System.Exception MapCancellation(ushort cvAddress, TimeSpan timeout) =>
_disposeCts.IsCancellationRequested
? new ObjectDisposedException(GetType().FullName)
: new CvOperationTimeoutException(cvAddress, timeout);

private void ReleaseCvLock()
{
try
{
_cvLock.Release();
}
catch (ObjectDisposedException exception)
{
_logger?.LogDebug(exception, "CV lock released after the station was disposed.");
}
}

private enum CvAttemptKind
{
Result,
Expand Down Expand Up @@ -365,9 +402,11 @@ public void Dispose()
{
if (_disposed)
return;

_disposed = true;

_disposeCts.Cancel(); // unblock any in-flight safe CV operation; it surfaces as ObjectDisposedException
_delayedKeepAliveAction.Dispose();
_disposeCts.Dispose();
_cvLock.Dispose();
GC.SuppressFinalize(this);
}
Expand Down
8 changes: 8 additions & 0 deletions src/Z21.Client/Core/Z21Options.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,13 @@ public class Z21Options
/// Interval after the last command before an automatic keep-alive request is sent.
/// </summary>
public TimeSpan KeepAliveInterval { get; set; } = TimeSpan.FromSeconds(45);

/// <summary>
/// Delay between retries of a safe (timeout-bounded) CV operation after the decoder fails to
/// acknowledge (<c>LAN_X_CV_NACK</c>). A short, non-zero delay avoids hammering the command station
/// and repeatedly re-entering programming mode while a slow byte-wise read is in progress. The
/// caller-supplied timeout still bounds the overall operation.
/// </summary>
public TimeSpan CvRetryDelay { get; set; } = TimeSpan.FromMilliseconds(50);
}
}
2 changes: 1 addition & 1 deletion version.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"$schema": "https://raw.githubusercontent.com/dotnet/Nerdbank.GitVersioning/main/src/NerdBank.GitVersioning/version.schema.json",
"version": "7.0",
"version": "7.1",
"publicReleaseRefs": [
"^refs/heads/main$"
]
Expand Down
Loading