diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java index bc87fe1f55b..8525dafb436 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java @@ -154,6 +154,22 @@ public interface GobblinTemporalConfigurationKeys { String TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_ATTEMPTS = TEMPORAL_ACTIVITY_RETRY_OPTIONS + "maximum.attempts"; int DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_ATTEMPTS = 4; + /** + * RPC retry options for gRPC calls to the Temporal service (e.g. worker status reporting). + * Defaults are tuned to tolerate short-lived throttling windows of 1-2 minutes: + * with initialInterval=500ms, coefficient=2.0, maximumInterval=30s, the cumulative wait across + * 10 attempts is ~151.5s, providing coverage beyond a 2-minute throttle burst. + */ + String TEMPORAL_RPC_RETRY_OPTIONS = PREFIX + "rpc.retry.options."; + String TEMPORAL_RPC_RETRY_OPTIONS_INITIAL_INTERVAL_MILLIS = TEMPORAL_RPC_RETRY_OPTIONS + "initial.interval.millis"; + int DEFAULT_TEMPORAL_RPC_RETRY_OPTIONS_INITIAL_INTERVAL_MILLIS = 500; + String TEMPORAL_RPC_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS = TEMPORAL_RPC_RETRY_OPTIONS + "maximum.interval.seconds"; + int DEFAULT_TEMPORAL_RPC_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS = 30; + String TEMPORAL_RPC_RETRY_OPTIONS_BACKOFF_COEFFICIENT = TEMPORAL_RPC_RETRY_OPTIONS + "backoff.coefficient"; + double DEFAULT_TEMPORAL_RPC_RETRY_OPTIONS_BACKOFF_COEFFICIENT = 2.0; + String TEMPORAL_RPC_RETRY_OPTIONS_MAXIMUM_ATTEMPTS = TEMPORAL_RPC_RETRY_OPTIONS + "maximum.attempts"; + int DEFAULT_TEMPORAL_RPC_RETRY_OPTIONS_MAXIMUM_ATTEMPTS = 10; + /** * Memory allocation for execution worker containers. */ diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/client/TemporalWorkflowClientFactory.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/client/TemporalWorkflowClientFactory.java index 821340c1624..9266c5641bf 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/client/TemporalWorkflowClientFactory.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/client/TemporalWorkflowClientFactory.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.io.InputStream; import java.security.KeyStore; +import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -37,6 +38,7 @@ import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext; import io.temporal.client.WorkflowClient; import io.temporal.client.WorkflowClientOptions; +import io.temporal.serviceclient.RpcRetryOptions; import io.temporal.serviceclient.WorkflowServiceStubs; import io.temporal.serviceclient.WorkflowServiceStubsOptions; import javax.net.ssl.KeyManagerFactory; @@ -118,6 +120,7 @@ public static ManagedWorkflowServiceStubs createServiceInstance(String connectio .setEnableHttps(true) .setSslContext(sslContext) .setMetricsScope(metricsScope) + .setRpcRetryOptions(buildRpcRetryOptions(config)) .build(); return new ManagedWorkflowServiceStubs(WorkflowServiceStubs.newServiceStubs(options)); } @@ -129,6 +132,27 @@ public static WorkflowClient createClientInstance(WorkflowServiceStubs service, return WorkflowClient.newInstance(service, options); } + private static RpcRetryOptions buildRpcRetryOptions(Config config) { + int initialIntervalMillis = ConfigUtils.getInt(config, + GobblinTemporalConfigurationKeys.TEMPORAL_RPC_RETRY_OPTIONS_INITIAL_INTERVAL_MILLIS, + GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_RPC_RETRY_OPTIONS_INITIAL_INTERVAL_MILLIS); + int maximumIntervalSeconds = ConfigUtils.getInt(config, + GobblinTemporalConfigurationKeys.TEMPORAL_RPC_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS, + GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_RPC_RETRY_OPTIONS_MAXIMUM_INTERVAL_SECONDS); + double backoffCoefficient = ConfigUtils.getDouble(config, + GobblinTemporalConfigurationKeys.TEMPORAL_RPC_RETRY_OPTIONS_BACKOFF_COEFFICIENT, + GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_RPC_RETRY_OPTIONS_BACKOFF_COEFFICIENT); + int maximumAttempts = ConfigUtils.getInt(config, + GobblinTemporalConfigurationKeys.TEMPORAL_RPC_RETRY_OPTIONS_MAXIMUM_ATTEMPTS, + GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_RPC_RETRY_OPTIONS_MAXIMUM_ATTEMPTS); + return RpcRetryOptions.newBuilder() + .setInitialInterval(Duration.ofMillis(initialIntervalMillis)) + .setBackoffCoefficient(backoffCoefficient) + .setMaximumInterval(Duration.ofSeconds(maximumIntervalSeconds)) + .setMaximumAttempts(maximumAttempts) + .build(); + } + private static InputStream toInputStream(File storeFile) throws IOException { byte[] data = FileUtils.readFileToByteArray(storeFile);