Flink Rest Basic Auth - 安全认证

背景

公司目前需要将Flink实时作业云化,构建多租户实时计算平台。目前考虑为了资源高效利用,并不打算为每个租户部署一套独立的Kubernetes集群。也就意味着多个租户的作业可能会运行在同一套kubernets集群中。此时实时作业的任务就变的很危险,因为网络可能是通的,就会存在危险的REST API暴露出去,被一些不坏好意的人利用,从而影响其他租户的作业。鉴于此考虑给Flink的作业添加一个认证方式,可以是Kerberos或者是Http 用户名密码Baisc认证。各种搜索和询问,最终发现了一些线索FLIP-181: Custom netty HTTP request inbound/outbound handlers 这里描述了为何flink官方否定这个诉求。当然不要着急,笔者在flink-basic-auth-handler上找到了方案,并且成功将方案迁移到了flink-1.17.2版本中。

改造步骤

Flink 的JobManager/SQLGateway是基于Netty实现的一套轻量级的web服务接口,这些接口都实现了RestServerEndpoint抽象类。因此我们可以看看这个类start方法中可以看到在启动的代码中可以看到InboundChannelHandlerFactory这个东西,通过改Factory创建一个Inbound的hander。

public final void start() throws Exception {
   
        synchronized (lock) {
   
            Preconditions.checkState(
                    state == State.CREATED, "The RestServerEndpoint cannot be restarted.");

            log.info("Starting rest endpoint.");

            final Router router = new Router();
            final CompletableFuture<String> restAddressFuture = new CompletableFuture<>();

            handlers = initializeHandlers(restAddressFuture);

            /* sort the handlers such that they are ordered the following:
             * /jobs
             * /jobs/overview
             * /jobs/:jobid
             * /jobs/:jobid/config
             * /:*
             */
            Collections.sort(handlers, RestHandlerUrlComparator.INSTANCE);

            checkAllEndpointsAndHandlersAreUnique(handlers);
            handlers.forEach(handler -> registerHandler(router, handler, log));

            ChannelInitializer<SocketChannel> initializer =
                    new ChannelInitializer<SocketChannel>() {
   

                        @Override
                        protected void initChannel(SocketChannel ch) throws ConfigurationException {
   
                            RouterHandler handler = new RouterHandler(router, responseHeaders);

                            // SSL should be the first handler in the pipeline
                            if (isHttpsEnabled()) {
   
                                ch.pipeline()
                                        .addLast(
                                                "ssl",
                                                new RedirectingSslHandler(
                                                        restAddress,
                                                        restAddressFuture,
                                                        sslHandlerFactory));
                            }

                            ch.pipeline()
                                    .addLast(new HttpServerCodec())
                                    .addLast(new FileUploadHandler(uploadDir))
                                    .addLast(
                                            new FlinkHttpObjectAggregator(
                                                    maxContentLength, responseHeaders));

                            for (InboundChannelHandlerFactory factory :
                                    inboundChannelHandlerFactories) {
   
                                Optional<ChannelHandler> channelHandler =
                                        factory.createHandler(configuration, responseHeaders);
                                if (channelHandler.isPresent()) {
   
                                    ch.pipeline().addLast(channelHandler.get());
                                }
                            }

                            ch.pipeline()
                                    .addLast(new ChunkedWriteHandler())
                                    .addLast(handler.getName(), handler)
                                    .addLast(new PipelineErrorHandler(log, responseHeaders));
                        }
                    };

            NioEventLoopGroup bossGroup =
                    new NioEventLoopGroup(
                            1, new ExecutorThreadFactory("flink-rest-server-netty-boss"));
            NioEventLoopGroup workerGroup =
                    new NioEventLoopGroup(
                            0, new ExecutorThreadFactory("flink-rest-server-netty-worker"));

            bootstrap = new ServerBootstrap();
            bootstrap
                    .group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(initializer);

            Iterator<Integer> portsIterator;
            try {
   
                portsIterator = NetUtils.getPortRangeFromString(restBindPortRange);
            } catch (IllegalConfigurationException e) {
   
                throw e;
            } catch (Exception e) {
   
                throw new IllegalArgumentException(
                        "Invalid port range definition: " + restBindPortRange);
            }

            int chosenPort = 0;
            while (portsIterator.hasNext()) {
   
                try {
   
                    chosenPort = portsIterator.next();
                    final ChannelFuture channel;
                    if (restBindAddress == null) {
   
                        channel = bootstrap.bind(chosenPort);
                    } else {
   
                        channel = bootstrap.bind(restBindAddress, chosenPort);
                    }
                    serverChannel = channel.syncUninterruptibly().channel();
                    break;
                } catch (final Exception e) {
   
                    // syncUninterruptibly() throws checked exceptions via Unsafe
                    // continue if the exception is due to the port being in use, fail early
                    // otherwise
                    if (!(e instanceof java.net.BindException)) {
   
                        throw e;
                    }
                }
            }

            if (serverChannel == null) {
   
                throw new BindException(
                        "Could not start rest endpoint on any port in port range "
                                + restBindPortRange);
            }

            log.debug("Binding rest endpoint to {}:{}.", restBindAddress, chosenPort);

            final InetSocketAddress bindAddress = (InetSocketAddress) serverChannel.localAddress();
            final String advertisedAddress;
            if (bindAddress.getAddress().isAnyLocalAddress()) {
   
                advertisedAddress = this.restAddress;
            } else {
   
                advertisedAddress = bindAddress.getAddress().getHostAddress();
            }

            port = bindAddress.getPort();

            log.info("Rest endpoint listening at {}:{}", advertisedAddress, port);

            restBaseUrl = new URL(determineProtocol(), advertisedAddress, port, "").toString();

            restAddressFuture.complete(restBaseUrl);

            state = State.RUNNING;

            startInternal();
        }
    }

然后在构造函数中可以发现inboundChannelHandlerFactories对象是通过SPI方案加载进来的。

 public RestServerEndpoint(Configuration configuration)
            throws IOException, ConfigurationException {
   
        Preconditions.checkNotNull(configuration);
        RestServerEndpointConfiguration restConfiguration =
                RestServerEndpointConfiguration.fromConfiguration(configuration);
        Preconditions.checkNotNull(restConfiguration);

        this.configuration = configuration;
        this.restAddress = restConfiguration.getRestAddress();
        this.restBindAddress = restConfiguration.getRestBindAddress();
        this.restBindPortRange = restConfiguration.getRestBindPortRange();
        this.sslHandlerFactory = restConfiguration.getSslHandlerFactory();

        this.uploadDir = restConfiguration.getUploadDir();
        

相关推荐

  1. 安全认证机制之JWT

    2024-06-08 03:28:02       55 阅读
  2. 保护通信的双重安全:消息认证与身份认证

    2024-06-08 03:28:02       32 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-06-08 03:28:02       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-06-08 03:28:02       100 阅读
  3. 在Django里面运行非项目文件

    2024-06-08 03:28:02       82 阅读
  4. Python语言-面向对象

    2024-06-08 03:28:02       91 阅读

热门阅读

  1. 安卓手机APP开发___设备管理概述

    2024-06-08 03:28:02       27 阅读
  2. Gnu/Linux 系统编程 - 如何获取帮助及一个演示

    2024-06-08 03:28:02       34 阅读
  3. C#朗读语音

    2024-06-08 03:28:02       33 阅读
  4. 第3章 列表简介

    2024-06-08 03:28:02       31 阅读
  5. MySQL数据库(7)

    2024-06-08 03:28:02       25 阅读
  6. 快慢指针算法举例

    2024-06-08 03:28:02       33 阅读
  7. pytest +allure在测试中的应用

    2024-06-08 03:28:02       30 阅读
  8. Python笔记 - Lambda表达式

    2024-06-08 03:28:02       24 阅读
  9. Kotlin 注解

    2024-06-08 03:28:02       33 阅读
  10. Android14 WMS-Power键短按流程

    2024-06-08 03:28:02       26 阅读