Skip to content

Commit 5dfef51

Browse files
authored
Merge branch 'apache:master' into ETL-18960
2 parents c8a4558 + dd659cd commit 5dfef51

15 files changed

Lines changed: 559 additions & 95 deletions

File tree

gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,8 @@ public class ServiceConfigKeys {
159159
public static final String NUM_DAG_PROC_THREADS_KEY = GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "numThreads";
160160
public static final String DAG_PROC_ENGINE_NON_RETRYABLE_EXCEPTIONS_KEY = GOBBLIN_SERVICE_DAG_PROCESSING_ENGINE_PREFIX + "nonRetryableExceptions";
161161
public static final Integer DEFAULT_NUM_DAG_PROC_THREADS = 3;
162+
public static final String NUM_SPEC_CATALOG_LISTENER_THREADS_KEY = GOBBLIN_SERVICE_PREFIX + "specCatalogListener.numThreads";
163+
public static final int DEFAULT_NUM_SPEC_CATALOG_LISTENER_THREADS = 3;
162164
public static final long DEFAULT_FLOW_FINISH_DEADLINE_MILLIS = TimeUnit.HOURS.toMillis(24);
163165

164166
public static final String ERROR_PATTERN_STORE_CLASS = ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX + "errorPatternStore.class";

gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,8 @@ public class GobblinClusterConfigurationKeys {
154154

155155
public static final String HELIX_JOB_STOP_TIMEOUT_SECONDS = GOBBLIN_CLUSTER_PREFIX + "helix.job.stopTimeoutSeconds";
156156
public static final long DEFAULT_HELIX_JOB_STOP_TIMEOUT_SECONDS = 10L;
157+
public static final String TASK_STATE_MODEL_FACTORY_SHUTDOWN_TIMEOUT_SECONDS = GOBBLIN_CLUSTER_PREFIX + "taskStateModelFactory.shutdownTimeoutSeconds";
158+
public static final long DEFAULT_TASK_STATE_MODEL_FACTORY_SHUTDOWN_TIMEOUT_SECONDS = 300L;
157159
public static final String TASK_RUNNER_SUITE_BUILDER = GOBBLIN_CLUSTER_PREFIX + "taskRunnerSuite.builder";
158160

159161
public static final String HELIX_JOB_NAME_KEY = GOBBLIN_CLUSTER_PREFIX + "helixJobName";

gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,7 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
177177
protected final String applicationId;
178178
private final boolean isMetricReportingFailureFatal;
179179
private final boolean isEventReportingFailureFatal;
180+
private final long taskStateModelFactoryShutdownTimeoutSeconds;
180181

181182
public GobblinTaskRunner(String applicationName,
182183
String helixInstanceName,
@@ -213,6 +214,10 @@ public GobblinTaskRunner(String applicationName,
213214
ConfigurationKeys.GOBBLIN_TASK_EVENT_REPORTING_FAILURE_FATAL,
214215
ConfigurationKeys.DEFAULT_GOBBLIN_TASK_EVENT_REPORTING_FAILURE_FATAL);
215216

217+
this.taskStateModelFactoryShutdownTimeoutSeconds = ConfigUtils.getLong(this.clusterConfig,
218+
GobblinClusterConfigurationKeys.TASK_STATE_MODEL_FACTORY_SHUTDOWN_TIMEOUT_SECONDS,
219+
GobblinClusterConfigurationKeys.DEFAULT_TASK_STATE_MODEL_FACTORY_SHUTDOWN_TIMEOUT_SECONDS);
220+
216221
logger.info("Configured GobblinTaskRunner work dir to: {}", this.appWorkPath.toString());
217222

218223
this.isContainerExitOnHealthCheckFailureEnabled = ConfigUtils.getBoolean(config, GobblinClusterConfigurationKeys.CONTAINER_EXIT_ON_HEALTH_CHECK_FAILURE_ENABLED,
@@ -415,20 +420,43 @@ public synchronized void stop() {
415420

416421
logger.info("Stopping the Gobblin Task runner");
417422

418-
// Stop metric reporting
419-
if (this.containerMetrics.isPresent()) {
420-
this.containerMetrics.get().stopMetricsReporting();
421-
}
422-
423423
try {
424-
stopServices();
424+
try {
425+
stopServices();
426+
} finally {
427+
logger.info("All services are stopped.");
428+
shutdownTaskStateModelFactory();
429+
disconnectHelixManager();
430+
}
425431
} finally {
426-
logger.info("All services are stopped.");
427-
this.taskStateModelFactory.shutdown();
428-
disconnectHelixManager();
432+
// Stop metric reporting only after tasks have completed and emitted their final GTEs.
433+
// Placed in an outer finally to guarantee execution even if shutdownTaskStateModelFactory()
434+
// or disconnectHelixManager() throws an unexpected runtime exception.
435+
if (this.containerMetrics.isPresent()) {
436+
this.containerMetrics.get().stopMetricsReporting();
437+
}
438+
this.isStopped = true;
429439
}
440+
}
430441

431-
this.isStopped = true;
442+
private void shutdownTaskStateModelFactory() {
443+
this.taskStateModelFactory.shutdown();
444+
long timeoutMs = TimeUnit.SECONDS.toMillis(this.taskStateModelFactoryShutdownTimeoutSeconds);
445+
long startTime = System.currentTimeMillis();
446+
while (!this.taskStateModelFactory.isTerminated()
447+
&& System.currentTimeMillis() - startTime < timeoutMs) {
448+
try {
449+
Thread.sleep(1000);
450+
} catch (InterruptedException e) {
451+
Thread.currentThread().interrupt();
452+
logger.warn("Interrupted while waiting for task state model factory to terminate");
453+
break;
454+
}
455+
}
456+
if (!this.taskStateModelFactory.isTerminated()) {
457+
logger.warn("Task state model factory did not terminate within {} seconds",
458+
this.taskStateModelFactoryShutdownTimeoutSeconds);
459+
}
432460
}
433461

434462
private void stopServices() {

0 commit comments

Comments
 (0)