Skip to content

Commit a44db4b

Browse files
authored
Save waitTime value for sequence and conductor action (allow parent/child transaction ids) (#4819)
* Allow parent/child transaction ids * Create child transaction for sequence action * Fix test case * Fix test case of SchedulerTests * Refactor transactionId * Check waitTime is defined in test case * Update annotations.md * Use StringBuilder to generate tid * Remove unsed import * Avoid `.get` and use pattern matching
1 parent 3b71cfe commit a44db4b

11 files changed

Lines changed: 242 additions & 62 deletions

File tree

common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,10 @@ class AkkaLogging(loggingAdapter: LoggingAdapter) extends Logging {
100100
}
101101
}
102102

103-
protected def format(id: TransactionId, name: String, logmsg: String) = s"[$id] [$name] $logmsg"
103+
protected def format(id: TransactionId, name: String, logmsg: String) = {
104+
val currentId = if (id.hasParent) id else ""
105+
s"[${id.root}] [$currentId] [$name] $logmsg"
106+
}
104107
}
105108

106109
/**
@@ -124,8 +127,9 @@ class PrintStreamLogging(outputStream: PrintStream = Console.out) extends Loggin
124127
case msg if msg.nonEmpty =>
125128
msg.split('\n').map(_.trim).mkString(" ")
126129
}
130+
val currentId = if (id.hasParent) id else ""
127131

128-
val parts = Seq(s"[$time]", s"[$level]", s"[$id]") ++ Seq(s"[$name]") ++ logMessage
132+
val parts = Seq(s"[$time]", s"[$level]", s"[${id.root}]", s"[$currentId]") ++ Seq(s"[$name]") ++ logMessage
129133
outputStream.println(parts.mkString(" "))
130134
}
131135
}

common/scala/src/main/scala/org/apache/openwhisk/common/TransactionId.scala

Lines changed: 54 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,9 @@ import scala.util.Try
3636
* metadata is stored indirectly in the referenced meta object.
3737
*/
3838
case class TransactionId private (meta: TransactionMetadata) extends AnyVal {
39+
def root = findRoot(meta)
3940
def id = meta.id
40-
override def toString = s"#tid_${meta.id}"
41+
override def toString = meta.toString
4142

4243
def toHeader = RawHeader(TransactionId.generatorConfig.header, meta.id)
4344

@@ -172,13 +173,24 @@ case class TransactionId private (meta: TransactionMetadata) extends AnyVal {
172173
def deltaToMarker(startMarker: StartMarker, endTime: Instant = Instant.now(Clock.systemUTC)) =
173174
Duration.between(startMarker.start, endTime).toMillis
174175

176+
def hasParent = meta.parent.isDefined
177+
175178
/**
176179
* Formats log message to include marker.
177180
*
178181
* @param message: The log message without the marker
179182
* @param marker: The marker to add to the message
180183
*/
181184
private def createMessageWithMarker(message: String, marker: LogMarker): String = s"$message $marker"
185+
186+
/**
187+
* Find root transaction metadata
188+
*/
189+
private def findRoot(meta: TransactionMetadata): TransactionMetadata =
190+
meta.parent match {
191+
case Some(parent) => findRoot(parent)
192+
case _ => meta
193+
}
182194
}
183195

184196
/**
@@ -197,7 +209,12 @@ case class StartMarker(start: Instant, startMarker: LogMarkerToken)
197209
* @param start the timestamp when the request processing commenced
198210
* @param extraLogging enables logging, if set to true
199211
*/
200-
protected case class TransactionMetadata(id: String, start: Instant, extraLogging: Boolean = false)
212+
protected case class TransactionMetadata(id: String,
213+
start: Instant,
214+
extraLogging: Boolean = false,
215+
parent: Option[TransactionMetadata] = None) {
216+
override def toString = s"#tid_$id"
217+
}
201218

202219
case class MetricConfig(prometheusEnabled: Boolean,
203220
kamonEnabled: Boolean,
@@ -227,28 +244,54 @@ object TransactionId {
227244
val dbBatcher = TransactionId(systemPrefix + "dbBatcher") // Database batcher
228245
val actionHealthPing = TransactionId(systemPrefix + "actionHealth")
229246

247+
private val dict = ('A' to 'Z') ++ ('a' to 'z') ++ ('0' to '9')
248+
230249
def apply(tid: String, extraLogging: Boolean = false): TransactionId = {
231250
val now = Instant.now(Clock.systemUTC()).inMills
232251
TransactionId(TransactionMetadata(tid, now, extraLogging))
233252
}
234253

254+
def childOf(parentTid: TransactionId): TransactionId = {
255+
val now = Instant.now(Clock.systemUTC()).inMills
256+
val tid = generateTid()
257+
TransactionId(TransactionMetadata(tid, now, parentTid.meta.extraLogging, Some(parentTid.meta)))
258+
}
259+
260+
def generateTid(): String = {
261+
val sb = new StringBuilder
262+
for (_ <- 1 to 32) sb.append(dict(util.Random.nextInt(dict.length)))
263+
sb.toString
264+
}
265+
235266
implicit val serdes = new RootJsonFormat[TransactionId] {
236-
def write(t: TransactionId) = {
237-
if (t.meta.extraLogging)
238-
JsArray(JsString(t.meta.id), JsNumber(t.meta.start.toEpochMilli), JsBoolean(t.meta.extraLogging))
239-
else
240-
JsArray(JsString(t.meta.id), JsNumber(t.meta.start.toEpochMilli))
267+
268+
private def writeMetadata(meta: TransactionMetadata): JsArray = {
269+
val base = Vector(JsString(meta.id), JsNumber(meta.start.toEpochMilli))
270+
val extraLogging = if (meta.extraLogging) Vector(JsBoolean(meta.extraLogging)) else Vector.empty
271+
val parent = meta.parent match {
272+
case Some(p) => Vector(writeMetadata(p))
273+
case _ => Vector.empty
274+
}
275+
JsArray(base ++ extraLogging ++ parent)
241276
}
242277

243-
def read(value: JsValue) =
278+
private def readMetadata(value: JsValue): Option[TransactionMetadata] = {
244279
Try {
245280
value match {
246281
case JsArray(Vector(JsString(id), JsNumber(start))) =>
247-
TransactionId(TransactionMetadata(id, Instant.ofEpochMilli(start.longValue), false))
282+
Some(TransactionMetadata(id, Instant.ofEpochMilli(start.longValue), false))
248283
case JsArray(Vector(JsString(id), JsNumber(start), JsBoolean(extraLogging))) =>
249-
TransactionId(TransactionMetadata(id, Instant.ofEpochMilli(start.longValue), extraLogging))
284+
Some(TransactionMetadata(id, Instant.ofEpochMilli(start.longValue), extraLogging))
285+
case JsArray(Vector(JsString(id), JsNumber(start), JsBoolean(extraLogging), parent)) =>
286+
Some(TransactionMetadata(id, Instant.ofEpochMilli(start.longValue), extraLogging, readMetadata(parent)))
287+
case JsArray(Vector(JsString(id), JsNumber(start), parent)) =>
288+
Some(TransactionMetadata(id, Instant.ofEpochMilli(start.longValue), false, readMetadata(parent)))
250289
}
251-
} getOrElse unknown
290+
} getOrElse Option.empty
291+
}
292+
293+
def write(t: TransactionId): JsArray = writeMetadata(t.meta)
294+
def read(value: JsValue): TransactionId = readMetadata(value).map(meta => TransactionId(meta)).getOrElse(unknown)
252295
}
253296
}
254297

common/scala/src/main/scala/org/apache/openwhisk/http/BasicHttpService.scala

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
package org.apache.openwhisk.http
1919

20-
import java.util.concurrent.ThreadLocalRandom
21-
2220
import akka.actor.ActorSystem
2321
import akka.event.Logging
2422
import akka.http.scaladsl.{Http, HttpConnectionContext}
@@ -94,10 +92,6 @@ trait BasicHttpService extends Directives {
9492
}
9593
}
9694

97-
// Scala random should be enough here, as the generation for the tid is only a fallback. In addition the tid only has
98-
// to be unique within a few minutes.
99-
private val dict = ('A' to 'Z') ++ ('a' to 'z') ++ ('0' to '9')
100-
10195
/** Assigns transaction id to every request. */
10296
protected def assignId = HeaderDirectives.optionalHeaderValueByName(OW_EXTRA_LOGGING_HEADER) flatMap { headerValue =>
10397
val extraLogging = headerValue match {
@@ -115,8 +109,7 @@ trait BasicHttpService extends Directives {
115109
.filterNot(_.startsWith(TransactionId.systemPrefix))
116110
.getOrElse {
117111
// As this is only a fallback, because the tid should be generated by nginx, this shouldn't be used.
118-
// Therefore we didn't take a deep look into performance here.
119-
(0 until 32).map(_ => dict(ThreadLocalRandom.current().nextInt(dict.size))).mkString("")
112+
TransactionId.generateTid()
120113
}
121114

122115
TransactionId(tid, extraLogging)

core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/PrimitiveActions.scala

Lines changed: 36 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ import java.time.{Clock, Instant}
2121

2222
import akka.actor.ActorSystem
2323
import akka.event.Logging.InfoLevel
24+
import spray.json.DefaultJsonProtocol._
2425
import spray.json._
25-
2626
import org.apache.openwhisk.common.tracing.WhiskTracerProvider
2727
import org.apache.openwhisk.common.{Logging, LoggingMarkers, TransactionId, UserEvents}
2828
import org.apache.openwhisk.core.connector.{ActivationMessage, EventMessage, MessagingProvider}
@@ -37,13 +37,13 @@ import org.apache.openwhisk.http.Messages._
3737
import org.apache.openwhisk.spi.SpiLoader
3838
import org.apache.openwhisk.utils.ExecutionContextFactory.FutureExtensions
3939
import org.apache.openwhisk.core.ConfigKeys
40+
import org.apache.openwhisk.core.containerpool.Interval
4041

4142
import scala.collection.mutable.Buffer
4243
import scala.concurrent.duration._
4344
import scala.concurrent.{ExecutionContext, Future, Promise}
4445
import scala.language.postfixOps
4546
import scala.util.{Failure, Success}
46-
4747
import pureconfig._
4848
import pureconfig.generic.auto._
4949

@@ -290,7 +290,7 @@ protected[actions] trait PrimitiveActions {
290290
logging.info(this, s"invoking composition $action topmost ${cause.isEmpty} activationid '${session.activationId}'")
291291

292292
val response: Future[Either[ActivationId, WhiskActivation]] =
293-
invokeConductor(user, payload, session).map(response =>
293+
invokeConductor(user, payload, session, transid).map(response =>
294294
Right(completeActivation(user, session, response, waitForResponse.isDefined)))
295295

296296
// is caller waiting for the result of the activation?
@@ -315,10 +315,14 @@ protected[actions] trait PrimitiveActions {
315315
* @param user the identity invoking the action
316316
* @param payload the dynamic arguments for the activation
317317
* @param session the session object for this composition
318-
* @param transid a transaction id for logging
318+
* @param parentTid a parent transaction id
319319
*/
320-
private def invokeConductor(user: Identity, payload: Option[JsObject], session: Session)(
321-
implicit transid: TransactionId): Future[ActivationResponse] = {
320+
private def invokeConductor(user: Identity,
321+
payload: Option[JsObject],
322+
session: Session,
323+
parentTid: TransactionId): Future[ActivationResponse] = {
324+
325+
implicit val transid: TransactionId = TransactionId.childOf(parentTid)
322326

323327
if (session.accounting.conductors > 2 * actionSequenceLimit) {
324328
// composition is too long
@@ -365,19 +369,21 @@ protected[actions] trait PrimitiveActions {
365369
case Some(next) =>
366370
FullyQualifiedEntityName.resolveName(next, user.namespace.name) match {
367371
case Some(fqn) if session.accounting.components < actionSequenceLimit =>
368-
tryInvokeNext(user, fqn, params, session)
372+
tryInvokeNext(user, fqn, params, session, transid)
369373

370374
case Some(_) => // composition is too long
371375
invokeConductor(
372376
user,
373377
payload = Some(JsObject(ERROR_FIELD -> JsString(compositionIsTooLong))),
374-
session = session)
378+
session = session,
379+
transid)
375380

376381
case None => // parsing failure
377382
invokeConductor(
378383
user,
379384
payload = Some(JsObject(ERROR_FIELD -> JsString(compositionComponentInvalid(next)))),
380-
session = session)
385+
session = session,
386+
transid)
381387

382388
}
383389
}
@@ -395,8 +401,14 @@ protected[actions] trait PrimitiveActions {
395401
* @param session the session for the current activation
396402
* @return promise for the eventual activation
397403
*/
398-
private def tryInvokeNext(user: Identity, fqn: FullyQualifiedEntityName, params: Option[JsObject], session: Session)(
399-
implicit transid: TransactionId): Future[ActivationResponse] = {
404+
private def tryInvokeNext(user: Identity,
405+
fqn: FullyQualifiedEntityName,
406+
params: Option[JsObject],
407+
session: Session,
408+
parentTid: TransactionId): Future[ActivationResponse] = {
409+
410+
implicit val transid: TransactionId = TransactionId.childOf(parentTid)
411+
400412
val resource = Resource(fqn.path, Collection(Collection.ACTIONS), Some(fqn.name.asString))
401413
entitlementProvider
402414
.check(user, Privilege.ACTIVATE, Set(resource), noThrottle = true)
@@ -415,7 +427,8 @@ protected[actions] trait PrimitiveActions {
415427
invokeConductor(
416428
user,
417429
payload = Some(JsObject(ERROR_FIELD -> JsString(compositionComponentNotFound(fqn.asString)))),
418-
session = session)
430+
session = session,
431+
transid)
419432
}
420433
}
421434
.recoverWith {
@@ -424,7 +437,8 @@ protected[actions] trait PrimitiveActions {
424437
invokeConductor(
425438
user,
426439
payload = Some(JsObject(ERROR_FIELD -> JsString(compositionComponentNotAccessible(fqn.asString)))),
427-
session = session)
440+
session = session,
441+
transid)
428442
}
429443
}
430444

@@ -480,7 +494,7 @@ protected[actions] trait PrimitiveActions {
480494
case Left(response) => // unsuccessful invocation, return error response
481495
Future.successful(response)
482496
case Right(activation) => // reinvoke conductor on component result
483-
invokeConductor(user, payload = Some(activation.resultAsJson), session = session)
497+
invokeConductor(user, payload = Some(activation.resultAsJson), session = session, transid)
484498
}
485499
}
486500

@@ -544,6 +558,13 @@ protected[actions] trait PrimitiveActions {
544558
Parameters(WhiskActivation.causedByAnnotation, JsString(Exec.SEQUENCE))
545559
}
546560

561+
// set waitTime for conductor action
562+
val waitTime = {
563+
Parameters(
564+
WhiskActivation.waitTimeAnnotation,
565+
Interval(transid.meta.start, session.start).duration.toMillis.toJson)
566+
}
567+
547568
// set binding if invoked action is in a package binding
548569
val binding =
549570
session.action.binding.map(f => Parameters(WhiskActivation.bindingAnnotation, JsString(f.asString)))
@@ -567,7 +588,7 @@ protected[actions] trait PrimitiveActions {
567588
Parameters(WhiskActivation.pathAnnotation, JsString(session.action.fullyQualifiedName(false).asString)) ++
568589
Parameters(WhiskActivation.kindAnnotation, JsString(Exec.SEQUENCE)) ++
569590
Parameters(WhiskActivation.conductorAnnotation, JsTrue) ++
570-
causedBy ++ binding ++
591+
causedBy ++ waitTime ++ binding ++
571592
sequenceLimits,
572593
duration = Some(session.duration))
573594

core/controller/src/main/scala/org/apache/openwhisk/core/controller/actions/SequenceActions.scala

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,11 @@ import java.time.{Clock, Instant}
2121
import java.util.concurrent.atomic.AtomicReference
2222

2323
import akka.actor.ActorSystem
24+
import spray.json._
25+
import spray.json.DefaultJsonProtocol._
2426
import org.apache.openwhisk.common.{Logging, TransactionId, UserEvents}
2527
import org.apache.openwhisk.core.connector.{EventMessage, MessagingProvider}
28+
import org.apache.openwhisk.core.containerpool.Interval
2629
import org.apache.openwhisk.core.controller.WhiskServices
2730
import org.apache.openwhisk.core.database.{ActivationStore, NoDocumentException, UserContext}
2831
import org.apache.openwhisk.core.entity._
@@ -199,7 +202,7 @@ protected[actions] trait SequenceActions {
199202
topmost: Boolean,
200203
cause: Option[ActivationId],
201204
start: Instant,
202-
end: Instant): WhiskActivation = {
205+
end: Instant)(implicit transid: TransactionId): WhiskActivation = {
203206

204207
// compute max memory
205208
val sequenceLimits = accounting.maxMemory map { maxMemoryAcrossActionsInSequence =>
@@ -213,6 +216,11 @@ protected[actions] trait SequenceActions {
213216
Some(Parameters(WhiskActivation.causedByAnnotation, JsString(Exec.SEQUENCE)))
214217
} else None
215218

219+
// set waitTime for sequence action
220+
val waitTime = {
221+
Parameters(WhiskActivation.waitTimeAnnotation, Interval(transid.meta.start, start).duration.toMillis.toJson)
222+
}
223+
216224
// set binding if an invoked action is in a package binding
217225
val binding = action.binding map { path =>
218226
Parameters(WhiskActivation.bindingAnnotation, JsString(path.asString))
@@ -234,7 +242,7 @@ protected[actions] trait SequenceActions {
234242
annotations = Parameters(WhiskActivation.topmostAnnotation, JsBoolean(topmost)) ++
235243
Parameters(WhiskActivation.pathAnnotation, JsString(action.fullyQualifiedName(false).asString)) ++
236244
Parameters(WhiskActivation.kindAnnotation, JsString(Exec.SEQUENCE)) ++
237-
causedBy ++ binding ++
245+
causedBy ++ waitTime ++ binding ++
238246
sequenceLimits,
239247
duration = Some(accounting.duration))
240248
}
@@ -285,7 +293,7 @@ protected[actions] trait SequenceActions {
285293
.foldLeft(initialAccounting) { (accountingFuture, futureAction) =>
286294
accountingFuture.flatMap { accounting =>
287295
if (accounting.atomicActionCnt < actionSequenceLimit) {
288-
invokeNextAction(user, futureAction, accounting, cause)
296+
invokeNextAction(user, futureAction, accounting, cause, transid)
289297
.flatMap { accounting =>
290298
if (!accounting.shortcircuit) {
291299
Future.successful(accounting)
@@ -327,12 +335,14 @@ protected[actions] trait SequenceActions {
327335
* @param cause the activation id of the first sequence containing this activations
328336
* @return a future which resolves with updated accounting for a sequence, including the last result, duration, and activation ids
329337
*/
330-
private def invokeNextAction(
331-
user: Identity,
332-
futureAction: Future[WhiskActionMetaData],
333-
accounting: SequenceAccounting,
334-
cause: Option[ActivationId])(implicit transid: TransactionId): Future[SequenceAccounting] = {
338+
private def invokeNextAction(user: Identity,
339+
futureAction: Future[WhiskActionMetaData],
340+
accounting: SequenceAccounting,
341+
cause: Option[ActivationId],
342+
parentTid: TransactionId): Future[SequenceAccounting] = {
335343
futureAction.flatMap { action =>
344+
implicit val transid: TransactionId = TransactionId.childOf(parentTid)
345+
336346
// the previous response becomes input for the next action in the sequence;
337347
// the accounting no longer needs to hold a reference to it once the action is
338348
// invoked, so previousResponse.getAndSet(null) drops the reference at this point

0 commit comments

Comments
 (0)