Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public static void beforeClass() throws Exception {
hConf.setBoolean("fs.hdfs.impl.disable.cache", true);

zkServer = InMemoryZKServer.builder().build();
zkServer.startAndWait();
zkServer.startAsync().awaitRunning();

CConfiguration cConf = CConfiguration.create();
// tests should use the current user for HDFS
Expand All @@ -121,7 +121,7 @@ public static void beforeClass() throws Exception {
server = TransactionServiceTest
.createTxService(zkServer.getConnectionStr(), Networks.getRandomPort(),
hConf, tmpFolder.newFolder(), cConf);
server.startAndWait();
server.startAsync().awaitRunning();

injector = Guice.createInjector(
new ConfigModule(cConf, hConf),
Expand Down Expand Up @@ -150,25 +150,25 @@ protected void configure() {
new AuthenticationContextModules().getNoOpModule());

zkClient = injector.getInstance(ZKClientService.class);
zkClient.startAndWait();
zkClient.startAsync().awaitRunning();

txStateStorage = injector.getInstance(TransactionStateStorage.class);
txStateStorage.startAndWait();
txStateStorage.startAsync().awaitRunning();
}

@AfterClass
public static void afterClass() {
try {
try {
server.stopAndWait();
server.stopAsync().awaitTerminated();
miniDfsCluster.shutdown();
} finally {
zkClient.stopAndWait();
txStateStorage.stopAndWait();
zkClient.stopAsync().awaitTerminated();
txStateStorage.stopAsync().awaitTerminated();
}
} finally {
zkServer.stopAndWait();
txStateStorage.stopAndWait();
zkServer.stopAsync().awaitTerminated();
txStateStorage.stopAsync().awaitTerminated();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public void before() throws Exception {
hConf.setBoolean("fs.hdfs.impl.disable.cache", true);

zkServer = InMemoryZKServer.builder().build();
zkServer.startAndWait();
zkServer.startAsync().awaitRunning();
}

@After
Expand All @@ -100,7 +100,7 @@ public void after() throws Exception {
miniDfsCluster.shutdown();
} finally {
if (zkServer != null) {
zkServer.stopAndWait();
zkServer.stopAsync().awaitTerminated();
}
}
}
Expand Down Expand Up @@ -146,7 +146,7 @@ protected void configure() {
);

ZKClientService zkClient = injector.getInstance(ZKClientService.class);
zkClient.startAndWait();
zkClient.startAsync().awaitRunning();

try {
final Table table = createTable("myTable");
Expand All @@ -161,7 +161,7 @@ protected void configure() {
TransactionService first = createTxService(zkServer.getConnectionStr(),
Networks.getRandomPort(),
hConf, tmpFolder.newFolder());
first.startAndWait();
first.startAsync().awaitRunning();
Assert.assertNotNull(txClient.startShort());
verifyGetAndPut(table, txExecutor, null, "val1");

Expand All @@ -171,15 +171,15 @@ protected void configure() {
hConf, tmpFolder.newFolder());
// NOTE: we don't have to wait for start as client should pick it up anyways, but we do wait to ensure
// the case with two active is handled well
second.startAndWait();
second.startAsync().awaitRunning();
// wait for affect a bit
TimeUnit.SECONDS.sleep(1);

Assert.assertNotNull(txClient.startShort());
verifyGetAndPut(table, txExecutor, "val1", "val2");

// shutting down the first one is fine: we have another one to pick up the leader role
first.stopAndWait();
first.stopAsync().awaitTerminated();

Assert.assertNotNull(txClient.startShort());
verifyGetAndPut(table, txExecutor, "val2", "val3");
Expand All @@ -189,21 +189,21 @@ protected void configure() {
Networks.getRandomPort(),
hConf, tmpFolder.newFolder());
// NOTE: we don't have to wait for start as client should pick it up anyways
third.start();
third.startAsync();
// stopping second one
second.stopAndWait();
second.stopAsync().awaitTerminated();

Assert.assertNotNull(txClient.startShort());
verifyGetAndPut(table, txExecutor, "val3", "val4");

// releasing resources
third.stop();
third.stopAsync().awaitTerminated();

} finally {
try {
dropTable("myTable");
} finally {
zkClient.stopAndWait();
zkClient.stopAsync().awaitTerminated();
}
}
}
Expand Down Expand Up @@ -268,7 +268,7 @@ protected void configure() {
new AuthorizationTestModule(),
new AuthorizationEnforcementModule().getInMemoryModules(),
new AuthenticationContextModules().getNoOpModule());
injector.getInstance(ZKClientService.class).startAndWait();
injector.getInstance(ZKClientService.class).startAsync().awaitRunning();

return injector.getInstance(TransactionService.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@

package io.cdap.cdap.data.dataset;

import com.google.common.base.Objects;
import com.google.common.base.Throwables;
import com.google.common.base.MoreObjects;
import io.cdap.cdap.api.data.DatasetInstantiationException;
import io.cdap.cdap.api.dataset.Dataset;
import io.cdap.cdap.api.dataset.DatasetAdmin;
Expand All @@ -26,7 +25,6 @@
import io.cdap.cdap.api.service.ServiceUnavailableException;
import io.cdap.cdap.data2.datafabric.dataset.type.ConstantClassLoaderProvider;
import io.cdap.cdap.data2.datafabric.dataset.type.DatasetClassLoaderProvider;
import io.cdap.cdap.data2.datafabric.dataset.type.DirectoryClassLoaderProvider;
import io.cdap.cdap.data2.dataset2.DatasetFramework;
import io.cdap.cdap.data2.metadata.lineage.AccessType;
import io.cdap.cdap.proto.id.DatasetId;
Expand Down Expand Up @@ -74,7 +72,7 @@ public SystemDatasetInstantiator(DatasetFramework datasetFramework,
this.classLoaderProvider = classLoaderProvider;
this.datasetFramework = datasetFramework;
this.parentClassLoader = parentClassLoader == null
? Objects.firstNonNull(Thread.currentThread().getContextClassLoader(),
? MoreObjects.firstNonNull(Thread.currentThread().getContextClassLoader(),
getClass().getClassLoader()) :
parentClassLoader;
}
Expand Down Expand Up @@ -110,7 +108,11 @@ public <T extends Dataset> T getDataset(DatasetId datasetId, Map<String, String>
}
return dataset;
} catch (Exception e) {
Throwables.propagateIfInstanceOf(e, ServiceUnavailableException.class);
if (e instanceof ServiceUnavailableException) {

throw (ServiceUnavailableException) e;

}
throw new DatasetInstantiationException("Failed to access dataset: " + datasetId, e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package io.cdap.cdap.data2.audit;

import com.google.common.base.Objects;
import com.google.common.base.MoreObjects;
import com.google.gson.Gson;
import com.google.inject.Inject;
import io.cdap.cdap.api.messaging.TopicNotFoundException;
Expand Down Expand Up @@ -69,7 +69,7 @@ public void publish(EntityId entityId, AuditType auditType, AuditPayload auditPa
@Override
public void publish(MetadataEntity metadataEntity, AuditType auditType,
AuditPayload auditPayload) {
String userId = Objects.firstNonNull(SecurityRequestContext.getUserId(), "");
String userId = MoreObjects.firstNonNull(SecurityRequestContext.getUserId(), "");
AuditMessage auditMessage = new AuditMessage(System.currentTimeMillis(), metadataEntity, userId,
auditType, auditPayload);
LOG.trace("Publishing audit message {}", auditMessage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@
package io.cdap.cdap.data2.datafabric.dataset;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import io.cdap.cdap.api.data.DatasetContext;
import io.cdap.cdap.api.data.DatasetInstantiationException;
import io.cdap.cdap.api.dataset.Dataset;
import io.cdap.cdap.api.dataset.DatasetDefinition;
import io.cdap.cdap.api.dataset.DatasetManagementException;
import io.cdap.cdap.api.dataset.DatasetProperties;
import io.cdap.cdap.api.dataset.DatasetSpecification;
Expand Down Expand Up @@ -128,7 +126,7 @@ public static void createIfNotExists(DatasetFramework datasetFramework,
} catch (DatasetManagementException e) {
LOG.error("Could NOT add dataset instance {} of type {} with props {}",
datasetInstanceId, typeName, props, e);
throw Throwables.propagate(e);
throw new RuntimeException(e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@

package io.cdap.cdap.data2.datafabric.dataset;

import com.google.common.base.Objects;
import com.google.common.base.Throwables;
import com.google.common.base.MoreObjects;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
Expand Down Expand Up @@ -380,7 +379,7 @@ private <T extends DatasetType> T getType(DatasetTypeMeta datasetTypeMeta,
DatasetClassLoaderProvider classLoaderProvider) {

if (classLoader == null) {
classLoader = Objects.firstNonNull(Thread.currentThread().getContextClassLoader(),
classLoader = MoreObjects.firstNonNull(Thread.currentThread().getContextClassLoader(),
getClass().getClassLoader());
}

Expand All @@ -392,15 +391,15 @@ private <T extends DatasetType> T getType(DatasetTypeMeta datasetTypeMeta,
} catch (IOException e) {
LOG.error("Was not able to init classloader for module {} while trying to load type {}",
moduleMeta, datasetTypeMeta, e);
throw Throwables.propagate(e);
throw new RuntimeException(e);
}

try {
DatasetDefinitionRegistries.register(moduleMeta.getClassName(), classLoader, registry);
} catch (Exception e) {
LOG.error("Was not able to load dataset module class {} while trying to load type {}",
moduleMeta.getClassName(), datasetTypeMeta, e);
throw Throwables.propagate(e);
throw new RuntimeException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,12 @@ public AuthorizationDatasetTypeService(

@Override
protected void startUp() throws Exception {
delegate.startAndWait();
delegate.startAsync().awaitRunning();
}

@Override
protected void shutDown() throws Exception {
delegate.stopAndWait();
delegate.stopAsync().awaitTerminated();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package io.cdap.cdap.data2.datafabric.dataset.service;

import com.google.common.base.Objects;
import com.google.common.base.MoreObjects;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.AbstractService;
import com.google.common.util.concurrent.MoreExecutors;
Expand Down Expand Up @@ -142,7 +142,7 @@ protected void doStop() {
private void startUp() {
try {
LOG.info("Starting DatasetService...");
typeService.startAndWait();
typeService.startAsync().awaitRunning();
httpService.start();

// setting watch for ops executor service that we need to be running to operate correctly
Expand All @@ -155,10 +155,10 @@ private void startUp() {
LOG.info("Discovered {} service", Constants.Service.DATASET_EXECUTOR);
opExecutorDiscovered.set(serviceDiscovered);
}
}, MoreExecutors.sameThreadExecutor());
}, MoreExecutors.directExecutor());

for (DatasetMetricsReporter metricsReporter : metricReporters) {
metricsReporter.start();
metricsReporter.startAsync();
}
} catch (Throwable t) {
notifyFailed(t);
Expand Down Expand Up @@ -234,14 +234,14 @@ private void doShutdown() throws Exception {
}

for (DatasetMetricsReporter metricsReporter : metricReporters) {
metricsReporter.stop();
metricsReporter.stopAsync();
}

if (opExecutorServiceWatch != null) {
opExecutorServiceWatch.cancel();
}

typeService.stopAndWait();
typeService.stopAsync().awaitTerminated();

// Wait for a few seconds for requests to stop
httpService.stop();
Expand All @@ -250,7 +250,7 @@ private void doShutdown() throws Exception {

@Override
public String toString() {
return Objects.toStringHelper(this)
return MoreObjects.toStringHelper(this)
.add("bindAddress", httpService.getBindAddress())
.toString();
}
Expand Down
Loading