Skip to content

Commit 3b71cfe

Browse files
authored
Make activation response truncation length configurable (#4909)
* Make activation payload truncation length configurable * Update tests * Change configuration to use HOCON
1 parent bfa7702 commit 3b71cfe

8 files changed

Lines changed: 59 additions & 34 deletions

File tree

common/scala/src/main/resources/application.conf

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -351,7 +351,8 @@ whisk {
351351

352352
activation {
353353
payload {
354-
max = 1048576
354+
max = 1 m
355+
truncation = 1 m
355356
}
356357
# Action responses sent through Kafka can contain up to 3018 bytes of metadata
357358
# CompletionMessage

common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/AkkaContainerClient.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ protected class AkkaContainerClient(
7070
port: Int,
7171
timeout: FiniteDuration,
7272
maxResponse: ByteSize,
73+
truncation: ByteSize,
7374
queueSize: Int,
7475
retryInterval: FiniteDuration = 100.milliseconds)(implicit logging: Logging, as: ActorSystem)
7576
extends PoolingRestClient("http", hostname, port, queueSize, timeout = Some(timeout))
@@ -174,12 +175,12 @@ protected class AkkaContainerClient(
174175
tail.runWith(Sink.ignore).map(_ => previouslyCaptured.utf8String)
175176
case (Seq(prefix), tail) =>
176177
val truncatedResponse = previouslyCaptured ++ prefix
177-
if (truncatedResponse.size < maxResponse.toBytes) {
178+
if (truncatedResponse.size < truncation.toBytes) {
178179
truncated(tail, truncatedResponse)
179180
} else {
180181
//ignore the tail (MUST CONSUME ENTIRE ENTITY!)
181-
//captured string MAY be larger than the max response, so take only maxResponse bytes to get the exact length
182-
tail.runWith(Sink.ignore).map(_ => truncatedResponse.take(maxResponse.toBytes.toInt).utf8String)
182+
//captured string MAY be larger than the truncation size, so take only truncation bytes to get the exact length
183+
tail.runWith(Sink.ignore).map(_ => truncatedResponse.take(truncation.toBytes.toInt).utf8String)
183184
}
184185
}
185186
}
@@ -193,7 +194,7 @@ object AkkaContainerClient {
193194
as: ActorSystem,
194195
ec: ExecutionContext,
195196
tid: TransactionId): (Int, Option[JsObject]) = {
196-
val connection = new AkkaContainerClient(host, port, timeout, 1.MB, 1)
197+
val connection = new AkkaContainerClient(host, port, timeout, 1.MB, 1.MB, 1)
197198
val response = executeRequest(connection, endPoint, content)
198199
val result = Await.result(response, timeout + 10.seconds) //additional timeout to complete futures
199200
connection.close()
@@ -206,7 +207,7 @@ object AkkaContainerClient {
206207
tid: TransactionId,
207208
as: ActorSystem,
208209
ec: ExecutionContext): Seq[(Int, Option[JsObject])] = {
209-
val connection = new AkkaContainerClient(host, port, timeout, 1.MB, 1)
210+
val connection = new AkkaContainerClient(host, port, timeout, 1.MB, 1.MB, 1)
210211
val futureResults = contents.map { executeRequest(connection, endPoint, _) }
211212
val results = Await.result(Future.sequence(futureResults), timeout + 10.seconds) //additional timeout to complete futures
212213
connection.close()

common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/ApacheBlockingContainerClient.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ protected[containerpool] case class RetryableConnectionError(t: Throwable) exten
6767
protected class ApacheBlockingContainerClient(hostname: String,
6868
timeout: FiniteDuration,
6969
maxResponse: ByteSize,
70+
truncation: ByteSize,
7071
maxConcurrent: Int = 1)(implicit logging: Logging, ec: ExecutionContext)
7172
extends ContainerClient {
7273

@@ -129,7 +130,7 @@ protected class ApacheBlockingContainerClient(hostname: String,
129130
Right(ContainerResponse(statusCode, str, None))
130131
} else {
131132
// only consume a bounded number of bytes according to the system limits
132-
val str = new String(IOUtils.toByteArray(entity.getContent, maxResponseBytes), StandardCharsets.UTF_8)
133+
val str = new String(IOUtils.toByteArray(entity.getContent, truncationBytes), StandardCharsets.UTF_8)
133134
EntityUtils.consumeQuietly(entity) // consume the rest of the stream to free the connection
134135
Right(ContainerResponse(statusCode, str, Some(contentLength.B, maxResponse)))
135136
}
@@ -180,6 +181,7 @@ protected class ApacheBlockingContainerClient(hostname: String,
180181
}
181182

182183
private val maxResponseBytes = maxResponse.toBytes
184+
private val truncationBytes = truncation.toBytes
183185

184186
private val baseUri = new URIBuilder()
185187
.setScheme("http")
@@ -227,7 +229,7 @@ object ApacheBlockingContainerClient {
227229
tid: TransactionId,
228230
ec: ExecutionContext): (Int, Option[JsObject]) = {
229231
val timeout = 90.seconds
230-
val connection = new ApacheBlockingContainerClient(s"$host:$port", timeout, 1.MB)
232+
val connection = new ApacheBlockingContainerClient(s"$host:$port", timeout, 1.MB, 1.MB)
231233
val response = executeRequest(connection, endPoint, content)
232234
val result = Await.result(response, timeout)
233235
connection.close()
@@ -239,7 +241,7 @@ object ApacheBlockingContainerClient {
239241
implicit logging: Logging,
240242
tid: TransactionId,
241243
ec: ExecutionContext): Seq[(Int, Option[JsObject])] = {
242-
val connection = new ApacheBlockingContainerClient(s"$host:$port", 90.seconds, 1.MB, contents.size)
244+
val connection = new ApacheBlockingContainerClient(s"$host:$port", 90.seconds, 1.MB, 1.MB, contents.size)
243245
val futureResults = contents.map { content =>
244246
executeRequest(connection, endPoint, content)
245247
}

common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/Container.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,12 +220,19 @@ trait Container {
220220
}
221221
private def openConnections(timeout: FiniteDuration, maxConcurrent: Int) = {
222222
if (Container.config.akkaClient) {
223-
new AkkaContainerClient(addr.host, addr.port, timeout, ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT, 1024)
223+
new AkkaContainerClient(
224+
addr.host,
225+
addr.port,
226+
timeout,
227+
ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT,
228+
ActivationEntityLimit.MAX_ACTIVATION_ENTITY_TRUNCATION_LIMIT,
229+
1024)
224230
} else {
225231
new ApacheBlockingContainerClient(
226232
s"${addr.host}:${addr.port}",
227233
timeout,
228234
ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT,
235+
ActivationEntityLimit.MAX_ACTIVATION_ENTITY_TRUNCATION_LIMIT,
229236
maxConcurrent)
230237
}
231238
}

common/scala/src/main/scala/org/apache/openwhisk/core/entity/ActivationEntityLimit.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ package org.apache.openwhisk.core.entity
2020
import pureconfig._
2121
import pureconfig.generic.auto._
2222
import org.apache.openwhisk.core.ConfigKeys
23-
import org.apache.openwhisk.core.entity.size.SizeLong
23+
import org.apache.openwhisk.core.entity.size._
2424

25-
case class ActivationEntityPayload(max: ByteSize)
25+
case class ActivationEntityPayload(max: ByteSize, truncation: ByteSize)
2626
case class ActivationEntityLimitConf(serdesOverhead: ByteSize, payload: ActivationEntityPayload)
2727

2828
/**
@@ -31,9 +31,9 @@ case class ActivationEntityLimitConf(serdesOverhead: ByteSize, payload: Activati
3131
* parameters for triggers.
3232
*/
3333
protected[core] object ActivationEntityLimit {
34-
private implicit val pureconfigLongReader: ConfigReader[ByteSize] = ConfigReader[Long].map(_.bytes)
3534
private val config = loadConfigOrThrow[ActivationEntityLimitConf](ConfigKeys.activation)
3635

3736
protected[core] val MAX_ACTIVATION_ENTITY_LIMIT: ByteSize = config.payload.max
37+
protected[core] val MAX_ACTIVATION_ENTITY_TRUNCATION_LIMIT: ByteSize = config.payload.truncation
3838
protected[core] val MAX_ACTIVATION_LIMIT: ByteSize = config.payload.max + config.serdesOverhead
3939
}

core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainer.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,12 +220,19 @@ class DockerContainer(protected val id: ContainerId,
220220
val started = Instant.now()
221221
val http = httpConnection.getOrElse {
222222
val conn = if (Container.config.akkaClient) {
223-
new AkkaContainerClient(addr.host, addr.port, timeout, ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT, 1024)
223+
new AkkaContainerClient(
224+
addr.host,
225+
addr.port,
226+
timeout,
227+
ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT,
228+
ActivationEntityLimit.MAX_ACTIVATION_ENTITY_TRUNCATION_LIMIT,
229+
1024)
224230
} else {
225231
new ApacheBlockingContainerClient(
226232
s"${addr.host}:${addr.port}",
227233
timeout,
228234
ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT,
235+
ActivationEntityLimit.MAX_ACTIVATION_ENTITY_TRUNCATION_LIMIT,
229236
maxConcurrent)
230237
}
231238
httpConnection = Some(conn)

tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/AkkaContainerClientTests.scala

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ class AkkaContainerClientTests
109109

110110
it should "not wait longer than set timeout" in {
111111
val timeout = 5.seconds
112-
val connection = new AkkaContainerClient(httpHost.getHostName, httpHost.getPort, timeout, 1.B, 100)
112+
val connection = new AkkaContainerClient(httpHost.getHostName, httpHost.getPort, timeout, 1.B, 1.B, 100)
113113
testHang = timeout * 2
114114
val start = Instant.now()
115115
val result = Await.result(connection.post("/init", JsObject.empty, retry = true), 10.seconds)
@@ -123,15 +123,15 @@ class AkkaContainerClientTests
123123

124124
it should "handle empty entity response" in {
125125
val timeout = 5.seconds
126-
val connection = new AkkaContainerClient(httpHost.getHostName, httpHost.getPort, timeout, 1.B, 100)
126+
val connection = new AkkaContainerClient(httpHost.getHostName, httpHost.getPort, timeout, 1.B, 1.B, 100)
127127
testStatusCode = 204
128128
val result = Await.result(connection.post("/init", JsObject.empty, retry = true), 10.seconds)
129129
result shouldBe Left(NoResponseReceived())
130130
}
131131

132132
it should "retry till timeout on StreamTcpException" in {
133133
val timeout = 5.seconds
134-
val connection = new AkkaContainerClient("0.0.0.0", 12345, timeout, 1.B, 100)
134+
val connection = new AkkaContainerClient("0.0.0.0", 12345, timeout, 1.B, 1.B, 100)
135135
val start = Instant.now()
136136
val result = Await.result(connection.post("/init", JsObject.empty, retry = true), 10.seconds)
137137
val end = Instant.now()
@@ -146,7 +146,7 @@ class AkkaContainerClientTests
146146

147147
it should "throw ContainerHealthError on HttpHostConnectException if reschedule==true" in {
148148
val timeout = 5.seconds
149-
val connection = new AkkaContainerClient("0.0.0.0", 12345, timeout, 1.B, 100)
149+
val connection = new AkkaContainerClient("0.0.0.0", 12345, timeout, 1.B, 1.B, 100)
150150
assertThrows[ContainerHealthError] {
151151
Await.result(connection.post("/run", JsObject.empty, retry = false, reschedule = true), 10.seconds)
152152
}
@@ -156,7 +156,7 @@ class AkkaContainerClientTests
156156
val timeout = 5.seconds
157157
val retryInterval = 500.milliseconds
158158
val connection =
159-
new AkkaContainerClient(httpHost.getHostName, httpHost.getPort, timeout, 1.B, 100, retryInterval)
159+
new AkkaContainerClient(httpHost.getHostName, httpHost.getPort, timeout, 1.B, 1.B, 100, retryInterval)
160160
val start = Instant.now()
161161
testConnectionFailCount = 5
162162
testResponse = ""
@@ -173,7 +173,7 @@ class AkkaContainerClientTests
173173

174174
it should "not truncate responses within limit" in {
175175
val timeout = 1.minute.toMillis
176-
val connection = new AkkaContainerClient(httpHost.getHostName, httpHost.getPort, timeout.millis, 50.B, 100)
176+
val connection = new AkkaContainerClient(httpHost.getHostName, httpHost.getPort, timeout.millis, 50.B, 50.B, 100)
177177
Seq(true, false).foreach { success =>
178178
Seq(null, "", "abc", """{"a":"B"}""", """["a", "b"]""").foreach { r =>
179179
testStatusCode = if (success) 200 else 500
@@ -188,15 +188,17 @@ class AkkaContainerClientTests
188188

189189
it should "truncate responses that exceed limit" in {
190190
val timeout = 1.minute.toMillis
191-
val limit = 1.B
192-
val connection = new AkkaContainerClient(httpHost.getHostName, httpHost.getPort, timeout.millis, limit, 100)
191+
val limit = 2.B
192+
val truncationLimit = 1.B
193+
val connection =
194+
new AkkaContainerClient(httpHost.getHostName, httpHost.getPort, timeout.millis, limit, truncationLimit, 100)
193195
Seq(true, false).foreach { success =>
194196
Seq("abc", """{"a":"B"}""", """["a", "b"]""").foreach { r =>
195197
testStatusCode = if (success) 200 else 500
196198
testResponse = r
197199
val result = Await.result(connection.post("/init", JsObject.empty, retry = true), 10.seconds)
198200
result shouldBe Right {
199-
ContainerResponse(okStatus = success, r.take(limit.toBytes.toInt), Some((r.length.B, limit)))
201+
ContainerResponse(okStatus = success, r.take(truncationLimit.toBytes.toInt), Some((r.length.B, limit)))
200202
}
201203
}
202204
}
@@ -207,15 +209,20 @@ class AkkaContainerClientTests
207209
//use a limit large enough to not fit into a single ByteString as response entity is parsed into multiple ByteStrings
208210
//seems like this varies, but often is ~64k or ~128k
209211
val limit = 300.KB
210-
val connection = new AkkaContainerClient(httpHost.getHostName, httpHost.getPort, timeout.millis, limit, 100)
212+
val truncationLimit = 299.B
213+
val connection =
214+
new AkkaContainerClient(httpHost.getHostName, httpHost.getPort, timeout.millis, limit, truncationLimit, 100)
211215
Seq(true, false).foreach { success =>
212216
// Generate a response that's 1MB
213217
val response = "0" * 1024 * 1024
214218
testStatusCode = if (success) 200 else 500
215219
testResponse = response
216220
val result = Await.result(connection.post("/init", JsObject.empty, retry = true), 10.seconds)
217221
result shouldBe Right {
218-
ContainerResponse(okStatus = success, response.take(limit.toBytes.toInt), Some((response.length.B, limit)))
222+
ContainerResponse(
223+
okStatus = success,
224+
response.take(truncationLimit.toBytes.toInt),
225+
Some((response.length.B, limit)))
219226
}
220227

221228
}

tests/src/test/scala/org/apache/openwhisk/core/containerpool/docker/test/ApacheBlockingContainerClientTests.scala

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ class ApacheBlockingContainerClientTests
100100

101101
it should "not wait longer than set timeout" in {
102102
val timeout = 5.seconds
103-
val connection = new ApacheBlockingContainerClient(hostWithPort, timeout, 1.B)
103+
val connection = new ApacheBlockingContainerClient(hostWithPort, timeout, 1.B, 1.B)
104104
testHang = timeout * 2
105105
val start = Instant.now()
106106
val result = Await.result(connection.post("/init", JsObject.empty, retry = true), 10.seconds)
@@ -114,7 +114,7 @@ class ApacheBlockingContainerClientTests
114114

115115
it should "handle empty entity response" in {
116116
val timeout = 5.seconds
117-
val connection = new ApacheBlockingContainerClient(hostWithPort, timeout, 1.B)
117+
val connection = new ApacheBlockingContainerClient(hostWithPort, timeout, 1.B, 1.B)
118118
testStatusCode = 204
119119
val result = Await.result(connection.post("/init", JsObject.empty, retry = true), 10.seconds)
120120
result shouldBe Left(NoResponseReceived())
@@ -123,7 +123,7 @@ class ApacheBlockingContainerClientTests
123123
it should "retry till timeout on HttpHostConnectException" in {
124124
val timeout = 5.seconds
125125
val badHostAndPort = "0.0.0.0:12345"
126-
val connection = new ApacheBlockingContainerClient(badHostAndPort, timeout, 1.B)
126+
val connection = new ApacheBlockingContainerClient(badHostAndPort, timeout, 1.B, 1.B)
127127
testStatusCode = 204
128128
val start = Instant.now()
129129
val result = Await.result(connection.post("/init", JsObject.empty, retry = true), 10.seconds)
@@ -142,15 +142,15 @@ class ApacheBlockingContainerClientTests
142142
it should "throw ContainerHealthError on HttpHostConnectException if reschedule==true" in {
143143
val timeout = 5.seconds
144144
val badHostAndPort = "0.0.0.0:12345"
145-
val connection = new ApacheBlockingContainerClient(badHostAndPort, timeout, 1.B)
145+
val connection = new ApacheBlockingContainerClient(badHostAndPort, timeout, 1.B, 1.B)
146146
assertThrows[ContainerHealthError] {
147147
Await.result(connection.post("/run", JsObject.empty, retry = false, reschedule = true), 10.seconds)
148148
}
149149
}
150150

151151
it should "not truncate responses within limit" in {
152152
val timeout = 1.minute.toMillis
153-
val connection = new ApacheBlockingContainerClient(hostWithPort, timeout.millis, 50.B)
153+
val connection = new ApacheBlockingContainerClient(hostWithPort, timeout.millis, 50.B, 50.B)
154154
Seq(true, false).foreach { success =>
155155
Seq(null, "", "abc", """{"a":"B"}""", """["a", "b"]""").foreach { r =>
156156
testStatusCode = if (success) 200 else 500
@@ -165,16 +165,16 @@ class ApacheBlockingContainerClientTests
165165

166166
it should "truncate responses that exceed limit" in {
167167
val timeout = 1.minute.toMillis
168-
val limit = 1.B
169-
val excess = limit + 1.B
170-
val connection = new ApacheBlockingContainerClient(hostWithPort, timeout.millis, limit)
168+
val limit = 2.B
169+
val truncationLimit = 1.B
170+
val connection = new ApacheBlockingContainerClient(hostWithPort, timeout.millis, limit, truncationLimit)
171171
Seq(true, false).foreach { success =>
172172
Seq("abc", """{"a":"B"}""", """["a", "b"]""").foreach { r =>
173173
testStatusCode = if (success) 200 else 500
174174
testResponse = r
175175
val result = Await.result(connection.post("/init", JsObject.empty, retry = true), 10.seconds)
176176
result shouldBe Right {
177-
ContainerResponse(okStatus = success, r.take(limit.toBytes.toInt), Some((r.length.B, limit)))
177+
ContainerResponse(okStatus = success, r.take(truncationLimit.toBytes.toInt), Some((r.length.B, limit)))
178178
}
179179
}
180180
}

0 commit comments

Comments
 (0)