Skip to content
This repository was archived by the owner on Jan 24, 2024. It is now read-only.

Commit 91c65b6

Browse files
committed
[Schema] Return 401 error when no HTTP authentication is configured (#1802)
[Schema] Return 401 error when no HTTP authentication is configured ### Motivation When authentication is enabled, if the Schema REST requests were sent without HTTP authentication header, the Schema Registry will return 404, rather than 401. ### Modifications - When `SchemaStorageException` is thrown, build the response with the error code and the exception message. - Add `testSchemaNoAuth` to verify 401 unauthorized will be returned. (cherry picked from commit c41ad06)
1 parent 0de01fc commit 91c65b6

3 files changed

Lines changed: 37 additions & 8 deletions

File tree

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@
325325
<artifactId>maven-javadoc-plugin</artifactId>
326326
<version>${maven-javadoc-plugin.version}</version>
327327
<configuration>
328-
<source>8</source>
328+
<source>17</source>
329329
<doclint>none</doclint>
330330
</configuration>
331331
</plugin>

schema-registry/src/main/java/io/streamnative/pulsar/handlers/kop/schemaregistry/HttpJsonRequestProcessor.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import io.netty.handler.codec.http.FullHttpRequest;
2020
import io.netty.handler.codec.http.FullHttpResponse;
2121
import io.netty.handler.codec.http.HttpResponseStatus;
22+
import io.streamnative.pulsar.handlers.kop.schemaregistry.model.impl.SchemaStorageException;
2223
import java.io.DataInput;
2324
import java.io.IOException;
2425
import java.util.ArrayList;
@@ -87,8 +88,16 @@ public CompletableFuture<FullHttpResponse> processRequest(FullHttpRequest reques
8788
return buildJsonResponse(resp, RESPONSE_CONTENT_TYPE);
8889
}
8990
}).exceptionally(err -> {
90-
log.error("Error while processing request", err);
91-
return buildJsonErrorResponse(err);
91+
Throwable throwable = err;
92+
while (throwable.getCause() != null) {
93+
throwable = throwable.getCause();
94+
}
95+
if (throwable instanceof SchemaStorageException e) {
96+
return buildErrorResponse(e.getHttpStatusCode(), e.getMessage());
97+
} else {
98+
log.error("Error while processing request", err);
99+
return buildJsonErrorResponse(err);
100+
}
92101
});
93102
} catch (IOException err) {
94103
log.error("Cannot decode request", err);

tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaAuthorizationTestBase.java

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,12 @@
2121
import static org.testng.AssertJUnit.fail;
2222

2323
import com.google.common.collect.Sets;
24+
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
2425
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
2526
import io.confluent.kafka.serializers.KafkaAvroSerializer;
2627
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
2728
import io.jsonwebtoken.SignatureAlgorithm;
29+
import io.netty.handler.codec.http.HttpResponseStatus;
2830
import java.time.Duration;
2931
import java.util.Collections;
3032
import java.util.List;
@@ -754,6 +756,20 @@ public void testAvroProduceAndConsumeWithAuth(boolean withTokenPrefix) throws Ex
754756
consumer.close();
755757
}
756758

759+
@Test(timeOut = 30000)
760+
public void testSchemaNoAuth() {
761+
final KafkaProducer<Integer, Object> producer = createAvroProducer(false, false);
762+
try {
763+
producer.send(new ProducerRecord<>("test-avro-wrong-auth", createAvroRecord())).get();
764+
fail();
765+
} catch (Exception e) {
766+
assertTrue(e.getCause() instanceof RestClientException);
767+
var restException = (RestClientException) e.getCause();
768+
assertEquals(restException.getErrorCode(), HttpResponseStatus.UNAUTHORIZED.code());
769+
assertTrue(restException.getMessage().contains("Missing AUTHORIZATION header"));
770+
}
771+
producer.close();
772+
}
757773

758774
private IndexedRecord createAvroRecord() {
759775
String userSchema = "{\"namespace\": \"example.avro\", \"type\": \"record\", "
@@ -766,6 +782,10 @@ private IndexedRecord createAvroRecord() {
766782
}
767783

768784
private KafkaProducer<Integer, Object> createAvroProducer(boolean withTokenPrefix) {
785+
return createAvroProducer(withTokenPrefix, true);
786+
}
787+
788+
private KafkaProducer<Integer, Object> createAvroProducer(boolean withTokenPrefix, boolean withSchemaToken) {
769789
Properties props = new Properties();
770790
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getClientPort());
771791
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
@@ -782,11 +802,11 @@ private KafkaProducer<Integer, Object> createAvroProducer(boolean withTokenPrefi
782802
props.put("security.protocol", "SASL_PLAINTEXT");
783803
props.put("sasl.mechanism", "PLAIN");
784804

785-
786-
props.put(KafkaAvroSerializerConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO");
787-
788-
props.put(KafkaAvroSerializerConfig.USER_INFO_CONFIG,
789-
username + ":" + (withTokenPrefix ? password : userToken));
805+
if (withSchemaToken) {
806+
props.put(KafkaAvroSerializerConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO");
807+
props.put(KafkaAvroSerializerConfig.USER_INFO_CONFIG,
808+
username + ":" + (withTokenPrefix ? password : userToken));
809+
}
790810

791811
return new KafkaProducer<>(props);
792812
}

0 commit comments

Comments
 (0)