Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public static void main(String[] args) {
.build();

LOGGER.info("Starting Spanner DDL creation...");
spannerClient.createDatabase();
spannerClient.validateOrInitializeDatabase();
LOGGER.info("Spanner DDL creation complete.");

Pipeline pipeline = Pipeline.create(options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public static void main(String[] args) {
.build();

LOGGER.info("Starting Spanner DDL creation...");
spannerClient.createDatabase();
spannerClient.validateOrInitializeDatabase();
LOGGER.info("Spanner DDL creation complete.");

Pipeline pipeline = Pipeline.create(options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public void setUp() throws Exception {
.numShards(1)
.emulatorHost(emulatorHost)
.build();
spannerClient.createDatabase();
spannerClient.validateOrInitializeDatabase();

// Clear tables
DatabaseId dbId = DatabaseId.of(projectId, instanceId, databaseId);
Expand Down
24 changes: 24 additions & 0 deletions pipeline/spanner/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,29 @@
<groupId>com.google.api.grpc</groupId>
<artifactId>grpc-google-cloud-spanner-admin-database-v1</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>
<version>1.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>${beam.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.beam.sdk.Pipeline;
Expand Down Expand Up @@ -104,13 +105,51 @@ public SpannerWriteResult writeMutations(
private List<String> parseDdlStatements(BufferedReader reader) throws IOException {
String fullText = reader.lines().collect(Collectors.joining("\n"));
String[] blocksArray = fullText.split("\\n\\s*\\n+", 0);
return Arrays.asList(blocksArray);
return new ArrayList<>(Arrays.asList(blocksArray));
}

public void createDatabase() {
private ByteString loadProtoDescriptors() {
InputStream is = getClass().getClassLoader().getResourceAsStream("descriptor.proto.bin");
if (is == null) {
throw new IllegalStateException(
"Could not find proto descriptor file (descriptor.proto.bin) in resources.");
}
try {
DatabaseAdminClient dbAdminClient;
return ByteString.copyFrom(is.readAllBytes());
} catch (IOException e) {
throw new IllegalStateException("Failed to read proto descriptor file.", e);
}
}

/** Initializes the remote Spanner database. */
private void initializeRemoteDatabase(DatabaseAdminClient dbAdminClient) {
LOGGER.info("Spanner database {} not found. Creating it now.", spannerDatabaseId);

CreateDatabaseRequest request =
CreateDatabaseRequest.newBuilder()
.setParent(String.format("projects/%s/instances/%s", gcpProjectId, spannerInstanceId))
.setCreateStatement(String.format("CREATE DATABASE `%s`", spannerDatabaseId))
.build();

try {
// Block until the database is created.
dbAdminClient.createDatabaseAsync(request).get();
} catch (java.util.concurrent.ExecutionException executionE) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.UNKNOWN, "An error occurred during database creation.", executionE);
} catch (InterruptedException interruptedE) {
LOGGER.error("Operation was interrupted while waiting for database creation.", interruptedE);
throw SpannerExceptionFactory.propagateInterrupt(interruptedE);
}
LOGGER.info("Successfully created Spanner database {}.", spannerDatabaseId);
}

/**
* Validates that the Spanner database and tables exist. Initializes the database if it doesn't.
*/
public void validateOrInitializeDatabase() {
DatabaseAdminClient dbAdminClient;
try {
if (emulatorHost != null) {
DatabaseAdminSettings.Builder settingsBuilder = DatabaseAdminSettings.newBuilder();
settingsBuilder.setCredentialsProvider(NoCredentialsProvider.create());
Expand All @@ -124,69 +163,101 @@ public void createDatabase() {
} else {
dbAdminClient = DatabaseAdminClient.create();
}
} catch (IOException e) {
throw new IllegalStateException("Failed to create DatabaseAdminClient.", e);
}

try {
dbAdminClient.getDatabase(
String.format(
"projects/%s/instances/%s/databases/%s",
gcpProjectId, spannerInstanceId, spannerDatabaseId));
LOGGER.info("Spanner database {} already exists.", spannerDatabaseId);
} catch (com.google.api.gax.rpc.NotFoundException notFoundE) {
LOGGER.info("Spanner database {} not found. Creating it now.", spannerDatabaseId);

List<String> ddlStatements;
try (InputStream inputStream =
getClass().getClassLoader().getResourceAsStream("spanner_schema.sql")) {
if (inputStream == null) {
throw new java.io.FileNotFoundException(
"Could not find spanner_schema.sql in resources.");
}
try (BufferedReader reader =
new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
ddlStatements = parseDdlStatements(reader);
}
} catch (IOException ioE) {
throw new IOException("Failed to read DDL file", ioE);
}
// Ensure the Database now has the proper Tables and Proto Bundle.
String databasePath =
String.format(
"projects/%s/instances/%s/databases/%s",
gcpProjectId, spannerInstanceId, spannerDatabaseId);

ByteString protoDescriptors;
try (InputStream inputStream =
getClass().getClassLoader().getResourceAsStream("descriptor.proto.bin")) {
if (inputStream == null) {
throw new java.io.FileNotFoundException(
"Could not find proto descriptor file (descriptor.proto.bin) in resources.");
}
protoDescriptors = ByteString.copyFrom(inputStream.readAllBytes());
} catch (IOException ioE) {
throw new IOException("Failed to read proto descriptor file", ioE);
}
try {
dbAdminClient.getDatabase(databasePath);
LOGGER.info("Spanner database {} already exists.", spannerDatabaseId);
} catch (com.google.api.gax.rpc.NotFoundException notFoundE) {
initializeRemoteDatabase(dbAdminClient);
}

CreateDatabaseRequest request =
CreateDatabaseRequest.newBuilder()
.setParent(
String.format("projects/%s/instances/%s", gcpProjectId, spannerInstanceId))
.setCreateStatement(String.format("CREATE DATABASE `%s`", spannerDatabaseId))
.addAllExtraStatements(ddlStatements)
.setProtoDescriptors(protoDescriptors)
.build();

try {
dbAdminClient.createDatabaseAsync(request).get();
} catch (java.util.concurrent.ExecutionException executionE) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.UNKNOWN, "An error occurred during database creation.", executionE);
} catch (InterruptedException interruptedE) {
LOGGER.error(
"Operation was interrupted while waiting for database creation.", interruptedE);
throw SpannerExceptionFactory.propagateInterrupt(interruptedE);
}
LOGGER.info("Successfully created Spanner database {}.", spannerDatabaseId);
}
// Check if tables exist by fetching current DDL from Spanner once.
List<String> currentDdlStatements =
dbAdminClient.getDatabaseDdl(databasePath).getStatementsList();

boolean nodeExists = checkTableExists(currentDdlStatements, "Node");
boolean edgeExists = checkTableExists(currentDdlStatements, "Edge");
boolean observationExists = checkTableExists(currentDdlStatements, "Observation");
boolean protoBundleExists =
currentDdlStatements.stream()
.anyMatch(s -> s.trim().toUpperCase().startsWith("CREATE PROTO BUNDLE"));

if (nodeExists && edgeExists && observationExists && protoBundleExists) {
LOGGER.info("Database is fully initialized.");
return;
}

if (nodeExists || edgeExists || observationExists) {
throw new IllegalStateException(
String.format(
"Database is in an inconsistent state. Node: %b, Edge: %b, Observation: %b. Please clean up the database manually.",
nodeExists, edgeExists, observationExists));
}

LOGGER.info("Database is empty. Applying DDL statements to existing database.");
List<String> statementsToApply = readDdlStatements();
if (protoBundleExists) {
LOGGER.info("Proto bundle already exists in database. Skipping CREATE PROTO BUNDLE.");
statementsToApply.removeIf(s -> s.trim().toUpperCase().startsWith("CREATE PROTO BUNDLE"));
}

ByteString protoDescriptors = loadProtoDescriptors();
try {
// Update the Schema to include the new tables and proto bundle.
dbAdminClient
.updateDatabaseDdlAsync(
com.google.spanner.admin.database.v1.UpdateDatabaseDdlRequest.newBuilder()
.setDatabase(databasePath)
.addAllStatements(statementsToApply)
.setProtoDescriptors(protoDescriptors)
.build())
.get();
LOGGER.info("Successfully applied DDL statements to existing database.");
} catch (java.util.concurrent.ExecutionException executionE) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.UNKNOWN, "An error occurred during DDL update.", executionE);
} catch (InterruptedException interruptedE) {
LOGGER.error("Operation was interrupted while waiting for DDL update.", interruptedE);
throw SpannerExceptionFactory.propagateInterrupt(interruptedE);
}
}

/** Reads DDL statements from the spanner_schema.sql file in the resources directory. */
List<String> readDdlStatements() {
InputStream inputStream = getClass().getClassLoader().getResourceAsStream("spanner_schema.sql");
if (inputStream == null) {
throw new IllegalStateException("Could not find spanner_schema.sql in resources.");
}
try (BufferedReader reader =
new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
return parseDdlStatements(reader);
} catch (IOException e) {
throw new IllegalStateException("Failed to create DatabaseAdminClient.", e);
throw new IllegalStateException("Failed to read spanner_schema.sql", e);
}
}

/**
* Checks if a table exists in the list of DDL statements.
*
* @param statements The list of DDL statements retrieved from Spanner.
* @param tableName The name of the table to check.
* @return true if the table exists, false otherwise.
*/
boolean checkTableExists(List<String> statements, String tableName) {
String regex = "(?i)\\bCREATE\\s+TABLE\\s+`?" + tableName + "`?\\b";
Pattern pattern = Pattern.compile(regex, Pattern.CASE_INSENSITIVE);
return statements.stream().anyMatch(s -> pattern.matcher(s).find());
}

public PCollection<Void> deleteDataForImport(
Pipeline pipeline, String importName, String tableName, String columnName) {
String stageName = tableName + "-" + importName.replaceFirst("^dc/base/", "");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package org.datacommons.ingestion.spanner;

import static org.junit.Assert.*;
import static org.mockito.Mockito.*;

import java.util.List;
import org.junit.Before;
import org.junit.Test;

public class SpannerClientTest {

private SpannerClient spannerClient;

@Before
public void setUp() {
spannerClient =
SpannerClient.builder()
.gcpProjectId("test-project")
.spannerInstanceId("test-instance")
.spannerDatabaseId("test-db")
.build();
}

@Test
public void testReadDdlStatements() {
List<String> ddlStatements = spannerClient.readDdlStatements();
assertNotNull(ddlStatements);
assertFalse(ddlStatements.isEmpty());

// Verify that we have at least table creation statements
boolean hasNodeTable = false;
for (String ddl : ddlStatements) {
if (ddl.contains("CREATE TABLE Node")) {
hasNodeTable = true;
break;
}
}
assertTrue("Should contain Node table DDL", hasNodeTable);
}

@Test
public void testCheckTableExists_True() {
List<String> statements = new java.util.ArrayList<>();
statements.add("CREATE TABLE Node");

boolean exists = spannerClient.checkTableExists(statements, "Node");
assertTrue(exists);
}

@Test
public void testCheckTableExists_False() {
List<String> statements = new java.util.ArrayList<>();
statements.add("CREATE TABLE Edge");

boolean exists = spannerClient.checkTableExists(statements, "Node");
assertFalse(exists);
}
Comment thread
gmechali marked this conversation as resolved.

@Test
public void testCheckTableExists_FalsePositive() {
List<String> statements = new java.util.ArrayList<>();
statements.add("CREATE TABLE Node_Old");

boolean exists = spannerClient.checkTableExists(statements, "Node");
assertFalse(exists);
}
}