Elasticsearch集群搭建学习

RestClient

查询所有

void testMatchAll() throws IOException {
        //1.准备Request
        SearchRequest request = new SearchRequest("hotel");
        //2.准备DSL
        request.source().query(QueryBuilders.matchAllQuery());
        //3.发送请求
        SearchResponse response = restClient.search(requ est , RequestOptions.DEFAULT);

        System.out.println(response);
    }

高亮

高亮API包括请求DSL构建和结果解析两部分。

构建:

request.source().highlighter(new HghlightBuilder().field("name")
                             //是否需要与字段匹配
                             .requireFieldMatch(false))
@Test
    public void testHighLight() throws IOException {
        //1 准备Request
        SearchRequest request = new SearchRequest("hotel");
        //2 准备DSL
        request.source().query(QueryBuilders.matchQuery("name" , "如家"));
        //2.2 高亮
        request.source().highlighter(new HighlightBuilder().field("name").requireFieldMatch(false));
        //3 发送请求
        SearchResponse response = restClient.search(request , RequestOptions.DEFAULT);
        //4 解析响应
        SearchHits searchHits = response.getHits();
        //5 获取总条数
        long total = searchHits.getTotalHits().value;
        //4.2 文档数组
        SearchHit[] hits = searchHits.getHits();
        //4.3 遍历
        for(SearchHit hit : hits){
            //获取文档source
            String json = hit.getSourceAsString();
            //反序列化
            HotelDoc hotelDoc = JSON.parseObject(json , HotelDoc.class);
            //获取高亮结果
            Map<String , HighlightField> highlightFields = hit.getHighlightFields();
            //根据字段名获取高亮结果
            HighlightField highlightField = hightlightFields.get("name");
            //获取高亮值
            String name = highlightField.getFragments()[0].string();
            //覆盖非高亮结果
            hotelDoc.setName(name);
        }
    }
  • 所有搜索DSL的构建,记住一个API:

    • SearchRequest的source()方法。
  • 高亮结果解析是参考JSON结果,逐层解析

算分控制

FunctionScoreQueryBuilder functionScoreQuery = QueryBuilders.functionScoreQuery(
	//原始查询,相关性算分的查询
    boolQuery ,
    new FunctionSocreQueryBuilder.FilterFunctionBuilder[]{
        new FunctionScoreQueryBuilder.FilterFunctionBuilder(
        	//过滤条件
            QueryBuilders.termQuery("isAd" , true),
            //算分函数
            ScoreFuntionBuilders.weightFactorFuntion(10)
            
        )
    }
);

数据聚合

聚合的分类

聚合(aggregations)可以实现对文档数据的统计、分析、运算。聚合常见的有三类:

  • 桶(Bucket)聚合:用来对文档做分组

    • TermAggregation:按照文档字段值分组
    • Date Histogram:按照日期阶梯分组,例如一周为一组,或者一月为一组
  • 度量(Metric)聚合:用以计算一些值,比如:最大值、最小值、平均值等

    • Avg:求平均值
    • Max:求最大值
    • Min:求最小值
    • Stats:同时求max、min、avg、sum等
  • 管道(pipeline)聚合:其他聚合的结果为基础做聚合

什么是聚合?

  • 聚合是对文档数据的统计、分析、计算

聚合的常见种类有哪些?

  • Bucket:对文档数据分组,并统计每组数量
  • Metric:对文档数据做计算,例如avg
  • Pipeline:基于其他聚合结果再做聚合

参与聚合的字段类型必须是:

  • keyword
  • 数值
  • 日期
  • 布尔

DSL实现Bucket聚合

GET /hotel/_search
{
    "size":0, // 设置size为0,结果中不包含文档,只包含聚合结果
    "aggs":{ // 定义聚合
        "brandAgg":{ // 给聚合起个名字
            "terms":{ // 聚合的类型,按照品牌值聚合,所以选择term
                "field":"brand", // 参与聚合的字段
                "size":20 // 希望获取的聚合结果数量
            }
        }
    }
}

aggs代表聚合,与query同级,此时query的作用是?

  • 限定聚合的文档范围

聚合必须的三要素:

  • 聚合名称
  • 聚合类型
  • 聚合手段

聚合可配置属性有:

  • size:指定聚合结果数量
  • order:指定聚合结果排序方式
  • field:指定聚合字段

DSL实现Metrics聚合

例如,我们要求获取每个品牌的用户评分的min、max、avg等值

GET /hotel/_search
{
 	"size":0,
    "aggs":{
    	"brandAgg":{
    		"terms":{
    			"field":"brand",
    			"size":20
    		},
    		"aggs":{// 是brands聚合的子聚合,也就是分组后对每组分别计算
    			"score_status":{ // 聚合名称
    				"stats":{ // 聚合类型,这里stats可以计算min、max、avg等
    					"field":"score"// 聚合字段,这里可以是score
    				}
    			}
    		}
    	}
    }
}

RestAPI实现聚合

@Test
    public void testAggregation() throws IOException {
        // 准备Request
        SearchRequest request = new SearchRequest("hotel");
        // 准备DSL
        // 设置size
        request.source().size(0);
        // 聚合
        request.source().aggregation(AggregationBuilders.terms("brandAgg").size(20).field("brand"));
        //发出请求
        SearchResponse response = restClient.search(request, RequestOptions.DEFAULT);
        //解析聚合结果
        Aggregations aggregations = response.getAggregations();
        //根据名称获取聚合结果
        Terms brandTerms = aggregations.get("brand_agg");
        //获取桶
        List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();
        for (Terms.Bucket bucket : buckets) {
            //获取key,也就是品牌信息
            String brandName = bucket.getKeyAsString();
            System.out.println(brandName);
        }
    }

实现对品牌、城市、星级的聚合

@Test
    public Map<String , List<String>> testMap() throws IOException {
        // 准备Request
        SearchRequest request = new SearchRequest("hotel");
        // 准备DSL
        // 设置size
        request.source().size(0);
        // 聚合
        request.source().aggregation(AggregationBuilders.terms("brandAgg").size(20).field("brand"));
        request.source().aggregation(AggregationBuilders.terms("cityAgg").size(20).field("city"));
        request.source().aggregation(AggregationBuilders.terms("starAgg").size(20).field("starName"));
        //发出请求
        SearchResponse response = restClient.search(request, RequestOptions.DEFAULT);
        //解析聚合结果
        Aggregations aggregations = response.getAggregations();
        Map<String , List<String>> result = new HashMap<>();
        result.put("brandAgg" , getAggByName("brandAgg" , aggregations));
        result.put("cityAgg" , getAggByName("cityAgg" , aggregations));
        result.put("starAgg" , getAggByName("starAgg" , aggregations));
        return result;
    }

    private List<String> getAggByName(String name , Aggregations aggregations){
        List<String> result = new ArrayList<>();
        Terms terms = aggregations.get(name);
        for (Terms.Bucket bucket : terms.getBuckets()) {
            String keyAsString = bucket.getKeyAsString();
            result.add(keyAsString);
        }
        return result;
    }

拼音分词器

如何使用拼音分词器?

下载pinyin分词器

解压并放到elasticsearch的plugin目录

重启

如何自定义分词器?

创建索引库时,在settings中配置,可以包含三部分

character filter

tokenizer

filter

拼音分词器注意事项?

为了避免搜索到同音字,搜索时不要使用拼音分词器

自动补全

elasticsearch提供了Completion Suggester查询来实现自动补全功能。这个查询会匹配以用户输入内容开头的词条并返回。为了提高补全查询的效率,对于文档中字段的类型有一些约束:

  • 参与补全查询的字段必须是completion类型
  • 字段的内容一般是用来补全的多个词条形成的数组
// 自动补全查询
GET /test2/_search
{
  "suggest":{
    "titleSuggest":{
      "text":"s",
      "completion":{
        "field":"title",
        "skip_duplicates":true,
        "size":10
      }
    }
  }
}

自动补全对字段的要求:

  • 类型是completion类型
  • 字段值是多词条的数组
@Test
public void testSuggest() throws IOException {
    // 准备Request
    SearchRequest request = new SearchRequest("hotel");
    // 准备DSL
    request.source().suggest(new SuggestBuilder().addSuggestion(
        "suggestions" , SuggestBuilders.completionSuggestion("suggestion")
    	.prefix("h")
    	.skipDuplicates(true)
    	.size(10)));
    // 发起请求
    SearchResponse response = client.search(request,RequestOptions.DEFAULT);
    // 解析结果
    Suggest suggest = response.getSuggest();
    // 根据名称获取补全结果
    CompletionSuggestion suggestion = suggest.getSuggestion("hotelSuggestion");
    //获取options并遍历
    for(CompletionSuggestion.Entry.Option option : suggestion.getOptions()){
        String text = option.getText().string();
        System.out.println(text);
    }
}

数据同步

  • 同步调用
    • 优点:实现简单,粗暴
    • 缺点:业务耦合度高
  • 异步通知
    • 优点:低耦合,实现难度一般
    • 缺点:依赖mq的可靠性
  • 监听binlog
    • 优点:完全解除服务间耦合
    • 缺点:开启binlog增加数据库负担、实现复杂度高

异步通知数据同步步骤:

  1. 定义config文件,声明队列和交换机bean,并绑定队列与交换机
  2. 在增加删除修改接口中发送mq消息到指定的增删改队列
  3. 定义监听器,监听mq消息并修改es文档

集群搭建

ES集群结构

单机的es做数据存储,必然面临两个问题:海量数据存储问题,单点故障问题

  • 海量数据存储问题:将索引库从逻辑上拆分N个分片(shard),存储到多个节点
  • 单点故障问题:将分片数据在不同节点备份(replica)

创建es集群

首先编写一个docker-compose.yml文件,内容如下:

version: '2.2'
services:
  es01:
    image: elasticsearch:7.12.1
    container_name: es01
    environment:
      - node.name=es01
      - cluster.name=es-docker-cluster
      - discovery.seed_hosts=es02,es03
      - cluster.initial_master_nodes=es01,es02,es03
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    volumes:
      - data01:/usr/share/elasticsearch/data
    ports:
      - 9200:9200
    networks:
      - elastic
  es02:
    image: elasticsearch:7.12.1
    container_name: es02
    environment:
      - node.name=es02
      - cluster.name=es-docker-cluster
      - discovery.seed_hosts=es01,es03
      - cluster.initial_master_nodes=es01,es02,es03
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    volumes:
      - data02:/usr/share/elasticsearch/data
    ports:
      - 9201:9200
    networks:
      - elastic
  es03:
    image: elasticsearch:7.12.1
    container_name: es03
    environment:
      - node.name=es03
      - cluster.name=es-docker-cluster
      - discovery.seed_hosts=es01,es02
      - cluster.initial_master_nodes=es01,es02,es03
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    volumes:
      - data03:/usr/share/elasticsearch/data
    networks:
      - elastic
    ports:
      - 9202:9200
volumes:
  data01:
    driver: local
  data02:
    driver: local
  data03:
    driver: local

networks:
  elastic:
    driver: bridge

es运行需要修改一些linux系统权限,修改/etc/sysctl.conf文件

vi /etc/sysctl.conf

添加下面的内容:

vm.max_map_count=262144

然后执行命令,让配置生效:

sysctl -p

通过docker-compose启动集群:

docker-compose up -d

集群状态监控

kibana可以监控es集群,不过新版本需要依赖es的x-pack 功能,配置比较复杂。

这里推荐使用cerebro来监控es集群状态,官方网址:https://github.com/lmenezes/cerebro

双击的cerebro.bat文件即可启动服务。

访问http://localhost:9000 即可进入管理界面:

在这里插入图片描述

输入你的elasticsearch的任意节点的地址和端口,点击connect即可:

在这里插入图片描述

绿色的条,代表集群处于绿色(健康状态)。

创建索引库

1)利用kibana的DevTools创建索引库

在DevTools中输入指令:

PUT /itcast
{
  "settings": {
    "number_of_shards": 3, // 分片数量
    "number_of_replicas": 1 // 副本数量
  },
  "mappings": {
    "properties": {
      // mapping映射定义 ...
    }
  }
}
2)利用cerebro创建索引库

利用cerebro还可以创建索引库:

在这里插入图片描述

填写索引库信息:

在这里插入图片描述

点击右下角的create按钮:

在这里插入图片描述

查看分片效果

回到首页,即可查看索引库分片效果:

在这里插入图片描述

ES集群中的节点角色

elasticsearch中集群节点有不同的职责划分:

在这里插入图片描述

ES脑裂

默认情况下,每个节点都是master eligible节点,因此一旦master节点宕机,其他候选节点会选举一个称为主节点。当主节点与其他节点网络故障时,可能发生脑裂问题。

为了避免脑裂,需要要求选票超过(eligible节点数量+1)/2才能当选为主,因此eligible节点数量最好是奇数。对应配置项是discovery.zen.minimux_master_nodes,在es7.0以后,已经成为默认配置,因此一般不会发生脑裂问题。

  • master eligible结点的作用?
    • 参与集群选主
    • 主节点可以管理集群状态、管理分片信息、处理创建和删除索引库的请求
  • data结点的作用?
    • 数据的CRUD
  • coordinator结点的作用?
    • 路由请求到其他节点
    • 合并查询到的结果,返回给用户

ES集群的分布式存储

当新增文档时,应该保存到不同的分片,保证数据均衡,那么coordinating node如何确定数据该存储到哪个分片呢?elasticsearch会通过hash算法来计算文档应该存储到哪个分片:

shard = hash(_routing) % number_of_shards

说明:

  • _routing默认是文档的id
  • 算法与分片数量有关,因此索引库一旦创建,分片数量不能修改!

ES集群的分布式查询

elasticsearch的查询分为两个阶段:

  • scatter phase:分散阶段,coordinating node会把请求分发到每一个分片
  • gather phase:聚集阶段,coordinating node汇总data node的搜索结果,并处理为最终结果集返回给用户

ES集群的故障转移

集群的master节点会监控集群中的节点状态,如果发现有节点宕机,会立即将宕机节点的分片数据迁移到其他节点,确保数据安全,这个叫做故障转移

相关推荐

  1. ElasticSearch高可用

    2024-05-25 23:48:49       35 阅读
  2. ElasticsearchElasticsearch详细手册

    2024-05-25 23:48:49       39 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-05-25 23:48:49       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-05-25 23:48:49       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-05-25 23:48:49       18 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-05-25 23:48:49       20 阅读

热门阅读

  1. 怎样理解 Vue 的单项数据流

    2024-05-25 23:48:49       9 阅读
  2. CS144 Lab Checkpoint 5: down the stack (the network interface)

    2024-05-25 23:48:49       11 阅读
  3. vue富文本层级高

    2024-05-25 23:48:49       9 阅读
  4. 信息系统管理工程师问答题

    2024-05-25 23:48:49       9 阅读
  5. 量子计算在科技浪潮中的引领作用

    2024-05-25 23:48:49       9 阅读
  6. LeetCode399触发求值

    2024-05-25 23:48:49       10 阅读
  7. MySQL和MongoDB数据库的区别

    2024-05-25 23:48:49       9 阅读
  8. Python——字典数据存入excel

    2024-05-25 23:48:49       10 阅读