博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
JStorm之Topology调度
阅读量:4590 次
发布时间:2019-06-09

本文共 3654 字,大约阅读时间需要 12 分钟。

  topology在服务端提交过程中,会经过一系列的验证和初始化:TP结构校验、创建本地文件夹并拷贝序列化文件jar包、生成znode用于存放TP和task等信息,最后一步才进行任务分配。例如以下图:
提交主函数位于ServiceHandler.java中
private void makeAssignment(String topologyName, String topologyId, 		TopologyInitialStatus status) throws FailedAssignTopologyException {	//1、创建topology的分配事件	TopologyAssignEvent assignEvent = new TopologyAssignEvent();	assignEvent.setTopologyId(topologyId);	assignEvent.setScratch(false);	assignEvent.setTopologyName(topologyName);	assignEvent.setOldStatus(Thrift			.topologyInitialStatusToStormStatus(status));  //2、丢入事件处理队列	TopologyAssign.push(assignEvent);  //3、等待时间返回	boolean isSuccess = assignEvent.waitFinish();	if (isSuccess == true) {		LOG.info("Finish submit for " + topologyName);	} else {		throw new FailedAssignTopologyException(				assignEvent.getErrorMsg());	}}
这当中最基本的是事件丢入队列后兴许的处理过程。事件分配由TopologyAssign线程处理,这个线程的流程非常清晰,监听事件队列。一旦有事件进入,立即取出,进行doTopologyAssignment,例如以下:
public void run() {	LOG.info("TopologyAssign thread has been started");	runFlag = true;	while (runFlag) {		TopologyAssignEvent event;		try {			event = queue.take();		} catch (InterruptedException e1) {			continue;		}		if (event == null) {			continue;		}		boolean isSuccess = doTopologyAssignment(event);		..............}
任务分配的核心代码位于TopologyAssign.java中
public Assignment mkAssignment(TopologyAssignEvent event) throws Exception {	String topologyId = event.getTopologyId();	LOG.info("Determining assignment for " + topologyId);	TopologyAssignContext context = prepareTopologyAssign(event);	Set
assignments = null; if (!StormConfig.local_mode(nimbusData.getConf())) { IToplogyScheduler scheduler = schedulers .get(DEFAULT_SCHEDULER_NAME); //開始进行作业的调度 assignments = scheduler.assignTasks(context); } else { assignments = mkLocalAssignment(context); } ............}
调用栈例如以下:
分配原理是首先获得全部可用的supervisor,推断supervisor可用的标准是是否有空暇的slot,也就是是否全部supervisor.slots.ports指定port都被占用,然后计算出须要分配几个woker。由于一个woker相应一个port,当然这些信息的採集都是来自Zookeeper,如今我们来分析分配的核心代码:
WorkerMaker.java
//注意參数,result是这个作业须要的槽位。传入前仅仅知道须要槽位的数量,详细分配到哪台supervisor上还没指定
//supervisors指当前集群中全部可用的supervisor。即有空暇port的
private void putWorkerToSupervisor(List
result, List
supervisors) { int key = 0; //按所需槽位遍历,每次分配一个 for (ResourceWorkerSlot worker : result) { //首先进行必要的推断和置位 if (supervisors.size() == 0) return; if (worker.getNodeId() != null) continue; if (key >= supervisors.size()) key = 0; //1、取出第一个supervisor SupervisorInfo supervisor = supervisors.get(key); worker.setHostname(supervisor.getHostName()); worker.setNodeId(supervisor.getSupervisorId()); worker.setPort(supervisor.getWorkerPorts().iterator().next()); //槽位用完则从集合中删除,不再參与分配 supervisor.getWorkerPorts().remove(worker.getPort()); if (supervisor.getWorkerPorts().size() == 0) supervisors.remove(supervisor); //当一个supervisor分配完后便不再使用。除非supervisor不够用 key++; }}
从上面的代码中我们能够看到,眼下槽位分配没考虑机器负载,槽位的分配并不一定平均,比方第一个supervisor有10个槽位,剩下的supervisor仅仅有两个,那么还是要每一个supervisor分配一个woker的。

注意一个问题,在上面代码中supervisors这个集合是经过排序的,排序规则例如以下:

private void putAllWorkerToSupervisor(List
result, List
supervisors) { ........... supervisors = this.getCanUseSupervisors(supervisors); Collections.sort(supervisors, new Comparator
() { @Override public int compare(SupervisorInfo o1, SupervisorInfo o2) { // TODO Auto-generated method stub return -NumberUtils.compare(o1.getWorkerPorts().size(), o2 .getWorkerPorts().size()); } }); this.putWorkerToSupervisor(result, supervisors); .............}
能够看到。当前排序规则是按slot多少的,我们兴许版本号中可能会考虑机器负载的一些因素吧。

转载于:https://www.cnblogs.com/llguanli/p/8438798.html

你可能感兴趣的文章
java中ArrayList、LinkedList、Vector的区别
查看>>
第五周学习总结
查看>>
获得世界各国的当地时间 C#
查看>>
UWP&WP8.1 基础控件——Border
查看>>
食物链(并查集)
查看>>
python实现快速排序算法
查看>>
ABAP - 日期格式转换 & ABAP经常使用日期处理函数
查看>>
mac下通过xcodebuild使用oclint
查看>>
phonegap开发app中踩过的那些坑
查看>>
Django----模板
查看>>
MATLAB新手教程
查看>>
Selenium Python FirefoxWebDriver处理打开保存对话框
查看>>
如何将当前时间与已设时间比较大小
查看>>
图片瀑布流
查看>>
ROS-launch文件标签解读
查看>>
引用了System.Configuration命名空间,却找不到ConfigurationManager类
查看>>
编译型语言、解释型语言、脚本语言的区别
查看>>
电子书下载:Silverlight 5 in Action
查看>>
一个完整的DLL远程注入函数
查看>>
Day30 python 锁、信号量、事件、queue队列、生产者与消费者、joinablequeue
查看>>