Skip to content
4 changes: 2 additions & 2 deletions autodoc/src/main/java/com/bakdata/conquery/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import com.bakdata.conquery.models.config.CSVConfig;
import com.bakdata.conquery.models.config.ClusterConfig;
import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.config.DatabaseConfig;
import com.bakdata.conquery.models.config.DatabaseConnectionConfig;
import com.bakdata.conquery.models.config.Dialect;
import com.bakdata.conquery.models.config.FrontendConfig;
import com.bakdata.conquery.models.config.LocaleConfig;
Expand Down Expand Up @@ -129,7 +129,7 @@ public class Constants {
.otherClass(MinaConfig.class)
.otherClass(FrontendConfig.CurrencyConfig.class)
.otherClass(XodusConfig.class)
.otherClasses(List.of(SqlConnectorConfig.class, DatabaseConfig.class, Dialect.class))
.otherClasses(List.of(SqlConnectorConfig.class, DatabaseConnectionConfig.class, Dialect.class))
.hide(Charset.class)
.hide(Currency.class)
.hide(InetAddress.class)
Expand Down
7 changes: 6 additions & 1 deletion backend/src/main/java/com/bakdata/conquery/Conquery.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.bakdata.conquery;

import java.time.Clock;
import jakarta.validation.Validator;

import ch.qos.logback.classic.Level;
Expand Down Expand Up @@ -39,6 +40,10 @@ public class Conquery extends Application<ConqueryConfig> {

private final String name;

protected Clock getQueryClock() {
return Clock.systemDefaultZone();
}

public Conquery() {
this("Conquery");
}
Expand Down Expand Up @@ -103,7 +108,7 @@ protected Level bootstrapLogLevel() {
@Override
public void run(ConqueryConfig configuration, Environment environment) throws Exception {
ManagerProvider provider = configuration.getSqlConnectorConfig().isEnabled() ?
new LocalManagerProvider() : new ClusterManagerProvider();
new LocalManagerProvider(getQueryClock()) : new ClusterManagerProvider();
Manager manager = provider.provideManager(configuration, environment);

ManagerNode managerNode = new ManagerNode();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.bakdata.conquery.commands;

import java.nio.file.Path;
import java.time.Clock;
import java.util.List;
import java.util.Vector;

Expand All @@ -13,6 +14,7 @@
import io.dropwizard.core.cli.ServerCommand;
import io.dropwizard.core.setup.Environment;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import net.sourceforge.argparse4j.inf.Namespace;

Expand All @@ -24,6 +26,9 @@ public class DistributedStandaloneCommand extends ServerCommand<ConqueryConfig>
private final List<ShardNode> shardNodes = new Vector<>();
private ClusterManager manager;

@Setter
private Clock clock = Clock.systemDefaultZone();

public DistributedStandaloneCommand() {
super(new NoOpConquery(), "standalone", "starts a manager node and shard node(s) at the same time in a single JVM.");
}
Expand All @@ -46,6 +51,7 @@ protected void run(Environment environment, Namespace namespace, ConqueryConfig
for (int id = 0; id < configuration.getStandalone().getNumberOfShardNodes(); id++) {

ShardNode sc = new ShardNode(ShardNode.DEFAULT_NAME + id);
sc.setClock(clock);

shardNodes.add(sc);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.bakdata.conquery.commands;

import java.time.Clock;
import java.util.Collection;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
Expand Down Expand Up @@ -36,6 +37,8 @@ public class ShardNode implements ConfiguredBundle<ConqueryConfig> {
@Setter
private ShardWorkers workers;
private ClusterConnectionShard clusterConnection;
@Setter
private Clock clock = Clock.systemDefaultZone();

public ShardNode() {
this(DEFAULT_NAME);
Expand All @@ -55,7 +58,8 @@ public void run(ConqueryConfig config, Environment environment) throws Exception
workers = new ShardWorkers(
config.getQueries().getExecutionPool(),
internalMapperFactory,
config.getQueries().getSecondaryIdSubPlanRetention()
config.getQueries().getSecondaryIdSubPlanRetention(),
clock
);

lifecycle.manage(workers);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.bakdata.conquery.commands;

import java.time.Clock;
import java.util.List;

import com.bakdata.conquery.mode.Manager;
Expand All @@ -15,4 +16,7 @@ public interface StandaloneCommand {

Environment getEnvironment();

void setClock(Clock clock);
Clock getClock();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.bakdata.conquery.mode.local;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

import com.bakdata.conquery.models.datasets.Dataset;
import lombok.AccessLevel;
import lombok.Data;
import lombok.Getter;
import org.jooq.DSLContext;

@Data
public class ConnectionManager {
@Getter(AccessLevel.NONE)
private final Map<String, ManagedConnection> connections = new HashMap<>();

public void addConnection(String name, ManagedConnection connection) {
connections.put(name, connection);
}

public DSLContext connect(Dataset dataset) {
return getConnection(dataset).connect();
}

public ManagedConnection getConnection(Dataset dataset) {
return Objects.requireNonNull(connections.get(dataset.getDataSource()), () -> "No connection available for datset=%s, dataSource=%s".formatted(dataset.getName(), dataset.getDataSource()));
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.bakdata.conquery.mode.local;

import java.time.Clock;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand All @@ -14,31 +15,25 @@
import com.bakdata.conquery.models.worker.DatasetRegistry;
import com.bakdata.conquery.models.worker.LocalNamespace;
import com.bakdata.conquery.models.worker.ShardNodeInformation;
import com.bakdata.conquery.sql.conversion.dialect.SqlDialectFactory;
import io.dropwizard.core.setup.Environment;
import lombok.RequiredArgsConstructor;

@RequiredArgsConstructor
public class LocalManagerProvider implements ManagerProvider {

private static final Supplier<Collection<ShardNodeInformation>> EMPTY_NODE_PROVIDER = Collections::emptyList;
private final Clock clock;

private final SqlDialectFactory dialectFactory;

public LocalManagerProvider() {
this.dialectFactory = new SqlDialectFactory();
}

public LocalManagerProvider(SqlDialectFactory dialectFactory) {
this.dialectFactory = dialectFactory;
}

@Override
public DelegateManager<LocalNamespace> provideManager(ConqueryConfig config, Environment environment) {

final ConnectionManager connectionManager = config.getSqlConnectorConfig().toConnectionManager(environment);

final MetaStorage storage = new MetaStorage(config.getStorage());
final InternalMapperFactory internalMapperFactory = new InternalMapperFactory(config, environment.getValidator());
final NamespaceHandler<LocalNamespace> namespaceHandler = new LocalNamespaceHandler(config, internalMapperFactory, dialectFactory);
final NamespaceHandler<LocalNamespace> namespaceHandler = new LocalNamespaceHandler(config, internalMapperFactory, connectionManager, clock);
final DatasetRegistry<LocalNamespace> datasetRegistry = ManagerProvider.createDatasetRegistry(namespaceHandler, config, internalMapperFactory);


return new DelegateManager<>(
config,
environment,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,21 @@
package com.bakdata.conquery.mode.local;

import java.time.Clock;

import com.bakdata.conquery.io.storage.MetaStorage;
import com.bakdata.conquery.io.storage.NamespaceStorage;
import com.bakdata.conquery.mode.NamespaceHandler;
import com.bakdata.conquery.mode.NamespaceSetupData;
import com.bakdata.conquery.mode.cluster.InternalMapperFactory;
import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.config.DatabaseConfig;
import com.bakdata.conquery.models.config.IdColumnConfig;
import com.bakdata.conquery.models.config.SqlConnectorConfig;
import com.bakdata.conquery.models.identifiable.ids.specific.DatasetId;
import com.bakdata.conquery.models.query.ExecutionManager;
import com.bakdata.conquery.models.worker.DatasetRegistry;
import com.bakdata.conquery.models.worker.LocalNamespace;
import com.bakdata.conquery.sql.DSLContextWrapper;
import com.bakdata.conquery.sql.DslContextFactory;
import com.bakdata.conquery.sql.conquery.SqlExecutionManager;
import com.bakdata.conquery.sql.conversion.NodeConversions;
import com.bakdata.conquery.sql.conversion.SqlConverter;
import com.bakdata.conquery.sql.conversion.dialect.SqlDialect;
import com.bakdata.conquery.sql.conversion.dialect.SqlDialectFactory;
import com.bakdata.conquery.sql.conversion.dialect.DialectBundle;
import com.bakdata.conquery.sql.execution.ResultSetProcessor;
import com.bakdata.conquery.sql.execution.ResultSetProcessorFactory;
import com.bakdata.conquery.sql.execution.SqlExecutionService;
Expand All @@ -34,42 +30,37 @@ public class LocalNamespaceHandler implements NamespaceHandler<LocalNamespace> {

private final ConqueryConfig config;
private final InternalMapperFactory internalMapperFactory;
private final SqlDialectFactory dialectFactory;
private final ConnectionManager connectionManager;
private final Clock clock;

@Override
public LocalNamespace createNamespace(NamespaceStorage namespaceStorage, MetaStorage metaStorage, DatasetRegistry<LocalNamespace> datasetRegistry, Environment environment) {
public LocalNamespace createNamespace(
NamespaceStorage namespaceStorage,
MetaStorage metaStorage,
DatasetRegistry<LocalNamespace> datasetRegistry,
Environment environment) {

NamespaceSetupData namespaceData = NamespaceHandler.createNamespaceSetup(namespaceStorage, config, internalMapperFactory, datasetRegistry, environment);

IdColumnConfig idColumns = config.getIdColumns();
SqlConnectorConfig sqlConnectorConfig = config.getSqlConnectorConfig();
DatabaseConfig databaseConfig = sqlConnectorConfig.getDatabaseConfig(namespaceStorage.getDataset());

DSLContextWrapper dslContextWrapper = DslContextFactory.create(databaseConfig, sqlConnectorConfig, environment.healthChecks());
DSLContext dslContext = dslContextWrapper.getDslContext();
SqlDialect sqlDialect = dialectFactory.createSqlDialect(databaseConfig.getDialect());

boolean valid = dslContext.connectionResult(connection -> connection.isValid(1));
ManagedConnection connection = connectionManager.getConnection(namespaceStorage.getDataset());
DSLContext dslContext = connection.connect();
DialectBundle dialectBundle = connection.getConnection().getDialect().getDialectBundle();

if (!valid) {
throw new IllegalStateException("Unable to connect to %s".formatted(databaseConfig));
}

ResultSetProcessor resultSetProcessor = ResultSetProcessorFactory.create(config, sqlDialect);
ResultSetProcessor resultSetProcessor = ResultSetProcessorFactory.create(config, dialectBundle);
SqlExecutionService sqlExecutionService = new SqlExecutionService(dslContext, resultSetProcessor);
NodeConversions nodeConversions = new NodeConversions(idColumns, sqlDialect, dslContext, databaseConfig, sqlExecutionService);

NodeConversions nodeConversions = new NodeConversions(config.getIdColumns(), dialectBundle, dslContext, sqlExecutionService, clock);
SqlConverter sqlConverter = new SqlConverter(nodeConversions, config);
ExecutionManager executionManager = new SqlExecutionManager(sqlConverter, sqlExecutionService, metaStorage, datasetRegistry, config);
SqlStorageHandler sqlStorageHandler = new SqlStorageHandler(sqlExecutionService);
SqlEntityResolver sqlEntityResolver = new SqlEntityResolver(idColumns, dslContext, sqlDialect, sqlExecutionService);
SqlEntityResolver sqlEntityResolver = new SqlEntityResolver(config.getIdColumns(), dslContext, dialectBundle, sqlExecutionService);

return new LocalNamespace(
sqlDialect,
dialectBundle,
namespaceData.preprocessMapper(),
namespaceStorage,
executionManager,
dslContextWrapper,
sqlStorageHandler,
dslContext, sqlStorageHandler,
namespaceData.jobManager(),
namespaceData.filterSearch(),
sqlEntityResolver
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package com.bakdata.conquery.mode.local;

import java.sql.SQLException;

import javax.annotation.CheckForNull;

import com.bakdata.conquery.models.config.DatabaseConnectionConfig;
import com.bakdata.conquery.models.config.SqlConnectorConfig;
import com.codahale.metrics.health.HealthCheckRegistry;
import com.google.common.base.Preconditions;
import com.zaxxer.hikari.HikariDataSource;
import io.dropwizard.lifecycle.Managed;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.jooq.DSLContext;
import org.jooq.conf.RenderOptionalKeyword;
import org.jooq.conf.RenderQuotedNames;
import org.jooq.conf.Settings;
import org.jooq.impl.DSL;

@Data
@Slf4j
public class ManagedConnection implements Managed {
private final String name;
private final SqlConnectorConfig config;
private final DatabaseConnectionConfig connection;
@CheckForNull
private final HealthCheckRegistry healthCheckRegistry;

private HikariDataSource dataSource;

@Override
public void start() throws Exception {
dataSource = connection.createDataSource(healthCheckRegistry);

try {
log.debug("TEST connecting to {}", connection.getJdbcConnectionUrl());
if (dataSource.getConnection().isValid(100)) {
log.info("SUCCESS connecting to {}", connection.getJdbcConnectionUrl());
}
else {
log.error("FAILED connecting to {}. Connection did not become valid.", connection.getJdbcConnectionUrl());
}
}
catch (SQLException exception) {
log.error("FAILED connecting to {}", connection.getJdbcConnectionUrl(), exception);
}
}

public DSLContext connect() {
Preconditions.checkNotNull(this.dataSource, "dataSource has not been initialized yet.");

Settings settings = new Settings()
.withRenderFormatted(config.isWithPrettyPrinting())
// enforces all identifiers to be quoted if not explicitly unquoted via DSL.unquotedName()
// to prevent any lowercase/uppercase SQL dialect specific identifier naming issues
.withRenderQuotedNames(RenderQuotedNames.EXPLICIT_DEFAULT_QUOTED)
// always render "as" keyword for field aliases
.withRenderOptionalAsKeywordForFieldAliases(RenderOptionalKeyword.ON);

return DSL.using(
this.dataSource,
connection.getDialect().getDialectBundle().getJooqDialect(),
settings
);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import com.bakdata.conquery.models.identifiable.mapping.EntityIdMap;
import com.bakdata.conquery.models.identifiable.mapping.ExternalId;
import com.bakdata.conquery.sql.conversion.SharedAliases;
import com.bakdata.conquery.sql.conversion.dialect.SqlDialect;
import com.bakdata.conquery.sql.conversion.dialect.DialectBundle;
import com.bakdata.conquery.sql.execution.SqlExecutionService;
import com.bakdata.conquery.util.DateReader;
import com.bakdata.conquery.util.io.IdColumnUtil;
Expand All @@ -49,7 +49,7 @@ public class SqlEntityResolver implements EntityResolver {

private final IdColumnConfig idColumns;
private final DSLContext context;
private final SqlDialect dialect;
private final DialectBundle dialect;
private final SqlExecutionService executionService;

@Override
Expand Down
Loading
Loading