Flink 输出至 Elasticsearch

【1】引入pom.xml依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-elasticsearch6_2.12</artifactId>
    <version>1.10.0</version>
</dependency>

【2】ES6 Scala代码,自动导入的scala包需要修改为scala._ 否则会出现错误。

package com.zzx.flink

import java.util

import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.elasticsearch.{
   ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
import org.apache.http.HttpHost
import org.elasticsearch.client.Requests


object EsSinkTest {
   
  def main(args: Array[String]): Unit = {
   
    // 创建一个流处理执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //从文件中读取数据并转换为 类
    val inputStreamFromFile: DataStream[String] = env.readTextFile("E:\\Project\\flink\\src\\main\\resources\\wordcount.txt")
    //转换
    val dataStream: DataStream[SensorReading] = inputStreamFromFile
      .map( data => {
   
        var dataArray = data.split(",")
        SensorReading(dataArray(0),dataArray(1).toLong,dataArray(2).toDouble)
      })

    //定义一个 HttpHosts
    val httpHost = new util.ArrayList[HttpHost]()
    //默认 9200 我的修改为了 9201
    httpHost.add(new HttpHost("192.168.1.12",9200,"http"))
    httpHost.add(new HttpHost("127.0.0.1",9200,"http"))
    //定义一个 ElasticSearchFuntion 操作 es的function
    val esSinkFunc = new ElasticsearchSinkFunction[SensorReading] {
   
      //element 每一条数据 通过 index 发送
      override def process(element: SensorReading, runtimeContext: RuntimeContext, index: RequestIndexer): Unit = {
   
        //包装写入 es 的数据
        val dataSource = new util.HashMap[String,String]()
        dataSource.put("sensor_id",element.id)
        dataSource.put("temp",element.temperature.toString)
        dataSource.put("ts",element.timestamp.toString)

        //index
        val indexRequest = Requests.indexRequest()
            .index("sensor_temp")
            .`type`("readingdata")
            .source(dataSource)
        index.add(indexRequest)
        println("saved successfully " + element.toString)
      }
    }
    //输出值 es
    dataStream.addSink(new ElasticsearchSink.Builder[SensorReading](httpHost,esSinkFunc).build())
    env.execute("es")
  }
}

【3】ES6输出展示

​ [点击并拖拽以移动] ​​

相关推荐

  1. Flink旁路输出OutputTag

    2023-12-30 01:38:02       26 阅读
  2. Flink输出流(SideOutput)

    2023-12-30 01:38:02       25 阅读
  3. Flink系列之:Elasticsearch SQL 连接器

    2023-12-30 01:38:02       38 阅读
  4. 【QT】如何将printf打印输出窗体

    2023-12-30 01:38:02       11 阅读

最近更新

  1. TCP协议是安全的吗?

    2023-12-30 01:38:02       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2023-12-30 01:38:02       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2023-12-30 01:38:02       19 阅读
  4. 通过文章id递归查询所有评论(xml)

    2023-12-30 01:38:02       20 阅读

热门阅读

  1. Nginx和OpenResty面试题及简单示例

    2023-12-30 01:38:02       28 阅读
  2. c语言结构体内存对齐

    2023-12-30 01:38:02       35 阅读
  3. 互联网摸鱼日报(2023-12-26)

    2023-12-30 01:38:02       39 阅读
  4. @staticmethod函数装饰器

    2023-12-30 01:38:02       37 阅读
  5. 有序数组的平方

    2023-12-30 01:38:02       38 阅读
  6. 深入探讨在SpringBoot中分布式锁的实现与应用

    2023-12-30 01:38:02       32 阅读
  7. RecyclerView刷新显示的问题

    2023-12-30 01:38:02       40 阅读
  8. 接收多个参数的函数——python

    2023-12-30 01:38:02       33 阅读
  9. Vue3自定义指令与Composition API实现动态权限控制

    2023-12-30 01:38:02       40 阅读
  10. Linux sed 命令

    2023-12-30 01:38:02       42 阅读
  11. 第八章 创建Callout Library - ZFentry 链接选项

    2023-12-30 01:38:02       38 阅读
  12. 【CISSP学习笔记】7. 安全评估与测试

    2023-12-30 01:38:02       35 阅读