Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions pipeline/spanner/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,29 @@
<groupId>com.google.api.grpc</groupId>
<artifactId>grpc-google-cloud-spanner-admin-database-v1</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>
<version>1.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>${beam.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.beam.sdk.Pipeline;
Expand Down Expand Up @@ -125,41 +126,81 @@ public void createDatabase() {
dbAdminClient = DatabaseAdminClient.create();
}

List<String> ddlStatements = readDdlStatements();

ByteString protoDescriptors;
try (InputStream inputStream =
getClass().getClassLoader().getResourceAsStream("descriptor.proto.bin")) {
if (inputStream == null) {
throw new java.io.FileNotFoundException(
"Could not find proto descriptor file (descriptor.proto.bin) in resources.");
}
protoDescriptors = ByteString.copyFrom(inputStream.readAllBytes());
}
Comment thread
gmechali marked this conversation as resolved.
Outdated

try {
dbAdminClient.getDatabase(
String.format(
"projects/%s/instances/%s/databases/%s",
gcpProjectId, spannerInstanceId, spannerDatabaseId));
LOGGER.info("Spanner database {} already exists.", spannerDatabaseId);
} catch (com.google.api.gax.rpc.NotFoundException notFoundE) {
LOGGER.info("Spanner database {} not found. Creating it now.", spannerDatabaseId);

List<String> ddlStatements;
try (InputStream inputStream =
getClass().getClassLoader().getResourceAsStream("spanner_schema.sql")) {
if (inputStream == null) {
throw new java.io.FileNotFoundException(
"Could not find spanner_schema.sql in resources.");
}
try (BufferedReader reader =
new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
ddlStatements = parseDdlStatements(reader);
// Check if tables exist by fetching current DDL from Spanner once.
String databasePath =
String.format(
"projects/%s/instances/%s/databases/%s",
gcpProjectId, spannerInstanceId, spannerDatabaseId);
List<String> currentDdlStatements =
dbAdminClient.getDatabaseDdl(databasePath).getStatementsList();

boolean nodeExists = checkTableExists(currentDdlStatements, "Node");
boolean edgeExists = checkTableExists(currentDdlStatements, "Edge");
boolean observationExists = checkTableExists(currentDdlStatements, "Observation");
Comment thread
gmechali marked this conversation as resolved.
Outdated

if (!nodeExists && !edgeExists && !observationExists) {
LOGGER.info("Database is empty. Applying DDL statements to existing database.");

List<String> statementsToApply = new java.util.ArrayList<>(ddlStatements);
Comment thread
gmechali marked this conversation as resolved.
Outdated

boolean protoBundleExistsInSpanner =
currentDdlStatements.stream()
.anyMatch(s -> s.toUpperCase().contains("CREATE PROTO BUNDLE"));

if (protoBundleExistsInSpanner) {
LOGGER.info("Proto bundle already exists in database. Skipping CREATE PROTO BUNDLE.");
statementsToApply.removeIf(s -> s.toUpperCase().contains("CREATE PROTO BUNDLE"));
}
} catch (IOException ioE) {
throw new IOException("Failed to read DDL file", ioE);
}

ByteString protoDescriptors;
try (InputStream inputStream =
getClass().getClassLoader().getResourceAsStream("descriptor.proto.bin")) {
if (inputStream == null) {
throw new java.io.FileNotFoundException(
"Could not find proto descriptor file (descriptor.proto.bin) in resources.");
try {
dbAdminClient
.updateDatabaseDdlAsync(
com.google.spanner.admin.database.v1.UpdateDatabaseDdlRequest.newBuilder()
.setDatabase(
String.format(
"projects/%s/instances/%s/databases/%s",
gcpProjectId, spannerInstanceId, spannerDatabaseId))
.addAllStatements(statementsToApply)
.setProtoDescriptors(protoDescriptors)
.build())
.get();
LOGGER.info("Successfully applied DDL statements to existing database.");
} catch (java.util.concurrent.ExecutionException executionE) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.UNKNOWN, "An error occurred during DDL update.", executionE);
} catch (InterruptedException interruptedE) {
LOGGER.error("Operation was interrupted while waiting for DDL update.", interruptedE);
throw SpannerExceptionFactory.propagateInterrupt(interruptedE);
}
protoDescriptors = ByteString.copyFrom(inputStream.readAllBytes());
} catch (IOException ioE) {
throw new IOException("Failed to read proto descriptor file", ioE);
} else if (nodeExists && edgeExists && observationExists) {
LOGGER.info("Database is fully initialized.");
} else {
throw new IllegalStateException(
String.format(
"Database is in an inconsistent state. Node: %b, Edge: %b, Observation: %b. Please clean up the database manually.",
nodeExists, edgeExists, observationExists));
}
Comment thread
gmechali marked this conversation as resolved.
Outdated
} catch (com.google.api.gax.rpc.NotFoundException notFoundE) {
LOGGER.info("Spanner database {} not found. Creating it now.", spannerDatabaseId);

CreateDatabaseRequest request =
CreateDatabaseRequest.newBuilder()
Expand Down Expand Up @@ -187,6 +228,32 @@ public void createDatabase() {
}
}

List<String> readDdlStatements() throws IOException {
try (InputStream inputStream =
getClass().getClassLoader().getResourceAsStream("spanner_schema.sql")) {
if (inputStream == null) {
throw new java.io.FileNotFoundException("Could not find spanner_schema.sql in resources.");
}
try (BufferedReader reader =
new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
return parseDdlStatements(reader);
}
}
}

/**
* Checks if a table exists in the list of DDL statements.
*
* @param statements The list of DDL statements retrieved from Spanner.
* @param tableName The name of the table to check.
* @return true if the table exists, false otherwise.
*/
boolean checkTableExists(List<String> statements, String tableName) {
String regex = "^\\s*CREATE\\s+TABLE\\s+[`]?" + tableName + "[`]?\\b";
Pattern pattern = Pattern.compile(regex, Pattern.CASE_INSENSITIVE);
return statements.stream().anyMatch(s -> pattern.matcher(s).find());
}

public PCollection<Void> deleteDataForImport(
Pipeline pipeline, String importName, String tableName, String columnName) {
String stageName = tableName + "-" + importName.replaceFirst("^dc/base/", "");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package org.datacommons.ingestion.spanner;

import static org.junit.Assert.*;
import static org.mockito.Mockito.*;

import java.io.IOException;
import java.util.List;
import org.junit.Before;
import org.junit.Test;

public class SpannerClientTest {

private SpannerClient spannerClient;

@Before
public void setUp() {
spannerClient =
SpannerClient.builder()
.gcpProjectId("test-project")
.spannerInstanceId("test-instance")
.spannerDatabaseId("test-db")
.build();
}

@Test
public void testReadDdlStatements() throws IOException {
List<String> ddlStatements = spannerClient.readDdlStatements();
assertNotNull(ddlStatements);
assertFalse(ddlStatements.isEmpty());

// Verify that we have at least table creation statements
boolean hasNodeTable = false;
for (String ddl : ddlStatements) {
if (ddl.contains("CREATE TABLE Node")) {
hasNodeTable = true;
break;
}
}
assertTrue("Should contain Node table DDL", hasNodeTable);
}

@Test
public void testCheckTableExists_True() {
List<String> statements = new java.util.ArrayList<>();
statements.add("CREATE TABLE Node");

boolean exists = spannerClient.checkTableExists(statements, "Node");
assertTrue(exists);
}

@Test
public void testCheckTableExists_False() {
List<String> statements = new java.util.ArrayList<>();
statements.add("CREATE TABLE Edge");

boolean exists = spannerClient.checkTableExists(statements, "Node");
}
Comment thread
gmechali marked this conversation as resolved.

@Test
public void testCheckTableExists_FalsePositive() {
List<String> statements = new java.util.ArrayList<>();
statements.add("CREATE TABLE Node_Old");

boolean exists = spannerClient.checkTableExists(statements, "Node");
assertFalse(exists);
}
}