从xxl-job源码中学习Netty的使用

1. 启动与Spring实例化

com.xxl.job.core.executor.impl.XxlJobSpringExecutor.java类 

继承SmartInitializingSingleton 类,在afterSingletonsInstantiated 实例化后方法中

调用initJobHandlerMethodRepository 把所有的xxljob任务管理起来;

  private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
        if (applicationContext == null) {
            return;
        }
        // init job handler from method
        //扫描所有的bean,并获取XxlJob的注解,registJobHandler加入到 一个 map  中
        // 见jobHandlerRepository
        String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);
        for (String beanDefinitionName : beanDefinitionNames) {
            Object bean = applicationContext.getBean(beanDefinitionName);

            Map<Method, XxlJob> annotatedMethods = null;   // referred to :org.springframework.context.event.EventListenerMethodProcessor.processBean
            try {
                annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),
                        new MethodIntrospector.MetadataLookup<XxlJob>() {
                            @Override
                            public XxlJob inspect(Method method) {
                                return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);
                            }
                        });
            } catch (Throwable ex) {
                logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);
            }
            if (annotatedMethods==null || annotatedMethods.isEmpty()) {
                continue;
            }

            for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {
                Method executeMethod = methodXxlJobEntry.getKey();
                XxlJob xxlJob = methodXxlJobEntry.getValue();
                if (xxlJob == null) {
                    continue;
                }

                String name = xxlJob.value();
                if (name.trim().length() == 0) {
                    throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + bean.getClass() + "#" + executeMethod.getName() + "] .");
                }
                if (loadJobHandler(name) != null) {
                    throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");
                }

                // execute method
                /*if (!(method.getParameterTypes().length == 1 && method.getParameterTypes()[0].isAssignableFrom(String.class))) {
                    throw new RuntimeException("xxl-job method-jobhandler param-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " +
                            "The correct method format like \" public ReturnT<String> execute(String param) \" .");
                }
                if (!method.getReturnType().isAssignableFrom(ReturnT.class)) {
                    throw new RuntimeException("xxl-job method-jobhandler return-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " +
                            "The correct method format like \" public ReturnT<String> execute(String param) \" .");
                }*/

                executeMethod.setAccessible(true);

                // init and destory
                Method initMethod = null;
                Method destroyMethod = null;

                if (xxlJob.init().trim().length() > 0) {
                    try {
                        initMethod = bean.getClass().getDeclaredMethod(xxlJob.init());
                        initMethod.setAccessible(true);
                    } catch (NoSuchMethodException e) {
                        throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + bean.getClass() + "#" + executeMethod.getName() + "] .");
                    }
                }
                if (xxlJob.destroy().trim().length() > 0) {
                    try {
                        destroyMethod = bean.getClass().getDeclaredMethod(xxlJob.destroy());
                        destroyMethod.setAccessible(true);
                    } catch (NoSuchMethodException e) {
                        throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + bean.getClass() + "#" + executeMethod.getName() + "] .");
                    }
                }

                // registry jobhandler
                //扫描所有的jobhandler
                registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));
            }
        }

    }

2. 调用父类的start() 方法 启动netty服务端

com.xxl.job.core.executor.XxlJobExecutor#initEmbedServer

核心代码见com.xxl.job.core.server.EmbedServer#start

 // start server 服务端
                    ServerBootstrap bootstrap = new ServerBootstrap();
                    bootstrap.group(bossGroup, workerGroup)
                            .channel(NioServerSocketChannel.class)
                            .childHandler(new ChannelInitializer<SocketChannel>() {
                                @Override
                                public void initChannel(SocketChannel channel) throws Exception {
                                    channel.pipeline()
                                            //心跳机制
                                            .addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS))  // beat 3N, close if idle
                                            // 使用 http 协议
                                            .addLast(new HttpServerCodec())
                                            // 拆包粘包
                                            .addLast(new HttpObjectAggregator(5 * 1024 * 1024))  // merge request & reponse to FULL
                                            // 业务处理EmbedHttpServerHandler
                                            .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
                                }
                            })
                            .childOption(ChannelOption.SO_KEEPALIVE, true);

                    // bind
                    ChannelFuture future = bootstrap.bind(port).sync();

                    logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);

                    // start registry
                    startRegistry(appname, address);

                    // wait util stop
                    future.channel().closeFuture().sync();

3. 定时任务发起一个任务执行 调用netty server段的接口

此处以手动调用为一个例子; 定时任务&时间轮见下回分析

com.xxl.job.admin.core.thread.JobTriggerPoolHelper#trigger

@startuml tomcat
JobInfoController -> JobTriggerPoolHelper: trigger()
JobTriggerPoolHelper -> XxlJobTrigger:trigger()
XxlJobTrigger ->  XxlJobTrigger: processTrigger()
XxlJobTrigger -> XxlJobTrigger : runExecutor()
XxlJobTrigger -> ExecutorBizImpl :run()
@enduml

最终调用

 @Override
    public ReturnT<String> run(TriggerParam triggerParam) {
        return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);
    }

4. 其他netty 详细内容见下回分析;

相关推荐

  1. xxl-job使用笔记

    2024-06-17 07:14:03       57 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-06-17 07:14:03       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-06-17 07:14:03       101 阅读
  3. 在Django里面运行非项目文件

    2024-06-17 07:14:03       82 阅读
  4. Python语言-面向对象

    2024-06-17 07:14:03       91 阅读

热门阅读

  1. Kafka内外网分流配置listeners和advertised.listeners

    2024-06-17 07:14:03       32 阅读
  2. 【杂记-浅谈以太网IP数据帧】

    2024-06-17 07:14:03       26 阅读
  3. Android.mk的用法

    2024-06-17 07:14:03       30 阅读
  4. 2024.6.15 英语六级 经验与复盘

    2024-06-17 07:14:03       33 阅读
  5. 速盾:网站如何加上cdn?

    2024-06-17 07:14:03       33 阅读
  6. 22.2 正则表达式-数据验证、数据变换

    2024-06-17 07:14:03       27 阅读
  7. golang实现循环队列

    2024-06-17 07:14:03       29 阅读
  8. github基础使用

    2024-06-17 07:14:03       28 阅读