From 277fde2fb489b6ad8f0ce18efb3d0005222f66ed Mon Sep 17 00:00:00 2001 From: Joshua Hartman Date: Wed, 17 Nov 2010 10:40:27 -0800 Subject: [PATCH 1/7] Using a scheduled thread pool executor in norbert to remove old requests. Fixes a memory leak for stale requests and when the request has no callback. --- .../network/netty/ClientChannelHandler.scala | 20 +++++++++---------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/network/src/main/scala/com/linkedin/norbert/network/netty/ClientChannelHandler.scala b/network/src/main/scala/com/linkedin/norbert/network/netty/ClientChannelHandler.scala index 27e628e0..85e64eaf 100644 --- a/network/src/main/scala/com/linkedin/norbert/network/netty/ClientChannelHandler.scala +++ b/network/src/main/scala/com/linkedin/norbert/network/netty/ClientChannelHandler.scala @@ -20,34 +20,32 @@ package netty import java.util.UUID import org.jboss.netty.channel._ import com.google.protobuf.InvalidProtocolBufferException -import java.util.concurrent.{TimeUnit, ConcurrentHashMap} import common.MessageRegistry import protos.NorbertProtos import logging.Logging import jmx.JMX.MBean import jmx.JMX +import java.util.concurrent.{ScheduledThreadPoolExecutor, TimeUnit, ConcurrentHashMap} @ChannelPipelineCoverage("all") class ClientChannelHandler(serviceName: String, messageRegistry: MessageRegistry, staleRequestTimeoutMins: Int, staleRequestCleanupFrequencyMins: Int) extends SimpleChannelHandler with Logging { private val requestMap = new ConcurrentHashMap[UUID, Request] - private val cleanupThread = new Thread("stale-request-cleanup-thread") { + val cleanupTask = new Runnable() { val staleRequestTimeoutMillis = TimeUnit.MILLISECONDS.convert(staleRequestTimeoutMins, TimeUnit.MINUTES) override def run = { - while (true) { - TimeUnit.MINUTES.sleep(staleRequestCleanupFrequencyMins) - - import collection.JavaConversions._ - requestMap.keySet.foreach { uuid => - val request = requestMap.get(uuid) - if ((System.currentTimeMillis - request.timestamp) > staleRequestTimeoutMillis) requestMap.remove(uuid) - } + import collection.JavaConversions._ + requestMap.keySet.foreach { uuid => + val request = requestMap.get(uuid) + if ((System.currentTimeMillis - request.timestamp) > staleRequestTimeoutMillis) requestMap.remove(uuid) } } } - cleanupThread.setDaemon(true) + + val cleanupExecutor = new ScheduledThreadPoolExecutor(1) + cleanupExecutor.scheduleAtFixedRate(cleanupTask, staleRequestCleanupFrequencyMins, staleRequestCleanupFrequencyMins, TimeUnit.MINUTES) private val statsActor = new NetworkStatisticsActor(100) statsActor.start From f9fa3268ca670be71d00959591c2839413c2d94c Mon Sep 17 00:00:00 2001 From: Joshua Hartman Date: Wed, 17 Nov 2010 11:16:38 -0800 Subject: [PATCH 2/7] Logging the expired entries and wrapping the scheduled thread in a try-catch --- .../network/netty/ClientChannelHandler.scala | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/network/src/main/scala/com/linkedin/norbert/network/netty/ClientChannelHandler.scala b/network/src/main/scala/com/linkedin/norbert/network/netty/ClientChannelHandler.scala index 85e64eaf..1957be3d 100644 --- a/network/src/main/scala/com/linkedin/norbert/network/netty/ClientChannelHandler.scala +++ b/network/src/main/scala/com/linkedin/norbert/network/netty/ClientChannelHandler.scala @@ -36,10 +36,21 @@ class ClientChannelHandler(serviceName: String, messageRegistry: MessageRegistry val staleRequestTimeoutMillis = TimeUnit.MILLISECONDS.convert(staleRequestTimeoutMins, TimeUnit.MINUTES) override def run = { - import collection.JavaConversions._ - requestMap.keySet.foreach { uuid => - val request = requestMap.get(uuid) - if ((System.currentTimeMillis - request.timestamp) > staleRequestTimeoutMillis) requestMap.remove(uuid) + try { + import collection.JavaConversions._ + var expiredEntryCount = 0 + + requestMap.keySet.foreach { uuid => + val request = requestMap.get(uuid) + if ((System.currentTimeMillis - request.timestamp) > staleRequestTimeoutMillis) { + requestMap.remove(uuid) + expiredEntryCount += 1 + } + } + + log.info("Expired %d stale entries from the request map".format(expiredEntryCount)) + } catch { + case e: Exception => log.error("Exception caught in cleanup task, ignoring " + e) } } } From fc61468283037b325b7a2c52e9c496afe78b996d Mon Sep 17 00:00:00 2001 From: Joshua Hartman Date: Mon, 6 Dec 2010 12:51:58 -0800 Subject: [PATCH 3/7] Using the latest version of specs jar --- project/build/NorbertProject.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/build/NorbertProject.scala b/project/build/NorbertProject.scala index 341f1064..5cf0e2cd 100644 --- a/project/build/NorbertProject.scala +++ b/project/build/NorbertProject.scala @@ -14,7 +14,7 @@ class NorbertProject(info: ProjectInfo) extends ParentProject(info) with IdeaPro val protobuf = "com.google.protobuf" % "protobuf-java" % "2.3.0" val log4j = "log4j" % "log4j" % "1.2.14" - val specs = "org.scala-tools.testing" %% "specs" % "1.6.5-SNAPSHOT" % "test" + val specs = "org.scala-tools.testing" %% "specs" % "1.6.5" % "test" val mockito = "org.mockito" % "mockito-all" % "1.8.4" % "test" val cglib = "cglib" % "cglib" % "2.1_3" % "test" val objenesis = "org.objenesis" % "objenesis" % "1.0" % "test" From 8bcacfb638dfb1c67f39e94174e77d55d6e704e9 Mon Sep 17 00:00:00 2001 From: Joshua Hartman Date: Mon, 6 Dec 2010 12:52:28 -0800 Subject: [PATCH 4/7] Fixing memory leak: We should shutdown the message executor and reclaim the threadpool and jmx objects when the network server shuts down --- .../com/linkedin/norbert/network/netty/NettyNetworkServer.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/network/src/main/scala/com/linkedin/norbert/network/netty/NettyNetworkServer.scala b/network/src/main/scala/com/linkedin/norbert/network/netty/NettyNetworkServer.scala index c336d2de..d50c9875 100644 --- a/network/src/main/scala/com/linkedin/norbert/network/netty/NettyNetworkServer.scala +++ b/network/src/main/scala/com/linkedin/norbert/network/netty/NettyNetworkServer.scala @@ -90,6 +90,7 @@ class NettyNetworkServer(serverConfig: NetworkServerConfig) extends NetworkServe override def shutdown = { if (serverConfig.clusterClient == null) clusterClient.shutdown else super.shutdown + messageExecutor.shutdown requestContextEncoder.shutdown } } From 28568c3e4935c797f1210789a3225da3a44172d1 Mon Sep 17 00:00:00 2001 From: Joshua Hartman Date: Mon, 6 Dec 2010 16:09:56 -0800 Subject: [PATCH 5/7] Bumping norbert version to 0.5.1 --- project/build.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/build.properties b/project/build.properties index 37bb6376..c5e7b8d8 100644 --- a/project/build.properties +++ b/project/build.properties @@ -3,6 +3,6 @@ project.organization=com.linkedin.norbert project.name=norbert sbt.version=0.7.3 -project.version=0.5-SNAPSHOT +project.version=0.5.1 build.scala.versions=2.8.0 project.initialize=false From 03a22577ffbadbefda2c0d66f10f1f8bcd43373b Mon Sep 17 00:00:00 2001 From: Joshua Hartman Date: Sun, 12 Dec 2010 09:15:30 -0800 Subject: [PATCH 6/7] Using the latest version of netty --- project/build/NorbertProject.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/project/build/NorbertProject.scala b/project/build/NorbertProject.scala index 5cf0e2cd..f5fb4005 100644 --- a/project/build/NorbertProject.scala +++ b/project/build/NorbertProject.scala @@ -1,7 +1,7 @@ import sbt._ class NorbertProject(info: ProjectInfo) extends ParentProject(info) with IdeaProject { - override def repositories = Set(ScalaToolsSnapshots, "JBoss Maven 2 Repository" at "http://repository.jboss.com/maven2") + override def repositories = Set(ScalaToolsSnapshots, "JBoss Maven 2 Repository" at "http://repository.jboss.org/nexus/content/groups/public/") lazy val cluster = project("cluster", "Norbert Cluster", new ClusterProject(_)) lazy val network = project("network", "Norbert Network", new NetworkProject(_), cluster) @@ -21,7 +21,7 @@ class NorbertProject(info: ProjectInfo) extends ParentProject(info) with IdeaPro } class NetworkProject(info: ProjectInfo) extends DefaultProject(info) with IdeaProject { - val netty = "org.jboss.netty" % "netty" % "3.1.5.GA" + val netty = "org.jboss.netty" % "netty" % "3.2.3.Final" val slf4j = "org.slf4j" % "slf4j-api" % "1.5.6" val slf4jLog4j = "org.slf4j" % "slf4j-log4j12" % "1.5.6" } From 7a1686a90521a0f7b31bf7b5b524bc7eb1326659 Mon Sep 17 00:00:00 2001 From: Joshua Hartman Date: Sun, 12 Dec 2010 09:19:39 -0800 Subject: [PATCH 7/7] The Javacompat NettyNetworkServer can now take a cluster client (matches the behavior for the scala api). --- .../norbert/javacompat/network/NettyNetworkServer.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/java-network/src/main/scala/com/linkedin/norbert/javacompat/network/NettyNetworkServer.scala b/java-network/src/main/scala/com/linkedin/norbert/javacompat/network/NettyNetworkServer.scala index 55655214..a83f8974 100644 --- a/java-network/src/main/scala/com/linkedin/norbert/javacompat/network/NettyNetworkServer.scala +++ b/java-network/src/main/scala/com/linkedin/norbert/javacompat/network/NettyNetworkServer.scala @@ -18,12 +18,15 @@ package network import com.google.protobuf.Message import cluster.BaseClusterClient +import com.linkedin.norbert.cluster.ClusterClient class NettyNetworkServer(config: NetworkServerConfig) extends NetworkServer { val c = new com.linkedin.norbert.network.netty.NetworkServerConfig - if (config.getClusterClient != null) c.clusterClient = config.getClusterClient.asInstanceOf[BaseClusterClient].underlying - c.serviceName = config.getServiceName - c.zooKeeperConnectString = config.getZooKeeperConnectString + + c.clusterClient = if (config.clusterClient != null) + config.getClusterClient.asInstanceOf[BaseClusterClient].underlying + else new ClusterClient(config.serviceName, config.zooKeeperConnectString, config.zooKeeperSessionTimeoutMillis) + c.zooKeeperSessionTimeoutMillis = config.getZooKeeperSessionTimeoutMillis c.requestThreadCorePoolSize = config.getRequestThreadCorePoolSize c.requestThreadMaxPoolSize = config.getRequestThreadMaxPoolSize