Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 110 additions & 110 deletions src/integrationTest/java/com/uber/ugroup/ConsumerLagDetectionIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@
*/
package com.uber.ugroup;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
Expand All @@ -40,129 +48,121 @@
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

/**
* Integration test that verifies consumer lag detection.
*/
/** Integration test that verifies consumer lag detection. */
@Testcontainers
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@org.springframework.test.annotation.DirtiesContext
class ConsumerLagDetectionIT {

private static final String TEST_TOPIC = "test-lag-topic";
private static final String TEST_GROUP = "test-consumer-group";
private static final String KAFKA_IMAGE = System.getProperty("kafka.test.image", "confluentinc/cp-kafka:7.5.0");
private static final String TEST_TOPIC = "test-lag-topic";
private static final String TEST_GROUP = "test-consumer-group";
private static final String KAFKA_IMAGE =
System.getProperty("kafka.test.image", "confluentinc/cp-kafka:7.5.0");

@Container
static GenericContainer<?> kafka = createKafkaContainer();
@Container static GenericContainer<?> kafka = createKafkaContainer();

private static GenericContainer<?> createKafkaContainer() {
if (KAFKA_IMAGE.startsWith("apache/kafka")) {
return new org.testcontainers.kafka.KafkaContainer(DockerImageName.parse(KAFKA_IMAGE));
}
return new org.testcontainers.containers.KafkaContainer(DockerImageName.parse(KAFKA_IMAGE));
private static GenericContainer<?> createKafkaContainer() {
if (KAFKA_IMAGE.startsWith("apache/kafka")) {
return new org.testcontainers.kafka.KafkaContainer(DockerImageName.parse(KAFKA_IMAGE));
}
return new org.testcontainers.containers.KafkaContainer(DockerImageName.parse(KAFKA_IMAGE));
}

private static String getBootstrapServers() {
if (kafka instanceof org.testcontainers.kafka.KafkaContainer kc) {
return kc.getBootstrapServers();
}
return ((org.testcontainers.containers.KafkaContainer) kafka).getBootstrapServers();
private static String getBootstrapServers() {
if (kafka instanceof org.testcontainers.kafka.KafkaContainer kc) {
return kc.getBootstrapServers();
}

@LocalServerPort
private int port;

@Autowired
private TestRestTemplate restTemplate;

@DynamicPropertySource
static void kafkaProperties(DynamicPropertyRegistry registry) {
registry.add("ugroup.kafka.bootstrap-servers", ConsumerLagDetectionIT::getBootstrapServers);
registry.add("ugroup.kafka.cluster-name", () -> "test-cluster");
registry.add("ugroup.watchlist.mode", () -> "all");
registry.add("ugroup.processing.lag-report-interval-ms", () -> "1000");
registry.add("ugroup.processing.compaction-interval-ms", () -> "1000");
return ((org.testcontainers.containers.KafkaContainer) kafka).getBootstrapServers();
}

@LocalServerPort private int port;

@Autowired private TestRestTemplate restTemplate;

@DynamicPropertySource
static void kafkaProperties(DynamicPropertyRegistry registry) {
registry.add("ugroup.kafka.bootstrap-servers", ConsumerLagDetectionIT::getBootstrapServers);
registry.add("ugroup.kafka.cluster-name", () -> "test-cluster");
registry.add("ugroup.watchlist.mode", () -> "all");
registry.add("ugroup.processing.lag-report-interval-ms", () -> "1000");
registry.add("ugroup.processing.compaction-interval-ms", () -> "1000");
}

@BeforeAll
static void setupKafka() throws Exception {
// Create test topic
Properties adminProps = new Properties();
adminProps.put("bootstrap.servers", getBootstrapServers());

try (AdminClient admin = AdminClient.create(adminProps)) {
admin
.createTopics(List.of(new NewTopic(TEST_TOPIC, 3, (short) 1)))
.all()
.get(30, TimeUnit.SECONDS);
}

@BeforeAll
static void setupKafka() throws Exception {
// Create test topic
Properties adminProps = new Properties();
adminProps.put("bootstrap.servers", getBootstrapServers());

try (AdminClient admin = AdminClient.create(adminProps)) {
admin.createTopics(List.of(new NewTopic(TEST_TOPIC, 3, (short) 1)))
.all()
.get(30, TimeUnit.SECONDS);
}

// Produce some messages
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getBootstrapServers());
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps)) {
for (int i = 0; i < 100; i++) {
producer.send(new ProducerRecord<>(TEST_TOPIC, "key-" + i, "value-" + i));
}
producer.flush();
}
// Produce some messages
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getBootstrapServers());
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps)) {
for (int i = 0; i < 100; i++) {
producer.send(new ProducerRecord<>(TEST_TOPIC, "key-" + i, "value-" + i));
}
producer.flush();
}

@Test
void shouldDetectConsumerLag() throws Exception {
// Create a consumer and consume only partially
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, getBootstrapServers());
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, TEST_GROUP);
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10");

try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {
consumer.subscribe(Collections.singletonList(TEST_TOPIC));

// Consume only 10 messages (leaving 90 behind)
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
assertThat(records.count()).isGreaterThan(0);

// Commit the offsets
consumer.commitSync();
}

// Wait for uGroup to detect the lag
await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> {
ResponseEntity<String> response = restTemplate.getForEntity(
"http://localhost:" + port + "/api/v1/lag/" + TEST_GROUP + "/" + TEST_TOPIC,
String.class);

assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(response.getBody()).contains("totalLag");
// Should have some lag since we only consumed 10 of 100 messages
});
}

@Test
void shouldDetectConsumerLag() throws Exception {
// Create a consumer and consume only partially
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, getBootstrapServers());
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, TEST_GROUP);
consumerProps.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10");

try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {
consumer.subscribe(Collections.singletonList(TEST_TOPIC));

// Consume only 10 messages (leaving 90 behind)
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
assertThat(records.count()).isGreaterThan(0);

// Commit the offsets
consumer.commitSync();
}

@Test
void shouldReturnNotFoundForUnknownGroup() {
ResponseEntity<String> response = restTemplate.getForEntity(
"http://localhost:" + port + "/api/v1/lag/nonexistent-group",
String.class);

assertThat(response.getStatusCode()).isEqualTo(HttpStatus.NOT_FOUND);
}
// Wait for uGroup to detect the lag
await()
.atMost(Duration.ofSeconds(30))
.untilAsserted(
() -> {
ResponseEntity<String> response =
restTemplate.getForEntity(
"http://localhost:" + port + "/api/v1/lag/" + TEST_GROUP + "/" + TEST_TOPIC,
String.class);

assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(response.getBody()).contains("totalLag");
// Should have some lag since we only consumed 10 of 100 messages
});
}

@Test
void shouldReturnNotFoundForUnknownGroup() {
ResponseEntity<String> response =
restTemplate.getForEntity(
"http://localhost:" + port + "/api/v1/lag/nonexistent-group", String.class);

assertThat(response.getStatusCode()).isEqualTo(HttpStatus.NOT_FOUND);
}
}
125 changes: 59 additions & 66 deletions src/integrationTest/java/com/uber/ugroup/UGroupApplicationIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package com.uber.ugroup;

import static org.assertj.core.api.Assertions.assertThat;

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
Expand All @@ -29,80 +31,71 @@
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

import static org.assertj.core.api.Assertions.assertThat;

/**
* Integration test that verifies uGroup can start and connect to Kafka.
*/
/** Integration test that verifies uGroup can start and connect to Kafka. */
@Testcontainers
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@org.springframework.test.annotation.DirtiesContext
class UGroupApplicationIT {

private static final String KAFKA_IMAGE = System.getProperty("kafka.test.image", "confluentinc/cp-kafka:7.5.0");

@Container
static GenericContainer<?> kafka = createKafkaContainer();

private static GenericContainer<?> createKafkaContainer() {
if (KAFKA_IMAGE.startsWith("apache/kafka")) {
return new org.testcontainers.kafka.KafkaContainer(DockerImageName.parse(KAFKA_IMAGE));
}
return new org.testcontainers.containers.KafkaContainer(DockerImageName.parse(KAFKA_IMAGE));
}

private static String getBootstrapServers() {
if (kafka instanceof org.testcontainers.kafka.KafkaContainer kc) {
return kc.getBootstrapServers();
}
return ((org.testcontainers.containers.KafkaContainer) kafka).getBootstrapServers();
}

@LocalServerPort
private int port;

@Autowired
private TestRestTemplate restTemplate;

@DynamicPropertySource
static void kafkaProperties(DynamicPropertyRegistry registry) {
registry.add("ugroup.kafka.bootstrap-servers", UGroupApplicationIT::getBootstrapServers);
registry.add("ugroup.kafka.cluster-name", () -> "test-cluster");
registry.add("ugroup.watchlist.mode", () -> "all");
}

@Test
void contextLoads() {
// Application should start successfully
}
private static final String KAFKA_IMAGE =
System.getProperty("kafka.test.image", "confluentinc/cp-kafka:7.5.0");

@Test
void healthEndpoint_returnsUp() {
ResponseEntity<String> response = restTemplate.getForEntity(
"http://localhost:" + port + "/actuator/health",
String.class);
@Container static GenericContainer<?> kafka = createKafkaContainer();

assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(response.getBody()).contains("UP");
private static GenericContainer<?> createKafkaContainer() {
if (KAFKA_IMAGE.startsWith("apache/kafka")) {
return new org.testcontainers.kafka.KafkaContainer(DockerImageName.parse(KAFKA_IMAGE));
}
return new org.testcontainers.containers.KafkaContainer(DockerImageName.parse(KAFKA_IMAGE));
}

@Test
void statusEndpoint_returnsStatus() {
ResponseEntity<String> response = restTemplate.getForEntity(
"http://localhost:" + port + "/api/v1/status",
String.class);

assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(response.getBody()).contains("running");
assertThat(response.getBody()).contains("test-cluster");
}

@Test
void metricsEndpoint_available() {
ResponseEntity<String> response = restTemplate.getForEntity(
"http://localhost:" + port + "/actuator/metrics",
String.class);

assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
private static String getBootstrapServers() {
if (kafka instanceof org.testcontainers.kafka.KafkaContainer kc) {
return kc.getBootstrapServers();
}
return ((org.testcontainers.containers.KafkaContainer) kafka).getBootstrapServers();
}

@LocalServerPort private int port;

@Autowired private TestRestTemplate restTemplate;

@DynamicPropertySource
static void kafkaProperties(DynamicPropertyRegistry registry) {
registry.add("ugroup.kafka.bootstrap-servers", UGroupApplicationIT::getBootstrapServers);
registry.add("ugroup.kafka.cluster-name", () -> "test-cluster");
registry.add("ugroup.watchlist.mode", () -> "all");
}

@Test
void contextLoads() {
// Application should start successfully
}

@Test
void healthEndpoint_returnsUp() {
ResponseEntity<String> response =
restTemplate.getForEntity("http://localhost:" + port + "/actuator/health", String.class);

assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(response.getBody()).contains("UP");
}

@Test
void statusEndpoint_returnsStatus() {
ResponseEntity<String> response =
restTemplate.getForEntity("http://localhost:" + port + "/api/v1/status", String.class);

assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
assertThat(response.getBody()).contains("running");
assertThat(response.getBody()).contains("test-cluster");
}

@Test
void metricsEndpoint_available() {
ResponseEntity<String> response =
restTemplate.getForEntity("http://localhost:" + port + "/actuator/metrics", String.class);

assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
}
}
Loading