Skip to content

Commit 4744903

Browse files
Add SyncService client from EGW for embedded calling from Data API (#2405)
1 parent fc2af1b commit 4744903

11 files changed

Lines changed: 763 additions & 8 deletions

File tree

pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,10 @@
102102
<groupId>io.quarkus</groupId>
103103
<artifactId>quarkus-arc</artifactId>
104104
</dependency>
105+
<dependency>
106+
<groupId>io.quarkus</groupId>
107+
<artifactId>quarkus-cache</artifactId>
108+
</dependency>
105109
<dependency>
106110
<groupId>io.quarkus</groupId>
107111
<artifactId>quarkus-container-image-docker</artifactId>

src/main/java/io/stargate/sgv2/jsonapi/service/embedding/DataVectorizer.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,19 +38,19 @@ public class DataVectorizer {
3838
/**
3939
* Constructor
4040
*
41-
* @param embeddingProvider - Service client based on embedding service configuration set for the
42-
* table
41+
* @param embeddingProviderWrapper - Service client based on embedding service configuration set
42+
* for the table
4343
* @param nodeFactory - Jackson node factory to create json nodes added to the document
4444
* @param embeddingCredentials - Credentials for the embedding service
4545
* @param schemaObject - The collection setting for vectorize call
4646
*/
4747
public DataVectorizer(
48-
MeteredEmbeddingProviderWrapper embeddingProvider,
48+
MeteredEmbeddingProviderWrapper embeddingProviderWrapper,
4949
JsonNodeFactory nodeFactory,
5050
EmbeddingCredentials embeddingCredentials,
5151
SchemaObject schemaObject) {
5252
// 16-Feb-2026, tatu: This can be null, apparently
53-
this.embeddingProviderWrapper = embeddingProvider;
53+
this.embeddingProviderWrapper = embeddingProviderWrapper;
5454
this.nodeFactory = nodeFactory;
5555
this.embeddingCredentials =
5656
Objects.requireNonNull(embeddingCredentials, "embeddingCredentials must not be null");

src/main/java/io/stargate/sgv2/jsonapi/service/embedding/operation/EmbeddingProviderFactory.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import io.stargate.sgv2.jsonapi.service.embedding.configuration.ServiceConfigStore;
1010
import io.stargate.sgv2.jsonapi.service.embedding.gateway.EmbeddingGatewayClient;
1111
import io.stargate.sgv2.jsonapi.service.provider.ModelProvider;
12+
import io.stargate.sgv2.jsonapi.syncservice.SyncServiceClient;
1213
import jakarta.enterprise.context.ApplicationScoped;
1314
import jakarta.enterprise.inject.Instance;
1415
import jakarta.inject.Inject;
@@ -29,6 +30,8 @@ public class EmbeddingProviderFactory {
2930
@GrpcClient("embedding")
3031
EmbeddingService grpcGatewayClient;
3132

33+
@Inject SyncServiceClient syncServiceClient;
34+
3235
@FunctionalInterface
3336
interface ProviderConstructor {
3437
EmbeddingProvider create(
@@ -99,7 +102,7 @@ public EmbeddingProvider create(
99102
commandName);
100103
}
101104

102-
public EmbeddingProvider create(
105+
private EmbeddingProvider create(
103106
Tenant tenant,
104107
String authToken,
105108
ModelProvider modelProvider,
@@ -190,7 +193,17 @@ public EmbeddingProvider create(
190193
"ModelProvider does not have a constructor: " + modelProvider);
191194
}
192195

193-
return ctor.create(
194-
providerConfig, modelConfig, serviceConfig, dimension, vectorizeServiceParameters);
196+
var provider =
197+
ctor.create(
198+
providerConfig, modelConfig, serviceConfig, dimension, vectorizeServiceParameters);
199+
200+
// Wrap with credential resolver if shared-secret authentication is configured
201+
if (authentication != null && !authentication.isEmpty()) {
202+
provider =
203+
new SyncServiceCredentialResolvingProvider(
204+
provider, syncServiceClient, authentication, tenant, authToken);
205+
}
206+
207+
return provider;
195208
}
196209
}
Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
package io.stargate.sgv2.jsonapi.service.embedding.operation;
2+
3+
import io.smallrye.mutiny.Uni;
4+
import io.stargate.sgv2.jsonapi.api.request.EmbeddingCredentials;
5+
import io.stargate.sgv2.jsonapi.api.request.tenant.Tenant;
6+
import io.stargate.sgv2.jsonapi.syncservice.SyncServiceClient;
7+
import java.util.*;
8+
import org.slf4j.Logger;
9+
import org.slf4j.LoggerFactory;
10+
11+
/**
12+
* A decorator that wraps a direct {@link EmbeddingProvider} and resolves shared-secret credentials
13+
* via {@link SyncServiceClient} before each {@link #vectorize} call.
14+
*
15+
* <p>When the Embedding Gateway (EGW) is disabled (standalone mode), collections configured with
16+
* {@code SHARED_SECRET} authentication need their credential names resolved into actual secrets.
17+
* This provider handles that resolution by calling {@link SyncServiceClient#getCredential} for each
18+
* entry in the authentication map, then passing the resolved credentials to the delegate provider.
19+
*/
20+
public class SyncServiceCredentialResolvingProvider extends EmbeddingProvider {
21+
22+
private static final Logger LOGGER =
23+
LoggerFactory.getLogger(SyncServiceCredentialResolvingProvider.class);
24+
25+
private static final String PROVIDER_KEY = "providerKey";
26+
private static final String ACCESS_ID = "accessId";
27+
private static final String SECRET_KEY = "secretKey";
28+
private static final String SECRET_ID = "secretId";
29+
30+
private final EmbeddingProvider delegate;
31+
private final SyncServiceClient syncServiceClient;
32+
private final Map<String, String> authentication;
33+
private final Tenant tenant;
34+
private final String authToken;
35+
36+
public SyncServiceCredentialResolvingProvider(
37+
EmbeddingProvider delegate,
38+
SyncServiceClient syncServiceClient,
39+
Map<String, String> authentication,
40+
Tenant tenant,
41+
String authToken) {
42+
super(
43+
delegate.modelProvider(),
44+
delegate.providerConfig,
45+
delegate.modelConfig,
46+
delegate.serviceConfig,
47+
delegate.dimension,
48+
delegate.vectorizeServiceParameters);
49+
50+
this.delegate = delegate;
51+
this.syncServiceClient = syncServiceClient;
52+
this.authentication = authentication;
53+
this.tenant = tenant;
54+
this.authToken = authToken;
55+
}
56+
57+
@Override
58+
protected String errorMessageJsonPtr() {
59+
// Not used directly — this wrapper never makes HTTP calls itself
60+
return "";
61+
}
62+
63+
@Override
64+
public Uni<BatchedEmbeddingResponse> vectorize(
65+
int batchId,
66+
List<String> texts,
67+
EmbeddingCredentials embeddingCredentials,
68+
EmbeddingRequestType embeddingRequestType) {
69+
70+
// Match EGW behavior: if caller already provided credentials via headers, use those
71+
// directly and skip SyncService resolution
72+
if (hasHeaderCredentials(embeddingCredentials)) {
73+
return delegate.vectorize(batchId, texts, embeddingCredentials, embeddingRequestType);
74+
}
75+
76+
return resolveCredentials()
77+
.flatMap(resolved -> delegate.vectorize(batchId, texts, resolved, embeddingRequestType));
78+
}
79+
80+
private static boolean hasHeaderCredentials(EmbeddingCredentials creds) {
81+
return creds.apiKey().isPresent()
82+
|| creds.accessId().isPresent()
83+
|| creds.secretId().isPresent();
84+
}
85+
86+
@Override
87+
public int maxBatchSize() {
88+
return delegate.maxBatchSize();
89+
}
90+
91+
/**
92+
* Resolves credentials by calling SyncService for each entry in the authentication map. Each
93+
* entry's key is the accepted token name (e.g. "providerKey"), and the value is the credential
94+
* reference name stored in SyncService (e.g. "my-openai-cred").
95+
*
96+
* <p>The SyncService response map is keyed by the credential reference name, so we extract the
97+
* resolved secret using the credential name as key (matching EGW's EmbeddingServiceImpl
98+
* behavior).
99+
*/
100+
private Uni<EmbeddingCredentials> resolveCredentials() {
101+
String providerName = modelProvider().apiName();
102+
103+
// Build parallel SyncService calls and track which accepted token name each one is for
104+
List<String> acceptedNames = new ArrayList<>();
105+
List<String> credNames = new ArrayList<>();
106+
List<Uni<Map<String, String>>> resolveUnis = new ArrayList<>();
107+
108+
for (Map.Entry<String, String> entry : authentication.entrySet()) {
109+
acceptedNames.add(entry.getKey());
110+
credNames.add(entry.getValue());
111+
resolveUnis.add(
112+
syncServiceClient.getCredential(authToken, tenant, providerName, entry.getValue()));
113+
}
114+
115+
return Uni.join()
116+
.all(resolveUnis)
117+
.andFailFast()
118+
.map(
119+
results -> {
120+
// For each resolved result, extract the secret using the credential name as key
121+
// (SyncService response is keyed by credential reference name, not accepted name)
122+
Map<String, String> resolvedByAcceptedName = new HashMap<>();
123+
for (int i = 0; i < results.size(); i++) {
124+
Map<String, String> credMap = results.get(i);
125+
if (credMap != null) {
126+
String credName = credNames.get(i);
127+
String acceptedName = acceptedNames.get(i);
128+
String resolvedValue = credMap.get(credName);
129+
if (resolvedValue != null) {
130+
resolvedByAcceptedName.put(acceptedName, resolvedValue);
131+
} else {
132+
LOGGER.warn(
133+
"SyncService response for credential '{}' (provider '{}') did not contain"
134+
+ " expected key '{}'; available keys: {}",
135+
credName,
136+
providerName,
137+
credName,
138+
credMap.keySet());
139+
}
140+
}
141+
}
142+
return buildEmbeddingCredentials(resolvedByAcceptedName);
143+
});
144+
}
145+
146+
/**
147+
* Maps resolved credential values (keyed by accepted token name) to {@link EmbeddingCredentials}.
148+
* The accepted token names come from the provider's SHARED_SECRET config (e.g. "providerKey",
149+
* "accessId", "secretKey").
150+
*/
151+
private EmbeddingCredentials buildEmbeddingCredentials(
152+
Map<String, String> resolvedByAcceptedName) {
153+
var apiKey = Optional.ofNullable(resolvedByAcceptedName.get(PROVIDER_KEY));
154+
var accessId = Optional.ofNullable(resolvedByAcceptedName.get(ACCESS_ID));
155+
var secretId =
156+
Optional.ofNullable(
157+
resolvedByAcceptedName.containsKey(SECRET_KEY)
158+
? resolvedByAcceptedName.get(SECRET_KEY)
159+
: resolvedByAcceptedName.get(SECRET_ID));
160+
return new EmbeddingCredentials(tenant, apiKey, accessId, secretId, Optional.of(authToken));
161+
}
162+
}

src/main/java/io/stargate/sgv2/jsonapi/service/resolver/VectorizeConfigValidator.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import io.stargate.sgv2.jsonapi.service.embedding.configuration.EmbeddingProvidersConfig;
88
import io.stargate.sgv2.jsonapi.service.provider.ApiModelSupport;
99
import io.stargate.sgv2.jsonapi.service.provider.ModelProvider;
10+
import io.stargate.sgv2.jsonapi.syncservice.SyncServiceClient;
1011
import jakarta.enterprise.context.ApplicationScoped;
1112
import jakarta.inject.Inject;
1213
import java.util.ArrayList;
@@ -26,15 +27,18 @@ public class VectorizeConfigValidator {
2627
private final OperationsConfig operationsConfig;
2728
private final EmbeddingProvidersConfig embeddingProvidersConfig;
2829
private final ValidateCredentials validateCredentials;
30+
private final SyncServiceClient syncServiceClient;
2931

3032
@Inject
3133
public VectorizeConfigValidator(
3234
OperationsConfig operationsConfig,
3335
EmbeddingProvidersConfig embeddingProvidersConfig,
34-
ValidateCredentials validateCredentials) {
36+
ValidateCredentials validateCredentials,
37+
SyncServiceClient syncServiceClient) {
3538
this.operationsConfig = operationsConfig;
3639
this.embeddingProvidersConfig = embeddingProvidersConfig;
3740
this.validateCredentials = validateCredentials;
41+
this.syncServiceClient = syncServiceClient;
3842
}
3943

4044
/**
@@ -189,8 +193,11 @@ private void validateAuthentication(
189193

190194
// Validate the credential name from secret service
191195
// already append the .providerKey to the value in CreateCollectionCommand
196+
// Both validate() and validateKey() are blocking and throw on invalid credentials
192197
if (operationsConfig.enableEmbeddingGateway()) {
193198
validateCredentials.validate(userConfig.provider(), userAuth.getValue());
199+
} else {
200+
syncServiceClient.validateKey(userConfig.provider(), userAuth.getValue());
194201
}
195202
}
196203
}

0 commit comments

Comments
 (0)