您当前的位置: 首页 >  scala

176 Scala 项目案例(项目实现)

杨林伟 发布时间:2019-08-14 11:25:59 ,浏览量:2

架构图

在这里插入图片描述

ActorSystem

在Akka中,ActorSystem是一个重量级的结构,他需要分配多个线程,所以在实际应用中,ActorSystem通常是一个单例对象,我们可以使用这个ActorSystem创建很多Actor。

Actor

在Akka中,Actor负责通信,在Actor中有一些重要的生命周期方法。

  1. preStart()方法:该方法在Actor对象构造方法执行后执行,整个Actor生命周期中仅执行一次。

  2. receive()方法:该方法在Actor的preStart方法执行完成后执行,用于接收消息,会被反复执行。

Master类
package cn.itcast.spark
import scala.concurrent.duration._
import akka.actor.{Props, ActorSystem, Actor}
import akka.actor.Actor.Receive
import com.typesafe.config.ConfigFactory

import scala.collection.mutable

/**
  * Master为整个集群中的主节点
  * Master继承了Actor
  */
class Master extends Actor{

  //保存WorkerID和Work信息的map
  val idToWorker = new mutable.HashMap[String, WorkerInfo]
  //保存所有Worker信息的Set
  val workers = new mutable.HashSet[WorkerInfo]
  //Worker超时时间
  val WORKER_TIMEOUT = 10 * 1000
  //重新receive方法

  //导入隐式转换,用于启动定时器
  import context.dispatcher

  //构造方法执行完执行一次
  override def preStart(): Unit = {
    //启动定时器,定时执行
    context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckOfTimeOutWorker)
  }

  //该方法会被反复执行,用于接收消息,通过case class模式匹配接收消息
  override def receive: Receive = {
    //Worker向Master发送的注册消息
    case RegisterWorker(id, workerHost, memory, cores) => {
      if(!idToWorker.contains(id)) {
        val worker = new WorkerInfo(id, workerHost, memory, cores)
        workers.add(worker)
        idToWorker(id) = worker
        sender ! RegisteredWorker("192.168.10.1")
      }
    }

    //Worker向Master发送的心跳消息
    case HeartBeat(workerId) => {
      val workerInfo = idToWorker(workerId)
      workerInfo.lastHeartbeat = System.currentTimeMillis()
    }

    //Master自己向自己发送的定期检查超时Worker的消息
    case CheckOfTimeOutWorker => {
      val currentTime = System.currentTimeMillis()
      val toRemove = workers.filter(w => currentTime - w.lastHeartbeat > WORKER_TIMEOUT).toArray
      for(worker  {
      import context.dispatcher
      //启动定时任务,向Master发送心跳
      context.system.scheduler.schedule(0 millis, 5000 millis, self, SendHeartBeat)
    }

    case SendHeartBeat => {
      println("worker send heartbeat")
      master ! HeartBeat(id)
    }
  }
}

object Worker {
  def main(args: Array[String]) {
    val clientPort = 2552
    //创建WorkerActorSystem的必要参数
    val configStr =
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.port = $clientPort
       """.stripMargin
    val config = ConfigFactory.parseString(configStr)
    val actorSystem = ActorSystem("WorkerActorSystem", config)
    //启动Actor,Master会被实例化,生命周期方法会被调用
    actorSystem.actorOf(Props[Worker], "Worker")
  }
}

关注
打赏
1688896170
查看更多评论

杨林伟

暂无认证

  • 2浏览

    0关注

    3183博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文
立即登录/注册

微信扫码登录

0.0501s