Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ private InsightsStartupData createStartupData() {
.withDriverVersion(getDriverVersion(startupOptions))
.withContactPoints(
getResolvedContactPoints(
driverContext.getMetadataManager().getContactPoints().stream()
driverContext.getMetadataManager().getResolvedContactPoints().stream()
.map(n -> n.getEndPoint().resolve())
.filter(InetSocketAddress.class::isInstance)
.map(InetSocketAddress.class::cast)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -837,7 +837,11 @@ public enum DefaultDriverOption implements DriverOption {
* Whether to resolve the addresses passed to `basic.contact-points`.
*
* <p>Value-type: boolean
*
* @deprecated Contact points are now always kept as unresolved hostnames and expanded to all
* their DNS-mapped IPs lazily at connection time. Setting this option has no effect.
*/
@Deprecated
RESOLVE_CONTACT_POINTS("advanced.resolve-contact-points"),

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,13 @@ public String toString() {
/** The coalescer reschedule interval. */
public static final TypedDriverOption<Duration> COALESCER_INTERVAL =
new TypedDriverOption<>(DefaultDriverOption.COALESCER_INTERVAL, GenericType.DURATION);
/** Whether to resolve the addresses passed to `basic.contact-points`. */
/**
* Whether to resolve the addresses passed to `basic.contact-points`.
*
* @deprecated Contact points are now always kept as unresolved hostnames and expanded to all
* their DNS-mapped IPs lazily at connection time. Setting this option has no effect.
*/
@Deprecated
public static final TypedDriverOption<Boolean> RESOLVE_CONTACT_POINTS =
new TypedDriverOption<>(DefaultDriverOption.RESOLVE_CONTACT_POINTS, GenericType.BOOLEAN);
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,11 @@ protected DriverConfigLoader defaultConfigLoader(@Nullable ClassLoader classLoad
* <p>Contact points can also be provided statically in the configuration. If both are specified,
* they will be merged. If both are absent, the driver will default to 127.0.0.1:9042.
*
* <p>Contrary to the configuration, DNS names with multiple A-records will not be handled here.
* If you need that, extract them manually with {@link java.net.InetAddress#getAllByName(String)}
* before calling this method. Similarly, if you need connect addresses to stay unresolved, make
* sure you pass unresolved instances here (see {@code advanced.resolve-contact-points} in the
* configuration for more explanations).
* <p>Addresses passed here are used as-is. For config-based contact points specified as
* hostnames, the driver automatically expands each hostname to all its DNS-mapped IPs at query
* plan time (via {@code MetadataManager.getResolvedContactPoints()}), so passing a single
* hostname in the configuration is sufficient to try all its IPs on initial connect. The {@code
* advanced.resolve-contact-points} option is deprecated and has no effect.
*/
@NonNull
public SelfT addContactPoints(@NonNull Collection<InetSocketAddress> contactPoints) {
Expand Down Expand Up @@ -957,11 +957,11 @@ protected final CompletionStage<CqlSession> buildDefaultSessionAsync() {
programmaticArguments = programmaticArgumentsBuilder.build();
}

boolean resolveAddresses =
defaultConfig.getBoolean(DefaultDriverOption.RESOLVE_CONTACT_POINTS, false);

// RESOLVE_CONTACT_POINTS is deprecated: contact points are always kept as unresolved
// hostnames and expanded to all their DNS IPs at query plan time (before the first
// connection attempt) via MetadataManager.getResolvedContactPoints().
Set<EndPoint> contactPoints =
ContactPoints.merge(programmaticContactPoints, configContactPoints, resolveAddresses);
ContactPoints.merge(programmaticContactPoints, configContactPoints, false);
Comment thread
nikagra marked this conversation as resolved.

Comment thread
nikagra marked this conversation as resolved.
if (keyspace == null && defaultConfig.isDefined(DefaultDriverOption.SESSION_KEYSPACE)) {
keyspace =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -68,73 +67,34 @@ public OptionalLocalDcHelper(
@Override
@NonNull
public Optional<String> discoverLocalDc(@NonNull Map<UUID, Node> nodes) {
String localDcStr = context.getLocalDatacenter(profile.getName());
Optional<String> localDc;
if (localDcStr != null) {
LOG.debug("[{}] Local DC set programmatically: {}", logPrefix, localDcStr);
localDc = Optional.of(localDcStr);
String localDc = context.getLocalDatacenter(profile.getName());
if (localDc != null) {
LOG.debug("[{}] Local DC set programmatically: {}", logPrefix, localDc);
} else if (profile.isDefined(DefaultDriverOption.LOAD_BALANCING_LOCAL_DATACENTER)) {
localDcStr = profile.getString(DefaultDriverOption.LOAD_BALANCING_LOCAL_DATACENTER);
LOG.debug("[{}] Local DC set from configuration: {}", logPrefix, localDcStr);
localDc = Optional.of(localDcStr);
} else {
localDc = Optional.empty();
}
if (localDc.isPresent()) {
checkLocalDatacenterCompatibility(
localDc.get(), context.getMetadataManager().getContactPoints());
// Also warn if the configured DC doesn't match any node in the cluster
if (!nodes.isEmpty()) {
boolean found = false;
for (Node node : nodes.values()) {
if (localDc.get().equals(node.getDatacenter())) {
found = true;
break;
}
}
if (!found) {
LOG.warn(
"[{}] Configured local DC '{}' does not match any node's datacenter"
+ " (available DCs: {}); please verify your configuration",
logPrefix,
localDc.get(),
formatDcs(nodes.values()));
}
}
localDc = profile.getString(DefaultDriverOption.LOAD_BALANCING_LOCAL_DATACENTER);
LOG.debug("[{}] Local DC set from configuration: {}", logPrefix, localDc);
} else {
LOG.debug("[{}] Local DC not set, DC awareness will be disabled", logPrefix);
return Optional.empty();
}
return localDc;
}

/**
* Checks if the contact points are compatible with the local datacenter specified either through
* configuration, or programmatically.
*
* <p>The default implementation logs a warning when a contact point reports a datacenter
* different from the local one, and only for the default profile.
*
* @param localDc The local datacenter, as specified in the config, or programmatically.
* @param contactPoints The contact points provided when creating the session.
*/
protected void checkLocalDatacenterCompatibility(
@NonNull String localDc, Set<? extends Node> contactPoints) {
if (profile.getName().equals(DriverExecutionProfile.DEFAULT_NAME)) {
Set<Node> badContactPoints = new LinkedHashSet<>();
for (Node node : contactPoints) {
if (!Objects.equals(localDc, node.getDatacenter())) {
badContactPoints.add(node);
if (!nodes.isEmpty()) {
boolean found = false;
for (Node node : nodes.values()) {
if (localDc.equals(node.getDatacenter())) {
found = true;
break;
}
}
if (!badContactPoints.isEmpty()) {
if (!found) {
LOG.warn(
"[{}] You specified {} as the local DC, but some contact points are from a different DC: {}; "
+ "please provide the correct local DC, or check your contact points",
"[{}] Configured local DC '{}' does not match any node's datacenter"
+ " (available DCs: {}); please verify your configuration",
logPrefix,
localDc,
formatNodesAndDcs(badContactPoints));
formatDcs(nodes.values()));
}
}
return Optional.of(localDc);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,7 @@ public Queue<Node> newQueryPlan(
switch (stateRef.get()) {
case BEFORE_INIT:
case DURING_INIT:
// The contact points are not stored in the metadata yet:
List<Node> nodes = new ArrayList<>(context.getMetadataManager().getContactPoints());
List<Node> nodes = new ArrayList<>(context.getMetadataManager().getResolvedContactPoints());
Collections.shuffle(nodes);
return new ConcurrentLinkedQueue<>(nodes);
case RUNNING:
Expand All @@ -170,9 +169,11 @@ public Queue<Node> newControlReconnectionQueryPlan() {
.getConfig()
.getDefaultProfile()
.getBoolean(DefaultDriverOption.CONTROL_CONNECTION_RECONNECT_CONTACT_POINTS)) {
Set<DefaultNode> originalNodes = context.getMetadataManager().getContactPoints();
// Use DNS-expanded contact points so every resolved IP is tried as a fallback, not just
// the first IP the JVM happens to return for the hostname.
List<Node> resolvedNodes = context.getMetadataManager().getResolvedContactPoints();
List<Node> contactNodes = new ArrayList<>();
for (DefaultNode node : originalNodes) {
for (Node node : resolvedNodes) {
contactNodes.add(DefaultNode.newContactPoint(node.getEndPoint(), context));
}
Collections.shuffle(contactNodes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,11 @@
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableSet;
import edu.umd.cs.findbugs.annotations.NonNull;
import io.netty.util.concurrent.EventExecutor;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -173,6 +176,60 @@ public Set<DefaultNode> getContactPoints() {
return contactPoints;
}

/**
* Returns the contact points expanded to all their DNS-resolved IPs.
*
* <p>Contact points are stored with unresolved hostnames (the driver always uses deferred DNS
* resolution). For each contact point whose underlying address is an unresolved {@link
* InetSocketAddress}, this method calls {@link InetAddress#getAllByName(String)} to obtain every
* IP the hostname maps to and creates a synthetic contact-point {@link DefaultNode} for each IP.
* This lets the load balancing policy iterate over all candidate IPs rather than only the first
* one, so that a non-responsive IP does not block initial connection or control-connection
* reconnection.
*
* <p>Already-resolved addresses and non-{@link InetSocketAddress} endpoints are returned as-is.
*/
public List<Node> getResolvedContactPoints() {
Set<DefaultNode> nodes = contactPoints;
if (nodes == null) {
return new ArrayList<>();
}
List<Node> result = new ArrayList<>();
for (DefaultNode node : nodes) {
EndPoint endPoint = node.getEndPoint();
if (endPoint instanceof DefaultEndPoint) {
InetSocketAddress address = ((DefaultEndPoint) endPoint).resolve();
if (address.isUnresolved()) {
// Expand hostname to all IPs so callers can try each one in turn.
try {
InetAddress[] all = InetAddress.getAllByName(address.getHostString());
if (all.length > 1) {
LOG.debug(
"[{}] Contact point {} expands to {} addresses",
logPrefix,
address.getHostString(),
all.length);
}
for (InetAddress ip : all) {
InetSocketAddress resolved = new InetSocketAddress(ip, address.getPort());
result.add(DefaultNode.newContactPoint(new DefaultEndPoint(resolved), context));
}
} catch (UnknownHostException e) {
LOG.warn(
"[{}] Could not resolve contact point hostname {}, skipping",
logPrefix,
address.getHostString(),
e);
}
continue;
}
}
// Already resolved or non-InetSocketAddress endpoint — use as-is.
result.add(node);
}
return result;
}

/** Whether the default contact point was used (because none were provided explicitly). */
public boolean wasImplicitContactPoint() {
return wasImplicitContactPoint;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ private DefaultDriverContext mockDefaultDriverContext() throws UnknownHostExcept
EndPoint contactEndPoint = mock(EndPoint.class);
when(contactEndPoint.resolve()).thenReturn(new InetSocketAddress("127.0.0.1", 9999));
when(contactPoint.getEndPoint()).thenReturn(contactEndPoint);
when(manager.getContactPoints()).thenReturn(ImmutableSet.of(contactPoint));
when(manager.getResolvedContactPoints()).thenReturn(ImmutableList.of(contactPoint));

DriverConfig driverConfig = mock(DriverConfig.class);
when(context.getConfig()).thenReturn(driverConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableMap;
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableSet;
import com.datastax.oss.driver.shaded.guava.common.collect.Lists;
import java.util.ArrayList;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
Expand Down Expand Up @@ -100,7 +101,7 @@ public void setup() {
Objects.requireNonNull(node3.getHostId()), node3);
when(metadataManager.getMetadata()).thenReturn(metadata);
when(metadata.getNodes()).thenReturn(allNodes);
when(metadataManager.getContactPoints()).thenReturn(contactPoints);
when(metadataManager.getResolvedContactPoints()).thenReturn(new ArrayList<>(contactPoints));
when(context.getMetadataManager()).thenReturn(metadataManager);

when(context.getConfig()).thenReturn(config);
Expand Down Expand Up @@ -130,26 +131,36 @@ public void setup() {

@Test
public void should_build_control_connection_query_plan_from_contact_points_before_init() {
// Given — simulate DNS expansion: node1's hostname resolves to two IPs (node1 + node4)
DefaultNode node4 = TestNodeFactory.newNode(4, context);
when(metadataManager.getResolvedContactPoints())
.thenReturn(ImmutableList.of(node1, node2, node4));

// When
Queue<Node> queryPlan = wrapper.newControlReconnectionQueryPlan();

// Then
// Then — query plan contains all DNS-expanded nodes, not just the original 2 contact points
for (LoadBalancingPolicy policy : ImmutableList.of(policy1, policy2, policy3)) {
verify(policy, never()).newQueryPlan(null, null);
}
assertThat(queryPlan).hasSameElementsAs(contactPoints);
assertThat(queryPlan).containsExactlyInAnyOrder(node1, node2, node4);
}

@Test
public void should_build_query_plan_from_contact_points_before_init() {
// Given — simulate DNS expansion: node1's hostname resolves to two IPs (node1 + node4)
DefaultNode node4 = TestNodeFactory.newNode(4, context);
when(metadataManager.getResolvedContactPoints())
.thenReturn(ImmutableList.of(node1, node2, node4));

// When
Queue<Node> queryPlan = wrapper.newQueryPlan(null, DriverExecutionProfile.DEFAULT_NAME, null);

// Then
// Then — query plan contains all DNS-expanded nodes, not just the original 2 contact points
for (LoadBalancingPolicy policy : ImmutableList.of(policy1, policy2, policy3)) {
verify(policy, never()).newQueryPlan(null, null);
}
assertThat(queryPlan).hasSameElementsAs(contactPoints);
assertThat(queryPlan).containsExactlyInAnyOrder(node1, node2, node4);
}

@Test
Expand Down Expand Up @@ -204,8 +215,7 @@ public void should_fetch_control_connection_query_plan_from_policy_after_init()
assertThat(queryPlan.poll()).isEqualTo(node3);
assertThat(queryPlan.poll()).isEqualTo(node2);
assertThat(queryPlan.poll()).isEqualTo(node1);
// Remaining nodes are contact points appended at the end.
// They are new DefaultNode instances created via newContactPoint, so compare by endpoint.
// Remaining nodes are the DNS-expanded (resolved) contact points appended at the end.
Set<EndPoint> remainingEndpoints = new java.util.HashSet<>();
for (Node n : queryPlan) {
remainingEndpoints.add(n.getEndPoint());
Expand Down
Loading
Loading