listACL = fixupACL(path, request.authInfo, acl);
ChangeRecord parentRecord = getRecordForPath(parentPath);
- zks.checkACL(request.cnxn, parentRecord.acl, ZooDefs.Perms.CREATE, request.authInfo, path, listACL);
+ zks.checkACL(request.cnxn, parentRecord.acl, ZooDefs.Perms.CREATE, request.authInfo, path,
+ listACL);
int parentCVersion = parentRecord.stat.getCversion();
if (createMode.isSequential()) {
path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion);
@@ -722,18 +724,25 @@ private void pRequest2TxnCreate(int type, Request request, Record record, boolea
parentRecord.childCount++;
parentRecord.stat.setCversion(newCversion);
parentRecord.stat.setPzxid(request.getHdr().getZxid());
- parentRecord.precalculatedDigest = precalculateDigest(
- DigestOpCode.UPDATE, parentPath, parentRecord.data, parentRecord.stat);
+ parentRecord.precalculatedDigest =
+ precalculateDigest(DigestOpCode.UPDATE, parentPath, parentRecord.data, parentRecord.stat);
addChangeRecord(parentRecord);
- ChangeRecord nodeRecord = new ChangeRecord(
- request.getHdr().getZxid(), path, s, 0, listACL);
+ //updateSpiral(parentRecord);
+ ChangeRecord nodeRecord = new ChangeRecord(request.getHdr().getZxid(), path, s, 0, listACL);
nodeRecord.data = data;
- nodeRecord.precalculatedDigest = precalculateDigest(
- DigestOpCode.ADD, path, nodeRecord.data, s);
+ nodeRecord.precalculatedDigest = precalculateDigest(DigestOpCode.ADD, path, nodeRecord.data, s);
setTxnDigest(request, nodeRecord.precalculatedDigest);
addChangeRecord(nodeRecord);
+ SpiralNode spiral = new SpiralNode(data, 0L, s);
+ try {
+ zks.createSpiralRecord(path, spiral);
+ } catch (IOException e) {
+ // handle exception
+ }
}
+
+
private void validatePath(String path, long sessionId) throws BadArgumentsException {
try {
PathUtils.validatePath(path);
@@ -1191,4 +1200,26 @@ private void setTxnDigest(Request request, PrecalculatedDigest preCalculatedDige
}
request.setTxnDigest(new TxnDigest(digestCalculator.getDigestVersion(), preCalculatedDigest.treeDigest));
}
+
+ public SpiralNode convert2Spiral(Request request) throws IOException {
+ CreateRequest createRequest = new CreateRequest();
+ ByteBufferInputStream.byteBuffer2Record(request.request, createRequest);
+
+ // TODO - convert lit of ACL to some long?
+ long acl = 0;
+ byte[] data = createRequest.getData();
+ StatPersisted stat = new StatPersisted();
+ stat.setCtime(request.getHdr().getTime());
+ stat.setMtime(request.getHdr().getTime());
+ stat.setCzxid(request.getHdr().getZxid());
+ stat.setMzxid(request.getHdr().getZxid());
+ stat.setPzxid(request.getHdr().getZxid());
+ stat.setVersion(0);
+ stat.setAversion(0);
+ stat.setEphemeralOwner(0);
+ return new SpiralNode(data, acl, stat);
+ }
+
+
}
+
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerConfig.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerConfig.java
index 9da0e53a4b6..60cac2165ce 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerConfig.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerConfig.java
@@ -56,6 +56,11 @@ public class ServerConfig {
protected int listenBacklog = -1;
protected String initialConfig;
+ // Spiral Specific - ZKBridge
+ protected String spiralServer = null;
+ protected long spiralServerPort = -1L;
+
+
/** JVM Pause Monitor feature switch */
protected boolean jvmPauseMonitorToRun = false;
/** JVM Pause Monitor warn threshold in ms */
@@ -121,6 +126,8 @@ public void readFrom(QuorumPeerConfig config) {
metricsProviderConfiguration = config.getMetricsProviderConfiguration();
listenBacklog = config.getClientPortListenBacklog();
initialConfig = config.getInitialConfig();
+ spiralServer = config.getSpiralServer();
+ spiralServerPort = config.getSpiralServerPort();
}
public InetSocketAddress getClientPortAddress() {
@@ -173,4 +180,8 @@ public int getClientPortListenBacklog() {
return listenBacklog;
}
+ public String getSpiralEndpoint() {
+ return spiralServer + ":" + spiralServerPort;
+ }
+
}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SpiralNode.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SpiralNode.java
new file mode 100644
index 00000000000..27c853b828b
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SpiralNode.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.jute.InputArchive;
+import org.apache.jute.OutputArchive;
+import org.apache.jute.Record;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.data.StatPersisted;
+
+/**
+ * This class contains the data for a node in the data tree.
+ *
+ * A data node contains a reference to its parent, a byte array as its data, an
+ * array of ACLs, a stat object, and a set of its children's paths.
+ *
+ */
+@SuppressFBWarnings({"EI_EXPOSE_REP", "EI_EXPOSE_REP2"})
+public class SpiralNode implements Record {
+
+ /** the data for this datanode */
+ public byte[] data;
+
+ /**
+ * the acl map long for this datanode. the datatree has the map
+ */
+ public Long acl;
+
+ /**
+ * the stat for this node that is persisted to disk.
+ */
+ public StatPersisted stat;
+
+ int numberOfChildren = 0;
+
+ SpiralNode() {
+ // default constructor
+ }
+
+ /**
+ * create a Spiral with data, acls and stat
+ */
+ public SpiralNode(byte[] data, Long acl, StatPersisted stat) {
+ this.data = data;
+ this.acl = acl;
+ this.stat = stat;
+ }
+
+ public synchronized void deserialize(InputArchive archive, String tag) throws IOException {
+ archive.startRecord("node");
+ data = archive.readBuffer("data");
+ acl = archive.readLong("acl");
+ stat = new StatPersisted();
+ stat.deserialize(archive, "statpersisted");
+ archive.endRecord("node");
+ }
+
+ public synchronized void serialize(OutputArchive archive, String tag) throws IOException {
+ archive.startRecord(this, tag);
+ archive.writeBuffer(data, "data");
+ archive.writeLong(acl, "acl");
+ stat.serialize(archive, "statpersisted");
+ archive.endRecord(this, tag);
+ }
+
+ public byte[] toByteBuffer() throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ BinaryOutputArchive archive = BinaryOutputArchive.getArchive(baos);
+ serialize(archive, "node");
+ return baos.toByteArray();
+ }
+
+ public static Stat convert2Stat(StatPersisted statPersisted) {
+ Stat stat = new Stat();
+ stat.setAversion(statPersisted.getAversion());
+ stat.setCtime(statPersisted.getCtime());
+ stat.setCversion(statPersisted.getCversion());
+ stat.setCzxid(statPersisted.getCzxid());
+ stat.setMtime(statPersisted.getMtime());
+ stat.setMzxid(statPersisted.getMzxid());
+ stat.setPzxid(statPersisted.getPzxid());
+ stat.setVersion(statPersisted.getVersion());
+ stat.setEphemeralOwner(statPersisted.getEphemeralOwner());
+ stat.setDataLength(0);
+ stat.setNumChildren(0);
+ return stat;
+ }
+}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index 7adfb9e1297..d7704c6ee69 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -19,6 +19,8 @@
package org.apache.zookeeper.server;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
@@ -38,6 +40,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import javax.security.sasl.SaslException;
+
import org.apache.jute.BinaryInputArchive;
import org.apache.jute.BinaryOutputArchive;
import org.apache.jute.Record;
@@ -48,7 +51,6 @@
import org.apache.zookeeper.Version;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooDefs.OpCode;
-import org.apache.zookeeper.ZookeeperBanner;
import org.apache.zookeeper.common.Time;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
@@ -71,6 +73,7 @@
import org.apache.zookeeper.server.ServerCnxn.CloseRequestException;
import org.apache.zookeeper.server.SessionTracker.Session;
import org.apache.zookeeper.server.SessionTracker.SessionExpirer;
+import org.apache.zookeeper.spiral.SpiralClient;
import org.apache.zookeeper.server.auth.ProviderRegistry;
import org.apache.zookeeper.server.auth.ServerAuthenticationProvider;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
@@ -91,6 +94,7 @@
* following chain of RequestProcessors to process requests:
* PrepRequestProcessor -> SyncRequestProcessor -> FinalRequestProcessor
*/
+@SuppressWarnings("ALL")
public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
protected static final Logger LOG;
@@ -113,16 +117,19 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
public static final String ZOOKEEPER_DIGEST_ENABLED = "zookeeper.digest.enabled";
private static boolean digestEnabled;
+
// Add a enable/disable option for now, we should remove this one when
// this feature is confirmed to be stable
public static final String CLOSE_SESSION_TXN_ENABLED = "zookeeper.closeSessionTxn.enabled";
private static boolean closeSessionTxnEnabled = true;
+ // Connection to spiralClient.
+ private static SpiralClient spiralClient;
+ private static String spiralEndpoint;
+
static {
LOG = LoggerFactory.getLogger(ZooKeeperServer.class);
- ZookeeperBanner.printBanner(LOG);
-
Environment.logEnv("Server environment:", LOG);
enableEagerACLCheck = Boolean.getBoolean(ENABLE_EAGER_ACL_CHECK);
@@ -139,6 +146,23 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
closeSessionTxnEnabled = Boolean.parseBoolean(
System.getProperty(CLOSE_SESSION_TXN_ENABLED, "true"));
LOG.info("{} = {}", CLOSE_SESSION_TXN_ENABLED, closeSessionTxnEnabled);
+
+ }
+
+ public SpiralNode getSpiralRecord(String key) {
+ byte[] response = spiralClient.get(key);
+ SpiralNode node = new SpiralNode();
+ BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteArrayInputStream(response));
+ try {
+ node.deserialize(bia, "node");
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return node;
+ }
+
+ public void createSpiralRecord(String key, SpiralNode node) throws IOException {
+ spiralClient.put(key, node.toByteBuffer());
}
// @VisibleForTesting
@@ -390,6 +414,12 @@ public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, String initia
this(txnLogFactory, tickTime, -1, -1, -1, new ZKDatabase(txnLogFactory), initialConfig, QuorumPeerConfig.isReconfigEnabled());
}
+ public void setSpiralEndpoint(String spiralEndpoint) {
+ // TODO: add to property/env config file.
+ spiralClient = new SpiralClient(spiralEndpoint);
+ this.spiralEndpoint = spiralEndpoint;
+ }
+
public ServerStats serverStats() {
return serverStats;
}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerConf.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerConf.java
index 499dd568071..30896546aff 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerConf.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerConf.java
@@ -67,6 +67,10 @@ public class ZooKeeperServerConf {
*/
public static final String KEY_CLIENT_PORT_LISTEN_BACKLOG = "client_port_listen_backlog";
+ public static final String SPIRAL_SERVER = "spiral_server";
+
+ public static final String SPIRAL_PORT = "spiral_port";
+
private final int clientPort;
private final String dataDir;
private final String dataLogDir;
@@ -77,6 +81,9 @@ public class ZooKeeperServerConf {
private final long serverId;
private final int clientPortListenBacklog;
+ private String spiralServer;
+ private long spiralPort;
+
/**
* Creates a new configuration.
*
@@ -99,8 +106,17 @@ public class ZooKeeperServerConf {
this.maxSessionTimeout = maxSessionTimeout;
this.serverId = serverId;
this.clientPortListenBacklog = clientPortListenBacklog;
+ this.spiralServer = null;
+ this.spiralPort = -1L;
}
+ public void setSpiralServer(String spiralServer) {
+ this.spiralServer = spiralServer;
+ }
+
+ public void setSpiralPort(long spiralPort) {
+ this.spiralPort = spiralPort;
+ }
/**
* Gets the client port.
*
@@ -197,6 +213,8 @@ public Map toMap() {
conf.put(KEY_MAX_SESSION_TIMEOUT, maxSessionTimeout);
conf.put(KEY_SERVER_ID, serverId);
conf.put(KEY_CLIENT_PORT_LISTEN_BACKLOG, clientPortListenBacklog);
+ conf.put(SPIRAL_SERVER, spiralServer);
+ conf.put(SPIRAL_PORT, spiralPort);
return conf;
}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMain.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMain.java
index 411d5d1fecf..bfa328e11b1 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMain.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMain.java
@@ -62,6 +62,7 @@ public class ZooKeeperServerMain {
* @param args the configfile or the port datadir [ticktime]
*/
public static void main(String[] args) {
+ LOG.info("Starting Server");
ZooKeeperServerMain main = new ZooKeeperServerMain();
try {
main.initializeAndRun(args);
@@ -108,7 +109,6 @@ protected void initializeAndRun(String[] args) throws ConfigException, IOExcepti
} else {
config.parse(args);
}
-
runFromConfig(config);
}
@@ -142,6 +142,9 @@ public void runFromConfig(ServerConfig config) throws IOException, AdminServerEx
final ZooKeeperServer zkServer = new ZooKeeperServer(jvmPauseMonitor, txnLog, config.tickTime, config.minSessionTimeout, config.maxSessionTimeout, config.listenBacklog, null, config.initialConfig);
txnLog.setServerStats(zkServer.serverStats());
+ // Set Spiral Specific configuration.
+ zkServer.setSpiralEndpoint(config.getSpiralEndpoint());
+
// Registers shutdown handler which will be used to know the
// server error or shutdown state changes.
final CountDownLatch shutdownLatch = new CountDownLatch(1);
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
index 948aa8f208b..220e44f57d9 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
@@ -117,6 +117,10 @@ public class QuorumPeerConfig {
protected LearnerType peerType = LearnerType.PARTICIPANT;
+ // Spiral Related
+ protected String spiralServer;
+ protected long spiralServerPort;
+
/**
* Configurations for the quorumpeer-to-quorumpeer sasl authentication
*/
@@ -474,6 +478,10 @@ public void parseProperties(Properties zkProp) throws IOException, ConfigExcepti
multiAddressReachabilityCheckTimeoutMs = Integer.parseInt(value);
} else if (key.equals("multiAddress.reachabilityCheckEnabled")) {
multiAddressReachabilityCheckEnabled = Boolean.parseBoolean(value);
+ } else if (key.equals("spiral-server")) {
+ spiralServer = value;
+ } else if (key.equals("spiral-port")) {
+ spiralServerPort= Long.parseLong(value);
} else {
System.setProperty("zookeeper." + key, value);
}
@@ -1053,4 +1061,12 @@ public static void setReconfigEnabled(boolean enabled) {
public BackupConfig getBackupConfig() {
return backupConfig;
}
+
+ public String getSpiralServer() {
+ return spiralServer;
+ }
+
+ public long getSpiralServerPort() {
+ return spiralServerPort;
+ }
}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/spiral/SpiralClient.java b/zookeeper-server/src/main/java/org/apache/zookeeper/spiral/SpiralClient.java
new file mode 100644
index 00000000000..0d0771d0c96
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/spiral/SpiralClient.java
@@ -0,0 +1,78 @@
+package org.apache.zookeeper.spiral;
+
+import com.google.protobuf.ByteString;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import proto.com.linkedin.spiral.GetRequestOrBuilder;
+import proto.com.linkedin.spiral.Put;
+import proto.com.linkedin.spiral.PutRequest;
+import proto.com.linkedin.spiral.PutResponse;
+import proto.com.linkedin.spiral.SpiralApiGrpc;
+import proto.com.linkedin.spiral.SpiralContext;
+import proto.com.linkedin.spiral.Key;
+import proto.com.linkedin.spiral.GetRequest;
+import proto.com.linkedin.spiral.GetResponse;
+import proto.com.linkedin.spiral.Value;
+
+public class SpiralClient {
+ private static final Logger logger = LoggerFactory.getLogger(SpiralClient.class);
+
+ private final SpiralApiGrpc.SpiralApiBlockingStub _blockingStub;
+
+
+ public SpiralClient(String spiralEndpoint) {
+ try {
+ ManagedChannel channel = ManagedChannelBuilder.forTarget(spiralEndpoint).usePlaintext().build();
+ _blockingStub = SpiralApiGrpc.newBlockingStub(channel);
+ logger.info("Connected to spiral-service : {}", spiralEndpoint);
+ } catch (Exception e) {
+ logger.error("Failed to connect to spiral service at endpoint : {} {}", spiralEndpoint, e.getMessage());
+ throw e;
+ }
+ }
+
+ public byte[] get(String key) {
+ SpiralContext cs = SpiralContext.newBuilder()
+ .setNamespace("test")
+ .setBucket("zk")
+ .build();
+
+ ByteString keyBytes = ByteString.copyFromUtf8(key);
+ Key apiKey = Key.newBuilder().setMessage(keyBytes).build();
+ GetRequest request = GetRequest.newBuilder().setSpiralContext(cs).setKey(apiKey).build();
+ try {
+ GetResponse response = _blockingStub.get(request);
+ return response.getValue().getMessage().toByteArray();
+ } catch (Exception e) {
+ logger.error("Get: RPC failed: {}", e.getMessage());
+ throw e;
+ }
+ }
+
+ public void put(String key, byte[] value) {
+ SpiralContext cs = SpiralContext.newBuilder()
+ .setNamespace("test")
+ .setBucket("zk")
+ .build();
+
+ ByteString keyBytes = ByteString.copyFromUtf8(key);
+ Key apiKey = Key.newBuilder().setMessage(keyBytes).build();
+ Value apiValue = Value.newBuilder().setMessage(ByteString.copyFrom(value)).build();
+ Put putValue = Put.newBuilder().setKey(apiKey).setValue(apiValue).build();
+ PutRequest request = PutRequest.newBuilder()
+ .setSpiralContext(cs)
+ .setPut(putValue)
+ .build();
+ try {
+ // TODO - convert response to ZK response.
+ PutResponse response = _blockingStub.put(request);
+ get(key);
+ } catch (Exception e) {
+ logger.error("put: RPC failed: {}, {}", e.getMessage(), e);
+ throw e;
+ }
+ }
+}