diff --git a/Conductor/Client/ApiClient.cs b/Conductor/Client/ApiClient.cs index 72210312..ce875402 100644 --- a/Conductor/Client/ApiClient.cs +++ b/Conductor/Client/ApiClient.cs @@ -10,12 +10,14 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ +using Conductor.Client.Telemetry; using Newtonsoft.Json; using Newtonsoft.Json.Linq; using RestSharp; using System; using System.Collections; using System.Collections.Generic; +using System.Diagnostics; using System.IO; using System.Linq; using System.Text; @@ -29,6 +31,12 @@ namespace Conductor.Client /// public partial class ApiClient { + /// + /// Optional metrics collector for recording http_api_client_request_seconds. + /// Set once via DI or startup; safe to leave null. + /// + public static MetricsCollector Metrics { get; set; } + public JsonSerializerSettings serializerSettings = new JsonSerializerSettings { ConstructorHandling = ConstructorHandling.AllowNonPublicDefaultConstructor @@ -179,11 +187,30 @@ public Object CallApi( Dictionary fileParams, Dictionary pathParams, String contentType, Configuration configuration) { - int retryCount = 0; - RestResponse response = RetryRestClientCallApi(path, method, queryParams, postBody, headerParams, - formParams, fileParams, pathParams, contentType, configuration, ref retryCount); - - return (Object)response; + var sw = Stopwatch.StartNew(); + string statusCode = "0"; + try + { + int retryCount = 0; + RestResponse response = RetryRestClientCallApi(path, method, queryParams, postBody, headerParams, + formParams, fileParams, pathParams, contentType, configuration, ref retryCount); + statusCode = ((int)response.StatusCode).ToString(); + return (Object)response; + } + catch + { + statusCode = "0"; + throw; + } + finally + { + sw.Stop(); + var resolvedUri = path; + foreach (var param in pathParams) + resolvedUri = resolvedUri.Replace("{" + param.Key + "}", param.Value); + var basePath = RestClient.Options.BaseUrl?.AbsolutePath?.TrimEnd('/') ?? ""; + Metrics?.RecordHttpApiClientRequest(method.ToString().ToUpperInvariant(), basePath + resolvedUri, statusCode, sw.Elapsed.TotalSeconds); + } } private RestResponse RetryRestClientCallApi(String path, Method method, List> queryParams, Object postBody, @@ -226,15 +253,35 @@ public async Task CallApiAsync( Dictionary fileParams, Dictionary pathParams, String contentType) { - var request = PrepareRequest( - path, method, queryParams, postBody, headerParams, formParams, fileParams, - pathParams, contentType); + var sw = Stopwatch.StartNew(); + string statusCode = "0"; + try + { + var request = PrepareRequest( + path, method, queryParams, postBody, headerParams, formParams, fileParams, + pathParams, contentType); - InterceptRequest(request); - var response = await RestClient.ExecuteAsync(request, method); - InterceptResponse(request, response); - FormatHeaders(response); - return (object)response; + InterceptRequest(request); + var response = await RestClient.ExecuteAsync(request, method); + InterceptResponse(request, response); + FormatHeaders(response); + statusCode = ((int)response.StatusCode).ToString(); + return (object)response; + } + catch + { + statusCode = "0"; + throw; + } + finally + { + sw.Stop(); + var resolvedUri = path; + foreach (var param in pathParams) + resolvedUri = resolvedUri.Replace("{" + param.Key + "}", param.Value); + var basePath = RestClient.Options.BaseUrl?.AbsolutePath?.TrimEnd('/') ?? ""; + Metrics?.RecordHttpApiClientRequest(method.ToString().ToUpperInvariant(), basePath + resolvedUri, statusCode, sw.Elapsed.TotalSeconds); + } } /// diff --git a/Conductor/Client/Extensions/DependencyInjectionExtensions.cs b/Conductor/Client/Extensions/DependencyInjectionExtensions.cs index 7dd719d3..f883ce8a 100644 --- a/Conductor/Client/Extensions/DependencyInjectionExtensions.cs +++ b/Conductor/Client/Extensions/DependencyInjectionExtensions.cs @@ -39,7 +39,12 @@ public static IServiceCollection AddConductorWorker(this IServiceCollection serv } services.AddSingleton(configuration); services.AddSingleton(); - services.AddSingleton(); + services.AddSingleton(sp => + { + var collector = new MetricsCollector(); + ApiClient.Metrics = collector; + return collector; + }); services.AddTransient(); return services; } diff --git a/Conductor/Client/Telemetry/MetricsCollector.cs b/Conductor/Client/Telemetry/MetricsCollector.cs index a7ed40fb..a46876f5 100644 --- a/Conductor/Client/Telemetry/MetricsCollector.cs +++ b/Conductor/Client/Telemetry/MetricsCollector.cs @@ -10,9 +10,12 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ +using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics.Metrics; +using OpenTelemetry; +using OpenTelemetry.Metrics; namespace Conductor.Client.Telemetry { @@ -20,18 +23,34 @@ namespace Conductor.Client.Telemetry /// Instruments the Conductor worker poll-execute-update loop with /// counters, histograms, and gauges. /// - /// The is named Conductor.Client. To expose these - /// metrics, attach a listener such as the OpenTelemetry Prometheus exporter - /// (see METRICS.md for examples). + /// The is named Conductor.Client. Call + /// to expose a Prometheus-compatible + /// /metrics endpoint with the canonical bucket configuration, + /// or attach your own using + /// . /// - public sealed class MetricsCollector + public sealed class MetricsCollector : IDisposable { public const string MeterName = "Conductor.Client"; + /// + /// Canonical time histogram buckets shared across all Conductor SDKs. + /// + public static readonly double[] CanonicalTimeBuckets = + { 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10 }; + + /// + /// Proposed size histogram buckets (in bytes); if accepted, will be shared across all Conductor SDKs. + /// + public static readonly double[] CanonicalSizeBuckets = + { 100, 1_000, 10_000, 100_000, 1_000_000, 10_000_000 }; + private readonly Meter _meter; + private MeterProvider _meterProvider; // --- counters --- private readonly Counter _taskPollTotal; + private readonly Counter _taskExecutionStartedTotal; private readonly Counter _taskPollErrorTotal; private readonly Counter _taskExecuteErrorTotal; private readonly Counter _taskUpdateErrorTotal; @@ -40,25 +59,36 @@ public sealed class MetricsCollector private readonly Counter _threadUncaughtExceptionsTotal; private readonly Counter _workflowStartErrorTotal; private readonly Counter _externalPayloadUsedTotal; + private readonly Counter _taskAckErrorTotal; + private readonly Counter _taskAckFailedTotal; // --- histograms --- private readonly Histogram _taskPollTimeSeconds; private readonly Histogram _taskExecuteTimeSeconds; private readonly Histogram _taskUpdateTimeSeconds; - private readonly Histogram _taskResultSizeBytes; - private readonly Histogram _workflowInputSizeBytes; + private readonly Histogram _httpApiClientRequestSeconds; + private readonly Histogram _taskResultSizeBytesHistogram; + private readonly Histogram _workflowInputSizeBytesHistogram; // --- gauges --- private readonly ConcurrentDictionary _activeWorkerCounts = new ConcurrentDictionary(); + private readonly ConcurrentDictionary _taskResultSizes = new ConcurrentDictionary(); + private readonly ConcurrentDictionary _workflowInputSizes = new ConcurrentDictionary(); public MetricsCollector() { _meter = new Meter(MeterName); + // --- counters --- + _taskPollTotal = _meter.CreateCounter( "task_poll_total", description: "Total number of task poll attempts"); + _taskExecutionStartedTotal = _meter.CreateCounter( + "task_execution_started_total", + description: "Tasks dispatched to the worker function"); + _taskPollErrorTotal = _meter.CreateCounter( "task_poll_error_total", description: "Total number of task poll errors"); @@ -91,6 +121,16 @@ public MetricsCollector() "external_payload_used_total", description: "External payload storage usage"); + _taskAckErrorTotal = _meter.CreateCounter( + "task_ack_error_total", + description: "Task ack client-side errors"); + + _taskAckFailedTotal = _meter.CreateCounter( + "task_ack_failed_total", + description: "Task ack declined by server"); + + // --- time histograms --- + _taskPollTimeSeconds = _meter.CreateHistogram( "task_poll_time_seconds", unit: "s", @@ -106,16 +146,63 @@ public MetricsCollector() unit: "s", description: "Task result update duration in seconds"); - _taskResultSizeBytes = _meter.CreateHistogram( + _httpApiClientRequestSeconds = _meter.CreateHistogram( + "http_api_client_request_seconds", + unit: "s", + description: "HTTP API client request duration in seconds"); + + // --- size histograms (non-canonical names while spec uses gauges) --- + + _taskResultSizeBytesHistogram = _meter.CreateHistogram( + "task_result_size_bytes_histogram", + unit: "By", + description: "Distribution of task result payload sizes in bytes"); + + _workflowInputSizeBytesHistogram = _meter.CreateHistogram( + "workflow_input_size_bytes_histogram", + unit: "By", + description: "Distribution of workflow input payload sizes in bytes"); + + // --- canonical size gauges --- + + _meter.CreateObservableGauge( "task_result_size_bytes", + observeValues: () => + { + var measurements = new List>(); + foreach (var kvp in _taskResultSizes) + { + measurements.Add(new Measurement( + kvp.Value, + new KeyValuePair("taskType", kvp.Key))); + } + return measurements; + }, unit: "By", description: "Size of task result payload in bytes"); - _workflowInputSizeBytes = _meter.CreateHistogram( + _meter.CreateObservableGauge( "workflow_input_size_bytes", + observeValues: () => + { + var measurements = new List>(); + foreach (var kvp in _workflowInputSizes) + { + var parts = kvp.Key.Split('\0'); + var wfType = parts[0]; + var version = parts.Length > 1 ? parts[1] : ""; + measurements.Add(new Measurement( + kvp.Value, + new KeyValuePair("workflowType", wfType), + new KeyValuePair("version", version))); + } + return measurements; + }, unit: "By", description: "Size of workflow input payload in bytes"); + // --- utilization gauge --- + _meter.CreateObservableGauge( "active_workers", observeValues: () => @@ -125,66 +212,111 @@ public MetricsCollector() { measurements.Add(new Measurement( kvp.Value, - new KeyValuePair("task_type", kvp.Key))); + new KeyValuePair("taskType", kvp.Key))); } return measurements; }, description: "Number of workers currently executing tasks"); } + // --------------------------------------------------------------- + // Built-in Prometheus server + // --------------------------------------------------------------- + + /// + /// Starts an OpenTelemetry Prometheus HTTP listener on the given port + /// with the canonical bucket configuration for all histograms. + /// + public void StartServer(int port) + { + if (_meterProvider != null) + throw new InvalidOperationException("Metrics server already started"); + + _meterProvider = Sdk.CreateMeterProviderBuilder() + .AddMeter(MeterName) + .AddView("task_poll_time_seconds", new ExplicitBucketHistogramConfiguration { Boundaries = CanonicalTimeBuckets }) + .AddView("task_execute_time_seconds", new ExplicitBucketHistogramConfiguration { Boundaries = CanonicalTimeBuckets }) + .AddView("task_update_time_seconds", new ExplicitBucketHistogramConfiguration { Boundaries = CanonicalTimeBuckets }) + .AddView("http_api_client_request_seconds", new ExplicitBucketHistogramConfiguration { Boundaries = CanonicalTimeBuckets }) + .AddView("task_result_size_bytes_histogram", new ExplicitBucketHistogramConfiguration { Boundaries = CanonicalSizeBuckets }) + .AddView("workflow_input_size_bytes_histogram", new ExplicitBucketHistogramConfiguration { Boundaries = CanonicalSizeBuckets }) + .AddPrometheusHttpListener(options => + { + options.UriPrefixes = new[] { $"http://*:{port}/" }; + }) + .Build(); + } + + public void Dispose() + { + _meterProvider?.Dispose(); + _meter?.Dispose(); + } + // --------------------------------------------------------------- // Poll // --------------------------------------------------------------- public void RecordTaskPoll(string taskType) { - _taskPollTotal.Add(1, new KeyValuePair("task_type", taskType)); + _taskPollTotal.Add(1, + new KeyValuePair("taskType", taskType)); } - public void RecordTaskPollTime(string taskType, double durationSeconds) + public void RecordTaskPollTime(string taskType, double durationSeconds, string status) { _taskPollTimeSeconds.Record(durationSeconds, - new KeyValuePair("task_type", taskType)); + new KeyValuePair("taskType", taskType), + new KeyValuePair("status", status)); } - public void RecordTaskPollError(string taskType, string errorType) + public void RecordTaskPollError(string taskType, string exceptionType) { _taskPollErrorTotal.Add(1, - new KeyValuePair("task_type", taskType), - new KeyValuePair("error_type", errorType)); + new KeyValuePair("taskType", taskType), + new KeyValuePair("exception", exceptionType)); } // --------------------------------------------------------------- // Execution // --------------------------------------------------------------- - public void RecordTaskExecuteTime(string taskType, double durationSeconds) + public void RecordTaskExecutionStarted(string taskType) + { + _taskExecutionStartedTotal.Add(1, + new KeyValuePair("taskType", taskType)); + } + + public void RecordTaskExecuteTime(string taskType, double durationSeconds, string status) { _taskExecuteTimeSeconds.Record(durationSeconds, - new KeyValuePair("task_type", taskType)); + new KeyValuePair("taskType", taskType), + new KeyValuePair("status", status)); } - public void RecordTaskExecuteError(string taskType, string errorType) + public void RecordTaskExecuteError(string taskType, string exceptionType) { _taskExecuteErrorTotal.Add(1, - new KeyValuePair("task_type", taskType), - new KeyValuePair("error_type", errorType)); + new KeyValuePair("taskType", taskType), + new KeyValuePair("exception", exceptionType)); } // --------------------------------------------------------------- // Update // --------------------------------------------------------------- - public void RecordTaskUpdateTime(string taskType, double durationSeconds) + public void RecordTaskUpdateTime(string taskType, double durationSeconds, string status) { _taskUpdateTimeSeconds.Record(durationSeconds, - new KeyValuePair("task_type", taskType)); + new KeyValuePair("taskType", taskType), + new KeyValuePair("status", status)); } - public void RecordTaskUpdateError(string taskType) + public void RecordTaskUpdateError(string taskType, string exceptionType) { _taskUpdateErrorTotal.Add(1, - new KeyValuePair("task_type", taskType)); + new KeyValuePair("taskType", taskType), + new KeyValuePair("exception", exceptionType)); } // --------------------------------------------------------------- @@ -193,15 +325,29 @@ public void RecordTaskUpdateError(string taskType) public void RecordTaskResultSize(string taskType, double sizeBytes) { - _taskResultSizeBytes.Record(sizeBytes, - new KeyValuePair("task_type", taskType)); + _taskResultSizes[taskType] = sizeBytes; + _taskResultSizeBytesHistogram.Record(sizeBytes, + new KeyValuePair("taskType", taskType)); } public void RecordWorkflowInputSize(string workflowType, string version, double sizeBytes) { - _workflowInputSizeBytes.Record(sizeBytes, - new KeyValuePair("workflow_type", workflowType), - new KeyValuePair("version", version)); + _workflowInputSizes[workflowType + "\0" + (version ?? "")] = sizeBytes; + _workflowInputSizeBytesHistogram.Record(sizeBytes, + new KeyValuePair("workflowType", workflowType), + new KeyValuePair("version", version ?? "")); + } + + // --------------------------------------------------------------- + // HTTP API client + // --------------------------------------------------------------- + + public void RecordHttpApiClientRequest(string method, string uri, string status, double durationSeconds) + { + _httpApiClientRequestSeconds.Record(durationSeconds, + new KeyValuePair("method", method), + new KeyValuePair("uri", uri), + new KeyValuePair("status", status)); } // --------------------------------------------------------------- @@ -211,13 +357,13 @@ public void RecordWorkflowInputSize(string workflowType, string version, double public void RecordTaskExecutionQueueFull(string taskType) { _taskExecutionQueueFullTotal.Add(1, - new KeyValuePair("task_type", taskType)); + new KeyValuePair("taskType", taskType)); } public void RecordTaskPaused(string taskType) { _taskPausedTotal.Add(1, - new KeyValuePair("task_type", taskType)); + new KeyValuePair("taskType", taskType)); } // --------------------------------------------------------------- @@ -230,26 +376,53 @@ public void RecordActiveWorkers(string taskType, int count) } // --------------------------------------------------------------- - // Uncategorised + // Uncaught exceptions // --------------------------------------------------------------- - public void RecordUncaughtException() + public void RecordUncaughtException(string exceptionType) { - _threadUncaughtExceptionsTotal.Add(1); + _threadUncaughtExceptionsTotal.Add(1, + new KeyValuePair("exception", exceptionType)); } - public void RecordWorkflowStartError(string workflowType) + // --------------------------------------------------------------- + // Workflow + // --------------------------------------------------------------- + + public void RecordWorkflowStartError(string workflowType, string exceptionType) { _workflowStartErrorTotal.Add(1, - new KeyValuePair("workflow_type", workflowType)); + new KeyValuePair("workflowType", workflowType), + new KeyValuePair("exception", exceptionType)); } + // --------------------------------------------------------------- + // External payload + // --------------------------------------------------------------- + public void RecordExternalPayloadUsed(string entityName, string operation, string payloadType) { _externalPayloadUsedTotal.Add(1, - new KeyValuePair("entity_name", entityName), + new KeyValuePair("entityName", entityName), new KeyValuePair("operation", operation), - new KeyValuePair("payload_type", payloadType)); + new KeyValuePair("payloadType", payloadType)); + } + + // --------------------------------------------------------------- + // Surface-only ack counters (internal runner never increments) + // --------------------------------------------------------------- + + public void RecordTaskAckError(string taskType, string exceptionType) + { + _taskAckErrorTotal.Add(1, + new KeyValuePair("taskType", taskType), + new KeyValuePair("exception", exceptionType)); + } + + public void RecordTaskAckFailed(string taskType) + { + _taskAckFailedTotal.Add(1, + new KeyValuePair("taskType", taskType)); } } } diff --git a/Conductor/Client/Worker/WorkflowTaskExecutor.cs b/Conductor/Client/Worker/WorkflowTaskExecutor.cs index 180a4c35..9a7c89e1 100644 --- a/Conductor/Client/Worker/WorkflowTaskExecutor.cs +++ b/Conductor/Client/Worker/WorkflowTaskExecutor.cs @@ -108,7 +108,7 @@ private void Work4Ever(CancellationToken token) } catch (Exception e) { - _metrics?.RecordUncaughtException(); + _metrics?.RecordUncaughtException(e.GetType().Name); _logger.LogError( $"[{_workerSettings.WorkerId}] worker error: {e.Message}" + $", taskName: {_worker.TaskType}" @@ -171,7 +171,7 @@ private async void WorkOnce(CancellationToken token) var tasks = _taskClient.PollTask(_worker.TaskType, _workerSettings.WorkerId, _workerSettings.Domain, availableWorkerCounter); pollStopwatch.Stop(); - _metrics?.RecordTaskPollTime(_worker.TaskType, pollStopwatch.Elapsed.TotalSeconds); + _metrics?.RecordTaskPollTime(_worker.TaskType, pollStopwatch.Elapsed.TotalSeconds, "SUCCESS"); if (tasks == null) { @@ -189,7 +189,7 @@ private async void WorkOnce(CancellationToken token) catch (Exception e) { pollStopwatch.Stop(); - _metrics?.RecordTaskPollTime(_worker.TaskType, pollStopwatch.Elapsed.TotalSeconds); + _metrics?.RecordTaskPollTime(_worker.TaskType, pollStopwatch.Elapsed.TotalSeconds, "FAILURE"); _metrics?.RecordTaskPollError(_worker.TaskType, e.GetType().Name); _logger.LogTrace( $"[{_workerSettings.WorkerId}] Polling error: {e.Message} " @@ -235,6 +235,7 @@ private async void ProcessTask(Models.Task task, CancellationToken token) + $", CancelToken: {token}" ); + _metrics?.RecordTaskExecutionStarted(_worker.TaskType); var executeStopwatch = Stopwatch.StartNew(); try { @@ -244,7 +245,7 @@ private async void ProcessTask(Models.Task task, CancellationToken token) taskResult = await _worker.Execute(task, token); executeStopwatch.Stop(); - _metrics?.RecordTaskExecuteTime(_worker.TaskType, executeStopwatch.Elapsed.TotalSeconds); + _metrics?.RecordTaskExecuteTime(_worker.TaskType, executeStopwatch.Elapsed.TotalSeconds, "SUCCESS"); _logger.LogTrace( $"[{_workerSettings.WorkerId}] Done processing task for worker" @@ -259,7 +260,7 @@ private async void ProcessTask(Models.Task task, CancellationToken token) catch (Exception e) { executeStopwatch.Stop(); - _metrics?.RecordTaskExecuteTime(_worker.TaskType, executeStopwatch.Elapsed.TotalSeconds); + _metrics?.RecordTaskExecuteTime(_worker.TaskType, executeStopwatch.Elapsed.TotalSeconds, "FAILURE"); _metrics?.RecordTaskExecuteError(_worker.TaskType, e.GetType().Name); _logger.LogError( $"[{_workerSettings.WorkerId}] Failed to process task for worker, reason: {e.Message}" @@ -297,7 +298,7 @@ private void UpdateTask(Models.TaskResult taskResult) _taskClient.UpdateTask(taskResult); updateStopwatch.Stop(); - _metrics?.RecordTaskUpdateTime(_worker.TaskType, updateStopwatch.Elapsed.TotalSeconds); + _metrics?.RecordTaskUpdateTime(_worker.TaskType, updateStopwatch.Elapsed.TotalSeconds, "SUCCESS"); _logger.LogTrace( $"[{_workerSettings.WorkerId}] Done updating task" + $", taskType: {_worker.TaskType}" @@ -320,7 +321,8 @@ private void UpdateTask(Models.TaskResult taskResult) } updateStopwatch.Stop(); - _metrics?.RecordTaskUpdateError(_worker.TaskType); + _metrics?.RecordTaskUpdateTime(_worker.TaskType, updateStopwatch.Elapsed.TotalSeconds, "FAILURE"); + _metrics?.RecordTaskUpdateError(_worker.TaskType, "RetryExhaustedException"); throw new Exception("Failed to update task after retries"); } diff --git a/Conductor/Executor/WorkflowExecutor.cs b/Conductor/Executor/WorkflowExecutor.cs index 0ab17ccb..1e6c8432 100644 --- a/Conductor/Executor/WorkflowExecutor.cs +++ b/Conductor/Executor/WorkflowExecutor.cs @@ -13,26 +13,32 @@ using Conductor.Api; using Conductor.Client; using Conductor.Client.Models; +using Conductor.Client.Telemetry; using Conductor.Definition; +using Newtonsoft.Json; +using System; using System.Collections.Generic; namespace Conductor.Executor { public class WorkflowExecutor { - private WorkflowResourceApi _workflowClient; - private MetadataResourceApi _metadataClient; + private readonly WorkflowResourceApi _workflowClient; + private readonly MetadataResourceApi _metadataClient; + private readonly MetricsCollector _metrics; - public WorkflowExecutor(Configuration configuration) + public WorkflowExecutor(Configuration configuration, MetricsCollector metrics = null) { _workflowClient = configuration.GetClient(); _metadataClient = configuration.GetClient(); + _metrics = metrics; } - public WorkflowExecutor(WorkflowResourceApi workflowClient, MetadataResourceApi metadataClient) + public WorkflowExecutor(WorkflowResourceApi workflowClient, MetadataResourceApi metadataClient, MetricsCollector metrics = null) { _workflowClient = workflowClient; _metadataClient = metadataClient; + _metrics = metrics; } public void RegisterWorkflow(WorkflowDef workflow, bool overwrite) @@ -54,7 +60,41 @@ public string StartWorkflow(ConductorWorkflow conductorWorkflow) public string StartWorkflow(StartWorkflowRequest startWorkflowRequest) { - return _workflowClient.StartWorkflow(startWorkflowRequest); + RecordInputSize(startWorkflowRequest); + try + { + return _workflowClient.StartWorkflow(startWorkflowRequest); + } + catch (Exception ex) + { + _metrics?.RecordWorkflowStartError( + startWorkflowRequest.Name ?? "", + ex.GetType().Name); + throw; + } + } + + private void RecordInputSize(StartWorkflowRequest request) + { + if (_metrics == null) + return; + try + { + double size = 0; + if (request.Input != null) + { + var json = JsonConvert.SerializeObject(request.Input); + size = json.Length; + } + _metrics.RecordWorkflowInputSize( + request.Name ?? "", + request.Version?.ToString() ?? "", + size); + } + catch + { + // Don't let metrics serialization failures disrupt workflow start. + } } } } diff --git a/Conductor/conductor-csharp.csproj b/Conductor/conductor-csharp.csproj index 0fb0055c..2963fe99 100644 --- a/Conductor/conductor-csharp.csproj +++ b/Conductor/conductor-csharp.csproj @@ -11,14 +11,16 @@ - + - + + + \ No newline at end of file diff --git a/Harness/Harness.csproj b/Harness/Harness.csproj index ea444b8a..71dd8154 100644 --- a/Harness/Harness.csproj +++ b/Harness/Harness.csproj @@ -3,10 +3,6 @@ Exe net8.0 - - - - diff --git a/Harness/Program.cs b/Harness/Program.cs index 14223b1e..6d76fd3b 100644 --- a/Harness/Program.cs +++ b/Harness/Program.cs @@ -1,4 +1,3 @@ -using Conductor.Api; using Conductor.Client; using Conductor.Client.Extensions; using Conductor.Client.Telemetry; @@ -6,8 +5,6 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; -using OpenTelemetry; -using OpenTelemetry.Metrics; using System; using System.Collections.Generic; using System.Threading.Tasks; @@ -43,24 +40,6 @@ public static async Task Main(string[] args) var metricsPort = int.TryParse( Environment.GetEnvironmentVariable("HARNESS_METRICS_PORT"), out var mp) ? mp : DefaultMetricsPort; - var timeBuckets = new double[] { 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10 }; - var sizeBuckets = new double[] { 100, 1_000, 10_000, 100_000, 1_000_000, 10_000_000 }; - - var meterProvider = Sdk.CreateMeterProviderBuilder() - .AddMeter(MetricsCollector.MeterName) - .AddView("task_poll_time_seconds", new ExplicitBucketHistogramConfiguration { Boundaries = timeBuckets }) - .AddView("task_execute_time_seconds", new ExplicitBucketHistogramConfiguration { Boundaries = timeBuckets }) - .AddView("task_update_time_seconds", new ExplicitBucketHistogramConfiguration { Boundaries = timeBuckets }) - .AddView("task_result_size_bytes", new ExplicitBucketHistogramConfiguration { Boundaries = sizeBuckets }) - .AddView("workflow_input_size_bytes", new ExplicitBucketHistogramConfiguration { Boundaries = sizeBuckets }) - .AddPrometheusHttpListener(options => - { - options.UriPrefixes = new[] { $"http://*:{metricsPort}/" }; - }) - .Build(); - - Console.WriteLine($"Prometheus metrics server started on port {metricsPort}"); - var host = new HostBuilder() .ConfigureServices(services => { @@ -71,12 +50,18 @@ public static async Task Main(string[] args) services.WithHostedService(); services.AddSingleton(config); - services.AddHostedService(sp => new WorkflowGovernor( - config, - sp.GetRequiredService>(), - WorkflowName, - workflowsPerSec, - sp.GetRequiredService())); + services.AddHostedService(sp => + { + var metrics = sp.GetRequiredService(); + metrics.StartServer(metricsPort); + Console.WriteLine($"Prometheus metrics server started on port {metricsPort}"); + return new WorkflowGovernor( + config, + sp.GetRequiredService>(), + WorkflowName, + workflowsPerSec, + metrics); + }); }) .ConfigureLogging(logging => { @@ -85,19 +70,12 @@ public static async Task Main(string[] args) }) .Build(); - try - { - await host.RunAsync(); - } - finally - { - meterProvider?.Dispose(); - } + await host.RunAsync(); } private static void RegisterMetadata(Configuration config) { - var metadataClient = config.GetClient(); + var metadataClient = config.GetClient(); var taskDefs = new List(); foreach (var (taskName, codename, sleepSeconds) in SimulatedWorkers) diff --git a/Harness/README.md b/Harness/README.md index 8a7ae863..df48b5d1 100644 --- a/Harness/README.md +++ b/Harness/README.md @@ -58,7 +58,7 @@ All resource names use a `csharp_` prefix so multiple SDK harnesses (Python, Jav ### Metrics The harness exposes Prometheus metrics at `http://localhost:9991/metrics` via the OpenTelemetry -Prometheus exporter. See [METRICS.md](../METRICS.md) for the full list of metrics, labels, and +Prometheus exporter. See [metrics.md](../docs/metrics.md) for the full list of metrics, labels, and configuration options. ### Environment Variables diff --git a/Harness/WorkflowGovernor.cs b/Harness/WorkflowGovernor.cs index 4cf092a6..e821e8f8 100644 --- a/Harness/WorkflowGovernor.cs +++ b/Harness/WorkflowGovernor.cs @@ -1,6 +1,6 @@ -using Conductor.Api; using Conductor.Client; using Conductor.Client.Telemetry; +using Conductor.Executor; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using System; @@ -11,11 +11,10 @@ namespace Harness { public class WorkflowGovernor : BackgroundService { - private readonly WorkflowResourceApi _workflowClient; + private readonly WorkflowExecutor _executor; private readonly ILogger _logger; private readonly string _workflowName; private readonly int _workflowsPerSecond; - private readonly MetricsCollector _metrics; public WorkflowGovernor( Configuration config, @@ -24,11 +23,10 @@ public WorkflowGovernor( int workflowsPerSecond, MetricsCollector metrics = null) { - _workflowClient = config.GetClient(); + _executor = new WorkflowExecutor(config, metrics); _logger = logger; _workflowName = workflowName; _workflowsPerSecond = workflowsPerSecond; - _metrics = metrics; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) @@ -43,15 +41,14 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) { for (var i = 0; i < _workflowsPerSecond; i++) { - _workflowClient.StartWorkflow( - new Conductor.Client.Models.StartWorkflowRequest(name: _workflowName)); + _executor.StartWorkflow( + new Conductor.Client.Models.StartWorkflowRequest(name: _workflowName, version: 1)); } _logger.LogInformation("Governor: started {Count} workflow(s)", _workflowsPerSecond); } catch (Exception ex) { - _metrics?.RecordWorkflowStartError(_workflowName); _logger.LogError(ex, "Governor: error starting workflows"); } diff --git a/METRICS.md b/METRICS.md deleted file mode 100644 index 74716798..00000000 --- a/METRICS.md +++ /dev/null @@ -1,252 +0,0 @@ -# Metrics Documentation - -The C# SDK exposes worker metrics via the standard `System.Diagnostics.Metrics` API, making them -compatible with any .NET metrics listener -- most notably the -[OpenTelemetry .NET SDK](https://github.com/open-telemetry/opentelemetry-dotnet). - -## Table of Contents - -- [Quick Reference](#quick-reference) -- [Configuration](#configuration) - - [DI-Based Workers](#di-based-workers) - - [WorkflowTaskHost Convenience API](#workflowtaskhost-convenience-api) - - [Prometheus via OpenTelemetry](#prometheus-via-opentelemetry) - - [Console Exporter (Development)](#console-exporter-development) -- [Metric Types](#metric-types) - - [Counters](#counters) - - [Histograms](#histograms) - - [Gauges](#gauges) -- [Labels](#labels) -- [Example Metrics Output](#example-metrics-output) -- [Best Practices](#best-practices) - -## Quick Reference - -All metrics are registered under the meter named `Conductor.Client`. - -| Name | Type | Labels | Description | -|---|---|---|---| -| `task_poll_total` | Counter | `task_type` | Total task poll attempts | -| `task_poll_error_total` | Counter | `task_type`, `error_type` | Total task poll errors | -| `task_execute_error_total` | Counter | `task_type`, `error_type` | Total task execution errors | -| `task_update_error_total` | Counter | `task_type` | Total task update errors (after all retries) | -| `task_paused_total` | Counter | `task_type` | Polls skipped because the worker is paused | -| `task_execution_queue_full_total` | Counter | `task_type` | Polls returning zero capacity (all workers busy) | -| `thread_uncaught_exceptions_total` | Counter | -- | Uncaught exceptions in worker threads | -| `workflow_start_error_total` | Counter | `workflow_type` | Errors starting workflows | -| `external_payload_used_total` | Counter | `entity_name`, `operation`, `payload_type` | External payload storage usage | -| `task_poll_time_seconds` | Histogram | `task_type` | Task poll round-trip duration (seconds) | -| `task_execute_time_seconds` | Histogram | `task_type` | Task execution duration (seconds) | -| `task_update_time_seconds` | Histogram | `task_type` | Task result update duration (seconds) | -| `task_result_size_bytes` | Histogram | `task_type` | Task result payload size (bytes) | -| `workflow_input_size_bytes` | Histogram | `workflow_type`, `version` | Workflow input payload size (bytes) | -| `active_workers` | Gauge | `task_type` | Workers currently executing tasks | - -## Configuration - -### DI-Based Workers - -`MetricsCollector` is automatically registered as a singleton when you call `AddConductorWorker()`. -No additional setup is needed for the SDK to start recording -- metrics are written to -`System.Diagnostics.Metrics` instruments immediately. - -```csharp -var host = new HostBuilder() - .ConfigureServices(services => - { - // MetricsCollector is registered automatically here. - services.AddConductorWorker(config); - services.AddConductorWorkflowTask(new MyWorker()); - services.WithHostedService(); - }) - .Build(); -``` - -To **expose** the metrics externally (e.g. Prometheus scraping), attach a metrics listener -or exporter as shown below. - -### WorkflowTaskHost Convenience API - -If you use the one-liner `WorkflowTaskHost.CreateWorkerHost(...)`, metrics are registered -automatically via the same `AddConductorWorker()` call: - -```csharp -var host = WorkflowTaskHost.CreateWorkerHost(config, workers: new MyWorker()); -await host.RunAsync(); -``` - -### Prometheus via OpenTelemetry - -Add the following NuGet packages to your project: - -``` -dotnet add package OpenTelemetry -dotnet add package OpenTelemetry.Exporter.Prometheus.HttpListener --prerelease -``` - -Then configure a `MeterProvider` before starting the host: - -```csharp -using OpenTelemetry; -using OpenTelemetry.Metrics; -using Conductor.Client.Telemetry; - -var meterProvider = Sdk.CreateMeterProviderBuilder() - .AddMeter(MetricsCollector.MeterName) // "Conductor.Client" - .AddPrometheusHttpListener(options => - { - options.UriPrefixes = new[] { "http://*:9090/" }; - }) - .Build(); - -// ... start the host ... - -// Dispose when shutting down. -meterProvider?.Dispose(); -``` - -Metrics are now available at `http://localhost:9090/metrics` in Prometheus text format. - -### Console Exporter (Development) - -For quick debugging, the OpenTelemetry console exporter prints metrics to stdout: - -``` -dotnet add package OpenTelemetry.Exporter.Console -``` - -```csharp -var meterProvider = Sdk.CreateMeterProviderBuilder() - .AddMeter(MetricsCollector.MeterName) - .AddConsoleExporter() - .Build(); -``` - -## Metric Types - -### Counters - -Monotonically increasing values. Prometheus exposes them with a `_total` suffix. - -| Name | Labels | Description | -|---|---|---| -| `task_poll_total` | `task_type` | Incremented once per poll round (regardless of how many tasks are returned). | -| `task_poll_error_total` | `task_type`, `error_type` | Incremented when a poll HTTP call fails. `error_type` is the exception class name. | -| `task_execute_error_total` | `task_type`, `error_type` | Incremented when `Execute()` throws. `error_type` is the exception class name. | -| `task_update_error_total` | `task_type` | Incremented when all update retries are exhausted. | -| `task_paused_total` | `task_type` | Incremented when a poll is skipped because the worker is paused. | -| `task_execution_queue_full_total` | `task_type` | Incremented when a poll is skipped because all workers are busy (batch size reached). | -| `thread_uncaught_exceptions_total` | -- | Incremented on any exception in the top-level poll loop that is not an `OperationCanceledException`. | -| `workflow_start_error_total` | `workflow_type` | Incremented when a workflow start call fails. | -| `external_payload_used_total` | `entity_name`, `operation`, `payload_type` | Incremented when external payload storage is used. | - -### Histograms - -Distribution metrics with sum, count, and bucket breakdowns. All time values are in **seconds**. -All size values are in **bytes**. - -| Name | Labels | Unit | Description | -|---|---|---|---| -| `task_poll_time_seconds` | `task_type` | seconds | Wall-clock time for the poll HTTP call. | -| `task_execute_time_seconds` | `task_type` | seconds | Wall-clock time inside `worker.Execute()`. | -| `task_update_time_seconds` | `task_type` | seconds | Wall-clock time for the update call (including retries). | -| `task_result_size_bytes` | `task_type` | bytes | JSON-serialized size of `TaskResult.OutputData`. | -| `workflow_input_size_bytes` | `workflow_type`, `version` | bytes | Workflow input payload size. | - -### Gauges - -Point-in-time values sampled by the metrics listener. - -| Name | Labels | Description | -|---|---|---| -| `active_workers` | `task_type` | Number of concurrent task executions in progress. Updated on every poll cycle. | - -## Labels - -| Label | Used By | Values | -|---|---|---| -| `task_type` | Most metrics | Task definition name (e.g. `"my_worker"`) | -| `error_type` | `task_poll_error_total`, `task_execute_error_total` | Exception class name (e.g. `"HttpRequestException"`) | -| `workflow_type` | `workflow_start_error_total`, `workflow_input_size_bytes` | Workflow definition name | -| `version` | `workflow_input_size_bytes` | Workflow version string | -| `entity_name` | `external_payload_used_total` | Entity name | -| `operation` | `external_payload_used_total` | Operation name | -| `payload_type` | `external_payload_used_total` | Payload type (e.g. `"TASK_INPUT"`, `"TASK_OUTPUT"`) | - -## Example Metrics Output - -When scraped by Prometheus (via the OpenTelemetry exporter), the output looks like: - -```prometheus -# HELP task_poll_total Total task poll attempts -# TYPE task_poll_total counter -task_poll_total{task_type="my_worker"} 142 - -# HELP task_poll_time_seconds Task poll round-trip duration (seconds) -# TYPE task_poll_time_seconds histogram -task_poll_time_seconds_bucket{task_type="my_worker",le="0.005"} 12 -task_poll_time_seconds_bucket{task_type="my_worker",le="0.01"} 45 -task_poll_time_seconds_bucket{task_type="my_worker",le="0.025"} 98 -task_poll_time_seconds_bucket{task_type="my_worker",le="0.05"} 120 -task_poll_time_seconds_bucket{task_type="my_worker",le="0.1"} 135 -task_poll_time_seconds_bucket{task_type="my_worker",le="0.25"} 140 -task_poll_time_seconds_bucket{task_type="my_worker",le="0.5"} 142 -task_poll_time_seconds_bucket{task_type="my_worker",le="1"} 142 -task_poll_time_seconds_bucket{task_type="my_worker",le="+Inf"} 142 -task_poll_time_seconds_sum{task_type="my_worker"} 3.842 -task_poll_time_seconds_count{task_type="my_worker"} 142 - -# HELP task_execute_time_seconds Task execution duration (seconds) -# TYPE task_execute_time_seconds histogram -task_execute_time_seconds_bucket{task_type="my_worker",le="0.25"} 50 -task_execute_time_seconds_bucket{task_type="my_worker",le="0.5"} 80 -task_execute_time_seconds_bucket{task_type="my_worker",le="1"} 110 -task_execute_time_seconds_bucket{task_type="my_worker",le="2.5"} 135 -task_execute_time_seconds_bucket{task_type="my_worker",le="+Inf"} 142 -task_execute_time_seconds_sum{task_type="my_worker"} 98.553 -task_execute_time_seconds_count{task_type="my_worker"} 142 - -# HELP active_workers Workers currently executing tasks -# TYPE active_workers gauge -active_workers{task_type="my_worker"} 5 - -# HELP task_execution_queue_full_total Polls returning zero capacity -# TYPE task_execution_queue_full_total counter -task_execution_queue_full_total{task_type="my_worker"} 3 -``` - -## Best Practices - -1. **Use the OpenTelemetry Prometheus exporter for production.** It serves a standard `/metrics` - endpoint that Prometheus can scrape directly. - -2. **Set histogram bucket boundaries via OpenTelemetry Views** if the defaults don't match your - workload. For example, if your workers are consistently fast (< 100ms), add more fine-grained - lower buckets: - ```csharp - builder.AddView("task_execute_time_seconds", - new ExplicitBucketHistogramConfiguration - { - Boundaries = new double[] { 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5 } - }); - ``` - -3. **Alert on `task_update_error_total`.** A non-zero rate means task results are being lost - after all retries are exhausted -- this is a critical failure. - -4. **Monitor `task_execution_queue_full_total`.** A sustained rate indicates the worker needs - more capacity (increase `BatchSize` or add replicas). - -5. **Use `rate()` on counters, not raw values.** For example: - ```promql - rate(task_poll_total{task_type="my_worker"}[5m]) - ``` - -6. **Track p99 execution latency** using histogram quantiles: - ```promql - histogram_quantile(0.99, rate(task_execute_time_seconds_bucket[5m])) - ``` - -7. **The `MetricsCollector` is available as a singleton via DI.** You can inject it into your - own services to record `workflow_start_error_total`, `external_payload_used_total`, or any - other metrics that occur outside the poll loop. diff --git a/Tests/Telemetry/MetricsCollectorTests.cs b/Tests/Telemetry/MetricsCollectorTests.cs index 8f12d0ae..e5fc18eb 100644 --- a/Tests/Telemetry/MetricsCollectorTests.cs +++ b/Tests/Telemetry/MetricsCollectorTests.cs @@ -45,169 +45,274 @@ public MetricsCollectorTests() _listener.Start(); } - public void Dispose() => _listener.Dispose(); + public void Dispose() + { + _sut.Dispose(); + _listener.Dispose(); + } // --------------------------------------------------------------- // Counters // --------------------------------------------------------------- [Fact] - public void RecordTaskPoll_EmitsCounterWithTaskType() + public void RecordTaskPoll_EmitsCanonicalLabels() { _sut.RecordTaskPoll("my_task"); var m = Assert.Single(_recorded); Assert.Equal("task_poll_total", m.Name); Assert.Equal(1L, m.Value); - AssertTag(m, "task_type", "my_task"); + AssertTag(m, "taskType", "my_task"); } [Fact] - public void RecordTaskPollError_EmitsCounterWithBothTags() + public void RecordTaskExecutionStarted_EmitsCanonicalLabels() + { + _sut.RecordTaskExecutionStarted("my_task"); + + var m = Assert.Single(_recorded); + Assert.Equal("task_execution_started_total", m.Name); + AssertTag(m, "taskType", "my_task"); + } + + [Fact] + public void RecordTaskPollError_EmitsCanonicalLabels() { _sut.RecordTaskPollError("my_task", "TimeoutException"); var m = Assert.Single(_recorded); Assert.Equal("task_poll_error_total", m.Name); - AssertTag(m, "task_type", "my_task"); - AssertTag(m, "error_type", "TimeoutException"); + AssertTag(m, "taskType", "my_task"); + AssertTag(m, "exception", "TimeoutException"); } [Fact] - public void RecordTaskExecuteError_EmitsCounterWithBothTags() + public void RecordTaskExecuteError_EmitsCanonicalLabels() { _sut.RecordTaskExecuteError("task_a", "NullReferenceException"); var m = Assert.Single(_recorded); Assert.Equal("task_execute_error_total", m.Name); - AssertTag(m, "task_type", "task_a"); - AssertTag(m, "error_type", "NullReferenceException"); + AssertTag(m, "taskType", "task_a"); + AssertTag(m, "exception", "NullReferenceException"); } [Fact] - public void RecordTaskUpdateError_EmitsCounter() + public void RecordTaskUpdateError_EmitsCanonicalLabels() { - _sut.RecordTaskUpdateError("task_b"); + _sut.RecordTaskUpdateError("task_b", "HttpRequestException"); var m = Assert.Single(_recorded); Assert.Equal("task_update_error_total", m.Name); - AssertTag(m, "task_type", "task_b"); + AssertTag(m, "taskType", "task_b"); + AssertTag(m, "exception", "HttpRequestException"); } [Fact] - public void RecordTaskPaused_EmitsCounter() + public void RecordTaskPaused_EmitsCanonicalLabels() { _sut.RecordTaskPaused("paused_task"); var m = Assert.Single(_recorded); Assert.Equal("task_paused_total", m.Name); - AssertTag(m, "task_type", "paused_task"); + AssertTag(m, "taskType", "paused_task"); } [Fact] - public void RecordTaskExecutionQueueFull_EmitsCounter() + public void RecordTaskExecutionQueueFull_EmitsCanonicalLabels() { _sut.RecordTaskExecutionQueueFull("busy_task"); var m = Assert.Single(_recorded); Assert.Equal("task_execution_queue_full_total", m.Name); - AssertTag(m, "task_type", "busy_task"); + AssertTag(m, "taskType", "busy_task"); } [Fact] - public void RecordUncaughtException_EmitsCounterWithNoTags() + public void RecordUncaughtException_EmitsExceptionLabel() { - _sut.RecordUncaughtException(); + _sut.RecordUncaughtException("InvalidOperationException"); var m = Assert.Single(_recorded); Assert.Equal("thread_uncaught_exceptions_total", m.Name); Assert.Equal(1L, m.Value); + AssertTag(m, "exception", "InvalidOperationException"); } [Fact] - public void RecordWorkflowStartError_EmitsCounter() + public void RecordWorkflowStartError_EmitsCanonicalLabels() { - _sut.RecordWorkflowStartError("my_workflow"); + _sut.RecordWorkflowStartError("my_workflow", "ApiException"); var m = Assert.Single(_recorded); Assert.Equal("workflow_start_error_total", m.Name); - AssertTag(m, "workflow_type", "my_workflow"); + AssertTag(m, "workflowType", "my_workflow"); + AssertTag(m, "exception", "ApiException"); } [Fact] - public void RecordExternalPayloadUsed_EmitsCounterWithThreeTags() + public void RecordExternalPayloadUsed_EmitsCanonicalLabels() { - _sut.RecordExternalPayloadUsed("entity", "read", "input"); + _sut.RecordExternalPayloadUsed("entity", "READ", "TASK_INPUT"); var m = Assert.Single(_recorded); Assert.Equal("external_payload_used_total", m.Name); - AssertTag(m, "entity_name", "entity"); - AssertTag(m, "operation", "read"); - AssertTag(m, "payload_type", "input"); + AssertTag(m, "entityName", "entity"); + AssertTag(m, "operation", "READ"); + AssertTag(m, "payloadType", "TASK_INPUT"); + } + + [Fact] + public void RecordTaskAckError_EmitsCanonicalLabels() + { + _sut.RecordTaskAckError("my_task", "HttpRequestException"); + + var m = Assert.Single(_recorded); + Assert.Equal("task_ack_error_total", m.Name); + AssertTag(m, "taskType", "my_task"); + AssertTag(m, "exception", "HttpRequestException"); + } + + [Fact] + public void RecordTaskAckFailed_EmitsCanonicalLabels() + { + _sut.RecordTaskAckFailed("my_task"); + + var m = Assert.Single(_recorded); + Assert.Equal("task_ack_failed_total", m.Name); + AssertTag(m, "taskType", "my_task"); } // --------------------------------------------------------------- - // Histograms + // Histograms — status label // --------------------------------------------------------------- [Fact] - public void RecordTaskPollTime_RecordsDuration() + public void RecordTaskPollTime_IncludesStatusLabel() { - _sut.RecordTaskPollTime("fast_task", 0.123); + _sut.RecordTaskPollTime("fast_task", 0.123, "SUCCESS"); var m = Assert.Single(_recorded); Assert.Equal("task_poll_time_seconds", m.Name); Assert.Equal(0.123, m.Value); - AssertTag(m, "task_type", "fast_task"); + AssertTag(m, "taskType", "fast_task"); + AssertTag(m, "status", "SUCCESS"); } [Fact] - public void RecordTaskExecuteTime_RecordsDuration() + public void RecordTaskExecuteTime_IncludesStatusLabel() { - _sut.RecordTaskExecuteTime("slow_task", 5.5); + _sut.RecordTaskExecuteTime("slow_task", 5.5, "FAILURE"); var m = Assert.Single(_recorded); Assert.Equal("task_execute_time_seconds", m.Name); Assert.Equal(5.5, m.Value); - AssertTag(m, "task_type", "slow_task"); + AssertTag(m, "taskType", "slow_task"); + AssertTag(m, "status", "FAILURE"); } [Fact] - public void RecordTaskUpdateTime_RecordsDuration() + public void RecordTaskUpdateTime_IncludesStatusLabel() { - _sut.RecordTaskUpdateTime("task_c", 0.05); + _sut.RecordTaskUpdateTime("task_c", 0.05, "SUCCESS"); var m = Assert.Single(_recorded); Assert.Equal("task_update_time_seconds", m.Name); Assert.Equal(0.05, m.Value); - AssertTag(m, "task_type", "task_c"); + AssertTag(m, "taskType", "task_c"); + AssertTag(m, "status", "SUCCESS"); } [Fact] - public void RecordTaskResultSize_RecordsBytes() + public void RecordHttpApiClientRequest_RecordsDuration() + { + _sut.RecordHttpApiClientRequest("GET", "/api/tasks/poll/batch/my_task", "200", 0.042); + + var m = Assert.Single(_recorded); + Assert.Equal("http_api_client_request_seconds", m.Name); + Assert.Equal(0.042, m.Value); + AssertTag(m, "method", "GET"); + AssertTag(m, "uri", "/api/tasks/poll/batch/my_task"); + AssertTag(m, "status", "200"); + } + + // --------------------------------------------------------------- + // Size metrics — gauge + histogram + // --------------------------------------------------------------- + + [Fact] + public void RecordTaskResultSize_EmitsHistogram() { _sut.RecordTaskResultSize("task_d", 1024.0); var m = Assert.Single(_recorded); - Assert.Equal("task_result_size_bytes", m.Name); + Assert.Equal("task_result_size_bytes_histogram", m.Name); Assert.Equal(1024.0, m.Value); - AssertTag(m, "task_type", "task_d"); + AssertTag(m, "taskType", "task_d"); + } + + [Fact] + public void RecordTaskResultSize_ExposedViaGauge() + { + _sut.RecordTaskResultSize("task_d", 2048.0); + + var gaugeValues = new List(); + using var gaugeListener = new MeterListener(); + gaugeListener.InstrumentPublished = (instrument, l) => + { + if (instrument.Meter.Name == MetricsCollector.MeterName + && instrument.Name == "task_result_size_bytes") + l.EnableMeasurementEvents(instrument); + }; + gaugeListener.SetMeasurementEventCallback((instrument, value, tags, _) => + gaugeValues.Add(new RecordedMeasurement(instrument.Name, value, tags.ToArray()))); + gaugeListener.Start(); + gaugeListener.RecordObservableInstruments(); + + var match = Assert.Single(gaugeValues); + Assert.Equal(2048.0, (double)match.Value); + AssertTag(match, "taskType", "task_d"); } [Fact] - public void RecordWorkflowInputSize_RecordsBytesWithVersionTag() + public void RecordWorkflowInputSize_EmitsHistogram() { - _sut.RecordWorkflowInputSize("wf_type", "v2", 2048.0); + _sut.RecordWorkflowInputSize("wf_type", "2", 2048.0); var m = Assert.Single(_recorded); - Assert.Equal("workflow_input_size_bytes", m.Name); + Assert.Equal("workflow_input_size_bytes_histogram", m.Name); Assert.Equal(2048.0, m.Value); - AssertTag(m, "workflow_type", "wf_type"); - AssertTag(m, "version", "v2"); + AssertTag(m, "workflowType", "wf_type"); + AssertTag(m, "version", "2"); + } + + [Fact] + public void RecordWorkflowInputSize_ExposedViaGauge() + { + _sut.RecordWorkflowInputSize("wf_type", "v2", 4096.0); + + var gaugeValues = new List(); + using var gaugeListener = new MeterListener(); + gaugeListener.InstrumentPublished = (instrument, l) => + { + if (instrument.Meter.Name == MetricsCollector.MeterName + && instrument.Name == "workflow_input_size_bytes") + l.EnableMeasurementEvents(instrument); + }; + gaugeListener.SetMeasurementEventCallback((instrument, value, tags, _) => + gaugeValues.Add(new RecordedMeasurement(instrument.Name, value, tags.ToArray()))); + gaugeListener.Start(); + gaugeListener.RecordObservableInstruments(); + + var match = Assert.Single(gaugeValues); + Assert.Equal(4096.0, (double)match.Value); + AssertTag(match, "workflowType", "wf_type"); + AssertTag(match, "version", "v2"); } // --------------------------------------------------------------- - // Observable gauge + // Observable gauge — active_workers // --------------------------------------------------------------- [Fact] @@ -231,9 +336,11 @@ public void RecordActiveWorkers_ExposedViaObservableGauge() Assert.Equal(2, gaugeValues.Count); Assert.Contains(gaugeValues, g => - (int)g.Value == 3 && g.Tags.Any(t => t.Key == "task_type" && (string)t.Value == "task_x")); + (int)g.Value == 3 + && g.Tags.Any(t => t.Key == "taskType" && (string)t.Value == "task_x")); Assert.Contains(gaugeValues, g => - (int)g.Value == 7 && g.Tags.Any(t => t.Key == "task_type" && (string)t.Value == "task_y")); + (int)g.Value == 7 + && g.Tags.Any(t => t.Key == "taskType" && (string)t.Value == "task_y")); } [Fact] @@ -256,7 +363,7 @@ public void RecordActiveWorkers_OverwritesPreviousValue() gaugeListener.RecordObservableInstruments(); var match = gaugeValues.Where(g => - g.Tags.Any(t => t.Key == "task_type" && (string)t.Value == "overwrite_test_task")).ToList(); + g.Tags.Any(t => t.Key == "taskType" && (string)t.Value == "overwrite_test_task")).ToList(); var single = Assert.Single(match); Assert.Equal(10, (int)single.Value); } @@ -273,8 +380,26 @@ public void Counters_AccumulateAcrossMultipleCalls() _sut.RecordTaskPoll("task_b"); Assert.Equal(3, _recorded.Count); - Assert.Equal(2, _recorded.Count(r => r.Tags.Any(t => (string)t.Value == "task_a"))); - Assert.Equal(1, _recorded.Count(r => r.Tags.Any(t => (string)t.Value == "task_b"))); + Assert.Equal(2, _recorded.Count(r => r.Tags.Any(t => t.Key == "taskType" && (string)t.Value == "task_a"))); + Assert.Equal(1, _recorded.Count(r => r.Tags.Any(t => t.Key == "taskType" && (string)t.Value == "task_b"))); + } + + // --------------------------------------------------------------- + // Canonical bucket constants + // --------------------------------------------------------------- + + [Fact] + public void CanonicalTimeBuckets_MatchesSpec() + { + var expected = new double[] { 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10 }; + Assert.Equal(expected, MetricsCollector.CanonicalTimeBuckets); + } + + [Fact] + public void CanonicalSizeBuckets_MatchesSpec() + { + var expected = new double[] { 100, 1_000, 10_000, 100_000, 1_000_000, 10_000_000 }; + Assert.Equal(expected, MetricsCollector.CanonicalSizeBuckets); } // --------------------------------------------------------------- diff --git a/docs/metrics.md b/docs/metrics.md new file mode 100644 index 00000000..d01dd0fd --- /dev/null +++ b/docs/metrics.md @@ -0,0 +1,337 @@ +# Metrics Documentation + +The C# SDK exposes worker metrics via the standard `System.Diagnostics.Metrics` API, making them +compatible with any .NET metrics listener -- most notably the +[OpenTelemetry .NET SDK](https://github.com/open-telemetry/opentelemetry-dotnet). + +## Table of Contents + +- [Quick Reference](#quick-reference) +- [Configuration](#configuration) + - [DI-Based Workers](#di-based-workers) + - [Built-in Prometheus Server (Recommended)](#built-in-prometheus-server-recommended) + - [Custom MeterProvider](#custom-meterprovider) + - [Console Exporter (Development)](#console-exporter-development) +- [Metric Types](#metric-types) + - [Counters](#counters) + - [Histograms](#histograms) + - [Gauges](#gauges) +- [Labels](#labels) +- [Nonstandard Metrics](#nonstandard-metrics) +- [Histogram Bucket Boundaries](#histogram-bucket-boundaries) +- [Example Metrics Output](#example-metrics-output) +- [OTel Scope Label](#otel-scope-label) +- [Best Practices](#best-practices) + +## Quick Reference + +All metrics are registered under the meter named `Conductor.Client`. + +| Name | Type | Labels | Description | +|---|---|---|---| +| `task_poll_total` | Counter | `taskType` | Total task poll attempts | +| `task_execution_started_total` | Counter | `taskType` | Tasks dispatched to the worker function | +| `task_poll_error_total` | Counter | `taskType`, `exception` | Total task poll errors | +| `task_execute_error_total` | Counter | `taskType`, `exception` | Total task execution errors | +| `task_update_error_total` | Counter | `taskType`, `exception` | Total task update errors (after all retries) | +| `task_ack_error_total` | Counter | `taskType`, `exception` | Task ack client-side errors (surface-only) | +| `task_ack_failed_total` | Counter | `taskType` | Task ack declined by server (surface-only) | +| `task_paused_total` | Counter | `taskType` | Polls skipped because the worker is paused | +| `task_execution_queue_full_total` | Counter | `taskType` | Polls returning zero capacity (all workers busy) | +| `thread_uncaught_exceptions_total` | Counter | `exception` | Uncaught exceptions in worker threads | +| `workflow_start_error_total` | Counter | `workflowType`, `exception` | Errors starting workflows | +| `external_payload_used_total` | Counter | `entityName`, `operation`, `payloadType` | External payload storage usage | +| `task_poll_time_seconds` | Histogram | `taskType`, `status` | Task poll round-trip duration (seconds) | +| `task_execute_time_seconds` | Histogram | `taskType`, `status` | Task execution duration (seconds) | +| `task_update_time_seconds` | Histogram | `taskType`, `status` | Task result update duration (seconds) | +| `http_api_client_request_seconds` | Histogram | `method`, `uri`, `status` | HTTP API client request duration (seconds) | +| `task_result_size_bytes` | Gauge | `taskType` | Task result payload size (bytes) — last value | +| `workflow_input_size_bytes` | Gauge | `workflowType`, `version` | Workflow input payload size (bytes) — last value | +| `active_workers` | Gauge | `taskType` | Workers currently executing tasks | +| `task_result_size_bytes_histogram` | Histogram | `taskType` | Task result payload size (bytes) — **nonstandard**, see [below](#nonstandard-metrics) | +| `workflow_input_size_bytes_histogram` | Histogram | `workflowType`, `version` | Workflow input payload size (bytes) — **nonstandard**, see [below](#nonstandard-metrics) | + +## Configuration + +### DI-Based Workers + +`MetricsCollector` is automatically registered as a singleton when you call `AddConductorWorker()`. +No additional setup is needed for the SDK to start recording -- metrics are written to +`System.Diagnostics.Metrics` instruments immediately. + +```csharp +var host = new HostBuilder() + .ConfigureServices(services => + { + // MetricsCollector is registered automatically here. + services.AddConductorWorker(config); + services.AddConductorWorkflowTask(new MyWorker()); + services.WithHostedService(); + }) + .Build(); +``` + +To **expose** the metrics externally (e.g. Prometheus scraping), use the built-in server +or attach your own exporter as shown below. + +### Built-in Prometheus Server (Recommended) + +The simplest way to expose a `/metrics` endpoint is to call `MetricsCollector.StartServer(port)`. +This configures the OpenTelemetry `MeterProvider` with canonical histogram bucket boundaries +and starts a Prometheus HTTP listener on the given port: + +```csharp +var metrics = serviceProvider.GetRequiredService(); +metrics.StartServer(9991); +// Prometheus scrape endpoint now at http://localhost:9991/metrics +``` + +This is equivalent to what every other Conductor SDK provides (Python, Go, Java, Ruby, Rust) +and is the recommended setup for production use. + +### Custom MeterProvider + +If you need full control (e.g. adding additional exporters, custom Views, or extra meters), +you can configure your own `MeterProvider`. Use the SDK's canonical bucket constants +to stay aligned with the fleet: + +```csharp +using OpenTelemetry; +using OpenTelemetry.Metrics; +using Conductor.Client.Telemetry; + +var meterProvider = Sdk.CreateMeterProviderBuilder() + .AddMeter(MetricsCollector.MeterName) + .AddView("task_poll_time_seconds", + new ExplicitBucketHistogramConfiguration + { Boundaries = MetricsCollector.CanonicalTimeBuckets }) + .AddView("task_execute_time_seconds", + new ExplicitBucketHistogramConfiguration + { Boundaries = MetricsCollector.CanonicalTimeBuckets }) + .AddView("task_update_time_seconds", + new ExplicitBucketHistogramConfiguration + { Boundaries = MetricsCollector.CanonicalTimeBuckets }) + .AddView("http_api_client_request_seconds", + new ExplicitBucketHistogramConfiguration + { Boundaries = MetricsCollector.CanonicalTimeBuckets }) + .AddPrometheusHttpListener(options => + { + options.UriPrefixes = new[] { "http://*:9090/" }; + }) + .Build(); +``` + +### Console Exporter (Development) + +For quick debugging, the OpenTelemetry console exporter prints metrics to stdout: + +``` +dotnet add package OpenTelemetry.Exporter.Console +``` + +```csharp +var meterProvider = Sdk.CreateMeterProviderBuilder() + .AddMeter(MetricsCollector.MeterName) + .AddConsoleExporter() + .Build(); +``` + +## Metric Types + +### Counters + +Monotonically increasing values. Prometheus exposes them with a `_total` suffix. + +| Name | Labels | Description | +|---|---|---| +| `task_poll_total` | `taskType` | Incremented once per poll round (regardless of how many tasks are returned). | +| `task_execution_started_total` | `taskType` | Incremented when a polled task is dispatched to the worker's `Execute()` method. | +| `task_poll_error_total` | `taskType`, `exception` | Incremented when a poll HTTP call fails. `exception` is the exception class name. | +| `task_execute_error_total` | `taskType`, `exception` | Incremented when `Execute()` throws. `exception` is the exception class name. | +| `task_update_error_total` | `taskType`, `exception` | Incremented when all update retries are exhausted. `exception` is the exception class name. | +| `task_ack_error_total` | `taskType`, `exception` | Surface-only: C# has no separate ack call (batch-poll response is the ack). Available for user code. | +| `task_ack_failed_total` | `taskType` | Surface-only: same rationale as `task_ack_error_total`. | +| `task_paused_total` | `taskType` | Incremented when a poll is skipped because the worker is paused. | +| `task_execution_queue_full_total` | `taskType` | Incremented when a poll is skipped because all workers are busy (batch size reached). | +| `thread_uncaught_exceptions_total` | `exception` | Incremented on any exception in the top-level poll loop that is not an `OperationCanceledException`. | +| `workflow_start_error_total` | `workflowType`, `exception` | Incremented when a `WorkflowExecutor.StartWorkflow()` call fails. | +| `external_payload_used_total` | `entityName`, `operation`, `payloadType` | Incremented when external payload storage is used. | + +### Histograms + +Distribution metrics with sum, count, and bucket breakdowns. All time values are in **seconds**. + +| Name | Labels | Unit | Description | +|---|---|---|---| +| `task_poll_time_seconds` | `taskType`, `status` | seconds | Wall-clock time for the poll HTTP call. `status` is `SUCCESS` or `FAILURE`. | +| `task_execute_time_seconds` | `taskType`, `status` | seconds | Wall-clock time inside `worker.Execute()`. `status` is `SUCCESS` or `FAILURE`. | +| `task_update_time_seconds` | `taskType`, `status` | seconds | Wall-clock time for the update call (including retries). `status` is `SUCCESS` or `FAILURE`. | +| `http_api_client_request_seconds` | `method`, `uri`, `status` | seconds | Every HTTP request made by the API client. `method` is the HTTP verb, `uri` is the request path, `status` is the HTTP status code (or `"0"` on transport failure). | + +### Gauges + +Point-in-time values sampled by the metrics listener. + +| Name | Labels | Description | +|---|---|---| +| `task_result_size_bytes` | `taskType` | Serialized byte size of the most recent task result output. Last-value gauge. | +| `workflow_input_size_bytes` | `workflowType`, `version` | Serialized byte size of the most recent workflow input. Last-value gauge. | +| `active_workers` | `taskType` | Number of concurrent task executions in progress. Updated on every poll cycle. | + +## Labels + +The C# SDK uses the canonical label names defined by the cross-SDK metrics harmonization spec. + +| Label | Used by | Values | +|---|---|---| +| `taskType` | Most task metrics | The task definition name (e.g. `my_worker`). | +| `workflowType` | Workflow metrics | The workflow definition name. | +| `entityName` | `external_payload_used_total` | The entity identifier. | +| `exception` | Error counters, `thread_uncaught_exceptions_total` | The .NET exception class name (e.g. `HttpRequestException`). | +| `status` | Time histograms | `SUCCESS` or `FAILURE`. | +| `method` | `http_api_client_request_seconds` | HTTP verb (`GET`, `POST`, etc.). | +| `uri` | `http_api_client_request_seconds` | Request path (interpolated, not templated). | +| `operation` | `external_payload_used_total` | `READ` or `WRITE`. | +| `payloadType` | `external_payload_used_total` | `TASK_INPUT`, `TASK_OUTPUT`, `WORKFLOW_INPUT`, or `WORKFLOW_OUTPUT`. | +| `version` | `workflow_input_size_bytes` | Workflow version string. | + +**Label naming convention.** Labels referencing Conductor domain entities (`taskType`, +`workflowType`, `entityName`) use camelCase to match the Conductor API's JSON field names. +Generic observability labels (`status`, `method`, `uri`, `exception`, `operation`, +`version`) follow the standard Prometheus snake_case / lowercase convention. +`payloadType` uses camelCase to match other Conductor domain-entity labels. + +> **Note:** Unlike some other Conductor SDKs that are undergoing a migration from legacy +> label names, the C# SDK's metrics surface is entirely new and emits only canonical labels. +> There are no legacy/deprecated label aliases. + +## Nonstandard Metrics + +The C# SDK emits two additional **histogram** instruments for payload sizes that are not part +of the cross-SDK canonical catalog: + +| Name | Labels | Unit | Description | +|---|---|---|---| +| `task_result_size_bytes_histogram` | `taskType` | bytes | Distribution of task result payload sizes. | +| `workflow_input_size_bytes_histogram` | `workflowType`, `version` | bytes | Distribution of workflow input payload sizes. | + +The canonical catalog defines size metrics as **last-value Gauges** (`task_result_size_bytes`, +`workflow_input_size_bytes`), which is what the SDK emits as its primary size instruments. +These histogram variants provide distribution information (bucket breakdowns, count, sum) that +the gauge shape cannot offer -- for example, computing p99 result sizes via +`histogram_quantile()`. + +These histograms are **proposed** for inclusion in a future revision of the canonical catalog +(see the harmonization spec's §5.1 "Future catalog considerations"). They use the proposed +size bucket set `(100, 1000, 10000, 100000, 1000000, 10000000)` bytes. Until the catalog +formally adopts them, they should be considered nonstandard and may change. + +Both are emitted automatically whenever the corresponding gauge is recorded (i.e. calling +`RecordTaskResultSize` writes to both the gauge and the histogram). + +## Histogram Bucket Boundaries + +The SDK defines canonical bucket boundaries that match all other Conductor SDKs: + +**Time histograms** (`*_seconds`): + +``` +0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10 +``` + +**Size histograms** (nonstandard `*_histogram`, proposed): + +``` +100, 1000, 10000, 100000, 1000000, 10000000 +``` + +These are applied automatically when using `MetricsCollector.StartServer(port)`. If you build +your own `MeterProvider`, reference `MetricsCollector.CanonicalTimeBuckets` and +`MetricsCollector.CanonicalSizeBuckets`. + +## Example Metrics Output + +When scraped by Prometheus (via the built-in server or OpenTelemetry exporter): + +```prometheus +# HELP task_poll_total Total task poll attempts +# TYPE task_poll_total counter +task_poll_total{taskType="my_worker"} 142 + +# HELP task_execution_started_total Tasks dispatched to the worker function +# TYPE task_execution_started_total counter +task_execution_started_total{taskType="my_worker"} 140 + +# HELP task_poll_time_seconds Task poll round-trip duration (seconds) +# TYPE task_poll_time_seconds histogram +task_poll_time_seconds_bucket{taskType="my_worker",status="SUCCESS",le="0.001"} 2 +task_poll_time_seconds_bucket{taskType="my_worker",status="SUCCESS",le="0.005"} 12 +task_poll_time_seconds_bucket{taskType="my_worker",status="SUCCESS",le="0.01"} 45 +task_poll_time_seconds_bucket{taskType="my_worker",status="SUCCESS",le="0.025"} 98 +task_poll_time_seconds_bucket{taskType="my_worker",status="SUCCESS",le="0.05"} 120 +task_poll_time_seconds_bucket{taskType="my_worker",status="SUCCESS",le="0.1"} 135 +task_poll_time_seconds_bucket{taskType="my_worker",status="SUCCESS",le="0.25"} 140 +task_poll_time_seconds_bucket{taskType="my_worker",status="SUCCESS",le="+Inf"} 140 +task_poll_time_seconds_sum{taskType="my_worker",status="SUCCESS"} 3.842 +task_poll_time_seconds_count{taskType="my_worker",status="SUCCESS"} 140 + +# HELP http_api_client_request_seconds HTTP API client request duration (seconds) +# TYPE http_api_client_request_seconds histogram +http_api_client_request_seconds_bucket{method="GET",uri="/api/tasks/poll/batch/my_worker",status="200",le="0.01"} 30 +http_api_client_request_seconds_bucket{method="GET",uri="/api/tasks/poll/batch/my_worker",status="200",le="+Inf"} 142 + +# HELP task_result_size_bytes Task result payload size (bytes) +# TYPE task_result_size_bytes gauge +task_result_size_bytes{taskType="my_worker"} 2048 + +# HELP active_workers Workers currently executing tasks +# TYPE active_workers gauge +active_workers{taskType="my_worker"} 5 + +# HELP task_execution_queue_full_total Polls returning zero capacity +# TYPE task_execution_queue_full_total counter +task_execution_queue_full_total{taskType="my_worker"} 3 +``` + +## OTel Scope Label + +The C# metrics output includes an `otel_scope_name="Conductor.Client"` label on every metric. +This is injected by the OpenTelemetry .NET exporter (it identifies the `System.Diagnostics.Metrics.Meter` +that owns each instrument). Other SDKs using native Prometheus client libraries do not have this. +This is harmless — it is a standard OTel convention and does not interfere with label queries. + +## Best Practices + +1. **Use `MetricsCollector.StartServer(port)` for production.** It serves a standard `/metrics` + endpoint that Prometheus can scrape directly, with canonical bucket boundaries pre-configured. + +2. **Override histogram buckets via OpenTelemetry Views** if the defaults don't match your + workload. For example, if your workers are consistently fast (< 100ms), add more fine-grained + lower buckets: + ```csharp + builder.AddView("task_execute_time_seconds", + new ExplicitBucketHistogramConfiguration + { + Boundaries = new double[] { 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5 } + }); + ``` + +3. **Alert on `task_update_error_total`.** A non-zero rate means task results are being lost + after all retries are exhausted -- this is a critical failure. + +4. **Monitor `task_execution_queue_full_total`.** A sustained rate indicates the worker needs + more capacity (increase `BatchSize` or add replicas). + +5. **Use `rate()` on counters, not raw values.** For example: + ```promql + rate(task_poll_total{taskType="my_worker"}[5m]) + ``` + +6. **Track p99 execution latency** using histogram quantiles: + ```promql + histogram_quantile(0.99, rate(task_execute_time_seconds_bucket[5m])) + ``` + +7. **Use `WorkflowExecutor` for starting workflows.** It automatically records + `workflow_input_size_bytes` and `workflow_start_error_total` when a `MetricsCollector` + is injected. diff --git a/docs/readme/workers.md b/docs/readme/workers.md index ca3a3ca0..719e5f57 100644 --- a/docs/readme/workers.md +++ b/docs/readme/workers.md @@ -50,19 +50,8 @@ Thread.Sleep(TimeSpan.FromSeconds(100)); Check out our [integration tests](https://github.com/conductor-sdk/conductor-csharp/blob/92c7580156a89322717c94aeaea9e5201fe577eb/Tests/Worker/WorkerTests.cs#L37) for more examples -Worker SDK collects the following metrics: - - -| Name | Purpose | Tags | -| ------------------ | :------------------------------------------- | -------------------------------- | -| task_poll_error | Client error when polling for a task queue | taskType, includeRetries, status | -| task_execute_error | Execution error | taskType | -| task_update_error | Task status cannot be updated back to server | taskType | -| task_poll_counter | Incremented each time polling is done | taskType | -| task_poll_time | Time to poll for a batch of tasks | taskType | -| task_execute_time | Time to execute a task | taskType | -| task_result_size | Records output payload size of a task | taskType | - -Metrics on client side supplements the one collected from server in identifying the network as well as client side issues. +The worker SDK collects metrics for poll latency, execution time, update time, error rates, queue +utilization, and more. See [metrics.md](../metrics.md) for the full list of metrics, labels, +configuration options, and example Prometheus output. ### Next: [Create and Execute Workflows](https://github.com/conductor-sdk/conductor-csharp/blob/main/docs/readme/workflow.md)