架构图
在Akka中,ActorSystem是一个重量级的结构,他需要分配多个线程,所以在实际应用中,ActorSystem通常是一个单例对象,我们可以使用这个ActorSystem创建很多Actor。
Actor在Akka中,Actor负责通信,在Actor中有一些重要的生命周期方法。
-
preStart()方法:该方法在Actor对象构造方法执行后执行,整个Actor生命周期中仅执行一次。
-
receive()方法:该方法在Actor的preStart方法执行完成后执行,用于接收消息,会被反复执行。
package cn.itcast.spark
import scala.concurrent.duration._
import akka.actor.{Props, ActorSystem, Actor}
import akka.actor.Actor.Receive
import com.typesafe.config.ConfigFactory
import scala.collection.mutable
/**
* Master为整个集群中的主节点
* Master继承了Actor
*/
class Master extends Actor{
//保存WorkerID和Work信息的map
val idToWorker = new mutable.HashMap[String, WorkerInfo]
//保存所有Worker信息的Set
val workers = new mutable.HashSet[WorkerInfo]
//Worker超时时间
val WORKER_TIMEOUT = 10 * 1000
//重新receive方法
//导入隐式转换,用于启动定时器
import context.dispatcher
//构造方法执行完执行一次
override def preStart(): Unit = {
//启动定时器,定时执行
context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckOfTimeOutWorker)
}
//该方法会被反复执行,用于接收消息,通过case class模式匹配接收消息
override def receive: Receive = {
//Worker向Master发送的注册消息
case RegisterWorker(id, workerHost, memory, cores) => {
if(!idToWorker.contains(id)) {
val worker = new WorkerInfo(id, workerHost, memory, cores)
workers.add(worker)
idToWorker(id) = worker
sender ! RegisteredWorker("192.168.10.1")
}
}
//Worker向Master发送的心跳消息
case HeartBeat(workerId) => {
val workerInfo = idToWorker(workerId)
workerInfo.lastHeartbeat = System.currentTimeMillis()
}
//Master自己向自己发送的定期检查超时Worker的消息
case CheckOfTimeOutWorker => {
val currentTime = System.currentTimeMillis()
val toRemove = workers.filter(w => currentTime - w.lastHeartbeat > WORKER_TIMEOUT).toArray
for(worker {
import context.dispatcher
//启动定时任务,向Master发送心跳
context.system.scheduler.schedule(0 millis, 5000 millis, self, SendHeartBeat)
}
case SendHeartBeat => {
println("worker send heartbeat")
master ! HeartBeat(id)
}
}
}
object Worker {
def main(args: Array[String]) {
val clientPort = 2552
//创建WorkerActorSystem的必要参数
val configStr =
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.port = $clientPort
""".stripMargin
val config = ConfigFactory.parseString(configStr)
val actorSystem = ActorSystem("WorkerActorSystem", config)
//启动Actor,Master会被实例化,生命周期方法会被调用
actorSystem.actorOf(Props[Worker], "Worker")
}
}