Skip to content

Commit ec9a40c

Browse files
author
Weiqing Yang
committed
Support Samza on K8s
Co-authored-by: Jian He <jianhe688@gmail.com>
1 parent 3e3713c commit ec9a40c

28 files changed

Lines changed: 1510 additions & 55 deletions

File tree

build.gradle

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -564,6 +564,27 @@ project(":samza-yarn_$scalaSuffix") {
564564
jar.dependsOn("lesscss")
565565
}
566566

567+
project(":samza-kubernetes_$scalaSuffix") {
568+
apply plugin: 'java'
569+
570+
dependencies {
571+
compile project(':samza-api')
572+
compile project(":samza-core_$scalaSuffix")
573+
compile "org.codehaus.jackson:jackson-core-asl:1.9.7"
574+
compile group: 'io.fabric8', name: 'kubernetes-client', version: kubernetesJavaClientVersion
575+
testCompile "junit:junit:$junitVersion"
576+
testCompile "org.mockito:mockito-core:$mockitoVersion"
577+
}
578+
579+
tasks.create(name: "releaseKubeTarGz", dependsOn: configurations.archives.artifacts, type: Tar) {
580+
into "samza-kubernetes-${version}"
581+
compression = Compression.GZIP
582+
from(configurations.runtime) { into("lib/") }
583+
from(configurations.archives.artifacts.files) { into("lib/") }
584+
duplicatesStrategy 'exclude'
585+
}
586+
}
587+
567588
project(":samza-shell") {
568589
apply plugin: 'java'
569590

gradle/dependency-versions.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,4 +49,5 @@
4949
failsafeVersion = "1.1.0"
5050
jlineVersion = "3.8.2"
5151
jnaVersion = "4.5.1"
52+
kubernetesJavaClientVersion = "4.1.3"
5253
}

samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package org.apache.samza.clustermanager;
2020

2121
import com.google.common.annotations.VisibleForTesting;
22-
import java.io.IOException;
2322
import java.util.HashMap;
2423
import java.util.Map;
2524
import java.util.Optional;
@@ -380,19 +379,53 @@ StreamPartitionCountMonitor getPartitionMonitor() {
380379
* @param args args
381380
*/
382381
public static void main(String[] args) {
382+
// TODO: remove all added code used for debugging
383+
Thread thread = new Thread() {
384+
public void run() {
385+
log.info("Dummy Thread starts to sleep");
386+
System.out.println("Dummy Thread starts to sleep");
387+
while (true) {
388+
try {
389+
sleep(8 * 1000 * 60 * 60 * 60);
390+
} catch (Exception e) {
391+
log.info("Dummy Thread was interrupted");
392+
System.out.println("Dummy Thread was interrupted");
393+
}
394+
}
395+
}
396+
};
397+
thread.start();
398+
383399
Config coordinatorSystemConfig = null;
384400
final String coordinatorSystemEnv = System.getenv(ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG());
385401
try {
386402
//Read and parse the coordinator system config.
387403
log.info("Parsing coordinator system config {}", coordinatorSystemEnv);
404+
System.out.println("Coordinator system config: " + coordinatorSystemEnv);
405+
String correctedCoordinatorSystemEnv = coordinatorSystemEnv.replace("\\\"", "\"");
406+
log.info("Corrected coordinator system config {}", correctedCoordinatorSystemEnv);
407+
System.out.println("Corrected coordinator system config: " + correctedCoordinatorSystemEnv);
408+
388409
coordinatorSystemConfig =
389-
new MapConfig(SamzaObjectMapper.getObjectMapper().readValue(coordinatorSystemEnv, Config.class));
390-
} catch (IOException e) {
410+
new MapConfig(SamzaObjectMapper.getObjectMapper().readValue(correctedCoordinatorSystemEnv, Config.class));
411+
} catch (Exception e) {
391412
log.error("Exception while reading coordinator stream config {}", e);
392-
throw new SamzaException(e);
413+
414+
log.error("Exception ignored: ", e);
415+
System.out.println("Exception ignored: " + e);
416+
// throw new SamzaException(e);
393417
}
418+
394419
ClusterBasedJobCoordinator jc = new ClusterBasedJobCoordinator(coordinatorSystemConfig);
395420
jc.run();
421+
422+
try {
423+
thread.join();
424+
} catch (Exception e) {
425+
log.error("new thread ended", e);
426+
System.out.println("new thread ended: " + e);
427+
}
428+
396429
log.info("Finished ClusterBasedJobCoordinator run");
397430
}
398431
}

samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
7676
* The Allocator matches requests to resources and executes processes.
7777
*/
7878
private final AbstractContainerAllocator containerAllocator;
79-
private final Thread allocatorThread;
79+
private Thread allocatorThread = null;
8080

8181
// The StandbyContainerManager manages standby-aware allocation and failover of containers
8282
private final Optional<StandbyContainerManager> standbyContainerManager;
@@ -146,8 +146,9 @@ public ContainerProcessManager(Config config,
146146
} else {
147147
this.containerAllocator = new ContainerAllocator(clusterResourceManager, config, state);
148148
}
149-
150-
this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread");
149+
if (shouldStartAllocateThread()) {
150+
this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread");
151+
}
151152
log.info("finished initialization of samza task manager");
152153

153154
}
@@ -174,19 +175,31 @@ public ContainerProcessManager(Config config,
174175
this.containerAllocator = new ContainerAllocator(clusterResourceManager, config, state);
175176
}
176177

177-
this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread");
178+
if (shouldStartAllocateThread()) {
179+
this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread");
180+
}
178181
log.info("finished initialization of samza task manager");
179182
}
180183

184+
// In Kubernetes, the pod requested will be started by kubelet automatically once it is assigned, it does not need a
185+
// separate thread to keep polling the allocated resources to start the container.
186+
public boolean shouldStartAllocateThread() {
187+
return !clusterResourceManager.getClass().getSimpleName().equals("KubeClusterResourceManager");
188+
}
189+
181190
public boolean shouldShutdown() {
182-
log.debug(" TaskManager state: Completed containers: {}, Configured containers: {}, Is there too many FailedContainers: {}, Is AllocatorThread alive: {} ",
183-
state.completedContainers.get(), state.containerCount, tooManyFailedContainers ? "yes" : "no", allocatorThread.isAlive() ? "yes" : "no");
191+
// TODO:
192+
// log.debug(" TaskManager state: Completed containers: {}, Configured containers: {}, Is there too many FailedContainers: {}, Is AllocatorThread alive: {} ",
193+
// state.completedContainers.get(), state.containerCount, tooManyFailedContainers ? "yes" : "no", allocatorThread.isAlive() ? "yes" : "no");
184194

185195
if (exceptionOccurred != null) {
186196
log.error("Exception in ContainerProcessManager", exceptionOccurred);
187197
throw new SamzaException(exceptionOccurred);
188198
}
189-
return tooManyFailedContainers || state.completedContainers.get() == state.containerCount.get() || !allocatorThread.isAlive();
199+
200+
// TODO:
201+
// return tooManyFailedContainers || state.completedContainers.get() == state.containerCount.get() || !allocatorThread.isAlive();
202+
return tooManyFailedContainers || state.completedContainers.get() == state.containerCount.get();
190203
}
191204

192205
public void start() {
@@ -206,20 +219,24 @@ public void start() {
206219

207220
// Start container allocator thread
208221
log.info("Starting the container allocator thread");
209-
allocatorThread.start();
222+
if (allocatorThread != null) {
223+
allocatorThread.start();
224+
}
210225
}
211226

212227
public void stop() {
213228
log.info("Invoked stop of the Samza container process manager");
214229

215230
// Shutdown allocator thread
216231
containerAllocator.stop();
217-
try {
218-
allocatorThread.join();
219-
log.info("Stopped container allocator");
220-
} catch (InterruptedException ie) {
221-
log.error("Allocator Thread join() threw an interrupted exception", ie);
222-
Thread.currentThread().interrupt();
232+
if (allocatorThread != null) {
233+
try {
234+
allocatorThread.join();
235+
log.info("Stopped container allocator");
236+
} catch (InterruptedException ie) {
237+
log.error("Allocator Thread join() threw an interrupted exception", ie);
238+
Thread.currentThread().interrupt();
239+
}
223240
}
224241

225242
if (metrics != null) {

samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java

Lines changed: 48 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
package org.apache.samza.runtime;
2121

22-
import org.apache.samza.SamzaException;
2322
import org.apache.samza.application.ApplicationUtil;
2423
import org.apache.samza.application.descriptors.ApplicationDescriptor;
2524
import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
@@ -41,37 +40,64 @@
4140
public class LocalContainerRunner {
4241
private static final Logger log = LoggerFactory.getLogger(LocalContainerRunner.class);
4342

43+
// TODO: remove all added code used for debugging
4444
public static void main(String[] args) throws Exception {
45+
Thread thread = new Thread() {
46+
public void run() {
47+
log.info("Dummy Thread starts to sleep");
48+
System.out.println("Dummy Thread starts to sleep");
49+
while (true) {
50+
try {
51+
sleep(8 * 1000 * 60 * 60 * 60);
52+
} catch (Exception e) {
53+
log.info("Dummy Thread was interrupted");
54+
System.out.println("Dummy Thread was interrupted");
55+
}
56+
}
57+
}
58+
};
59+
thread.start();
60+
4561
Thread.setDefaultUncaughtExceptionHandler(
4662
new SamzaUncaughtExceptionHandler(() -> {
4763
log.info("Exiting process now.");
4864
System.exit(1);
4965
}));
5066

51-
String containerId = System.getenv(ShellCommandConfig.ENV_CONTAINER_ID());
52-
log.info(String.format("Got container ID: %s", containerId));
53-
System.out.println(String.format("Container ID: %s", containerId));
67+
try {
68+
String containerId = System.getenv(ShellCommandConfig.ENV_CONTAINER_ID());
69+
log.info(String.format("Got container ID: %s", containerId));
70+
System.out.println(String.format("Container ID: %s", containerId));
5471

55-
String coordinatorUrl = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL());
56-
log.info(String.format("Got coordinator URL: %s", coordinatorUrl));
57-
System.out.println(String.format("Coordinator URL: %s", coordinatorUrl));
72+
String coordinatorUrl = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL());
73+
log.info(String.format("Got coordinator URL: %s", coordinatorUrl));
74+
System.out.println(String.format("Coordinator URL: %s", coordinatorUrl));
5875

59-
int delay = new Random().nextInt(SamzaContainer.DEFAULT_READ_JOBMODEL_DELAY_MS()) + 1;
60-
JobModel jobModel = SamzaContainer.readJobModel(coordinatorUrl, delay);
61-
Config config = jobModel.getConfig();
62-
JobConfig jobConfig = new JobConfig(config);
63-
if (jobConfig.getName().isEmpty()) {
64-
throw new SamzaException("can not find the job name");
65-
}
66-
String jobName = jobConfig.getName().get();
67-
String jobId = jobConfig.getJobId();
68-
MDC.put("containerName", "samza-container-" + containerId);
69-
MDC.put("jobName", jobName);
70-
MDC.put("jobId", jobId);
76+
int delay = new Random().nextInt(SamzaContainer.DEFAULT_READ_JOBMODEL_DELAY_MS()) + 1;
77+
JobModel jobModel = SamzaContainer.readJobModel(coordinatorUrl, delay);
78+
Config config = jobModel.getConfig();
79+
JobConfig jobConfig = new JobConfig(config);
80+
if (jobConfig.getName().isEmpty()) {
81+
// throw new SamzaException("can not find the job name");
82+
log.error("can not find the job name");
83+
System.out.println("can not find the job name");
84+
}
85+
String jobName = jobConfig.getName().get();
86+
String jobId = jobConfig.getJobId();
87+
MDC.put("containerName", "samza-container-" + containerId);
88+
MDC.put("jobName", jobName);
89+
MDC.put("jobId", jobId);
7190

72-
ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc =
73-
ApplicationDescriptorUtil.getAppDescriptor(ApplicationUtil.fromConfig(config), config);
91+
ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc =
92+
ApplicationDescriptorUtil.getAppDescriptor(ApplicationUtil.fromConfig(config), config);
93+
94+
ContainerLaunchUtil.run(appDesc, containerId, jobModel);
95+
} catch (Exception ex) {
96+
// ignored.
97+
log.error("LocalContainerRunner throw exception: ", ex);
98+
System.out.println("LocalContainerRunner throw exception: " + ex);
99+
}
74100

75-
ContainerLaunchUtil.run(appDesc, containerId, jobModel);
101+
thread.join();
76102
}
77103
}

0 commit comments

Comments
 (0)