From c0138757d906dbf2c3520487bca6008ba09c977c Mon Sep 17 00:00:00 2001 From: Chris Hagglund Date: Thu, 23 Apr 2026 13:07:16 -0600 Subject: [PATCH 1/4] metrics standardization --- Conductor/Client/ApiClient.cs | 73 ++++- .../DependencyInjectionExtensions.cs | 7 +- .../Client/Telemetry/MetricsCollector.cs | 222 ++++++++++++--- .../Client/Worker/WorkflowTaskExecutor.cs | 16 +- Conductor/Executor/WorkflowExecutor.cs | 50 +++- Conductor/conductor-csharp.csproj | 6 +- Harness/Harness.csproj | 4 - Harness/Program.cs | 50 +--- Harness/WorkflowGovernor.cs | 13 +- METRICS.md | 264 +++++++++++------- Tests/Telemetry/MetricsCollectorTests.cs | 222 ++++++++++++--- 11 files changed, 678 insertions(+), 249 deletions(-) diff --git a/Conductor/Client/ApiClient.cs b/Conductor/Client/ApiClient.cs index 72210312..ce875402 100644 --- a/Conductor/Client/ApiClient.cs +++ b/Conductor/Client/ApiClient.cs @@ -10,12 +10,14 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ +using Conductor.Client.Telemetry; using Newtonsoft.Json; using Newtonsoft.Json.Linq; using RestSharp; using System; using System.Collections; using System.Collections.Generic; +using System.Diagnostics; using System.IO; using System.Linq; using System.Text; @@ -29,6 +31,12 @@ namespace Conductor.Client /// public partial class ApiClient { + /// + /// Optional metrics collector for recording http_api_client_request_seconds. + /// Set once via DI or startup; safe to leave null. + /// + public static MetricsCollector Metrics { get; set; } + public JsonSerializerSettings serializerSettings = new JsonSerializerSettings { ConstructorHandling = ConstructorHandling.AllowNonPublicDefaultConstructor @@ -179,11 +187,30 @@ public Object CallApi( Dictionary fileParams, Dictionary pathParams, String contentType, Configuration configuration) { - int retryCount = 0; - RestResponse response = RetryRestClientCallApi(path, method, queryParams, postBody, headerParams, - formParams, fileParams, pathParams, contentType, configuration, ref retryCount); - - return (Object)response; + var sw = Stopwatch.StartNew(); + string statusCode = "0"; + try + { + int retryCount = 0; + RestResponse response = RetryRestClientCallApi(path, method, queryParams, postBody, headerParams, + formParams, fileParams, pathParams, contentType, configuration, ref retryCount); + statusCode = ((int)response.StatusCode).ToString(); + return (Object)response; + } + catch + { + statusCode = "0"; + throw; + } + finally + { + sw.Stop(); + var resolvedUri = path; + foreach (var param in pathParams) + resolvedUri = resolvedUri.Replace("{" + param.Key + "}", param.Value); + var basePath = RestClient.Options.BaseUrl?.AbsolutePath?.TrimEnd('/') ?? ""; + Metrics?.RecordHttpApiClientRequest(method.ToString().ToUpperInvariant(), basePath + resolvedUri, statusCode, sw.Elapsed.TotalSeconds); + } } private RestResponse RetryRestClientCallApi(String path, Method method, List> queryParams, Object postBody, @@ -226,15 +253,35 @@ public async Task CallApiAsync( Dictionary fileParams, Dictionary pathParams, String contentType) { - var request = PrepareRequest( - path, method, queryParams, postBody, headerParams, formParams, fileParams, - pathParams, contentType); + var sw = Stopwatch.StartNew(); + string statusCode = "0"; + try + { + var request = PrepareRequest( + path, method, queryParams, postBody, headerParams, formParams, fileParams, + pathParams, contentType); - InterceptRequest(request); - var response = await RestClient.ExecuteAsync(request, method); - InterceptResponse(request, response); - FormatHeaders(response); - return (object)response; + InterceptRequest(request); + var response = await RestClient.ExecuteAsync(request, method); + InterceptResponse(request, response); + FormatHeaders(response); + statusCode = ((int)response.StatusCode).ToString(); + return (object)response; + } + catch + { + statusCode = "0"; + throw; + } + finally + { + sw.Stop(); + var resolvedUri = path; + foreach (var param in pathParams) + resolvedUri = resolvedUri.Replace("{" + param.Key + "}", param.Value); + var basePath = RestClient.Options.BaseUrl?.AbsolutePath?.TrimEnd('/') ?? ""; + Metrics?.RecordHttpApiClientRequest(method.ToString().ToUpperInvariant(), basePath + resolvedUri, statusCode, sw.Elapsed.TotalSeconds); + } } /// diff --git a/Conductor/Client/Extensions/DependencyInjectionExtensions.cs b/Conductor/Client/Extensions/DependencyInjectionExtensions.cs index 7dd719d3..f883ce8a 100644 --- a/Conductor/Client/Extensions/DependencyInjectionExtensions.cs +++ b/Conductor/Client/Extensions/DependencyInjectionExtensions.cs @@ -39,7 +39,12 @@ public static IServiceCollection AddConductorWorker(this IServiceCollection serv } services.AddSingleton(configuration); services.AddSingleton(); - services.AddSingleton(); + services.AddSingleton(sp => + { + var collector = new MetricsCollector(); + ApiClient.Metrics = collector; + return collector; + }); services.AddTransient(); return services; } diff --git a/Conductor/Client/Telemetry/MetricsCollector.cs b/Conductor/Client/Telemetry/MetricsCollector.cs index a7ed40fb..10f8920f 100644 --- a/Conductor/Client/Telemetry/MetricsCollector.cs +++ b/Conductor/Client/Telemetry/MetricsCollector.cs @@ -10,9 +10,12 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ +using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics.Metrics; +using OpenTelemetry; +using OpenTelemetry.Metrics; namespace Conductor.Client.Telemetry { @@ -20,18 +23,28 @@ namespace Conductor.Client.Telemetry /// Instruments the Conductor worker poll-execute-update loop with /// counters, histograms, and gauges. /// - /// The is named Conductor.Client. To expose these - /// metrics, attach a listener such as the OpenTelemetry Prometheus exporter - /// (see METRICS.md for examples). + /// The is named Conductor.Client. Call + /// to expose a Prometheus-compatible + /// /metrics endpoint with the canonical bucket configuration, + /// or attach your own using + /// . /// - public sealed class MetricsCollector + public sealed class MetricsCollector : IDisposable { public const string MeterName = "Conductor.Client"; + /// + /// Canonical time histogram buckets shared across all Conductor SDKs. + /// + public static readonly double[] CanonicalTimeBuckets = + { 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10 }; + private readonly Meter _meter; + private MeterProvider _meterProvider; // --- counters --- private readonly Counter _taskPollTotal; + private readonly Counter _taskExecutionStartedTotal; private readonly Counter _taskPollErrorTotal; private readonly Counter _taskExecuteErrorTotal; private readonly Counter _taskUpdateErrorTotal; @@ -40,25 +53,34 @@ public sealed class MetricsCollector private readonly Counter _threadUncaughtExceptionsTotal; private readonly Counter _workflowStartErrorTotal; private readonly Counter _externalPayloadUsedTotal; + private readonly Counter _taskAckErrorTotal; + private readonly Counter _taskAckFailedTotal; // --- histograms --- private readonly Histogram _taskPollTimeSeconds; private readonly Histogram _taskExecuteTimeSeconds; private readonly Histogram _taskUpdateTimeSeconds; - private readonly Histogram _taskResultSizeBytes; - private readonly Histogram _workflowInputSizeBytes; + private readonly Histogram _httpApiClientRequestSeconds; // --- gauges --- private readonly ConcurrentDictionary _activeWorkerCounts = new ConcurrentDictionary(); + private readonly ConcurrentDictionary _taskResultSizes = new ConcurrentDictionary(); + private readonly ConcurrentDictionary _workflowInputSizes = new ConcurrentDictionary(); public MetricsCollector() { _meter = new Meter(MeterName); + // --- counters --- + _taskPollTotal = _meter.CreateCounter( "task_poll_total", description: "Total number of task poll attempts"); + _taskExecutionStartedTotal = _meter.CreateCounter( + "task_execution_started_total", + description: "Tasks dispatched to the worker function"); + _taskPollErrorTotal = _meter.CreateCounter( "task_poll_error_total", description: "Total number of task poll errors"); @@ -91,6 +113,16 @@ public MetricsCollector() "external_payload_used_total", description: "External payload storage usage"); + _taskAckErrorTotal = _meter.CreateCounter( + "task_ack_error_total", + description: "Task ack client-side errors"); + + _taskAckFailedTotal = _meter.CreateCounter( + "task_ack_failed_total", + description: "Task ack declined by server"); + + // --- time histograms --- + _taskPollTimeSeconds = _meter.CreateHistogram( "task_poll_time_seconds", unit: "s", @@ -106,16 +138,51 @@ public MetricsCollector() unit: "s", description: "Task result update duration in seconds"); - _taskResultSizeBytes = _meter.CreateHistogram( + _httpApiClientRequestSeconds = _meter.CreateHistogram( + "http_api_client_request_seconds", + unit: "s", + description: "HTTP API client request duration in seconds"); + + // --- canonical size gauges --- + + _meter.CreateObservableGauge( "task_result_size_bytes", + observeValues: () => + { + var measurements = new List>(); + foreach (var kvp in _taskResultSizes) + { + measurements.Add(new Measurement( + kvp.Value, + new KeyValuePair("taskType", kvp.Key))); + } + return measurements; + }, unit: "By", description: "Size of task result payload in bytes"); - _workflowInputSizeBytes = _meter.CreateHistogram( + _meter.CreateObservableGauge( "workflow_input_size_bytes", + observeValues: () => + { + var measurements = new List>(); + foreach (var kvp in _workflowInputSizes) + { + var parts = kvp.Key.Split('\0'); + var wfType = parts[0]; + var version = parts.Length > 1 ? parts[1] : ""; + measurements.Add(new Measurement( + kvp.Value, + new KeyValuePair("workflowType", wfType), + new KeyValuePair("version", version))); + } + return measurements; + }, unit: "By", description: "Size of workflow input payload in bytes"); + // --- utilization gauge --- + _meter.CreateObservableGauge( "active_workers", observeValues: () => @@ -125,66 +192,109 @@ public MetricsCollector() { measurements.Add(new Measurement( kvp.Value, - new KeyValuePair("task_type", kvp.Key))); + new KeyValuePair("taskType", kvp.Key))); } return measurements; }, description: "Number of workers currently executing tasks"); } + // --------------------------------------------------------------- + // Built-in Prometheus server + // --------------------------------------------------------------- + + /// + /// Starts an OpenTelemetry Prometheus HTTP listener on the given port + /// with the canonical bucket configuration for all histograms. + /// + public void StartServer(int port) + { + if (_meterProvider != null) + throw new InvalidOperationException("Metrics server already started"); + + _meterProvider = Sdk.CreateMeterProviderBuilder() + .AddMeter(MeterName) + .AddView("task_poll_time_seconds", new ExplicitBucketHistogramConfiguration { Boundaries = CanonicalTimeBuckets }) + .AddView("task_execute_time_seconds", new ExplicitBucketHistogramConfiguration { Boundaries = CanonicalTimeBuckets }) + .AddView("task_update_time_seconds", new ExplicitBucketHistogramConfiguration { Boundaries = CanonicalTimeBuckets }) + .AddView("http_api_client_request_seconds", new ExplicitBucketHistogramConfiguration { Boundaries = CanonicalTimeBuckets }) + .AddPrometheusHttpListener(options => + { + options.UriPrefixes = new[] { $"http://*:{port}/" }; + }) + .Build(); + } + + public void Dispose() + { + _meterProvider?.Dispose(); + _meter?.Dispose(); + } + // --------------------------------------------------------------- // Poll // --------------------------------------------------------------- public void RecordTaskPoll(string taskType) { - _taskPollTotal.Add(1, new KeyValuePair("task_type", taskType)); + _taskPollTotal.Add(1, + new KeyValuePair("taskType", taskType)); } - public void RecordTaskPollTime(string taskType, double durationSeconds) + public void RecordTaskPollTime(string taskType, double durationSeconds, string status) { _taskPollTimeSeconds.Record(durationSeconds, - new KeyValuePair("task_type", taskType)); + new KeyValuePair("taskType", taskType), + new KeyValuePair("status", status)); } - public void RecordTaskPollError(string taskType, string errorType) + public void RecordTaskPollError(string taskType, string exceptionType) { _taskPollErrorTotal.Add(1, - new KeyValuePair("task_type", taskType), - new KeyValuePair("error_type", errorType)); + new KeyValuePair("taskType", taskType), + new KeyValuePair("exception", exceptionType)); } // --------------------------------------------------------------- // Execution // --------------------------------------------------------------- - public void RecordTaskExecuteTime(string taskType, double durationSeconds) + public void RecordTaskExecutionStarted(string taskType) + { + _taskExecutionStartedTotal.Add(1, + new KeyValuePair("taskType", taskType)); + } + + public void RecordTaskExecuteTime(string taskType, double durationSeconds, string status) { _taskExecuteTimeSeconds.Record(durationSeconds, - new KeyValuePair("task_type", taskType)); + new KeyValuePair("taskType", taskType), + new KeyValuePair("status", status)); } - public void RecordTaskExecuteError(string taskType, string errorType) + public void RecordTaskExecuteError(string taskType, string exceptionType) { _taskExecuteErrorTotal.Add(1, - new KeyValuePair("task_type", taskType), - new KeyValuePair("error_type", errorType)); + new KeyValuePair("taskType", taskType), + new KeyValuePair("exception", exceptionType)); } // --------------------------------------------------------------- // Update // --------------------------------------------------------------- - public void RecordTaskUpdateTime(string taskType, double durationSeconds) + public void RecordTaskUpdateTime(string taskType, double durationSeconds, string status) { _taskUpdateTimeSeconds.Record(durationSeconds, - new KeyValuePair("task_type", taskType)); + new KeyValuePair("taskType", taskType), + new KeyValuePair("status", status)); } - public void RecordTaskUpdateError(string taskType) + public void RecordTaskUpdateError(string taskType, string exceptionType) { _taskUpdateErrorTotal.Add(1, - new KeyValuePair("task_type", taskType)); + new KeyValuePair("taskType", taskType), + new KeyValuePair("exception", exceptionType)); } // --------------------------------------------------------------- @@ -193,15 +303,24 @@ public void RecordTaskUpdateError(string taskType) public void RecordTaskResultSize(string taskType, double sizeBytes) { - _taskResultSizeBytes.Record(sizeBytes, - new KeyValuePair("task_type", taskType)); + _taskResultSizes[taskType] = sizeBytes; } public void RecordWorkflowInputSize(string workflowType, string version, double sizeBytes) { - _workflowInputSizeBytes.Record(sizeBytes, - new KeyValuePair("workflow_type", workflowType), - new KeyValuePair("version", version)); + _workflowInputSizes[workflowType + "\0" + (version ?? "")] = sizeBytes; + } + + // --------------------------------------------------------------- + // HTTP API client + // --------------------------------------------------------------- + + public void RecordHttpApiClientRequest(string method, string uri, string status, double durationSeconds) + { + _httpApiClientRequestSeconds.Record(durationSeconds, + new KeyValuePair("method", method), + new KeyValuePair("uri", uri), + new KeyValuePair("status", status)); } // --------------------------------------------------------------- @@ -211,13 +330,13 @@ public void RecordWorkflowInputSize(string workflowType, string version, double public void RecordTaskExecutionQueueFull(string taskType) { _taskExecutionQueueFullTotal.Add(1, - new KeyValuePair("task_type", taskType)); + new KeyValuePair("taskType", taskType)); } public void RecordTaskPaused(string taskType) { _taskPausedTotal.Add(1, - new KeyValuePair("task_type", taskType)); + new KeyValuePair("taskType", taskType)); } // --------------------------------------------------------------- @@ -230,26 +349,53 @@ public void RecordActiveWorkers(string taskType, int count) } // --------------------------------------------------------------- - // Uncategorised + // Uncaught exceptions // --------------------------------------------------------------- - public void RecordUncaughtException() + public void RecordUncaughtException(string exceptionType) { - _threadUncaughtExceptionsTotal.Add(1); + _threadUncaughtExceptionsTotal.Add(1, + new KeyValuePair("exception", exceptionType)); } - public void RecordWorkflowStartError(string workflowType) + // --------------------------------------------------------------- + // Workflow + // --------------------------------------------------------------- + + public void RecordWorkflowStartError(string workflowType, string exceptionType) { _workflowStartErrorTotal.Add(1, - new KeyValuePair("workflow_type", workflowType)); + new KeyValuePair("workflowType", workflowType), + new KeyValuePair("exception", exceptionType)); } + // --------------------------------------------------------------- + // External payload + // --------------------------------------------------------------- + public void RecordExternalPayloadUsed(string entityName, string operation, string payloadType) { _externalPayloadUsedTotal.Add(1, - new KeyValuePair("entity_name", entityName), + new KeyValuePair("entityName", entityName), new KeyValuePair("operation", operation), - new KeyValuePair("payload_type", payloadType)); + new KeyValuePair("payloadType", payloadType)); + } + + // --------------------------------------------------------------- + // Surface-only ack counters (internal runner never increments) + // --------------------------------------------------------------- + + public void RecordTaskAckError(string taskType, string exceptionType) + { + _taskAckErrorTotal.Add(1, + new KeyValuePair("taskType", taskType), + new KeyValuePair("exception", exceptionType)); + } + + public void RecordTaskAckFailed(string taskType) + { + _taskAckFailedTotal.Add(1, + new KeyValuePair("taskType", taskType)); } } } diff --git a/Conductor/Client/Worker/WorkflowTaskExecutor.cs b/Conductor/Client/Worker/WorkflowTaskExecutor.cs index 180a4c35..9a7c89e1 100644 --- a/Conductor/Client/Worker/WorkflowTaskExecutor.cs +++ b/Conductor/Client/Worker/WorkflowTaskExecutor.cs @@ -108,7 +108,7 @@ private void Work4Ever(CancellationToken token) } catch (Exception e) { - _metrics?.RecordUncaughtException(); + _metrics?.RecordUncaughtException(e.GetType().Name); _logger.LogError( $"[{_workerSettings.WorkerId}] worker error: {e.Message}" + $", taskName: {_worker.TaskType}" @@ -171,7 +171,7 @@ private async void WorkOnce(CancellationToken token) var tasks = _taskClient.PollTask(_worker.TaskType, _workerSettings.WorkerId, _workerSettings.Domain, availableWorkerCounter); pollStopwatch.Stop(); - _metrics?.RecordTaskPollTime(_worker.TaskType, pollStopwatch.Elapsed.TotalSeconds); + _metrics?.RecordTaskPollTime(_worker.TaskType, pollStopwatch.Elapsed.TotalSeconds, "SUCCESS"); if (tasks == null) { @@ -189,7 +189,7 @@ private async void WorkOnce(CancellationToken token) catch (Exception e) { pollStopwatch.Stop(); - _metrics?.RecordTaskPollTime(_worker.TaskType, pollStopwatch.Elapsed.TotalSeconds); + _metrics?.RecordTaskPollTime(_worker.TaskType, pollStopwatch.Elapsed.TotalSeconds, "FAILURE"); _metrics?.RecordTaskPollError(_worker.TaskType, e.GetType().Name); _logger.LogTrace( $"[{_workerSettings.WorkerId}] Polling error: {e.Message} " @@ -235,6 +235,7 @@ private async void ProcessTask(Models.Task task, CancellationToken token) + $", CancelToken: {token}" ); + _metrics?.RecordTaskExecutionStarted(_worker.TaskType); var executeStopwatch = Stopwatch.StartNew(); try { @@ -244,7 +245,7 @@ private async void ProcessTask(Models.Task task, CancellationToken token) taskResult = await _worker.Execute(task, token); executeStopwatch.Stop(); - _metrics?.RecordTaskExecuteTime(_worker.TaskType, executeStopwatch.Elapsed.TotalSeconds); + _metrics?.RecordTaskExecuteTime(_worker.TaskType, executeStopwatch.Elapsed.TotalSeconds, "SUCCESS"); _logger.LogTrace( $"[{_workerSettings.WorkerId}] Done processing task for worker" @@ -259,7 +260,7 @@ private async void ProcessTask(Models.Task task, CancellationToken token) catch (Exception e) { executeStopwatch.Stop(); - _metrics?.RecordTaskExecuteTime(_worker.TaskType, executeStopwatch.Elapsed.TotalSeconds); + _metrics?.RecordTaskExecuteTime(_worker.TaskType, executeStopwatch.Elapsed.TotalSeconds, "FAILURE"); _metrics?.RecordTaskExecuteError(_worker.TaskType, e.GetType().Name); _logger.LogError( $"[{_workerSettings.WorkerId}] Failed to process task for worker, reason: {e.Message}" @@ -297,7 +298,7 @@ private void UpdateTask(Models.TaskResult taskResult) _taskClient.UpdateTask(taskResult); updateStopwatch.Stop(); - _metrics?.RecordTaskUpdateTime(_worker.TaskType, updateStopwatch.Elapsed.TotalSeconds); + _metrics?.RecordTaskUpdateTime(_worker.TaskType, updateStopwatch.Elapsed.TotalSeconds, "SUCCESS"); _logger.LogTrace( $"[{_workerSettings.WorkerId}] Done updating task" + $", taskType: {_worker.TaskType}" @@ -320,7 +321,8 @@ private void UpdateTask(Models.TaskResult taskResult) } updateStopwatch.Stop(); - _metrics?.RecordTaskUpdateError(_worker.TaskType); + _metrics?.RecordTaskUpdateTime(_worker.TaskType, updateStopwatch.Elapsed.TotalSeconds, "FAILURE"); + _metrics?.RecordTaskUpdateError(_worker.TaskType, "RetryExhaustedException"); throw new Exception("Failed to update task after retries"); } diff --git a/Conductor/Executor/WorkflowExecutor.cs b/Conductor/Executor/WorkflowExecutor.cs index 0ab17ccb..1e6c8432 100644 --- a/Conductor/Executor/WorkflowExecutor.cs +++ b/Conductor/Executor/WorkflowExecutor.cs @@ -13,26 +13,32 @@ using Conductor.Api; using Conductor.Client; using Conductor.Client.Models; +using Conductor.Client.Telemetry; using Conductor.Definition; +using Newtonsoft.Json; +using System; using System.Collections.Generic; namespace Conductor.Executor { public class WorkflowExecutor { - private WorkflowResourceApi _workflowClient; - private MetadataResourceApi _metadataClient; + private readonly WorkflowResourceApi _workflowClient; + private readonly MetadataResourceApi _metadataClient; + private readonly MetricsCollector _metrics; - public WorkflowExecutor(Configuration configuration) + public WorkflowExecutor(Configuration configuration, MetricsCollector metrics = null) { _workflowClient = configuration.GetClient(); _metadataClient = configuration.GetClient(); + _metrics = metrics; } - public WorkflowExecutor(WorkflowResourceApi workflowClient, MetadataResourceApi metadataClient) + public WorkflowExecutor(WorkflowResourceApi workflowClient, MetadataResourceApi metadataClient, MetricsCollector metrics = null) { _workflowClient = workflowClient; _metadataClient = metadataClient; + _metrics = metrics; } public void RegisterWorkflow(WorkflowDef workflow, bool overwrite) @@ -54,7 +60,41 @@ public string StartWorkflow(ConductorWorkflow conductorWorkflow) public string StartWorkflow(StartWorkflowRequest startWorkflowRequest) { - return _workflowClient.StartWorkflow(startWorkflowRequest); + RecordInputSize(startWorkflowRequest); + try + { + return _workflowClient.StartWorkflow(startWorkflowRequest); + } + catch (Exception ex) + { + _metrics?.RecordWorkflowStartError( + startWorkflowRequest.Name ?? "", + ex.GetType().Name); + throw; + } + } + + private void RecordInputSize(StartWorkflowRequest request) + { + if (_metrics == null) + return; + try + { + double size = 0; + if (request.Input != null) + { + var json = JsonConvert.SerializeObject(request.Input); + size = json.Length; + } + _metrics.RecordWorkflowInputSize( + request.Name ?? "", + request.Version?.ToString() ?? "", + size); + } + catch + { + // Don't let metrics serialization failures disrupt workflow start. + } } } } diff --git a/Conductor/conductor-csharp.csproj b/Conductor/conductor-csharp.csproj index 0fb0055c..2963fe99 100644 --- a/Conductor/conductor-csharp.csproj +++ b/Conductor/conductor-csharp.csproj @@ -11,14 +11,16 @@ - + - + + + \ No newline at end of file diff --git a/Harness/Harness.csproj b/Harness/Harness.csproj index ea444b8a..71dd8154 100644 --- a/Harness/Harness.csproj +++ b/Harness/Harness.csproj @@ -3,10 +3,6 @@ Exe net8.0 - - - - diff --git a/Harness/Program.cs b/Harness/Program.cs index 14223b1e..6d76fd3b 100644 --- a/Harness/Program.cs +++ b/Harness/Program.cs @@ -1,4 +1,3 @@ -using Conductor.Api; using Conductor.Client; using Conductor.Client.Extensions; using Conductor.Client.Telemetry; @@ -6,8 +5,6 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; -using OpenTelemetry; -using OpenTelemetry.Metrics; using System; using System.Collections.Generic; using System.Threading.Tasks; @@ -43,24 +40,6 @@ public static async Task Main(string[] args) var metricsPort = int.TryParse( Environment.GetEnvironmentVariable("HARNESS_METRICS_PORT"), out var mp) ? mp : DefaultMetricsPort; - var timeBuckets = new double[] { 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10 }; - var sizeBuckets = new double[] { 100, 1_000, 10_000, 100_000, 1_000_000, 10_000_000 }; - - var meterProvider = Sdk.CreateMeterProviderBuilder() - .AddMeter(MetricsCollector.MeterName) - .AddView("task_poll_time_seconds", new ExplicitBucketHistogramConfiguration { Boundaries = timeBuckets }) - .AddView("task_execute_time_seconds", new ExplicitBucketHistogramConfiguration { Boundaries = timeBuckets }) - .AddView("task_update_time_seconds", new ExplicitBucketHistogramConfiguration { Boundaries = timeBuckets }) - .AddView("task_result_size_bytes", new ExplicitBucketHistogramConfiguration { Boundaries = sizeBuckets }) - .AddView("workflow_input_size_bytes", new ExplicitBucketHistogramConfiguration { Boundaries = sizeBuckets }) - .AddPrometheusHttpListener(options => - { - options.UriPrefixes = new[] { $"http://*:{metricsPort}/" }; - }) - .Build(); - - Console.WriteLine($"Prometheus metrics server started on port {metricsPort}"); - var host = new HostBuilder() .ConfigureServices(services => { @@ -71,12 +50,18 @@ public static async Task Main(string[] args) services.WithHostedService(); services.AddSingleton(config); - services.AddHostedService(sp => new WorkflowGovernor( - config, - sp.GetRequiredService>(), - WorkflowName, - workflowsPerSec, - sp.GetRequiredService())); + services.AddHostedService(sp => + { + var metrics = sp.GetRequiredService(); + metrics.StartServer(metricsPort); + Console.WriteLine($"Prometheus metrics server started on port {metricsPort}"); + return new WorkflowGovernor( + config, + sp.GetRequiredService>(), + WorkflowName, + workflowsPerSec, + metrics); + }); }) .ConfigureLogging(logging => { @@ -85,19 +70,12 @@ public static async Task Main(string[] args) }) .Build(); - try - { - await host.RunAsync(); - } - finally - { - meterProvider?.Dispose(); - } + await host.RunAsync(); } private static void RegisterMetadata(Configuration config) { - var metadataClient = config.GetClient(); + var metadataClient = config.GetClient(); var taskDefs = new List(); foreach (var (taskName, codename, sleepSeconds) in SimulatedWorkers) diff --git a/Harness/WorkflowGovernor.cs b/Harness/WorkflowGovernor.cs index 4cf092a6..e821e8f8 100644 --- a/Harness/WorkflowGovernor.cs +++ b/Harness/WorkflowGovernor.cs @@ -1,6 +1,6 @@ -using Conductor.Api; using Conductor.Client; using Conductor.Client.Telemetry; +using Conductor.Executor; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using System; @@ -11,11 +11,10 @@ namespace Harness { public class WorkflowGovernor : BackgroundService { - private readonly WorkflowResourceApi _workflowClient; + private readonly WorkflowExecutor _executor; private readonly ILogger _logger; private readonly string _workflowName; private readonly int _workflowsPerSecond; - private readonly MetricsCollector _metrics; public WorkflowGovernor( Configuration config, @@ -24,11 +23,10 @@ public WorkflowGovernor( int workflowsPerSecond, MetricsCollector metrics = null) { - _workflowClient = config.GetClient(); + _executor = new WorkflowExecutor(config, metrics); _logger = logger; _workflowName = workflowName; _workflowsPerSecond = workflowsPerSecond; - _metrics = metrics; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) @@ -43,15 +41,14 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) { for (var i = 0; i < _workflowsPerSecond; i++) { - _workflowClient.StartWorkflow( - new Conductor.Client.Models.StartWorkflowRequest(name: _workflowName)); + _executor.StartWorkflow( + new Conductor.Client.Models.StartWorkflowRequest(name: _workflowName, version: 1)); } _logger.LogInformation("Governor: started {Count} workflow(s)", _workflowsPerSecond); } catch (Exception ex) { - _metrics?.RecordWorkflowStartError(_workflowName); _logger.LogError(ex, "Governor: error starting workflows"); } diff --git a/METRICS.md b/METRICS.md index 74716798..09fcea53 100644 --- a/METRICS.md +++ b/METRICS.md @@ -9,38 +9,48 @@ compatible with any .NET metrics listener -- most notably the - [Quick Reference](#quick-reference) - [Configuration](#configuration) - [DI-Based Workers](#di-based-workers) - - [WorkflowTaskHost Convenience API](#workflowtaskhost-convenience-api) - - [Prometheus via OpenTelemetry](#prometheus-via-opentelemetry) + - [Built-in Prometheus Server (Recommended)](#built-in-prometheus-server-recommended) + - [Custom MeterProvider](#custom-meterprovider) - [Console Exporter (Development)](#console-exporter-development) - [Metric Types](#metric-types) - [Counters](#counters) - [Histograms](#histograms) - [Gauges](#gauges) - [Labels](#labels) + - [Dual-Emit Strategy (Phase 1)](#dual-emit-strategy-phase-1) + - [Deprecated Labels](#deprecated-labels) +- [Histogram Bucket Boundaries](#histogram-bucket-boundaries) - [Example Metrics Output](#example-metrics-output) +- [OTel Scope Label](#otel-scope-label) - [Best Practices](#best-practices) ## Quick Reference All metrics are registered under the meter named `Conductor.Client`. -| Name | Type | Labels | Description | -|---|---|---|---| -| `task_poll_total` | Counter | `task_type` | Total task poll attempts | -| `task_poll_error_total` | Counter | `task_type`, `error_type` | Total task poll errors | -| `task_execute_error_total` | Counter | `task_type`, `error_type` | Total task execution errors | -| `task_update_error_total` | Counter | `task_type` | Total task update errors (after all retries) | -| `task_paused_total` | Counter | `task_type` | Polls skipped because the worker is paused | -| `task_execution_queue_full_total` | Counter | `task_type` | Polls returning zero capacity (all workers busy) | -| `thread_uncaught_exceptions_total` | Counter | -- | Uncaught exceptions in worker threads | -| `workflow_start_error_total` | Counter | `workflow_type` | Errors starting workflows | -| `external_payload_used_total` | Counter | `entity_name`, `operation`, `payload_type` | External payload storage usage | -| `task_poll_time_seconds` | Histogram | `task_type` | Task poll round-trip duration (seconds) | -| `task_execute_time_seconds` | Histogram | `task_type` | Task execution duration (seconds) | -| `task_update_time_seconds` | Histogram | `task_type` | Task result update duration (seconds) | -| `task_result_size_bytes` | Histogram | `task_type` | Task result payload size (bytes) | -| `workflow_input_size_bytes` | Histogram | `workflow_type`, `version` | Workflow input payload size (bytes) | -| `active_workers` | Gauge | `task_type` | Workers currently executing tasks | +| Name | Type | Canonical Labels | Deprecated Labels | Description | +|---|---|---|---|---| +| `task_poll_total` | Counter | `taskType` | `task_type` | Total task poll attempts | +| `task_execution_started_total` | Counter | `taskType` | `task_type` | Tasks dispatched to the worker function | +| `task_poll_error_total` | Counter | `taskType`, `exception` | `task_type`, `error_type` | Total task poll errors | +| `task_execute_error_total` | Counter | `taskType`, `exception` | `task_type`, `error_type` | Total task execution errors | +| `task_update_error_total` | Counter | `taskType`, `exception` | `task_type`, `error_type` | Total task update errors (after all retries) | +| `task_ack_error_total` | Counter | `taskType`, `exception` | `task_type` | Task ack client-side errors (surface-only) | +| `task_ack_failed_total` | Counter | `taskType` | `task_type` | Task ack declined by server (surface-only) | +| `task_paused_total` | Counter | `taskType` | `task_type` | Polls skipped because the worker is paused | +| `task_execution_queue_full_total` | Counter | `taskType` | `task_type` | Polls returning zero capacity (all workers busy) | +| `thread_uncaught_exceptions_total` | Counter | `exception` | -- | Uncaught exceptions in worker threads | +| `workflow_start_error_total` | Counter | `workflowType`, `exception` | `workflow_type` | Errors starting workflows | +| `external_payload_used_total` | Counter | `entityName`, `operation`, `payload_type` | `entity_name` | External payload storage usage | +| `task_poll_time_seconds` | Histogram | `taskType`, `status` | `task_type` | Task poll round-trip duration (seconds) | +| `task_execute_time_seconds` | Histogram | `taskType`, `status` | `task_type` | Task execution duration (seconds) | +| `task_update_time_seconds` | Histogram | `taskType`, `status` | `task_type` | Task result update duration (seconds) | +| `http_api_client_request_seconds` | Histogram | `method`, `uri`, `status` | -- | HTTP API client request duration (seconds) | +| `task_result_size_bytes` | **Gauge** | `taskType` | `task_type` | Task result payload size (bytes) — last value | +| `task_result_size_bytes_histogram` | Histogram | `taskType` | `task_type` | **[DEPRECATED]** Task result payload size (bytes) | +| `workflow_input_size_bytes` | **Gauge** | `workflowType`, `version` | `workflow_type` | Workflow input payload size (bytes) — last value | +| `workflow_input_size_bytes_histogram` | Histogram | `workflowType`, `version` | `workflow_type` | **[DEPRECATED]** Workflow input payload size (bytes) | +| `active_workers` | Gauge | `taskType` | `task_type` | Workers currently executing tasks | ## Configuration @@ -62,29 +72,29 @@ var host = new HostBuilder() .Build(); ``` -To **expose** the metrics externally (e.g. Prometheus scraping), attach a metrics listener -or exporter as shown below. +To **expose** the metrics externally (e.g. Prometheus scraping), use the built-in server +or attach your own exporter as shown below. -### WorkflowTaskHost Convenience API +### Built-in Prometheus Server (Recommended) -If you use the one-liner `WorkflowTaskHost.CreateWorkerHost(...)`, metrics are registered -automatically via the same `AddConductorWorker()` call: +The simplest way to expose a `/metrics` endpoint is to call `MetricsCollector.StartServer(port)`. +This configures the OpenTelemetry `MeterProvider` with canonical histogram bucket boundaries +and starts a Prometheus HTTP listener on the given port: ```csharp -var host = WorkflowTaskHost.CreateWorkerHost(config, workers: new MyWorker()); -await host.RunAsync(); +var metrics = serviceProvider.GetRequiredService(); +metrics.StartServer(9991); +// Prometheus scrape endpoint now at http://localhost:9991/metrics ``` -### Prometheus via OpenTelemetry - -Add the following NuGet packages to your project: +This is equivalent to what every other Conductor SDK provides (Python, Go, Java, Ruby, Rust) +and is the recommended setup for production use. -``` -dotnet add package OpenTelemetry -dotnet add package OpenTelemetry.Exporter.Prometheus.HttpListener --prerelease -``` +### Custom MeterProvider -Then configure a `MeterProvider` before starting the host: +If you need full control (e.g. adding additional exporters, custom Views, or extra meters), +you can configure your own `MeterProvider`. Use the SDK's canonical bucket constants +to stay aligned with the fleet: ```csharp using OpenTelemetry; @@ -92,21 +102,26 @@ using OpenTelemetry.Metrics; using Conductor.Client.Telemetry; var meterProvider = Sdk.CreateMeterProviderBuilder() - .AddMeter(MetricsCollector.MeterName) // "Conductor.Client" + .AddMeter(MetricsCollector.MeterName) + .AddView("task_poll_time_seconds", + new ExplicitBucketHistogramConfiguration + { Boundaries = MetricsCollector.CanonicalTimeBuckets }) + .AddView("task_execute_time_seconds", + new ExplicitBucketHistogramConfiguration + { Boundaries = MetricsCollector.CanonicalTimeBuckets }) + .AddView("task_update_time_seconds", + new ExplicitBucketHistogramConfiguration + { Boundaries = MetricsCollector.CanonicalTimeBuckets }) + .AddView("http_api_client_request_seconds", + new ExplicitBucketHistogramConfiguration + { Boundaries = MetricsCollector.CanonicalTimeBuckets }) .AddPrometheusHttpListener(options => { options.UriPrefixes = new[] { "http://*:9090/" }; }) .Build(); - -// ... start the host ... - -// Dispose when shutting down. -meterProvider?.Dispose(); ``` -Metrics are now available at `http://localhost:9090/metrics` in Prometheus text format. - ### Console Exporter (Development) For quick debugging, the OpenTelemetry console exporter prints metrics to stdout: @@ -130,28 +145,31 @@ Monotonically increasing values. Prometheus exposes them with a `_total` suffix. | Name | Labels | Description | |---|---|---| -| `task_poll_total` | `task_type` | Incremented once per poll round (regardless of how many tasks are returned). | -| `task_poll_error_total` | `task_type`, `error_type` | Incremented when a poll HTTP call fails. `error_type` is the exception class name. | -| `task_execute_error_total` | `task_type`, `error_type` | Incremented when `Execute()` throws. `error_type` is the exception class name. | -| `task_update_error_total` | `task_type` | Incremented when all update retries are exhausted. | -| `task_paused_total` | `task_type` | Incremented when a poll is skipped because the worker is paused. | -| `task_execution_queue_full_total` | `task_type` | Incremented when a poll is skipped because all workers are busy (batch size reached). | -| `thread_uncaught_exceptions_total` | -- | Incremented on any exception in the top-level poll loop that is not an `OperationCanceledException`. | -| `workflow_start_error_total` | `workflow_type` | Incremented when a workflow start call fails. | -| `external_payload_used_total` | `entity_name`, `operation`, `payload_type` | Incremented when external payload storage is used. | +| `task_poll_total` | `taskType` | Incremented once per poll round (regardless of how many tasks are returned). | +| `task_execution_started_total` | `taskType` | Incremented when a polled task is dispatched to the worker's `Execute()` method. | +| `task_poll_error_total` | `taskType`, `exception` | Incremented when a poll HTTP call fails. `exception` is the exception class name. | +| `task_execute_error_total` | `taskType`, `exception` | Incremented when `Execute()` throws. `exception` is the exception class name. | +| `task_update_error_total` | `taskType`, `exception` | Incremented when all update retries are exhausted. `exception` is the exception class name. | +| `task_ack_error_total` | `taskType`, `exception` | Surface-only: C# has no separate ack call (batch-poll response is the ack). Available for user code. | +| `task_ack_failed_total` | `taskType` | Surface-only: same rationale as `task_ack_error_total`. | +| `task_paused_total` | `taskType` | Incremented when a poll is skipped because the worker is paused. | +| `task_execution_queue_full_total` | `taskType` | Incremented when a poll is skipped because all workers are busy (batch size reached). | +| `thread_uncaught_exceptions_total` | `exception` | Incremented on any exception in the top-level poll loop that is not an `OperationCanceledException`. | +| `workflow_start_error_total` | `workflowType`, `exception` | Incremented when a `WorkflowExecutor.StartWorkflow()` call fails. | +| `external_payload_used_total` | `entityName`, `operation`, `payload_type` | Incremented when external payload storage is used. | ### Histograms Distribution metrics with sum, count, and bucket breakdowns. All time values are in **seconds**. -All size values are in **bytes**. | Name | Labels | Unit | Description | |---|---|---|---| -| `task_poll_time_seconds` | `task_type` | seconds | Wall-clock time for the poll HTTP call. | -| `task_execute_time_seconds` | `task_type` | seconds | Wall-clock time inside `worker.Execute()`. | -| `task_update_time_seconds` | `task_type` | seconds | Wall-clock time for the update call (including retries). | -| `task_result_size_bytes` | `task_type` | bytes | JSON-serialized size of `TaskResult.OutputData`. | -| `workflow_input_size_bytes` | `workflow_type`, `version` | bytes | Workflow input payload size. | +| `task_poll_time_seconds` | `taskType`, `status` | seconds | Wall-clock time for the poll HTTP call. `status` is `SUCCESS` or `FAILURE`. | +| `task_execute_time_seconds` | `taskType`, `status` | seconds | Wall-clock time inside `worker.Execute()`. `status` is `SUCCESS` or `FAILURE`. | +| `task_update_time_seconds` | `taskType`, `status` | seconds | Wall-clock time for the update call (including retries). `status` is `SUCCESS` or `FAILURE`. | +| `http_api_client_request_seconds` | `method`, `uri`, `status` | seconds | Every HTTP request made by the API client. `method` is the HTTP verb, `uri` is the request path, `status` is the HTTP status code (or `"0"` on transport failure). | +| `task_result_size_bytes_histogram` | `taskType` | bytes | **[DEPRECATED]** JSON-serialized size of `TaskResult.OutputData`. Use the `task_result_size_bytes` Gauge instead. | +| `workflow_input_size_bytes_histogram` | `workflowType`, `version` | bytes | **[DEPRECATED]** Workflow input payload size. Use the `workflow_input_size_bytes` Gauge instead. | ### Gauges @@ -159,68 +177,114 @@ Point-in-time values sampled by the metrics listener. | Name | Labels | Description | |---|---|---| -| `active_workers` | `task_type` | Number of concurrent task executions in progress. Updated on every poll cycle. | +| `task_result_size_bytes` | `taskType` | Serialized byte size of the most recent task result output. Last-value gauge. | +| `workflow_input_size_bytes` | `workflowType`, `version` | Serialized byte size of the most recent workflow input. Last-value gauge. | +| `active_workers` | `taskType` | Number of concurrent task executions in progress. Updated on every poll cycle. | ## Labels -| Label | Used By | Values | +### Dual-Emit Strategy (Phase 1) + +As part of the cross-SDK metrics harmonization, every metric now emits **both** the canonical +camelCase label and the legacy snake_case label with the same value. This ensures existing +dashboards and alerts continue to work during migration. + +| Canonical (new) | Legacy (deprecated) | Used By | |---|---|---| -| `task_type` | Most metrics | Task definition name (e.g. `"my_worker"`) | -| `error_type` | `task_poll_error_total`, `task_execute_error_total` | Exception class name (e.g. `"HttpRequestException"`) | -| `workflow_type` | `workflow_start_error_total`, `workflow_input_size_bytes` | Workflow definition name | -| `version` | `workflow_input_size_bytes` | Workflow version string | -| `entity_name` | `external_payload_used_total` | Entity name | -| `operation` | `external_payload_used_total` | Operation name | -| `payload_type` | `external_payload_used_total` | Payload type (e.g. `"TASK_INPUT"`, `"TASK_OUTPUT"`) | +| `taskType` | `task_type` | Most task metrics | +| `exception` | `error_type` | Error counters | +| `workflowType` | `workflow_type` | Workflow metrics | +| `entityName` | `entity_name` | `external_payload_used_total` | + +Labels that are new (no legacy equivalent): `status` on time histograms, `method`/`uri`/`status` +on `http_api_client_request_seconds`, `version` on `workflow_input_size_bytes`. + +### Deprecated Labels + +The following labels are deprecated and will be removed in a future major release: + +- `task_type` — use `taskType` +- `error_type` — use `exception` +- `workflow_type` — use `workflowType` +- `entity_name` — use `entityName` + +## Histogram Bucket Boundaries + +The SDK defines canonical bucket boundaries that match all other Conductor SDKs: + +**Time histograms** (`*_seconds`): + +``` +0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10 +``` + +**Size histograms** (deprecated `*_histogram`): + +``` +100, 1000, 10000, 100000, 1000000, 10000000 +``` + +These are applied automatically when using `MetricsCollector.StartServer(port)`. If you build +your own `MeterProvider`, reference `MetricsCollector.CanonicalTimeBuckets` and +`MetricsCollector.CanonicalSizeBuckets`. ## Example Metrics Output -When scraped by Prometheus (via the OpenTelemetry exporter), the output looks like: +When scraped by Prometheus (via the built-in server or OpenTelemetry exporter): ```prometheus # HELP task_poll_total Total task poll attempts # TYPE task_poll_total counter -task_poll_total{task_type="my_worker"} 142 +task_poll_total{taskType="my_worker",task_type="my_worker"} 142 + +# HELP task_execution_started_total Tasks dispatched to the worker function +# TYPE task_execution_started_total counter +task_execution_started_total{taskType="my_worker",task_type="my_worker"} 140 # HELP task_poll_time_seconds Task poll round-trip duration (seconds) # TYPE task_poll_time_seconds histogram -task_poll_time_seconds_bucket{task_type="my_worker",le="0.005"} 12 -task_poll_time_seconds_bucket{task_type="my_worker",le="0.01"} 45 -task_poll_time_seconds_bucket{task_type="my_worker",le="0.025"} 98 -task_poll_time_seconds_bucket{task_type="my_worker",le="0.05"} 120 -task_poll_time_seconds_bucket{task_type="my_worker",le="0.1"} 135 -task_poll_time_seconds_bucket{task_type="my_worker",le="0.25"} 140 -task_poll_time_seconds_bucket{task_type="my_worker",le="0.5"} 142 -task_poll_time_seconds_bucket{task_type="my_worker",le="1"} 142 -task_poll_time_seconds_bucket{task_type="my_worker",le="+Inf"} 142 -task_poll_time_seconds_sum{task_type="my_worker"} 3.842 -task_poll_time_seconds_count{task_type="my_worker"} 142 - -# HELP task_execute_time_seconds Task execution duration (seconds) -# TYPE task_execute_time_seconds histogram -task_execute_time_seconds_bucket{task_type="my_worker",le="0.25"} 50 -task_execute_time_seconds_bucket{task_type="my_worker",le="0.5"} 80 -task_execute_time_seconds_bucket{task_type="my_worker",le="1"} 110 -task_execute_time_seconds_bucket{task_type="my_worker",le="2.5"} 135 -task_execute_time_seconds_bucket{task_type="my_worker",le="+Inf"} 142 -task_execute_time_seconds_sum{task_type="my_worker"} 98.553 -task_execute_time_seconds_count{task_type="my_worker"} 142 +task_poll_time_seconds_bucket{taskType="my_worker",task_type="my_worker",status="SUCCESS",le="0.001"} 2 +task_poll_time_seconds_bucket{taskType="my_worker",task_type="my_worker",status="SUCCESS",le="0.005"} 12 +task_poll_time_seconds_bucket{taskType="my_worker",task_type="my_worker",status="SUCCESS",le="0.01"} 45 +task_poll_time_seconds_bucket{taskType="my_worker",task_type="my_worker",status="SUCCESS",le="0.025"} 98 +task_poll_time_seconds_bucket{taskType="my_worker",task_type="my_worker",status="SUCCESS",le="0.05"} 120 +task_poll_time_seconds_bucket{taskType="my_worker",task_type="my_worker",status="SUCCESS",le="0.1"} 135 +task_poll_time_seconds_bucket{taskType="my_worker",task_type="my_worker",status="SUCCESS",le="0.25"} 140 +task_poll_time_seconds_bucket{taskType="my_worker",task_type="my_worker",status="SUCCESS",le="+Inf"} 140 +task_poll_time_seconds_sum{taskType="my_worker",task_type="my_worker",status="SUCCESS"} 3.842 +task_poll_time_seconds_count{taskType="my_worker",task_type="my_worker",status="SUCCESS"} 140 + +# HELP http_api_client_request_seconds HTTP API client request duration (seconds) +# TYPE http_api_client_request_seconds histogram +http_api_client_request_seconds_bucket{method="GET",uri="/api/tasks/poll/batch/my_worker",status="200",le="0.01"} 30 +http_api_client_request_seconds_bucket{method="GET",uri="/api/tasks/poll/batch/my_worker",status="200",le="+Inf"} 142 + +# HELP task_result_size_bytes Task result payload size (bytes) +# TYPE task_result_size_bytes gauge +task_result_size_bytes{taskType="my_worker",task_type="my_worker"} 2048 # HELP active_workers Workers currently executing tasks # TYPE active_workers gauge -active_workers{task_type="my_worker"} 5 +active_workers{taskType="my_worker",task_type="my_worker"} 5 # HELP task_execution_queue_full_total Polls returning zero capacity # TYPE task_execution_queue_full_total counter -task_execution_queue_full_total{task_type="my_worker"} 3 +task_execution_queue_full_total{taskType="my_worker",task_type="my_worker"} 3 ``` +## OTel Scope Label + +The C# metrics output includes an `otel_scope_name="Conductor.Client"` label on every metric. +This is injected by the OpenTelemetry .NET exporter (it identifies the `System.Diagnostics.Metrics.Meter` +that owns each instrument). Other SDKs using native Prometheus client libraries do not have this. +This is harmless — it is a standard OTel convention and does not interfere with canonical label queries. + ## Best Practices -1. **Use the OpenTelemetry Prometheus exporter for production.** It serves a standard `/metrics` - endpoint that Prometheus can scrape directly. +1. **Use `MetricsCollector.StartServer(port)` for production.** It serves a standard `/metrics` + endpoint that Prometheus can scrape directly, with canonical bucket boundaries pre-configured. -2. **Set histogram bucket boundaries via OpenTelemetry Views** if the defaults don't match your +2. **Override histogram buckets via OpenTelemetry Views** if the defaults don't match your workload. For example, if your workers are consistently fast (< 100ms), add more fine-grained lower buckets: ```csharp @@ -239,7 +303,7 @@ task_execution_queue_full_total{task_type="my_worker"} 3 5. **Use `rate()` on counters, not raw values.** For example: ```promql - rate(task_poll_total{task_type="my_worker"}[5m]) + rate(task_poll_total{taskType="my_worker"}[5m]) ``` 6. **Track p99 execution latency** using histogram quantiles: @@ -247,6 +311,10 @@ task_execution_queue_full_total{task_type="my_worker"} 3 histogram_quantile(0.99, rate(task_execute_time_seconds_bucket[5m])) ``` -7. **The `MetricsCollector` is available as a singleton via DI.** You can inject it into your - own services to record `workflow_start_error_total`, `external_payload_used_total`, or any - other metrics that occur outside the poll loop. +7. **Migrate dashboards to canonical labels.** Both `taskType` and `task_type` resolve to the + same value during Phase 1. Update your queries to use `taskType` before the deprecated labels + are removed in a future major release. + +8. **Use `WorkflowExecutor` for starting workflows.** It automatically records + `workflow_input_size_bytes` and `workflow_start_error_total` when a `MetricsCollector` + is injected. diff --git a/Tests/Telemetry/MetricsCollectorTests.cs b/Tests/Telemetry/MetricsCollectorTests.cs index 8f12d0ae..8a1483a0 100644 --- a/Tests/Telemetry/MetricsCollectorTests.cs +++ b/Tests/Telemetry/MetricsCollectorTests.cs @@ -45,173 +45,299 @@ public MetricsCollectorTests() _listener.Start(); } - public void Dispose() => _listener.Dispose(); + public void Dispose() + { + _sut.Dispose(); + _listener.Dispose(); + } // --------------------------------------------------------------- - // Counters + // Counters — dual-emit label checks // --------------------------------------------------------------- [Fact] - public void RecordTaskPoll_EmitsCounterWithTaskType() + public void RecordTaskPoll_EmitsDualLabels() { _sut.RecordTaskPoll("my_task"); var m = Assert.Single(_recorded); Assert.Equal("task_poll_total", m.Name); Assert.Equal(1L, m.Value); + AssertTag(m, "taskType", "my_task"); + AssertTag(m, "task_type", "my_task"); + } + + [Fact] + public void RecordTaskExecutionStarted_EmitsDualLabels() + { + _sut.RecordTaskExecutionStarted("my_task"); + + var m = Assert.Single(_recorded); + Assert.Equal("task_execution_started_total", m.Name); + AssertTag(m, "taskType", "my_task"); AssertTag(m, "task_type", "my_task"); } [Fact] - public void RecordTaskPollError_EmitsCounterWithBothTags() + public void RecordTaskPollError_EmitsDualLabels() { _sut.RecordTaskPollError("my_task", "TimeoutException"); var m = Assert.Single(_recorded); Assert.Equal("task_poll_error_total", m.Name); + AssertTag(m, "taskType", "my_task"); AssertTag(m, "task_type", "my_task"); + AssertTag(m, "exception", "TimeoutException"); AssertTag(m, "error_type", "TimeoutException"); } [Fact] - public void RecordTaskExecuteError_EmitsCounterWithBothTags() + public void RecordTaskExecuteError_EmitsDualLabels() { _sut.RecordTaskExecuteError("task_a", "NullReferenceException"); var m = Assert.Single(_recorded); Assert.Equal("task_execute_error_total", m.Name); + AssertTag(m, "taskType", "task_a"); AssertTag(m, "task_type", "task_a"); + AssertTag(m, "exception", "NullReferenceException"); AssertTag(m, "error_type", "NullReferenceException"); } [Fact] - public void RecordTaskUpdateError_EmitsCounter() + public void RecordTaskUpdateError_EmitsExceptionAndDualLabels() { - _sut.RecordTaskUpdateError("task_b"); + _sut.RecordTaskUpdateError("task_b", "HttpRequestException"); var m = Assert.Single(_recorded); Assert.Equal("task_update_error_total", m.Name); + AssertTag(m, "taskType", "task_b"); AssertTag(m, "task_type", "task_b"); + AssertTag(m, "exception", "HttpRequestException"); + AssertTag(m, "error_type", "HttpRequestException"); } [Fact] - public void RecordTaskPaused_EmitsCounter() + public void RecordTaskPaused_EmitsDualLabels() { _sut.RecordTaskPaused("paused_task"); var m = Assert.Single(_recorded); Assert.Equal("task_paused_total", m.Name); + AssertTag(m, "taskType", "paused_task"); AssertTag(m, "task_type", "paused_task"); } [Fact] - public void RecordTaskExecutionQueueFull_EmitsCounter() + public void RecordTaskExecutionQueueFull_EmitsDualLabels() { _sut.RecordTaskExecutionQueueFull("busy_task"); var m = Assert.Single(_recorded); Assert.Equal("task_execution_queue_full_total", m.Name); + AssertTag(m, "taskType", "busy_task"); AssertTag(m, "task_type", "busy_task"); } [Fact] - public void RecordUncaughtException_EmitsCounterWithNoTags() + public void RecordUncaughtException_EmitsExceptionLabel() { - _sut.RecordUncaughtException(); + _sut.RecordUncaughtException("InvalidOperationException"); var m = Assert.Single(_recorded); Assert.Equal("thread_uncaught_exceptions_total", m.Name); Assert.Equal(1L, m.Value); + AssertTag(m, "exception", "InvalidOperationException"); } [Fact] - public void RecordWorkflowStartError_EmitsCounter() + public void RecordWorkflowStartError_EmitsExceptionAndDualLabels() { - _sut.RecordWorkflowStartError("my_workflow"); + _sut.RecordWorkflowStartError("my_workflow", "ApiException"); var m = Assert.Single(_recorded); Assert.Equal("workflow_start_error_total", m.Name); + AssertTag(m, "workflowType", "my_workflow"); AssertTag(m, "workflow_type", "my_workflow"); + AssertTag(m, "exception", "ApiException"); } [Fact] - public void RecordExternalPayloadUsed_EmitsCounterWithThreeTags() + public void RecordExternalPayloadUsed_EmitsDualLabels() { - _sut.RecordExternalPayloadUsed("entity", "read", "input"); + _sut.RecordExternalPayloadUsed("entity", "READ", "TASK_INPUT"); var m = Assert.Single(_recorded); Assert.Equal("external_payload_used_total", m.Name); + AssertTag(m, "entityName", "entity"); AssertTag(m, "entity_name", "entity"); - AssertTag(m, "operation", "read"); - AssertTag(m, "payload_type", "input"); + AssertTag(m, "operation", "READ"); + AssertTag(m, "payload_type", "TASK_INPUT"); + } + + [Fact] + public void RecordTaskAckError_EmitsDualLabels() + { + _sut.RecordTaskAckError("my_task", "HttpRequestException"); + + var m = Assert.Single(_recorded); + Assert.Equal("task_ack_error_total", m.Name); + AssertTag(m, "taskType", "my_task"); + AssertTag(m, "task_type", "my_task"); + AssertTag(m, "exception", "HttpRequestException"); + } + + [Fact] + public void RecordTaskAckFailed_EmitsDualLabels() + { + _sut.RecordTaskAckFailed("my_task"); + + var m = Assert.Single(_recorded); + Assert.Equal("task_ack_failed_total", m.Name); + AssertTag(m, "taskType", "my_task"); + AssertTag(m, "task_type", "my_task"); } // --------------------------------------------------------------- - // Histograms + // Histograms — status label // --------------------------------------------------------------- [Fact] - public void RecordTaskPollTime_RecordsDuration() + public void RecordTaskPollTime_IncludesStatusLabel() { - _sut.RecordTaskPollTime("fast_task", 0.123); + _sut.RecordTaskPollTime("fast_task", 0.123, "SUCCESS"); var m = Assert.Single(_recorded); Assert.Equal("task_poll_time_seconds", m.Name); Assert.Equal(0.123, m.Value); + AssertTag(m, "taskType", "fast_task"); AssertTag(m, "task_type", "fast_task"); + AssertTag(m, "status", "SUCCESS"); } [Fact] - public void RecordTaskExecuteTime_RecordsDuration() + public void RecordTaskExecuteTime_IncludesStatusLabel() { - _sut.RecordTaskExecuteTime("slow_task", 5.5); + _sut.RecordTaskExecuteTime("slow_task", 5.5, "FAILURE"); var m = Assert.Single(_recorded); Assert.Equal("task_execute_time_seconds", m.Name); Assert.Equal(5.5, m.Value); + AssertTag(m, "taskType", "slow_task"); AssertTag(m, "task_type", "slow_task"); + AssertTag(m, "status", "FAILURE"); } [Fact] - public void RecordTaskUpdateTime_RecordsDuration() + public void RecordTaskUpdateTime_IncludesStatusLabel() { - _sut.RecordTaskUpdateTime("task_c", 0.05); + _sut.RecordTaskUpdateTime("task_c", 0.05, "SUCCESS"); var m = Assert.Single(_recorded); Assert.Equal("task_update_time_seconds", m.Name); Assert.Equal(0.05, m.Value); + AssertTag(m, "taskType", "task_c"); AssertTag(m, "task_type", "task_c"); + AssertTag(m, "status", "SUCCESS"); } [Fact] - public void RecordTaskResultSize_RecordsBytes() + public void RecordHttpApiClientRequest_RecordsDuration() + { + _sut.RecordHttpApiClientRequest("GET", "/api/tasks/poll/batch/my_task", "200", 0.042); + + var m = Assert.Single(_recorded); + Assert.Equal("http_api_client_request_seconds", m.Name); + Assert.Equal(0.042, m.Value); + AssertTag(m, "method", "GET"); + AssertTag(m, "uri", "/api/tasks/poll/batch/my_task"); + AssertTag(m, "status", "200"); + } + + // --------------------------------------------------------------- + // Size metrics — gauge + deprecated histogram + // --------------------------------------------------------------- + + [Fact] + public void RecordTaskResultSize_EmitsDeprecatedHistogram() { _sut.RecordTaskResultSize("task_d", 1024.0); var m = Assert.Single(_recorded); - Assert.Equal("task_result_size_bytes", m.Name); + Assert.Equal("task_result_size_bytes_histogram", m.Name); Assert.Equal(1024.0, m.Value); + AssertTag(m, "taskType", "task_d"); AssertTag(m, "task_type", "task_d"); } [Fact] - public void RecordWorkflowInputSize_RecordsBytesWithVersionTag() + public void RecordTaskResultSize_ExposedViaGauge() { - _sut.RecordWorkflowInputSize("wf_type", "v2", 2048.0); + _sut.RecordTaskResultSize("task_d", 2048.0); + + var gaugeValues = new List(); + using var gaugeListener = new MeterListener(); + gaugeListener.InstrumentPublished = (instrument, l) => + { + if (instrument.Meter.Name == MetricsCollector.MeterName + && instrument.Name == "task_result_size_bytes") + l.EnableMeasurementEvents(instrument); + }; + gaugeListener.SetMeasurementEventCallback((instrument, value, tags, _) => + gaugeValues.Add(new RecordedMeasurement(instrument.Name, value, tags.ToArray()))); + gaugeListener.Start(); + gaugeListener.RecordObservableInstruments(); + + var match = Assert.Single(gaugeValues); + Assert.Equal(2048.0, (double)match.Value); + AssertTag(match, "taskType", "task_d"); + AssertTag(match, "task_type", "task_d"); + } + + [Fact] + public void RecordWorkflowInputSize_EmitsDeprecatedHistogram() + { + _sut.RecordWorkflowInputSize("wf_type", "2", 2048.0); var m = Assert.Single(_recorded); - Assert.Equal("workflow_input_size_bytes", m.Name); + Assert.Equal("workflow_input_size_bytes_histogram", m.Name); Assert.Equal(2048.0, m.Value); + AssertTag(m, "workflowType", "wf_type"); AssertTag(m, "workflow_type", "wf_type"); - AssertTag(m, "version", "v2"); + AssertTag(m, "version", "2"); + } + + [Fact] + public void RecordWorkflowInputSize_ExposedViaGauge() + { + _sut.RecordWorkflowInputSize("wf_type", "v2", 4096.0); + + var gaugeValues = new List(); + using var gaugeListener = new MeterListener(); + gaugeListener.InstrumentPublished = (instrument, l) => + { + if (instrument.Meter.Name == MetricsCollector.MeterName + && instrument.Name == "workflow_input_size_bytes") + l.EnableMeasurementEvents(instrument); + }; + gaugeListener.SetMeasurementEventCallback((instrument, value, tags, _) => + gaugeValues.Add(new RecordedMeasurement(instrument.Name, value, tags.ToArray()))); + gaugeListener.Start(); + gaugeListener.RecordObservableInstruments(); + + var match = Assert.Single(gaugeValues); + Assert.Equal(4096.0, (double)match.Value); + AssertTag(match, "workflowType", "wf_type"); + AssertTag(match, "workflow_type", "wf_type"); + AssertTag(match, "version", "v2"); } // --------------------------------------------------------------- - // Observable gauge + // Observable gauge — active_workers // --------------------------------------------------------------- [Fact] - public void RecordActiveWorkers_ExposedViaObservableGauge() + public void RecordActiveWorkers_ExposedViaObservableGaugeWithDualLabels() { _sut.RecordActiveWorkers("task_x", 3); _sut.RecordActiveWorkers("task_y", 7); @@ -231,9 +357,13 @@ public void RecordActiveWorkers_ExposedViaObservableGauge() Assert.Equal(2, gaugeValues.Count); Assert.Contains(gaugeValues, g => - (int)g.Value == 3 && g.Tags.Any(t => t.Key == "task_type" && (string)t.Value == "task_x")); + (int)g.Value == 3 + && g.Tags.Any(t => t.Key == "taskType" && (string)t.Value == "task_x") + && g.Tags.Any(t => t.Key == "task_type" && (string)t.Value == "task_x")); Assert.Contains(gaugeValues, g => - (int)g.Value == 7 && g.Tags.Any(t => t.Key == "task_type" && (string)t.Value == "task_y")); + (int)g.Value == 7 + && g.Tags.Any(t => t.Key == "taskType" && (string)t.Value == "task_y") + && g.Tags.Any(t => t.Key == "task_type" && (string)t.Value == "task_y")); } [Fact] @@ -256,7 +386,7 @@ public void RecordActiveWorkers_OverwritesPreviousValue() gaugeListener.RecordObservableInstruments(); var match = gaugeValues.Where(g => - g.Tags.Any(t => t.Key == "task_type" && (string)t.Value == "overwrite_test_task")).ToList(); + g.Tags.Any(t => t.Key == "taskType" && (string)t.Value == "overwrite_test_task")).ToList(); var single = Assert.Single(match); Assert.Equal(10, (int)single.Value); } @@ -273,8 +403,26 @@ public void Counters_AccumulateAcrossMultipleCalls() _sut.RecordTaskPoll("task_b"); Assert.Equal(3, _recorded.Count); - Assert.Equal(2, _recorded.Count(r => r.Tags.Any(t => (string)t.Value == "task_a"))); - Assert.Equal(1, _recorded.Count(r => r.Tags.Any(t => (string)t.Value == "task_b"))); + Assert.Equal(2, _recorded.Count(r => r.Tags.Any(t => t.Key == "taskType" && (string)t.Value == "task_a"))); + Assert.Equal(1, _recorded.Count(r => r.Tags.Any(t => t.Key == "taskType" && (string)t.Value == "task_b"))); + } + + // --------------------------------------------------------------- + // Canonical bucket constants + // --------------------------------------------------------------- + + [Fact] + public void CanonicalTimeBuckets_MatchesSpec() + { + var expected = new double[] { 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10 }; + Assert.Equal(expected, MetricsCollector.CanonicalTimeBuckets); + } + + [Fact] + public void CanonicalSizeBuckets_MatchesSpec() + { + var expected = new double[] { 100, 1_000, 10_000, 100_000, 1_000_000, 10_000_000 }; + Assert.Equal(expected, MetricsCollector.CanonicalSizeBuckets); } // --------------------------------------------------------------- From 15c85bccd7d87d2030c5edb1eef6fceef6110958 Mon Sep 17 00:00:00 2001 From: Chris Hagglund Date: Fri, 24 Apr 2026 11:42:48 -0600 Subject: [PATCH 2/4] mostly fixing tests to not expect dual labels, we don't need to do dual labels for first release of metrics in cs sdk since there was not legacy ones to begin with --- .../Client/Telemetry/MetricsCollector.cs | 27 ++++++++ Tests/Telemetry/MetricsCollectorTests.cs | 61 ++++++------------- 2 files changed, 46 insertions(+), 42 deletions(-) diff --git a/Conductor/Client/Telemetry/MetricsCollector.cs b/Conductor/Client/Telemetry/MetricsCollector.cs index 10f8920f..a46876f5 100644 --- a/Conductor/Client/Telemetry/MetricsCollector.cs +++ b/Conductor/Client/Telemetry/MetricsCollector.cs @@ -39,6 +39,12 @@ public sealed class MetricsCollector : IDisposable public static readonly double[] CanonicalTimeBuckets = { 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10 }; + /// + /// Proposed size histogram buckets (in bytes); if accepted, will be shared across all Conductor SDKs. + /// + public static readonly double[] CanonicalSizeBuckets = + { 100, 1_000, 10_000, 100_000, 1_000_000, 10_000_000 }; + private readonly Meter _meter; private MeterProvider _meterProvider; @@ -61,6 +67,8 @@ public sealed class MetricsCollector : IDisposable private readonly Histogram _taskExecuteTimeSeconds; private readonly Histogram _taskUpdateTimeSeconds; private readonly Histogram _httpApiClientRequestSeconds; + private readonly Histogram _taskResultSizeBytesHistogram; + private readonly Histogram _workflowInputSizeBytesHistogram; // --- gauges --- private readonly ConcurrentDictionary _activeWorkerCounts = new ConcurrentDictionary(); @@ -143,6 +151,18 @@ public MetricsCollector() unit: "s", description: "HTTP API client request duration in seconds"); + // --- size histograms (non-canonical names while spec uses gauges) --- + + _taskResultSizeBytesHistogram = _meter.CreateHistogram( + "task_result_size_bytes_histogram", + unit: "By", + description: "Distribution of task result payload sizes in bytes"); + + _workflowInputSizeBytesHistogram = _meter.CreateHistogram( + "workflow_input_size_bytes_histogram", + unit: "By", + description: "Distribution of workflow input payload sizes in bytes"); + // --- canonical size gauges --- _meter.CreateObservableGauge( @@ -218,6 +238,8 @@ public void StartServer(int port) .AddView("task_execute_time_seconds", new ExplicitBucketHistogramConfiguration { Boundaries = CanonicalTimeBuckets }) .AddView("task_update_time_seconds", new ExplicitBucketHistogramConfiguration { Boundaries = CanonicalTimeBuckets }) .AddView("http_api_client_request_seconds", new ExplicitBucketHistogramConfiguration { Boundaries = CanonicalTimeBuckets }) + .AddView("task_result_size_bytes_histogram", new ExplicitBucketHistogramConfiguration { Boundaries = CanonicalSizeBuckets }) + .AddView("workflow_input_size_bytes_histogram", new ExplicitBucketHistogramConfiguration { Boundaries = CanonicalSizeBuckets }) .AddPrometheusHttpListener(options => { options.UriPrefixes = new[] { $"http://*:{port}/" }; @@ -304,11 +326,16 @@ public void RecordTaskUpdateError(string taskType, string exceptionType) public void RecordTaskResultSize(string taskType, double sizeBytes) { _taskResultSizes[taskType] = sizeBytes; + _taskResultSizeBytesHistogram.Record(sizeBytes, + new KeyValuePair("taskType", taskType)); } public void RecordWorkflowInputSize(string workflowType, string version, double sizeBytes) { _workflowInputSizes[workflowType + "\0" + (version ?? "")] = sizeBytes; + _workflowInputSizeBytesHistogram.Record(sizeBytes, + new KeyValuePair("workflowType", workflowType), + new KeyValuePair("version", version ?? "")); } // --------------------------------------------------------------- diff --git a/Tests/Telemetry/MetricsCollectorTests.cs b/Tests/Telemetry/MetricsCollectorTests.cs index 8a1483a0..e5fc18eb 100644 --- a/Tests/Telemetry/MetricsCollectorTests.cs +++ b/Tests/Telemetry/MetricsCollectorTests.cs @@ -52,11 +52,11 @@ public void Dispose() } // --------------------------------------------------------------- - // Counters — dual-emit label checks + // Counters // --------------------------------------------------------------- [Fact] - public void RecordTaskPoll_EmitsDualLabels() + public void RecordTaskPoll_EmitsCanonicalLabels() { _sut.RecordTaskPoll("my_task"); @@ -64,79 +64,69 @@ public void RecordTaskPoll_EmitsDualLabels() Assert.Equal("task_poll_total", m.Name); Assert.Equal(1L, m.Value); AssertTag(m, "taskType", "my_task"); - AssertTag(m, "task_type", "my_task"); } [Fact] - public void RecordTaskExecutionStarted_EmitsDualLabels() + public void RecordTaskExecutionStarted_EmitsCanonicalLabels() { _sut.RecordTaskExecutionStarted("my_task"); var m = Assert.Single(_recorded); Assert.Equal("task_execution_started_total", m.Name); AssertTag(m, "taskType", "my_task"); - AssertTag(m, "task_type", "my_task"); } [Fact] - public void RecordTaskPollError_EmitsDualLabels() + public void RecordTaskPollError_EmitsCanonicalLabels() { _sut.RecordTaskPollError("my_task", "TimeoutException"); var m = Assert.Single(_recorded); Assert.Equal("task_poll_error_total", m.Name); AssertTag(m, "taskType", "my_task"); - AssertTag(m, "task_type", "my_task"); AssertTag(m, "exception", "TimeoutException"); - AssertTag(m, "error_type", "TimeoutException"); } [Fact] - public void RecordTaskExecuteError_EmitsDualLabels() + public void RecordTaskExecuteError_EmitsCanonicalLabels() { _sut.RecordTaskExecuteError("task_a", "NullReferenceException"); var m = Assert.Single(_recorded); Assert.Equal("task_execute_error_total", m.Name); AssertTag(m, "taskType", "task_a"); - AssertTag(m, "task_type", "task_a"); AssertTag(m, "exception", "NullReferenceException"); - AssertTag(m, "error_type", "NullReferenceException"); } [Fact] - public void RecordTaskUpdateError_EmitsExceptionAndDualLabels() + public void RecordTaskUpdateError_EmitsCanonicalLabels() { _sut.RecordTaskUpdateError("task_b", "HttpRequestException"); var m = Assert.Single(_recorded); Assert.Equal("task_update_error_total", m.Name); AssertTag(m, "taskType", "task_b"); - AssertTag(m, "task_type", "task_b"); AssertTag(m, "exception", "HttpRequestException"); - AssertTag(m, "error_type", "HttpRequestException"); } [Fact] - public void RecordTaskPaused_EmitsDualLabels() + public void RecordTaskPaused_EmitsCanonicalLabels() { _sut.RecordTaskPaused("paused_task"); var m = Assert.Single(_recorded); Assert.Equal("task_paused_total", m.Name); AssertTag(m, "taskType", "paused_task"); - AssertTag(m, "task_type", "paused_task"); } [Fact] - public void RecordTaskExecutionQueueFull_EmitsDualLabels() + public void RecordTaskExecutionQueueFull_EmitsCanonicalLabels() { _sut.RecordTaskExecutionQueueFull("busy_task"); var m = Assert.Single(_recorded); Assert.Equal("task_execution_queue_full_total", m.Name); AssertTag(m, "taskType", "busy_task"); - AssertTag(m, "task_type", "busy_task"); } [Fact] @@ -151,51 +141,47 @@ public void RecordUncaughtException_EmitsExceptionLabel() } [Fact] - public void RecordWorkflowStartError_EmitsExceptionAndDualLabels() + public void RecordWorkflowStartError_EmitsCanonicalLabels() { _sut.RecordWorkflowStartError("my_workflow", "ApiException"); var m = Assert.Single(_recorded); Assert.Equal("workflow_start_error_total", m.Name); AssertTag(m, "workflowType", "my_workflow"); - AssertTag(m, "workflow_type", "my_workflow"); AssertTag(m, "exception", "ApiException"); } [Fact] - public void RecordExternalPayloadUsed_EmitsDualLabels() + public void RecordExternalPayloadUsed_EmitsCanonicalLabels() { _sut.RecordExternalPayloadUsed("entity", "READ", "TASK_INPUT"); var m = Assert.Single(_recorded); Assert.Equal("external_payload_used_total", m.Name); AssertTag(m, "entityName", "entity"); - AssertTag(m, "entity_name", "entity"); AssertTag(m, "operation", "READ"); - AssertTag(m, "payload_type", "TASK_INPUT"); + AssertTag(m, "payloadType", "TASK_INPUT"); } [Fact] - public void RecordTaskAckError_EmitsDualLabels() + public void RecordTaskAckError_EmitsCanonicalLabels() { _sut.RecordTaskAckError("my_task", "HttpRequestException"); var m = Assert.Single(_recorded); Assert.Equal("task_ack_error_total", m.Name); AssertTag(m, "taskType", "my_task"); - AssertTag(m, "task_type", "my_task"); AssertTag(m, "exception", "HttpRequestException"); } [Fact] - public void RecordTaskAckFailed_EmitsDualLabels() + public void RecordTaskAckFailed_EmitsCanonicalLabels() { _sut.RecordTaskAckFailed("my_task"); var m = Assert.Single(_recorded); Assert.Equal("task_ack_failed_total", m.Name); AssertTag(m, "taskType", "my_task"); - AssertTag(m, "task_type", "my_task"); } // --------------------------------------------------------------- @@ -211,7 +197,6 @@ public void RecordTaskPollTime_IncludesStatusLabel() Assert.Equal("task_poll_time_seconds", m.Name); Assert.Equal(0.123, m.Value); AssertTag(m, "taskType", "fast_task"); - AssertTag(m, "task_type", "fast_task"); AssertTag(m, "status", "SUCCESS"); } @@ -224,7 +209,6 @@ public void RecordTaskExecuteTime_IncludesStatusLabel() Assert.Equal("task_execute_time_seconds", m.Name); Assert.Equal(5.5, m.Value); AssertTag(m, "taskType", "slow_task"); - AssertTag(m, "task_type", "slow_task"); AssertTag(m, "status", "FAILURE"); } @@ -237,7 +221,6 @@ public void RecordTaskUpdateTime_IncludesStatusLabel() Assert.Equal("task_update_time_seconds", m.Name); Assert.Equal(0.05, m.Value); AssertTag(m, "taskType", "task_c"); - AssertTag(m, "task_type", "task_c"); AssertTag(m, "status", "SUCCESS"); } @@ -255,11 +238,11 @@ public void RecordHttpApiClientRequest_RecordsDuration() } // --------------------------------------------------------------- - // Size metrics — gauge + deprecated histogram + // Size metrics — gauge + histogram // --------------------------------------------------------------- [Fact] - public void RecordTaskResultSize_EmitsDeprecatedHistogram() + public void RecordTaskResultSize_EmitsHistogram() { _sut.RecordTaskResultSize("task_d", 1024.0); @@ -267,7 +250,6 @@ public void RecordTaskResultSize_EmitsDeprecatedHistogram() Assert.Equal("task_result_size_bytes_histogram", m.Name); Assert.Equal(1024.0, m.Value); AssertTag(m, "taskType", "task_d"); - AssertTag(m, "task_type", "task_d"); } [Fact] @@ -291,11 +273,10 @@ public void RecordTaskResultSize_ExposedViaGauge() var match = Assert.Single(gaugeValues); Assert.Equal(2048.0, (double)match.Value); AssertTag(match, "taskType", "task_d"); - AssertTag(match, "task_type", "task_d"); } [Fact] - public void RecordWorkflowInputSize_EmitsDeprecatedHistogram() + public void RecordWorkflowInputSize_EmitsHistogram() { _sut.RecordWorkflowInputSize("wf_type", "2", 2048.0); @@ -303,7 +284,6 @@ public void RecordWorkflowInputSize_EmitsDeprecatedHistogram() Assert.Equal("workflow_input_size_bytes_histogram", m.Name); Assert.Equal(2048.0, m.Value); AssertTag(m, "workflowType", "wf_type"); - AssertTag(m, "workflow_type", "wf_type"); AssertTag(m, "version", "2"); } @@ -328,7 +308,6 @@ public void RecordWorkflowInputSize_ExposedViaGauge() var match = Assert.Single(gaugeValues); Assert.Equal(4096.0, (double)match.Value); AssertTag(match, "workflowType", "wf_type"); - AssertTag(match, "workflow_type", "wf_type"); AssertTag(match, "version", "v2"); } @@ -337,7 +316,7 @@ public void RecordWorkflowInputSize_ExposedViaGauge() // --------------------------------------------------------------- [Fact] - public void RecordActiveWorkers_ExposedViaObservableGaugeWithDualLabels() + public void RecordActiveWorkers_ExposedViaObservableGauge() { _sut.RecordActiveWorkers("task_x", 3); _sut.RecordActiveWorkers("task_y", 7); @@ -358,12 +337,10 @@ public void RecordActiveWorkers_ExposedViaObservableGaugeWithDualLabels() Assert.Equal(2, gaugeValues.Count); Assert.Contains(gaugeValues, g => (int)g.Value == 3 - && g.Tags.Any(t => t.Key == "taskType" && (string)t.Value == "task_x") - && g.Tags.Any(t => t.Key == "task_type" && (string)t.Value == "task_x")); + && g.Tags.Any(t => t.Key == "taskType" && (string)t.Value == "task_x")); Assert.Contains(gaugeValues, g => (int)g.Value == 7 - && g.Tags.Any(t => t.Key == "taskType" && (string)t.Value == "task_y") - && g.Tags.Any(t => t.Key == "task_type" && (string)t.Value == "task_y")); + && g.Tags.Any(t => t.Key == "taskType" && (string)t.Value == "task_y")); } [Fact] From 0163f5bf872e333b0fd5fab047a4b511f70f6a9a Mon Sep 17 00:00:00 2001 From: Chris Hagglund Date: Fri, 24 Apr 2026 12:51:04 -0600 Subject: [PATCH 3/4] improve alignment of metrics docs with spec --- .../Client/Telemetry/MetricsCollector.cs | 2 +- Harness/README.md | 2 +- Tests/Telemetry/MetricsCollectorTests.cs | 2 +- METRICS.md => docs/metrics.md | 150 ++++++++++-------- docs/readme/workers.md | 17 +- 5 files changed, 89 insertions(+), 84 deletions(-) rename METRICS.md => docs/metrics.md (66%) diff --git a/Conductor/Client/Telemetry/MetricsCollector.cs b/Conductor/Client/Telemetry/MetricsCollector.cs index a46876f5..cd9e3ac9 100644 --- a/Conductor/Client/Telemetry/MetricsCollector.cs +++ b/Conductor/Client/Telemetry/MetricsCollector.cs @@ -405,7 +405,7 @@ public void RecordExternalPayloadUsed(string entityName, string operation, strin _externalPayloadUsedTotal.Add(1, new KeyValuePair("entityName", entityName), new KeyValuePair("operation", operation), - new KeyValuePair("payloadType", payloadType)); + new KeyValuePair("payload_type", payloadType)); } // --------------------------------------------------------------- diff --git a/Harness/README.md b/Harness/README.md index 8a7ae863..df48b5d1 100644 --- a/Harness/README.md +++ b/Harness/README.md @@ -58,7 +58,7 @@ All resource names use a `csharp_` prefix so multiple SDK harnesses (Python, Jav ### Metrics The harness exposes Prometheus metrics at `http://localhost:9991/metrics` via the OpenTelemetry -Prometheus exporter. See [METRICS.md](../METRICS.md) for the full list of metrics, labels, and +Prometheus exporter. See [metrics.md](../docs/metrics.md) for the full list of metrics, labels, and configuration options. ### Environment Variables diff --git a/Tests/Telemetry/MetricsCollectorTests.cs b/Tests/Telemetry/MetricsCollectorTests.cs index e5fc18eb..6aa222ff 100644 --- a/Tests/Telemetry/MetricsCollectorTests.cs +++ b/Tests/Telemetry/MetricsCollectorTests.cs @@ -160,7 +160,7 @@ public void RecordExternalPayloadUsed_EmitsCanonicalLabels() Assert.Equal("external_payload_used_total", m.Name); AssertTag(m, "entityName", "entity"); AssertTag(m, "operation", "READ"); - AssertTag(m, "payloadType", "TASK_INPUT"); + AssertTag(m, "payload_type", "TASK_INPUT"); } [Fact] diff --git a/METRICS.md b/docs/metrics.md similarity index 66% rename from METRICS.md rename to docs/metrics.md index 09fcea53..475ca135 100644 --- a/METRICS.md +++ b/docs/metrics.md @@ -17,8 +17,7 @@ compatible with any .NET metrics listener -- most notably the - [Histograms](#histograms) - [Gauges](#gauges) - [Labels](#labels) - - [Dual-Emit Strategy (Phase 1)](#dual-emit-strategy-phase-1) - - [Deprecated Labels](#deprecated-labels) +- [Nonstandard Metrics](#nonstandard-metrics) - [Histogram Bucket Boundaries](#histogram-bucket-boundaries) - [Example Metrics Output](#example-metrics-output) - [OTel Scope Label](#otel-scope-label) @@ -28,29 +27,29 @@ compatible with any .NET metrics listener -- most notably the All metrics are registered under the meter named `Conductor.Client`. -| Name | Type | Canonical Labels | Deprecated Labels | Description | -|---|---|---|---|---| -| `task_poll_total` | Counter | `taskType` | `task_type` | Total task poll attempts | -| `task_execution_started_total` | Counter | `taskType` | `task_type` | Tasks dispatched to the worker function | -| `task_poll_error_total` | Counter | `taskType`, `exception` | `task_type`, `error_type` | Total task poll errors | -| `task_execute_error_total` | Counter | `taskType`, `exception` | `task_type`, `error_type` | Total task execution errors | -| `task_update_error_total` | Counter | `taskType`, `exception` | `task_type`, `error_type` | Total task update errors (after all retries) | -| `task_ack_error_total` | Counter | `taskType`, `exception` | `task_type` | Task ack client-side errors (surface-only) | -| `task_ack_failed_total` | Counter | `taskType` | `task_type` | Task ack declined by server (surface-only) | -| `task_paused_total` | Counter | `taskType` | `task_type` | Polls skipped because the worker is paused | -| `task_execution_queue_full_total` | Counter | `taskType` | `task_type` | Polls returning zero capacity (all workers busy) | -| `thread_uncaught_exceptions_total` | Counter | `exception` | -- | Uncaught exceptions in worker threads | -| `workflow_start_error_total` | Counter | `workflowType`, `exception` | `workflow_type` | Errors starting workflows | -| `external_payload_used_total` | Counter | `entityName`, `operation`, `payload_type` | `entity_name` | External payload storage usage | -| `task_poll_time_seconds` | Histogram | `taskType`, `status` | `task_type` | Task poll round-trip duration (seconds) | -| `task_execute_time_seconds` | Histogram | `taskType`, `status` | `task_type` | Task execution duration (seconds) | -| `task_update_time_seconds` | Histogram | `taskType`, `status` | `task_type` | Task result update duration (seconds) | -| `http_api_client_request_seconds` | Histogram | `method`, `uri`, `status` | -- | HTTP API client request duration (seconds) | -| `task_result_size_bytes` | **Gauge** | `taskType` | `task_type` | Task result payload size (bytes) — last value | -| `task_result_size_bytes_histogram` | Histogram | `taskType` | `task_type` | **[DEPRECATED]** Task result payload size (bytes) | -| `workflow_input_size_bytes` | **Gauge** | `workflowType`, `version` | `workflow_type` | Workflow input payload size (bytes) — last value | -| `workflow_input_size_bytes_histogram` | Histogram | `workflowType`, `version` | `workflow_type` | **[DEPRECATED]** Workflow input payload size (bytes) | -| `active_workers` | Gauge | `taskType` | `task_type` | Workers currently executing tasks | +| Name | Type | Labels | Description | +|---|---|---|---| +| `task_poll_total` | Counter | `taskType` | Total task poll attempts | +| `task_execution_started_total` | Counter | `taskType` | Tasks dispatched to the worker function | +| `task_poll_error_total` | Counter | `taskType`, `exception` | Total task poll errors | +| `task_execute_error_total` | Counter | `taskType`, `exception` | Total task execution errors | +| `task_update_error_total` | Counter | `taskType`, `exception` | Total task update errors (after all retries) | +| `task_ack_error_total` | Counter | `taskType`, `exception` | Task ack client-side errors (surface-only) | +| `task_ack_failed_total` | Counter | `taskType` | Task ack declined by server (surface-only) | +| `task_paused_total` | Counter | `taskType` | Polls skipped because the worker is paused | +| `task_execution_queue_full_total` | Counter | `taskType` | Polls returning zero capacity (all workers busy) | +| `thread_uncaught_exceptions_total` | Counter | `exception` | Uncaught exceptions in worker threads | +| `workflow_start_error_total` | Counter | `workflowType`, `exception` | Errors starting workflows | +| `external_payload_used_total` | Counter | `entityName`, `operation`, `payload_type` | External payload storage usage | +| `task_poll_time_seconds` | Histogram | `taskType`, `status` | Task poll round-trip duration (seconds) | +| `task_execute_time_seconds` | Histogram | `taskType`, `status` | Task execution duration (seconds) | +| `task_update_time_seconds` | Histogram | `taskType`, `status` | Task result update duration (seconds) | +| `http_api_client_request_seconds` | Histogram | `method`, `uri`, `status` | HTTP API client request duration (seconds) | +| `task_result_size_bytes` | Gauge | `taskType` | Task result payload size (bytes) — last value | +| `workflow_input_size_bytes` | Gauge | `workflowType`, `version` | Workflow input payload size (bytes) — last value | +| `active_workers` | Gauge | `taskType` | Workers currently executing tasks | +| `task_result_size_bytes_histogram` | Histogram | `taskType` | Task result payload size (bytes) — **nonstandard**, see [below](#nonstandard-metrics) | +| `workflow_input_size_bytes_histogram` | Histogram | `workflowType`, `version` | Workflow input payload size (bytes) — **nonstandard**, see [below](#nonstandard-metrics) | ## Configuration @@ -168,8 +167,6 @@ Distribution metrics with sum, count, and bucket breakdowns. All time values are | `task_execute_time_seconds` | `taskType`, `status` | seconds | Wall-clock time inside `worker.Execute()`. `status` is `SUCCESS` or `FAILURE`. | | `task_update_time_seconds` | `taskType`, `status` | seconds | Wall-clock time for the update call (including retries). `status` is `SUCCESS` or `FAILURE`. | | `http_api_client_request_seconds` | `method`, `uri`, `status` | seconds | Every HTTP request made by the API client. `method` is the HTTP verb, `uri` is the request path, `status` is the HTTP status code (or `"0"` on transport failure). | -| `task_result_size_bytes_histogram` | `taskType` | bytes | **[DEPRECATED]** JSON-serialized size of `TaskResult.OutputData`. Use the `task_result_size_bytes` Gauge instead. | -| `workflow_input_size_bytes_histogram` | `workflowType`, `version` | bytes | **[DEPRECATED]** Workflow input payload size. Use the `workflow_input_size_bytes` Gauge instead. | ### Gauges @@ -183,30 +180,53 @@ Point-in-time values sampled by the metrics listener. ## Labels -### Dual-Emit Strategy (Phase 1) - -As part of the cross-SDK metrics harmonization, every metric now emits **both** the canonical -camelCase label and the legacy snake_case label with the same value. This ensures existing -dashboards and alerts continue to work during migration. +The C# SDK uses the canonical label names defined by the cross-SDK metrics harmonization spec. -| Canonical (new) | Legacy (deprecated) | Used By | +| Label | Used by | Values | |---|---|---| -| `taskType` | `task_type` | Most task metrics | -| `exception` | `error_type` | Error counters | -| `workflowType` | `workflow_type` | Workflow metrics | -| `entityName` | `entity_name` | `external_payload_used_total` | +| `taskType` | Most task metrics | The task definition name (e.g. `my_worker`). | +| `workflowType` | Workflow metrics | The workflow definition name. | +| `entityName` | `external_payload_used_total` | The entity identifier. | +| `exception` | Error counters, `thread_uncaught_exceptions_total` | The .NET exception class name (e.g. `HttpRequestException`). | +| `status` | Time histograms | `SUCCESS` or `FAILURE`. | +| `method` | `http_api_client_request_seconds` | HTTP verb (`GET`, `POST`, etc.). | +| `uri` | `http_api_client_request_seconds` | Request path (interpolated, not templated). | +| `operation` | `external_payload_used_total` | `READ` or `WRITE`. | +| `payload_type` | `external_payload_used_total` | `TASK_INPUT`, `TASK_OUTPUT`, `WORKFLOW_INPUT`, or `WORKFLOW_OUTPUT`. | +| `version` | `workflow_input_size_bytes` | Workflow version string. | + +**Label naming convention.** Labels referencing Conductor domain entities (`taskType`, +`workflowType`, `entityName`) use camelCase to match the Conductor API's JSON field names. +Generic observability labels (`status`, `method`, `uri`, `exception`, `operation`, +`payload_type`, `version`) follow the standard Prometheus snake_case / lowercase convention. + +> **Note:** Unlike some other Conductor SDKs that are undergoing a migration from legacy +> label names, the C# SDK's metrics surface is entirely new and emits only canonical labels. +> There are no legacy/deprecated label aliases. + +## Nonstandard Metrics + +The C# SDK emits two additional **histogram** instruments for payload sizes that are not part +of the cross-SDK canonical catalog: -Labels that are new (no legacy equivalent): `status` on time histograms, `method`/`uri`/`status` -on `http_api_client_request_seconds`, `version` on `workflow_input_size_bytes`. +| Name | Labels | Unit | Description | +|---|---|---|---| +| `task_result_size_bytes_histogram` | `taskType` | bytes | Distribution of task result payload sizes. | +| `workflow_input_size_bytes_histogram` | `workflowType`, `version` | bytes | Distribution of workflow input payload sizes. | -### Deprecated Labels +The canonical catalog defines size metrics as **last-value Gauges** (`task_result_size_bytes`, +`workflow_input_size_bytes`), which is what the SDK emits as its primary size instruments. +These histogram variants provide distribution information (bucket breakdowns, count, sum) that +the gauge shape cannot offer -- for example, computing p99 result sizes via +`histogram_quantile()`. -The following labels are deprecated and will be removed in a future major release: +These histograms are **proposed** for inclusion in a future revision of the canonical catalog +(see the harmonization spec's §5.1 "Future catalog considerations"). They use the proposed +size bucket set `(100, 1000, 10000, 100000, 1000000, 10000000)` bytes. Until the catalog +formally adopts them, they should be considered nonstandard and may change. -- `task_type` — use `taskType` -- `error_type` — use `exception` -- `workflow_type` — use `workflowType` -- `entity_name` — use `entityName` +Both are emitted automatically whenever the corresponding gauge is recorded (i.e. calling +`RecordTaskResultSize` writes to both the gauge and the histogram). ## Histogram Bucket Boundaries @@ -218,7 +238,7 @@ The SDK defines canonical bucket boundaries that match all other Conductor SDKs: 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10 ``` -**Size histograms** (deprecated `*_histogram`): +**Size histograms** (nonstandard `*_histogram`, proposed): ``` 100, 1000, 10000, 100000, 1000000, 10000000 @@ -235,24 +255,24 @@ When scraped by Prometheus (via the built-in server or OpenTelemetry exporter): ```prometheus # HELP task_poll_total Total task poll attempts # TYPE task_poll_total counter -task_poll_total{taskType="my_worker",task_type="my_worker"} 142 +task_poll_total{taskType="my_worker"} 142 # HELP task_execution_started_total Tasks dispatched to the worker function # TYPE task_execution_started_total counter -task_execution_started_total{taskType="my_worker",task_type="my_worker"} 140 +task_execution_started_total{taskType="my_worker"} 140 # HELP task_poll_time_seconds Task poll round-trip duration (seconds) # TYPE task_poll_time_seconds histogram -task_poll_time_seconds_bucket{taskType="my_worker",task_type="my_worker",status="SUCCESS",le="0.001"} 2 -task_poll_time_seconds_bucket{taskType="my_worker",task_type="my_worker",status="SUCCESS",le="0.005"} 12 -task_poll_time_seconds_bucket{taskType="my_worker",task_type="my_worker",status="SUCCESS",le="0.01"} 45 -task_poll_time_seconds_bucket{taskType="my_worker",task_type="my_worker",status="SUCCESS",le="0.025"} 98 -task_poll_time_seconds_bucket{taskType="my_worker",task_type="my_worker",status="SUCCESS",le="0.05"} 120 -task_poll_time_seconds_bucket{taskType="my_worker",task_type="my_worker",status="SUCCESS",le="0.1"} 135 -task_poll_time_seconds_bucket{taskType="my_worker",task_type="my_worker",status="SUCCESS",le="0.25"} 140 -task_poll_time_seconds_bucket{taskType="my_worker",task_type="my_worker",status="SUCCESS",le="+Inf"} 140 -task_poll_time_seconds_sum{taskType="my_worker",task_type="my_worker",status="SUCCESS"} 3.842 -task_poll_time_seconds_count{taskType="my_worker",task_type="my_worker",status="SUCCESS"} 140 +task_poll_time_seconds_bucket{taskType="my_worker",status="SUCCESS",le="0.001"} 2 +task_poll_time_seconds_bucket{taskType="my_worker",status="SUCCESS",le="0.005"} 12 +task_poll_time_seconds_bucket{taskType="my_worker",status="SUCCESS",le="0.01"} 45 +task_poll_time_seconds_bucket{taskType="my_worker",status="SUCCESS",le="0.025"} 98 +task_poll_time_seconds_bucket{taskType="my_worker",status="SUCCESS",le="0.05"} 120 +task_poll_time_seconds_bucket{taskType="my_worker",status="SUCCESS",le="0.1"} 135 +task_poll_time_seconds_bucket{taskType="my_worker",status="SUCCESS",le="0.25"} 140 +task_poll_time_seconds_bucket{taskType="my_worker",status="SUCCESS",le="+Inf"} 140 +task_poll_time_seconds_sum{taskType="my_worker",status="SUCCESS"} 3.842 +task_poll_time_seconds_count{taskType="my_worker",status="SUCCESS"} 140 # HELP http_api_client_request_seconds HTTP API client request duration (seconds) # TYPE http_api_client_request_seconds histogram @@ -261,15 +281,15 @@ http_api_client_request_seconds_bucket{method="GET",uri="/api/tasks/poll/batch/m # HELP task_result_size_bytes Task result payload size (bytes) # TYPE task_result_size_bytes gauge -task_result_size_bytes{taskType="my_worker",task_type="my_worker"} 2048 +task_result_size_bytes{taskType="my_worker"} 2048 # HELP active_workers Workers currently executing tasks # TYPE active_workers gauge -active_workers{taskType="my_worker",task_type="my_worker"} 5 +active_workers{taskType="my_worker"} 5 # HELP task_execution_queue_full_total Polls returning zero capacity # TYPE task_execution_queue_full_total counter -task_execution_queue_full_total{taskType="my_worker",task_type="my_worker"} 3 +task_execution_queue_full_total{taskType="my_worker"} 3 ``` ## OTel Scope Label @@ -277,7 +297,7 @@ task_execution_queue_full_total{taskType="my_worker",task_type="my_worker"} 3 The C# metrics output includes an `otel_scope_name="Conductor.Client"` label on every metric. This is injected by the OpenTelemetry .NET exporter (it identifies the `System.Diagnostics.Metrics.Meter` that owns each instrument). Other SDKs using native Prometheus client libraries do not have this. -This is harmless — it is a standard OTel convention and does not interfere with canonical label queries. +This is harmless — it is a standard OTel convention and does not interfere with label queries. ## Best Practices @@ -311,10 +331,6 @@ This is harmless — it is a standard OTel convention and does not interfere wit histogram_quantile(0.99, rate(task_execute_time_seconds_bucket[5m])) ``` -7. **Migrate dashboards to canonical labels.** Both `taskType` and `task_type` resolve to the - same value during Phase 1. Update your queries to use `taskType` before the deprecated labels - are removed in a future major release. - -8. **Use `WorkflowExecutor` for starting workflows.** It automatically records +7. **Use `WorkflowExecutor` for starting workflows.** It automatically records `workflow_input_size_bytes` and `workflow_start_error_total` when a `MetricsCollector` is injected. diff --git a/docs/readme/workers.md b/docs/readme/workers.md index ca3a3ca0..719e5f57 100644 --- a/docs/readme/workers.md +++ b/docs/readme/workers.md @@ -50,19 +50,8 @@ Thread.Sleep(TimeSpan.FromSeconds(100)); Check out our [integration tests](https://github.com/conductor-sdk/conductor-csharp/blob/92c7580156a89322717c94aeaea9e5201fe577eb/Tests/Worker/WorkerTests.cs#L37) for more examples -Worker SDK collects the following metrics: - - -| Name | Purpose | Tags | -| ------------------ | :------------------------------------------- | -------------------------------- | -| task_poll_error | Client error when polling for a task queue | taskType, includeRetries, status | -| task_execute_error | Execution error | taskType | -| task_update_error | Task status cannot be updated back to server | taskType | -| task_poll_counter | Incremented each time polling is done | taskType | -| task_poll_time | Time to poll for a batch of tasks | taskType | -| task_execute_time | Time to execute a task | taskType | -| task_result_size | Records output payload size of a task | taskType | - -Metrics on client side supplements the one collected from server in identifying the network as well as client side issues. +The worker SDK collects metrics for poll latency, execution time, update time, error rates, queue +utilization, and more. See [metrics.md](../metrics.md) for the full list of metrics, labels, +configuration options, and example Prometheus output. ### Next: [Create and Execute Workflows](https://github.com/conductor-sdk/conductor-csharp/blob/main/docs/readme/workflow.md) From a105770f3b9a7f22312e36294593d591723ff3ee Mon Sep 17 00:00:00 2001 From: Chris Hagglund Date: Tue, 28 Apr 2026 10:46:10 -0600 Subject: [PATCH 4/4] use payloadType instead of payload_type --- Conductor/Client/Telemetry/MetricsCollector.cs | 2 +- Tests/Telemetry/MetricsCollectorTests.cs | 2 +- docs/metrics.md | 9 +++++---- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/Conductor/Client/Telemetry/MetricsCollector.cs b/Conductor/Client/Telemetry/MetricsCollector.cs index cd9e3ac9..a46876f5 100644 --- a/Conductor/Client/Telemetry/MetricsCollector.cs +++ b/Conductor/Client/Telemetry/MetricsCollector.cs @@ -405,7 +405,7 @@ public void RecordExternalPayloadUsed(string entityName, string operation, strin _externalPayloadUsedTotal.Add(1, new KeyValuePair("entityName", entityName), new KeyValuePair("operation", operation), - new KeyValuePair("payload_type", payloadType)); + new KeyValuePair("payloadType", payloadType)); } // --------------------------------------------------------------- diff --git a/Tests/Telemetry/MetricsCollectorTests.cs b/Tests/Telemetry/MetricsCollectorTests.cs index 6aa222ff..e5fc18eb 100644 --- a/Tests/Telemetry/MetricsCollectorTests.cs +++ b/Tests/Telemetry/MetricsCollectorTests.cs @@ -160,7 +160,7 @@ public void RecordExternalPayloadUsed_EmitsCanonicalLabels() Assert.Equal("external_payload_used_total", m.Name); AssertTag(m, "entityName", "entity"); AssertTag(m, "operation", "READ"); - AssertTag(m, "payload_type", "TASK_INPUT"); + AssertTag(m, "payloadType", "TASK_INPUT"); } [Fact] diff --git a/docs/metrics.md b/docs/metrics.md index 475ca135..d01dd0fd 100644 --- a/docs/metrics.md +++ b/docs/metrics.md @@ -40,7 +40,7 @@ All metrics are registered under the meter named `Conductor.Client`. | `task_execution_queue_full_total` | Counter | `taskType` | Polls returning zero capacity (all workers busy) | | `thread_uncaught_exceptions_total` | Counter | `exception` | Uncaught exceptions in worker threads | | `workflow_start_error_total` | Counter | `workflowType`, `exception` | Errors starting workflows | -| `external_payload_used_total` | Counter | `entityName`, `operation`, `payload_type` | External payload storage usage | +| `external_payload_used_total` | Counter | `entityName`, `operation`, `payloadType` | External payload storage usage | | `task_poll_time_seconds` | Histogram | `taskType`, `status` | Task poll round-trip duration (seconds) | | `task_execute_time_seconds` | Histogram | `taskType`, `status` | Task execution duration (seconds) | | `task_update_time_seconds` | Histogram | `taskType`, `status` | Task result update duration (seconds) | @@ -155,7 +155,7 @@ Monotonically increasing values. Prometheus exposes them with a `_total` suffix. | `task_execution_queue_full_total` | `taskType` | Incremented when a poll is skipped because all workers are busy (batch size reached). | | `thread_uncaught_exceptions_total` | `exception` | Incremented on any exception in the top-level poll loop that is not an `OperationCanceledException`. | | `workflow_start_error_total` | `workflowType`, `exception` | Incremented when a `WorkflowExecutor.StartWorkflow()` call fails. | -| `external_payload_used_total` | `entityName`, `operation`, `payload_type` | Incremented when external payload storage is used. | +| `external_payload_used_total` | `entityName`, `operation`, `payloadType` | Incremented when external payload storage is used. | ### Histograms @@ -192,13 +192,14 @@ The C# SDK uses the canonical label names defined by the cross-SDK metrics harmo | `method` | `http_api_client_request_seconds` | HTTP verb (`GET`, `POST`, etc.). | | `uri` | `http_api_client_request_seconds` | Request path (interpolated, not templated). | | `operation` | `external_payload_used_total` | `READ` or `WRITE`. | -| `payload_type` | `external_payload_used_total` | `TASK_INPUT`, `TASK_OUTPUT`, `WORKFLOW_INPUT`, or `WORKFLOW_OUTPUT`. | +| `payloadType` | `external_payload_used_total` | `TASK_INPUT`, `TASK_OUTPUT`, `WORKFLOW_INPUT`, or `WORKFLOW_OUTPUT`. | | `version` | `workflow_input_size_bytes` | Workflow version string. | **Label naming convention.** Labels referencing Conductor domain entities (`taskType`, `workflowType`, `entityName`) use camelCase to match the Conductor API's JSON field names. Generic observability labels (`status`, `method`, `uri`, `exception`, `operation`, -`payload_type`, `version`) follow the standard Prometheus snake_case / lowercase convention. +`version`) follow the standard Prometheus snake_case / lowercase convention. +`payloadType` uses camelCase to match other Conductor domain-entity labels. > **Note:** Unlike some other Conductor SDKs that are undergoing a migration from legacy > label names, the C# SDK's metrics surface is entirely new and emits only canonical labels.