实验前后效果对比:

之前:执行13个节点,耗时16分钟

之后:同样13个节点,耗时3分钟

具体逻辑请参照代码及注释。

 import java.util.concurrent.{ExecutorService, Executors, TimeUnit}

 import akka.actor.{ActorSystem, Props}
import com.alibaba.fastjson.JSONObject
import xxx.listener.AddJobToQueueActor
import xxx.listener.bean.{AppStatusMessage, SparkContextStatusMessage}
import xxx.listener.utils.JSONUtil
import xxx.listener.utils.JmsUtils._
import xxx.main.SparkJob
import xxx.main.utils.JsonUtils
import com.typesafe.config.ConfigFactory
import org.apache.commons.lang.StringUtils
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{Logging, SparkConf, SparkContext} import scala.collection.mutable.Queue /**
* Created by zpc on 2016/9/1.
* JobServer实现模板。
* 修正前:各个任务节点独立提交到spark平台,其中启动sparkContext和hiveContext会占用大量时间,大约40多秒。
* 修改后:将统一用户,占用资源相同的节点利用JMS发送消息提交到同一个SparkContext上,默认设置为3个节点任务并行。
* 实现:1.提交到queue中的msg为任务包含任务中型的子类及参数信息,接收到的任务不存在依赖关系,依赖的处理在消息发送端控制。
* 前置任务执行结束,再发送下一节点任务。
* 2.第一次提交时,任务的参数在args中获取。启动之后,启动jms的lister监听,通过actor将接收到的任务信息加入队列。
* 3.通过反射调用SparkJob的各个子类(真正执行节点逻辑的类),SparkContext的默认timeout时间为30mins。
* 4.节点执行结束,发送节点成功消息到web端,节点失败,发送错误日志及错误消息。
* 程序退出,通过shutdownhook,发送sc关闭消息到web端。
* 程序被关闭,如kill时,将等待队列及正在执行集合中的任务,发送失败消息到web端。
*
*
*/
object ExecuteJobServer extends Logging { //等待执行的job所在的queue
val jobWaitingQueue = new Queue[String]
//当前正在执行的任务的集合
val jobRunningSet = new scala.collection.mutable.HashSet[JSONObject]
val timeout_mins = 30
//最后运行任务时间
var lastRunTime = System.currentTimeMillis() //spark context 对应的 applicationId, user, expId, resource
var appId : String = ""
var user: String = ""
var expId: Long = 0
var resource: String = ""
//正在执行的job JSON
var jobJson : JSONObject = null def main(args: Array[String]): Unit = { //进程杀死时,将正在执行或未执行的任务,发送失败消息到web端。
Runtime.getRuntime().addShutdownHook(new HookMessage())
//接收到的任务,可以同时提交时,线程数可以多设置,暂定为3
val threadPool: ExecutorService = Executors.newFixedThreadPool(3)
val sc = initSparkContext()
val hiveContext = new HiveContext(sc) val list = JsonUtils.parseArray(args(0))
val it = list.iterator
while (it.hasNext) {
val jobStr = it.next().toString
if(expId == 0){
val json = JSONUtil.toJSONString(jobStr)
val param = json.getJSONObject("params")
appId = sc.applicationId
user = param.getString("user")
expId = param.getLongValue("expId")
var driver_memory = ""
var num_executors = "spark.executor.instances"
var executor_memory = ""
sc.getConf.getAll.map( x => {
if(x._1 != null && "spark.executor.instances".equals(x._1)) {
num_executors = x._2
}
else if(x._1 != null && "spark.executor.memory".equals(x._1)){
executor_memory = x._2.substring(0, x._2.length - 1)
}else if(x._1 != null && "spark.driver.memory".equals(x._1)){
driver_memory = x._2.substring(0, x._2.length - 1)
}
}) resource = driver_memory + num_executors + executor_memory;
logInfo("resource is : " +resource)
// resource = param.getString("driver-memory") + param.getString("num-executors") + param.getString("executor-memory")
}
jobWaitingQueue.enqueue(jobStr)
} /** 1.启动listener监听appId,接收queue中发送过来的JobMsg消息2.通过Queue发送消息通知web端,sc启动 **/
val system = ActorSystem("mlp", ConfigFactory.load())
val actor = system.actorOf(Props(new AddJobToQueueActor(appId, jobWaitingQueue)))
createTopicListenerOfContextJobMsg("contextJobMsgListener", actor)
informSparkContextStatus(true) while (jobWaitingQueue.size > 0 || !checkTimeOut) {
while (jobWaitingQueue.size > 0) {
lastRunTime = System.currentTimeMillis()
val jobStr = jobWaitingQueue.dequeue()//.replace("\\", "")
logInfo("***** ExecuteJobServer jobStr ***** jobStr: " + jobStr)
val json = JSONUtil.toJSONString(jobStr)
jobRunningSet.add(json)
threadPool.execute(new ThreadSparkJob(json, hiveContext, sc))
jobJson = json
}
Thread.sleep(1000)
} /**
* jobWaittingQueue队列不再接收消息
*
*/
threadPool.shutdown()
var loop = true
do {
//等待所有任务完成
loop = !threadPool.awaitTermination(2, TimeUnit.SECONDS); //阻塞,直到线程池里所有任务结束
} while (loop);
} def checkTimeOut(): Boolean = {
val nowTime = System.currentTimeMillis()
if (jobRunningSet.isEmpty && (nowTime - lastRunTime) / (1000 * 60) > timeout_mins) {
return true
} else {
return false
}
} class ThreadSparkJob(json: JSONObject, hiveContext: HiveContext, ctx: SparkContext) extends Runnable {
override def run() { try{
val classStr = json.get("class").toString
val argsStr = json.get("params").toString
val obj: SparkJob = Class.forName(classStr).getMethod("self").invoke(null).asInstanceOf[SparkJob]
// val obj: SparkJob = Class.forName(classStr).newInstance().asInstanceOf[SparkJob]
obj.jobServer = true
obj.failed = false
obj.setContext(ctx)
obj.setHiveContext(hiveContext)
obj.main(Array(argsStr))
// InformJobSuccess(json)
logInfo("***** jobRunningSet remove job json***** json: " + json.toJSONString )
jobRunningSet.remove(json)
lastRunTime = System.currentTimeMillis()
}catch {
case oom: OutOfMemoryError => {
informJobFailure(oom.toString, json)
jobRunningSet.remove(json)
logInfo("***** SparkContext go to stop, reaseon: " + oom.getMessage )
hiveContext.sparkContext.stop()
//异常时,sc停止,driver程序停止
System.exit(1)
}
case ex: Exception => {
informJobFailure(ex.toString, json)
jobRunningSet.remove(json)
if(ex.toString.contains("stopped SparkContext")){
logInfo("***** SparkContext go to stop, reaseon: " + ex.getMessage )
hiveContext.sparkContext.stop()
//异常时,sc停止,driver程序停止
System.exit(1)
}
}
}
}
def informJobFailure(errMsg: String, json: JSONObject) = {
if(json != null) {
val params = json.getJSONObject("params")
val user = StringUtils.trimToEmpty(params.getString("user"))
val expId = params.getLongValue("expId")
val nodeId = params.getLongValue("nodeId")
val message = new AppStatusMessage(user, expId, nodeId, "FAILURE", errMsg)
logInfo("***** send informJobFailure message*****: expId: " + expId + "nodeId: " + nodeId)
jobStatusTemplate send message
}
}
} def initSparkContext(): SparkContext = {
val conf = new SparkConf().setAppName("cbt-mlaas")
new SparkContext(conf)
} class HookMessage extends Thread {
override def run() {
var shouldInformStop = false
informSparkContextStatus(false)
while (jobWaitingQueue.size > 0) {
val jobStr = jobWaitingQueue.dequeue()//.replace("\\", "")
val json = JSONUtil.toJSONString(jobStr)
informJobFailureInHook("SparkContext stopped, inform waiting job failed!", json)
shouldInformStop = true
}
jobRunningSet.map(json => {
informJobFailureInHook("SparkContext stopped, inform running job failed!", json);
shouldInformStop = true
})
if (shouldInformStop) {
informExpStop("SparkContext stopped, inform exp failed!", jobJson)
}
}
def informJobFailureInHook(errMsg: String, json: JSONObject) = {
if(json != null) {
val params = json.getJSONObject("params")
val user = StringUtils.trimToEmpty(params.getString("user"))
val expId = params.getLongValue("expId")
val nodeId = params.getLongValue("nodeId")
val message = new AppStatusMessage(user, expId, nodeId, "FAILURE", errMsg)
logInfo("***** send informJobFailure message*****: expId: " + expId + "nodeId: " + nodeId)
jobStatusTemplate send message
}
}
def informExpStop(errMsg: String, json: JSONObject) = {
if(json != null) {
val params = json.getJSONObject("params")
val user = StringUtils.trimToEmpty(params.getString("user"))
val expId = params.getLongValue("expId")
val nodeId = params.getLongValue("nodeId")
val message = new AppStatusMessage(user, expId, nodeId, "STOP", errMsg)
logInfo("***** send informExpStop message*****: expId: " + expId + "nodeId: " + nodeId)
jobStatusTemplate send message
}
}
} def informSparkContextStatus(start : Boolean) = {
val msg = new SparkContextStatusMessage(appId, start, user, expId, resource)
logInfo("***** send sparkContext start message*****: appId: " + appId + "start: " + start)
sparkContextStatusTemplate send msg
} }

最新文章

  1. 我的iOS开发系列博文
  2. Java中hashCode的作用
  3. 7-RandomAccessFile 随机流
  4. w3school-CSS
  5. 转:CentOS 7 安装Nginx
  6. 嵌入式开发笔记 - U-Boot相关
  7. 【疯狂Java讲义学习笔记】【流程控制与数组】
  8. Unity 编辑器扩展自定义窗体
  9. pb_ds(平板电视)整理
  10. 转发:Ubuntu软件卸载安装的命令
  11. 第二个MapReduce
  12. Linux终端连接Linux服务器
  13. scrapy + selenium 的动态爬虫
  14. [Swift]LeetCode241. 为运算表达式设计优先级 | Different Ways to Add Parentheses
  15. python之继承与派生
  16. 【数据使用】3k水稻数据库现成SNP的使用
  17. IBM NOTES
  18. airtest 记录
  19. kangle请求控制添加的add_header怎么查看
  20. intellij 快捷键整理

热门文章

  1. .Net 中资源的使用方式
  2. DZ升级到X3.2后,UCenter用户管理中心进不了了
  3. mui 重写back 调用back方法,实现返回就即时刷新页面
  4. matlab2014在mac Yosemite下出现java空指针情况
  5. 三个PHP常用代码样例
  6. Live帐号登陆win8系统不用输密码的方法
  7. ECSHOP 订单状态 记录
  8. Chrome远程调试Android上Chrome的页面
  9. asp.net mvc 事务处理:Transactions
  10. 排队(BZOJ1731:[Usaco2005 dec]Layout 排队布局)