spark 事件总线listenerBus

事件总线基本流程

图片来源:https://blog.csdn.net/sinat_26781639/article/details/105012302

LiveListenerBus创建

在sparkContext初始化中创建LiveListenerBus对象。

主要变量有两个

  • queues:事件队列,里面存放四个队列,每个队列中有对应的listener注册
  • queuedEvents:发生的事件


注册listener

内部listener注册

举例DynamicAllocation,在SpackContext初始化的时候创建了ExecutorAllocationManager对象,再调用start方法。

在ExecutorAllocationManager的start方法中,调用listenerBus相关方法完成注册

LiveListenerBus中queues分成四个队列,分别对应不同的注册方法,最终都是调用addToQueue方法

addToQueue比较简单,就是从queues中获取,有AsyncEventQueue就加入listener,没有就创建再加入。加入是调用AsyncEventQueue的addListener方法。

AsyncEventQueue的addListener方法是父类ListenerBus的addListener方法。
将listener加入到listenersPlusTimers中


内部listener是放入了对应队列的listenersPlusTimers中。

外部listener注册

在内部listener注册完成后调用setupAndStartListenerBus注册外部listener

在setupAndStartListenerBus方法中,读取配置spark.extraListeners获取要注册的外部listener集合,使用反射创建listener对象,遍历调用addToSharedQueue加入share queue。最后调用listenerBus的start方法启动listenerBus。

事件总线启动

如上图,在listener全部完成注册后,调用listenerBus的start方法启动。
将started变量修改为true,标记listenerBus启动。遍历queues,启动有注册的队列AsyncEventQueue(总共四个队列,但不一定全部都启用)。遍历queuedEvents处理在listenerBus没有启动期间产生的event。最后不再需要缓存消息了,将queuedEvents置为空。

AsyncEventQueue启动
将started变量变成true,标记AsyncEventQueue启动。启动dispatchThread线程。

dispatchThread线程是调用dispatch方法。
在dispatch方法中,可以看到是循环读取eventQueue,从其中读取event,调用postToAll发送给全部的listener。
eventQueue是一个阻塞队列LinkedBlockingQueue


到此,listenerBus启动完成,其中的列队AsyncEventQueue也启动完成。AsyncEventQueue循环从eventQueue中获取event来处理(这里是阻塞的)

发送消息

发送消息的入口是调用LiveListenerBus的post方法。
如果还没有启动,就将消息先缓存到queueEvents中。
如果启动了,就调用postToQueues将消息发送给全部队列。
在postToQueues中是遍历queue,调用post方法。

AsyncEventQueue的post方法中,就是将消息放入eventQueue即可。
但是eventQueue是有容量大小的,超过的消息就会丢弃。

至此,发送消息完成,将消息放入到AsyncEventQueue的eventQueue中。

处理消息

在启动的时候,dispatch线程已经完成了启动,从eventQueue获取event来处理。
处理消息是调用父类postToAll方法

postToAll方法中是遍历该队列全部listener,调用doPostEvent方法。

doPostEvent对应是SparkListenerBus的doPostEvent方法,根据event的类型,调用listener的不同的方法。

listener是要实现的SparkListenerInterface的方法,可以看到方法很多。。。

相关推荐

  1. 模式实现vue事件总线

    2024-07-18 21:50:01       34 阅读
  2. spark 报错总结

    2024-07-18 21:50:01       42 阅读
  3. 本地存储、自定义事件、全局事件总线

    2024-07-18 21:50:01       31 阅读
  4. Spark安全日志分析与事件调查:实战指南

    2024-07-18 21:50:01       35 阅读
  5. vue3+TypeScript全局事件总线mitt

    2024-07-18 21:50:01       61 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-07-18 21:50:01       66 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-07-18 21:50:01       70 阅读
  3. 在Django里面运行非项目文件

    2024-07-18 21:50:01       57 阅读
  4. Python语言-面向对象

    2024-07-18 21:50:01       68 阅读

热门阅读

  1. 设备树节点和struct device的关系及示例

    2024-07-18 21:50:01       18 阅读
  2. Html_Css问答集(8)

    2024-07-18 21:50:01       18 阅读
  3. APP开发者选择苹果企业签名的理由是什么?

    2024-07-18 21:50:01       21 阅读
  4. 负载均衡轮询逻辑

    2024-07-18 21:50:01       19 阅读
  5. swift小知识点(二)

    2024-07-18 21:50:01       18 阅读
  6. Redis常见阻塞原因

    2024-07-18 21:50:01       22 阅读
  7. Pandas库学习之DataFrame.replace()函数

    2024-07-18 21:50:01       21 阅读
  8. ros2--插件

    2024-07-18 21:50:01       27 阅读
  9. 探索 Flask:从入门到精通的完整学习指南

    2024-07-18 21:50:01       22 阅读
  10. antd使用踩坑记录

    2024-07-18 21:50:01       19 阅读