Akka基于Actor模型,提供了一个用于构建可扩展的(Scalable)、弹性的(Resilient)、快速响应的(Responsive)应用程序的平台。本文基本上是基于Akka的官方文档(版本是2.3.12),通过自己的理解,来阐述Akka提供的一些组件或概念,另外总结了Akka的一些使用场景。
Actor
维基百科这样定义Actor模型:
Actor是Akka中最核心的概念,它是一个封装了状态和行为的对象,Actor之间可以通过交换消息的方式进行通信,每个Actor都有自己的收件箱(Mailbox)。通过Actor能够简化锁及线程管理,可以非常容易地开发出正确地并发程序和并行系统,Actor具有如下特性:
实现一个Actor,可以继承特质akka.actor.Actor,实现一个receive方法,应该在receive方法中定义一系列的case语句,基于标准Scala的模式匹配方法,来实现每一种消息的处理逻辑。我们先看一下Akka中特质Actor的定义:
trait Actor {
import Actor._
type Receive = Actor.Receive
implicit val context: ActorContext = {
val contextStack = ActorCell.contextStack.get
if ((contextStack.isEmpty) || (contextStack.head eq null))
throw ActorInitializationException(
s"You cannot create an instance of [${getClass.getName}] explicitly using the constructor (new). " +
"You have to use one of the 'actorOf' factory methods to create a new actor. See the documentation.")
val c = contextStack.head
ActorCell.contextStack.set(null :: contextStack)
c
}
implicit final val self = context.self //MUST BE A VAL, TRUST ME
final def sender(): ActorRef = context.sender()
def receive: Actor.Receive // 这个是在子类中一定要实现的抽象方法
protected[akka] def aroundReceive(receive: Actor.Receive, msg: Any): Unit = receive.applyOrElse(msg, unhandled)
protected[akka] def aroundPreStart(): Unit = preStart()
protected[akka] def aroundPostStop(): Unit = postStop()
protected[akka] def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit = preRestart(reason, message)
protected[akka] def aroundPostRestart(reason: Throwable): Unit = postRestart(reason)
def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.defaultStrategy
@throws(classOf[Exception]) // when changing this you MUST also change UntypedActorDocTest
def preStart(): Unit = () // 启动Actor之前需要执行的操作,默认为空实现,可以重写该方法
@throws(classOf[Exception]) // when changing this you MUST also change UntypedActorDocTest
def postStop(): Unit = () // 终止Actor之前需要执行的操作,默认为空实现,可以重写该方法
@throws(classOf[Exception]) // when changing this you MUST also change UntypedActorDocTest
def preRestart(reason: Throwable, message: Option[Any]): Unit = { // 重启Actor之前需要执行的操作,默认终止该Actor所监督的所有子Actor,然后调用postStop()方法,可以重写该方法
context.children foreach { child ⇒
context.unwatch(child)
context.stop(child)
}
postStop()
}
@throws(classOf[Exception]) // when changing this you MUST also change UntypedActorDocTest
def postRestart(reason: Throwable): Unit = { // 重启Actor之前需要执行的操作,默认执行preStart()的实现逻辑,可以重写该方法
preStart()
}
def unhandled(message: Any): Unit = {
message match {
case Terminated(dead) ⇒ throw new DeathPactException(dead)
case _ ⇒ context.system.eventStream.publish(UnhandledMessage(message, sender(), self))
}
}
}
<< · Back Index ·>>