This repository was archived by the owner on Jun 28, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathClusterClientService.scala
More file actions
58 lines (45 loc) · 2.05 KB
/
ClusterClientService.scala
File metadata and controls
58 lines (45 loc) · 2.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package services
import javax.inject.{Inject, Singleton}
import actors.ConnectionStore
import akka.actor.ActorSystem
import akka.cluster.pubsub.DistributedPubSub
import akka.cluster.pubsub.DistributedPubSubMediator.Publish
import akka.pattern.ask
import akka.util.Timeout
import tmt.common.Messages
import tmt.library.Role
import tmt.shared.Topics
import tmt.shared.models.ConnectionSet
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.util.{Failure, Success, Try}
@Singleton
class ClusterClientService @Inject()(system: ActorSystem, roleMappingsService: RoleMappingsService) {
implicit val timeout = Timeout(2.seconds)
private val mediator = DistributedPubSub(system).mediator
private val connectionStore = system.actorOf(ConnectionStore.props())
def throttle(serverName: String, delay: FiniteDuration) = {
mediator ! Publish(Topics.Throttle, Messages.UpdateDelay(serverName, delay))
}
def subscribe(serverName: String, topic: String): Try[Unit] = {
validate(serverName, topic) match {
case Some(true) =>
Success(mediator ! Publish(Topics.Subscription, Messages.Subscribe(serverName, topic)))
case _ =>
Failure(new ValidationFailedException("Bad request"))
}
}
def unsubscribe(serverName: String, topic: String) = {
mediator ! Publish(Topics.Subscription, Messages.Unsubscribe(serverName, topic))
}
def allConnections = (connectionStore ? ConnectionStore.GetConnections).mapTo[ConnectionSet]
private def validate(sourceServerName: String, destinationServerName: String) = {
val roleMappings = roleMappingsService.onlineRoleMappings
for {
sourceRoleName <- roleMappings.roleOf(sourceServerName)
destinationRoleName <- roleMappings.roleOf(destinationServerName)
sourceRole = Role.withName(sourceRoleName)
destinationRole = Role.withName(destinationRoleName)
} yield sourceRole.maybeConsumes.isDefined && sourceRole.maybeConsumes == destinationRole.maybeProduces
}
}
case class ValidationFailedException(msg: String) extends RuntimeException(msg)