Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -952,6 +952,10 @@ public String getColumnName() {
return columnName;
}

public String getColumnIndexAndName() {
return (columnIndex + 1) + " (`" + columnName + "`)";
}

public String getOriginalTypeName() {
return originalTypeName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.clickhouse.client.api.ClientConfigProperties;
import com.clickhouse.client.api.ClientException;
import com.clickhouse.client.api.DataTransferException;

Check warning on line 5 in client-v2/src/main/java/com/clickhouse/client/api/data_formats/internal/AbstractBinaryFormatReader.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this unused import 'com.clickhouse.client.api.DataTransferException'.

See more on https://sonarcloud.io/project/issues?id=ClickHouse_clickhouse-java&issues=AZ0m9TGGIqNXeWr2gTBn&open=AZ0m9TGGIqNXeWr2gTBn&pullRequest=2804
import com.clickhouse.client.api.DataTypeUtils;
Comment thread
chernser marked this conversation as resolved.
import com.clickhouse.client.api.data_formats.ClickHouseBinaryFormatReader;
import com.clickhouse.client.api.internal.DataTypeConverter;
Expand Down Expand Up @@ -73,6 +74,8 @@
private Map[] convertions;
private boolean hasNext = true;
private boolean initialState = true; // reader is in initial state, no records have been read yet
private long row = -1; // before first row
private long lastNextCallTs; // for exception to detect slow reader

protected AbstractBinaryFormatReader(InputStream inputStream, QuerySettings querySettings, TableSchema schema,BinaryStreamReader.ByteBufferAllocator byteBufferAllocator, Map<ClickHouseDataType, Class<?>> defaultTypeHintMap) {
this.input = inputStream;
Expand All @@ -92,6 +95,7 @@
setSchema(schema);
}
this.dataTypeConverter = DataTypeConverter.INSTANCE; // singleton while no need to customize conversion
this.lastNextCallTs = System.currentTimeMillis();
}

protected Object[] currentRecord;
Expand Down Expand Up @@ -181,6 +185,7 @@
return false;
}

row++;
boolean firstColumn = true;
for (int i = 0; i < columns.length; i++) {
try {
Expand All @@ -191,12 +196,12 @@
record[i] = null;
}
firstColumn = false;
} catch (EOFException e) {
if (firstColumn) {
} catch (IOException e) {
if (e instanceof EOFException && firstColumn) {
endReached();
return false;
}
throw e;
throw new IOException(recordReadExceptionMsg(columns[i].getColumnIndexAndName()), e);
}
}
return true;
Expand Down Expand Up @@ -238,35 +243,52 @@
}
} catch (IOException e) {
endReached();
throw new ClientException("Failed to read next row", e);
throw new ClientException(recordReadExceptionMsg(), e);
}
}

private long timeSinceLastNext() {
return System.currentTimeMillis() - lastNextCallTs;
}

private String recordReadExceptionMsg() {
return recordReadExceptionMsg(null);
}

private String recordReadExceptionMsg(String column) {
return "Reading " + (column != null ? "column " + column + " in " : "")
+ " row " + row + " (time since last next call " + timeSinceLastNext() + ")";
Comment thread
chernser marked this conversation as resolved.
Outdated
}

@Override
public Map<String, Object> next() {
if (!hasNext) {
return null;
}

if (!nextRecordEmpty) {
Object[] tmp = currentRecord;
currentRecord = nextRecord;
nextRecord = tmp;
readNextRecord();
return new RecordWrapper(currentRecord, schema);
} else {
try {
if (readRecord(currentRecord)) {
readNextRecord();
return new RecordWrapper(currentRecord, schema);
} else {
currentRecord = null;
return null;
try {
if (!nextRecordEmpty) {
Object[] tmp = currentRecord;
currentRecord = nextRecord;
nextRecord = tmp;
readNextRecord();
return new RecordWrapper(currentRecord, schema);
} else {
try {
if (readRecord(currentRecord)) {
readNextRecord();
return new RecordWrapper(currentRecord, schema);
} else {
currentRecord = null;
return null;
}
} catch (IOException e) {
endReached();
throw new ClientException(recordReadExceptionMsg(), e);
}
} catch (IOException e) {
endReached();
throw new ClientException("Failed to read row", e);
}
} finally {
lastNextCallTs = System.currentTimeMillis();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,10 @@
import java.net.URISyntaxException;
import java.net.URLEncoder;
import java.net.UnknownHostException;
import java.nio.charset.Charset;

Check warning on line 87 in client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this unused import 'java.nio.charset.Charset'.

See more on https://sonarcloud.io/project/issues?id=ClickHouse_clickhouse-java&issues=AZ0m9TGxIqNXeWr2gTBq&open=AZ0m9TGxIqNXeWr2gTBq&pullRequest=2804
import java.nio.charset.StandardCharsets;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;

Check warning on line 90 in client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this unused import 'java.util.ArrayList'.

See more on https://sonarcloud.io/project/issues?id=ClickHouse_clickhouse-java&issues=AZ0m9TGxIqNXeWr2gTBr&open=AZ0m9TGxIqNXeWr2gTBr&pullRequest=2804
Comment thread
chernser marked this conversation as resolved.
Outdated
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -104,6 +106,7 @@
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Stream;

public class HttpAPIClientHelper {

Expand Down Expand Up @@ -351,75 +354,100 @@
* @param httpResponse - HTTP response
* @return exception object with server code
*/
public Exception readError(ClassicHttpResponse httpResponse) {
final Header qIdHeader = httpResponse.getFirstHeader(ClickHouseHttpProto.HEADER_QUERY_ID);
final String queryId = qIdHeader == null ? "" : qIdHeader.getValue();
public Exception readError(HttpPost req, ClassicHttpResponse httpResponse) {
final Header serverQueryIdHeader = httpResponse.getFirstHeader(ClickHouseHttpProto.HEADER_QUERY_ID);
final Header clientQueryIdHeader = req.getFirstHeader(ClickHouseHttpProto.HEADER_QUERY_ID);
final Header queryHeader = Stream.of(serverQueryIdHeader, clientQueryIdHeader).filter(Objects::nonNull).findFirst().orElse(null);
final String queryId = queryHeader == null ? "" : queryHeader.getValue();
int serverCode = getHeaderInt(httpResponse.getFirstHeader(ClickHouseHttpProto.HEADER_EXCEPTION_CODE), 0);
InputStream body = null;
try {
body = httpResponse.getEntity().getContent();
byte[] buffer = new byte[ERROR_BODY_BUFFER_SIZE];
byte[] lookUpStr = String.format(ERROR_CODE_PREFIX_PATTERN, serverCode).getBytes(StandardCharsets.UTF_8);
StringBuilder msgBuilder = new StringBuilder();
boolean found = false;
while (true) {
int rBytes = -1;
try {
rBytes = body.read(buffer);
} catch (ClientException e) {
// Invalid LZ4 Magic
if (body instanceof ClickHouseLZ4InputStream) {
ClickHouseLZ4InputStream stream = (ClickHouseLZ4InputStream) body;
body = stream.getInputStream();
byte[] headerBuffer = stream.getHeaderBuffer();
System.arraycopy(headerBuffer, 0, buffer, 0, headerBuffer.length);
rBytes = headerBuffer.length;
}
}
if (rBytes == -1) {
break;


return serverCode > 0 ? readClickHouseError(httpResponse.getEntity(), serverCode, queryId, httpResponse.getCode()) :
readNotClickHouseError(httpResponse.getEntity(), queryId, httpResponse.getCode());
} catch (Exception e) {
LOG.error("Failed to read error message", e);
String msg = String.format(ERROR_CODE_PREFIX_PATTERN, serverCode) + " <Unreadable error message> (transport error: " + httpResponse.getCode() + ")";
Comment thread
chernser marked this conversation as resolved.
return new ServerException(serverCode, msg + " (queryId= " + queryId + ")", httpResponse.getCode(), queryId);
}
}

private ServerException readNotClickHouseError(HttpEntity httpEntity, String queryId, int httpCode) {

byte[] buffer = new byte[ERROR_BODY_BUFFER_SIZE];

String msg = null;
try {
InputStream body = httpEntity.getContent();
int msgLen = body.read(buffer, 0, buffer.length - 2);
msg = new String(buffer, 0, msgLen, StandardCharsets.UTF_8);
Comment thread
chernser marked this conversation as resolved.
Outdated
} catch (Exception e) {
LOG.warn("Failed to read error message (queryId = " + queryId + ")", e);
Comment thread
chernser marked this conversation as resolved.
Outdated
}

String errormsg = msg == null ? "unknown server error" : msg;
return new ServerException(ServerException.CODE_UNKNOWN, errormsg + " (transport error: " + httpCode +")", httpCode, queryId);
}
Comment thread
chernser marked this conversation as resolved.

private static ServerException readClickHouseError(HttpEntity httpEntity, int serverCode, String queryId, int httpCode) throws Exception {

Check failure on line 392 in client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 32 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=ClickHouse_clickhouse-java&issues=AZ0m9TGxIqNXeWr2gTBp&open=AZ0m9TGxIqNXeWr2gTBp&pullRequest=2804

Check warning on line 392 in client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace generic exceptions with specific library exceptions or a custom exception.

See more on https://sonarcloud.io/project/issues?id=ClickHouse_clickhouse-java&issues=AZ0m9TGxIqNXeWr2gTBo&open=AZ0m9TGxIqNXeWr2gTBo&pullRequest=2804
InputStream body = httpEntity.getContent();
byte[] buffer = new byte[ERROR_BODY_BUFFER_SIZE];
byte[] lookUpStr = String.format(ERROR_CODE_PREFIX_PATTERN, serverCode).getBytes(StandardCharsets.UTF_8);
StringBuilder msgBuilder = new StringBuilder();
boolean found = false;
while (true) {
int rBytes = -1;
try {
rBytes = body.read(buffer);
} catch (ClientException e) {
// Invalid LZ4 Magic
if (body instanceof ClickHouseLZ4InputStream) {
ClickHouseLZ4InputStream stream = (ClickHouseLZ4InputStream) body;
body = stream.getInputStream();
byte[] headerBuffer = stream.getHeaderBuffer();
System.arraycopy(headerBuffer, 0, buffer, 0, headerBuffer.length);
rBytes = headerBuffer.length;
}
}
if (rBytes == -1) {
break;
}

for (int i = 0; i < rBytes; i++) {
if (buffer[i] == lookUpStr[0]) {
found = true;
for (int j = 1; j < Math.min(rBytes - i, lookUpStr.length); j++) {
if (buffer[i + j] != lookUpStr[j]) {
found = false;
break;
}
}
if (found) {
msgBuilder.append(new String(buffer, i, rBytes - i, StandardCharsets.UTF_8));
for (int i = 0; i < rBytes; i++) {
if (buffer[i] == lookUpStr[0]) {
found = true;
for (int j = 1; j < Math.min(rBytes - i, lookUpStr.length); j++) {
if (buffer[i + j] != lookUpStr[j]) {
found = false;
break;
}
}
}

if (found) {
break;
if (found) {
msgBuilder.append(new String(buffer, i, rBytes - i, StandardCharsets.UTF_8));
break;
}
}
}

while (true) {
int rBytes = body.read(buffer);
if (rBytes == -1) {
break;
}
msgBuilder.append(new String(buffer, 0, rBytes, StandardCharsets.UTF_8));
if (found) {
break;
}
}

String msg = msgBuilder.toString().replaceAll("\\s+", " ").replaceAll("\\\\n", " ")
.replaceAll("\\\\/", "/");
if (msg.trim().isEmpty()) {
msg = String.format(ERROR_CODE_PREFIX_PATTERN, serverCode) + " <Unreadable error message> (transport error: " + httpResponse.getCode() + ")";
while (true) {
int rBytes = body.read(buffer);
if (rBytes == -1) {
break;
}
return new ServerException(serverCode, "Code: " + msg + " (queryId= " + queryId + ")", httpResponse.getCode(), queryId);
} catch (Exception e) {
LOG.error("Failed to read error message", e);
String msg = String.format(ERROR_CODE_PREFIX_PATTERN, serverCode) + " <Unreadable error message> (transport error: " + httpResponse.getCode() + ")";
return new ServerException(serverCode, msg + " (queryId= " + queryId + ")", httpResponse.getCode(), queryId);
msgBuilder.append(new String(buffer, 0, rBytes, StandardCharsets.UTF_8));
}

String msg = msgBuilder.toString().replaceAll("\\s+", " ").replaceAll("\\\\n", " ")
.replaceAll("\\\\/", "/");
if (msg.trim().isEmpty()) {
msg = String.format(ERROR_CODE_PREFIX_PATTERN, serverCode) + " <Unreadable error message> (transport error: " + httpCode + ")";
}
return new ServerException(serverCode, "Code: " + msg + " (queryId= " + queryId + ")", httpCode, queryId);
}

private static final long POOL_VENT_TIMEOUT = 10000L;
Expand Down Expand Up @@ -536,7 +564,7 @@
throw new ClientException("Server returned '502 Bad gateway'. Check network and proxy settings.");
} else if (httpResponse.getCode() >= HttpStatus.SC_BAD_REQUEST || httpResponse.containsHeader(ClickHouseHttpProto.HEADER_EXCEPTION_CODE)) {
try {
throw readError(httpResponse);
throw readError(req, httpResponse);
} finally {
httpResponse.close();
}
Expand Down Expand Up @@ -742,7 +770,7 @@
}

// data compression
if (serverCompression && !(httpStatus == HttpStatus.SC_FORBIDDEN || httpStatus == HttpStatus.SC_UNAUTHORIZED)) {
if (serverCompression && !(httpStatus == HttpStatus.SC_FORBIDDEN || httpStatus == HttpStatus.SC_UNAUTHORIZED || httpStatus == HttpStatus.SC_NOT_FOUND)) {
int buffSize = ClientConfigProperties.COMPRESSION_LZ4_UNCOMPRESSED_BUF_SIZE.getOrDefault(requestConfig);
return new LZ4Entity(httpEntity, useHttpCompression, true, false, buffSize, true, lz4Factory);
}
Expand Down
Loading
Loading