1 | 作者: 夜泊1990 |
第一章 ES简介
第1节 ES介绍
1 | 1、Elasticsearch是一个基于Lucene的搜索服务器 |
第2节 ES版本
- 版本历史
1
ES : 1.x ---> 2.x ---> 5.x ---> 6.x --->7.x(目前)
- 版本选择
1
在版本选择一般选择5.x版本以上,我们本课程的学习使用6.x版本,低版本会随着官网的不断推动,在未来可能就不维护了,所以在选择的时候要尽量选择目前来说一个长期的稳定版本
第二章 安装(v6.3.2)
第1节 window下单点安装ES
- 下载地址
1
2
3
4
5
6官网地址:
https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.3.1.zip
华为镜像站:
https://mirrors.huaweicloud.com/elasticsearch/6.3.2/elasticsearch-6.3.2.zip - 安装步骤
1
2
31、将下载的安装包解压到指定目录下(不能有中文)
2、找到目录下的bin目录(比如: D:\soft\elasticsearch-6.3.2\bin)
3、bin目录下有一个[elasticsearch.bat]脚本命令,直接鼠标双击即可,启动ES服务 - 访问
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19ES服务的默认端口号为 9200,在浏览器访问 http://localhost:9200/ 浏览器就会输出下面信息,安装成功
{
"name" : "G7lh1eQ",
"cluster_name" : "elasticsearch",
"cluster_uuid" : "4T38-ecWQ5O8OpIM3RibAw",
"version" : {
"number" : "6.3.2",
"build_flavor" : "default",
"build_type" : "zip",
"build_hash" : "053779d",
"build_date" : "2018-07-20T05:20:23.451332Z",
"build_snapshot" : false,
"lucene_version" : "7.3.1",
"minimum_wire_compatibility_version" : "5.6.0",
"minimum_index_compatibility_version" : "5.0.0"
},
"tagline" : "You Know, for Search"
}
第2节 window下安装Head插件
2.1 Head插件介绍
1 | 是一款能连接ES,并提供对ES进行操作的可视化界面 |
2.2 Head插件下载地址
1 | https://github.com/mobz/elasticsearch-head/archive/master.zip |
2.3 安装步骤
1 | 1. 电脑首先要安装node.js,因为安装Head插件需要使用npm命令 |
2.4 Head插件和ES关联配置
1 | 因为两个服务之间可能ip地址和端口号不同造成跨域的问题,所以需要配置可以跨域 |
- 修改ES的配置文件
1
2
3
4
5
6
7
81、因为ES和Head插件之间是两个不同的服务,所以需要进行配置
- 1.1 修改ES的配置,找到我们的ES配置文件 elasticsearch-6.3.2\config\elasticsearch.yml
- 1.2 在文件的最后一行设置一个可以进行跨域访问的属性
http:
cors:
enabled: true
allow-origin: "*"
- 1.3 重启ES服务,重启Head服务
第3节 window ES 集群安装
3.1 集群配置步骤
1 | 1. 创建一个elasticsearch-cluster文件夹 |
3.2 配置文件编写
- elasticsearch-node01节点配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17# 集群名称,保证唯一
cluster.name: elasticsearch
# 节点名称,每个节点不能相同
node.name: node-01
# 本机的ip地址
network.host: 127.0.0.1
# 服务端口号,在同一机器下端口号不能相同
http.port: 9200
# 集群间通信端口号,在同一机器下端口号不能相同
transport.tcp.port: 9300
# 设置集群自动发现机器ip集合
discovery.zen.ping.unicast.hosts: ["127.0.0.1:9300","127.0.0.1:9301","127.0.0.1:9302"]
# 跨域调用
http:
cors:
enabled: true
allow-origin: "*" - elasticsearch-node02节点配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17# 集群名称,保证唯一
cluster.name: elasticsearch
# 节点名称,每个节点不能相同
node.name: node-02
# 本机的ip地址
network.host: 127.0.0.1
# 服务端口号,在同一机器下端口号不能相同
http.port: 9201
# 集群间通信端口号,在同一机器下端口号不能相同
transport.tcp.port: 9301
# 设置集群自动发现机器ip集合
discovery.zen.ping.unicast.hosts: ["127.0.0.1:9300","127.0.0.1:9301","127.0.0.1:9302"]
# 跨域调用
http:
cors:
enabled: true
allow-origin: "*" - elasticsearch-node03节点配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17# 集群名称,保证唯一
cluster.name: elasticsearch
# 节点名称,每个节点不能相同
node.name: node-03
# 本机的ip地址
network.host: 127.0.0.1
# 服务端口号,在同一机器下端口号不能相同
http.port: 9202
# 集群间通信端口号,在同一机器下端口号不能相同
transport.tcp.port: 9302
# 设置集群自动发现机器ip集合
discovery.zen.ping.unicast.hosts: ["127.0.0.1:9300","127.0.0.1:9301","127.0.0.1:9302"]
# 跨域调用
http:
cors:
enabled: true
allow-origin: "*"1
2
3
4
5
6重启各个节点
注意: 如果在重启各个节点时出现点击启动脚本闪退的问题,可能造成的原因是配置文件需要使用UTF-8编码的格式编写
解决方式,使用notepad++修改当前配置文件的编码格式为utf-8
重启Head服务
1 | 1. 图中黑色的粗框代表这分片 |
第三章 基础概念
第1节 索引
1 | 含有相同属性的文档集合,相当于我们MYSQL数据库中的数据库(DATABASE) |
第2节 类型
1 | 索引可以定义一个或多个类型(ES6中已经逐步弃用,在更高的7版本中已经删除),文档必须属于一个类型,相当于我们MYSQL中的一个表(TABLE) |
第3节 文档
1 | 文档是可以被索引的基本数据单位,相当于MYSQL数据库表中的一行记录 |
第4节 分片/副本(备份)
- 分片(shard)
1
2
3
4
51. 每个索引有一个或多个分片
2. 索引的数据被分配到各个分片上,相当于一桶水使用多个杯子去装,分片有助于横向扩展
3. 如果将大量的数据保存到一个分片里面,那么分片越来越大数据越来越多,查找性能越来越差
4. 默认情况下一个索引创建5个分片
5. 分片会默认的分配到es集群的各个节点上,集群自动完成 - 副本(replica)
1
2
31. 副本,可以理解为分片的备份
2. 主分片和副本(备分片)不会出现在同一个节点上(防止单点故障)默认情况下一个索引创建5个分片一个备份(5个主分片+5个副本分片=10个分片)
3. 如果只有一个es节点,那么replica就无法分配(因为主/副不能同在一个节点上),此时cluster status会变成Yellow
第四章 基本用法
第1节 ES 的 RESTFul API
- API的基本格式
1
http://<ip>:<port>/<索引>/<类型>/<文档id>
- 常用的HTTP动作
1
GET/POST/PUT/DELETE
第2节 创建索引
2.1 使用Head插件创建索引
- 创建索引的步骤
1
2
3
4
51. 登陆Head页面
2. 找到索引标签
3. 新建索引
4. 添加索引名称(英文,全部都是小写字母,不能用下划线开头)
5. 索引创建成功之后回到概览页面就会在上面发现我们新建的索引 - Head页面展示
2.2 结构化索引/非结构化索引
- 结构化和非结构化介绍
1 | 进入我们的Head界面,进入概览,点击索引名称下面的信息,点击索引信息,查看数据里面有一个mappings属性,内容为空,说明我们此索引为非结构化索引 |
- 结构化和非结构化转化
1
21、在当前的head页面中点击【复合查询】
2、调用ES的API构建一个请求url1
我这里给book索引添加一个author类型,使用_mappings关键字(ES给索引、类型、文档等起名字时不能以下划线开头,因为ES中很多关键字都是以下划线开头的)
1 | 3、给上面的非结构化索引添加一些结构化的数据,让他变成结构化索引 |
2.3 使用postman创建索引
1 | Head在进行结构化数据创建的时候,验证json数据格式比较麻烦,我们可以使用postman进行结构化数据创建 |
- 使用postman创建索引
- postman创建索引的请求数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32{
"settings":{
"number_of_shards":5,
"number_of_replicas":1
},
"mappings":{
"man":{
"properties":{
"name":{
"type":"text"
},
"country":{
"type":"keyword"
},
"age":{
"type":"integer"
},
"date":{
"type":"date",
"format":"yyyy-MM-dd HH:mm:ss || yyyy-MM-dd || epoch_millis"
}
}
}
}
}
-----------------------------参数变量的简单介绍----------------------------------
settings : 设置当前索引的基本信息
mappings : 结构化索引配置
format : 时间格式
epoch_millis : 时间戳(毫秒数) - head页面查看创建结果
第3节 数据插入(postman)
3.1 指定id插入
- 请求地址
1
http://localhost:9200/people/man/1
- 请求图例
- 添加数据
1
2
3
4
5
6{
"name":"枫桥夜泊1990",
"country":"中国",
"age":30,
"date":"1990-05-28"
} - 返回结果数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14{
"_index": "people",
"_type": "man",
"_id": "1",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 0,
"_primary_term": 1
} - head查看
3.2 自动生成id插入
- 请求地址
1
http://localhost:9200/people/man/
- 请求图例
- head查看
第4节 数据修改(postman)
4.1 直接修改文档
- 请求地址
1
http://localhost:9200/people/man/1/_update
- 请求参数
1
2
3
4
5{
"doc":{
"name":"我是枫桥夜泊1990"
}
} - 请求图例
- 响应数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14{
"_index": "people",
"_type": "man",
"_id": "1",
"_version": 2,
"result": "updated",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 2,
"_primary_term": 1
} - head查看
4.2 使用脚本修改文档(了解)
4.2.1 Painless简介
1 | 1. Painless 是默认情况下 Elasticsearch 中提供的一种简单,安全的脚本语言 |
4.2.2 Painless使用
- 请求地址
1
http://localhost:9200/people/man/1/_update
- 请求参数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20方式一:
{
"script":{
"lang":"painless",
"inline":"ctx._source.age -=10"
}
}
方式二:
{
"script":{
"lang":"painless",
"inline":"ctx._source.age = params.age",
"params":{
"age":21
}
}
} - 请求图例
第5节 数据删除(postman)
5.1 删除文档(postman)
- 请求地址
1
http://localhost:9200/people/man/1
- 请求图例
5.2 删除索引(postman)
- 请求地址
1
http://localhost:9200/people
- 请求图例
5.3 使用head删除索引
- 图例演示
第五章 高级查询(Kibana)
第1节 Kibana安装
1.1 Kibana简介
1 | Kibana是一个针对Elasticsearch的开源分析及可视化平台,用来搜索、查看交互存储在Elasticsearch索引中的数据.使用Kibana,可以通过各种图表进行高级数据分析及展示 |
1.2 Kibana下载
- 官网地址
1
https://www.elastic.co/cn/kibana
- 华为云地址
1
https://mirrors.huaweicloud.com/kibana/6.3.2/kibana-6.3.2-windows-x86_64.zip
1.3 Kibana安装
1 | 1、将下载的kibana-6.3.2-windows-x86_64.zip压缩包解压到指定文件夹下 |
第2节 Kibana使用
2.1 基本操作
2.1.1 查看集群健康状态
1 | GET /_cat/health?v |
2.1.2 查看节点状态
1 | GET /_cat/nodes?v |
2.1.3 查看索引信息
1 | GET /_cat/indices?v |
2.1.4 创建索引
1 | 1. 命令 PUT /book |
2.1.5 删除索引
1 | 1. 命令 DELETE /book |
2.1.6 向文档中添加数据
1 | PUT /book/doc/1 |
2.1.7 查看文档类型
1 | GET /book/doc/_mapping |
2.1.8 查看索引中的文档(根据ID查询)
1 | GET /book/doc/1 |
2.1.9 更新索引中的文档
1 | POST /book/doc/1/_update |
2.1.10 删除索引中的文档
1 | DELETE /book/doc/2 |
2.1.11 对索引中的文档进行批量操作
1 | POST /book/doc/_bulk |
2.2 高级查询
2.2.1 数据导入
- 数据地址
1
https://github.com/macrozheng/mall-learning/blob/master/document/json/accounts.json
- 导入命令(批量)
1
2POST /bank/account/_bulk
//这里放上面的数据地址中的数据
2.2.2 高级查询
2.2.2.1 简单搜索
- 搜索全部(match_all)
1
2
3
4GET /bank/_search
{
"query": { "match_all": {} }
} - 分页搜索(from表示偏移量,从0开始,size表示每页显示的数量)
1
2
3
4
5
6GET /bank/_search
{
"query": { "match_all": {} },
"from": 0,
"size": 10
} - 搜索排序(sort)
1
2
3
4
5
6
7GET /bank/_search
{
"query": { "match_all": {} },
"sort": { "balance": { "order": "desc" } }
}
使用sort关键字 以balance字段排序,设置order属性的值为desc倒叙输出 - 搜索并返回指定字段内容(_source)
1
2
3
4
5
6
7GET /bank/_search
{
"query": { "match_all": {} },
"_source": ["account_number", "balance"]
}
_source: 将要查询出来的属性设置到_source数组中
2.2.2.2 条件搜索(match)
- 搜索出account_number为20的文档
1
2
3
4
5
6
7
8GET /bank/_search
{
"query": {
"match": {
"account_number": 20
}
}
} - 搜索address字段中包含mill的文档
1
2
3
4
5
6
7
8
9
10
11
12GET /bank/_search
{
"query": {
"match": {
"address": "mill"
}
},
"_source": [
"address",
"account_number"
]
}注意: 对于数值类型match操作使用的是精确匹配,对于文本类型使用的是模糊匹配
2.2.2.3 短语匹配搜索(match_phrase)
- 搜索address字段中同时包含mill和lane的
1
2
3
4
5
6
7
8GET /bank/_search
{
"query": {
"match_phrase": {
"address": "mill lane"
}
}
}
2.2.2.4 组合搜索(bool)
- 同时满足(must)
- 搜索address字段中同时包含mill和lane的文档
1
2
3
4
5
6
7
8
9
10
11GET /bank/_search
{
"query": {
"bool": {
"must": [
{ "match": { "address": "mill" } },
{ "match": { "address": "lane" } }
]
}
}
}
- 搜索address字段中同时包含mill和lane的文档
- 满足其中任意一个(should)
- 搜索address字段中包含mill或者lane的文档
1
2
3
4
5
6
7
8
9
10
11GET /bank/_search
{
"query": {
"bool": {
"should": [
{ "match": { "address": "mill" } },
{ "match": { "address": "lane" } }
]
}
}
}
- 搜索address字段中包含mill或者lane的文档
- 同时不满足(must_not)
- 搜索address字段中不包含mill且不包含lane的文档
1
2
3
4
5
6
7
8
9
10
11GET /bank/_search
{
"query": {
"bool": {
"must_not": [
{ "match": { "address": "mill" } },
{ "match": { "address": "lane" } }
]
}
}
}
- 搜索address字段中不包含mill且不包含lane的文档
- 满足其中一部分,并且不包含另一部分(组合must和must_not)
- 搜索age字段等于40且state字段不包含ID的文档
1
2
3
4
5
6
7
8
9
10
11
12
13GET /bank/_search
{
"query": {
"bool": {
"must": [
{ "match": { "age": "40" } }
],
"must_not": [
{ "match": { "state": "ID" } }
]
}
}
}
- 搜索age字段等于40且state字段不包含ID的文档
2.2.2.5 过滤搜索(filter)
- 过滤出balance字段在20000~30000的文档
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16GET /bank/_search
{
"query": {
"bool": {
"must": { "match_all": {} },
"filter": {
"range": {
"balance": {
"gte": 20000,
"lte": 30000
}
}
}
}
}
}
2.2.2.6 搜索聚合(aggs)
- 对state字段进行聚合,统计出相同state的文档数量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16GET /bank/_search
{
"aggs":{
"group_by_state":{
"terms":{
"field":"state.keyword"
}
}
}
}
aggs: 分组聚合关键字
group_by_state: 聚合条件名字,自定义
terms : 关键字,查询某个字段里含有多个关键词的文档
keyword: state是一个关键字类型 - 对balance取平均值
1
2
3
4
5
6
7
8
9
10
11GET /bank/_search
{
"size": 0,
"aggs":{
"avg_balance":{
"avg":{
"field": "balance"
}
}
}
} - 对state字段进行聚合,统计出相同state的文档数量和balance平均值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16GET /bank/_search
{
"size": 0,
"aggs":{
"group_by_state":{
"terms":{
"field":"state.keyword"
}
},
"avg_balance":{
"avg":{
"field": "balance"
}
}
}
}
具体ES的聚合函数请参考官方文档这里不再讲解
第六章 ES和SpringBoot整合(Spring-data版本使用)
第1节 分词器介绍
1.1 分词器的作用
1 | 将原始内容进行拆分,将一段话拆分成单词或者一个一个的字,或者语义单元 |
1.2 常见分词器
- standars
1
ES默认分词器,将词汇单元转成小写,取出一些停用词和标点符号,支持中文,将中文拆分成单个的字
- IK分词器
1
一个可以很好的支持中文,并且可以自定义的开源分词器
第2节 standars分词器演示(kibana工具)
2.1 演示英文
- 命令
1
2
3
4
5POST _analyze
{
"analyzer": "standard",
"text":"Hello Java"
} - 结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20将Hello Java分成了两个词分别是hello和java,首字母都变成了小写
{
"tokens": [
{
"token": "hello",
"start_offset": 0,
"end_offset": 5,
"type": "<ALPHANUM>",
"position": 0
},
{
"token": "java",
"start_offset": 6,
"end_offset": 10,
"type": "<ALPHANUM>",
"position": 1
}
]
}
2.2 演示中文
- 命令
1
2
3
4
5POST _analyze
{
"analyzer": "standard",
"text":"我是中国人"
} - 结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39{
"tokens": [
{
"token": "我",
"start_offset": 0,
"end_offset": 1,
"type": "<IDEOGRAPHIC>",
"position": 0
},
{
"token": "是",
"start_offset": 1,
"end_offset": 2,
"type": "<IDEOGRAPHIC>",
"position": 1
},
{
"token": "中",
"start_offset": 2,
"end_offset": 3,
"type": "<IDEOGRAPHIC>",
"position": 2
},
{
"token": "国",
"start_offset": 3,
"end_offset": 4,
"type": "<IDEOGRAPHIC>",
"position": 3
},
{
"token": "人",
"start_offset": 4,
"end_offset": 5,
"type": "<IDEOGRAPHIC>",
"position": 4
}
]
}
第3节 ik分词器安装和使用
1 | medcl/elasticsearch-analysis-ik分词器是当前对中文支持最好的分词器 |
- IK分词器地址
1
2
3https://github.com/medcl/elasticsearch-analysis-ik/archive/v6.3.2.zip
下载的分词器一定要和当前安装的ES版本相同 - 安装步骤
1
2
31. 将下载的分词器压缩包复制到我们的es的安装目录D:\soft\elasticsearch-6.3.2\plugins插件目录下
3. 将elasticsearch-analysis-ik解压
4. 重启ES服务
第4节 ik分词器演示(kibana工具)
- 命令
1
2
3
4
5
6
7
8
9
10
11
12
13POST _analyze
{
"analyzer": "ik_smart",
"text":"我是中国人"
}
-----------------
POST _analyze
{
"analyzer": "ik_max_word",
"text":"我是中国人"
} - 返回效果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71ik_smart分词器效果
{
"tokens": [
{
"token": "我",
"start_offset": 0,
"end_offset": 1,
"type": "CN_CHAR",
"position": 0
},
{
"token": "是",
"start_offset": 1,
"end_offset": 2,
"type": "CN_CHAR",
"position": 1
},
{
"token": "中国人",
"start_offset": 2,
"end_offset": 5,
"type": "CN_WORD",
"position": 2
}
]
}
---------------------
ik_max_word分词器效果
{
"tokens": [
{
"token": "我",
"start_offset": 0,
"end_offset": 1,
"type": "CN_CHAR",
"position": 0
},
{
"token": "是",
"start_offset": 1,
"end_offset": 2,
"type": "CN_CHAR",
"position": 1
},
{
"token": "中国人",
"start_offset": 2,
"end_offset": 5,
"type": "CN_WORD",
"position": 2
},
{
"token": "中国",
"start_offset": 2,
"end_offset": 4,
"type": "CN_WORD",
"position": 3
},
{
"token": "国人",
"start_offset": 3,
"end_offset": 5,
"type": "CN_WORD",
"position": 4
}
]
}
第5节 ES和SpringBoot整合
5.1 创建SpringBoot项目
1 | 添加依赖,依赖在springboot-data中 |
5.2 配置
1 | # 设置ES的节点地址 |
5.3 常见的操作
5.3.1 映射ES文档的实体类
1 | /** |
5.3.2 操作ES的接口
1 | public interface EsAccountRepository extends ElasticsearchRepository<EsAccount,Long> { |
5.3.3 单元测试操作
- 常见操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237/**
* @Author 枫桥夜泊1990
* @BLOG https://hd1611756908.github.io/
* @BSITE https://space.bilibili.com/514155929/
* @DATE 2020/8/17
*/
public class EsAccountRepositoryTest extends SpringbootEs01ApplicationTests {
@Autowired
private EsAccountRepository esAccountRepository;
/**
* 测试findAll
*/
@Test
public void findAll1(){
Iterable<EsAccount> esAccounts = esAccountRepository.findAll();
//esAccounts.forEach(System.out::println);
//esAccounts.forEach((x)->{System.out.println(x);});
Iterator<EsAccount> iterator = esAccounts.iterator();
while (iterator.hasNext()){
System.out.println(iterator.next());
}
}
/**
* 测试添加一条数据
* 更新数据,更新和添加主要判断是否有id
*/
@Test
public void saveAccount(){
EsAccount account = new EsAccount();
account.setAccount_number(4570L);
account.setAddress("北京");
account.setAge(18L);
account.setBalance(1000000L);
account.setCity("中国");
account.setEmail("123@.com");
account.setEmployer("李雷");
account.setFirstname("李");
account.setGender("男");
account.setId(4570L);
account.setLastname("李雷");
account.setState("NL");
esAccountRepository.save(account);
}
/**
* 根据ID查询account
*/
@Test
public void findAccountById(){
Optional<EsAccount> esAccount = esAccountRepository.findById(4570L);
System.out.println(esAccount.get());
}
/**
* 根据ID删除
*/
@Test
public void deleteAccount(){
esAccountRepository.deleteById(4570L);
}
/**
* 更新
*/
@Test
public void updateAccount(){
Optional<EsAccount> esAccount = esAccountRepository.findById(456L);
EsAccount account = esAccount.get();
account.setLastname("李雷super");
esAccountRepository.save(account);
}
/**
* 根据Lastname查询(模糊查询)
*/
@Test
public void findByLastname(){
List<EsAccount> accountList = esAccountRepository.findByLastname("李雷");
System.out.println(accountList);
}
/**
* 根据地址查询(模糊查询)
*/
@Test
public void findByAddress(){
List<EsAccount> accounts = esAccountRepository.findByAddress("京");
System.out.println(accounts);
}
/**
* 分页查询
*/
@Test
public void findPage(){
Pageable p = PageRequest.of(0, 5);
Page<EsAccount> accounts = esAccountRepository.findAll(p);
for (EsAccount account : accounts) {
System.out.println(account);
}
}
/**
* 排序分页查询
*/
@Test
public void findPageMult(){
Sort sort = Sort.by(Sort.Direction.DESC, "account_number");
Pageable p = PageRequest.of(0, 5, sort);
Page<EsAccount> accounts = esAccountRepository.findByAddress("Place", p);
for (EsAccount account : accounts) {
System.out.println(account);
}
}
/**
* 复杂查询,采用search方法
* 构建条件查询 使用match语法
* 过滤查询
*/
@Test
public void testMatch01(){
//单个条件
// QueryBuilder query = QueryBuilders.matchQuery("account_number", 20);
//多个条件
NativeSearchQueryBuilder builder = new NativeSearchQueryBuilder();
NativeSearchQueryBuilder queryBuilder = builder
.withQuery(QueryBuilders.matchQuery("account_number", 20))
.withQuery(QueryBuilders.matchQuery("firstname","Elinor"));
NativeSearchQuery build = queryBuilder.build();
Iterable<EsAccount> accounts = esAccountRepository.search(build);
for (EsAccount account : accounts) {
System.out.println(account);
}
}
/**
* 复杂查询,采用search方法
* 构建条件查询 使用match语法
* 短语匹配搜索
*/
@Test
public void testMatchPhrase(){
NativeSearchQueryBuilder builder = new NativeSearchQueryBuilder();
NativeSearchQueryBuilder queryBuilder = builder.withQuery(QueryBuilders.matchPhraseQuery("address", "mill lane"));
NativeSearchQuery searchQuery = queryBuilder.build();
Page<EsAccount> accountPage = esAccountRepository.search(searchQuery);
//accountPage.forEach(System.out::println);
//accountPage.forEach((x)->{System.out.println(x);});
//获取迭代器
Iterator<EsAccount> iterator = accountPage.iterator();
while(iterator.hasNext()){
System.out.println(iterator.next());
}
}
/**
* 组合搜索(bool)
* 同时满足(must)
*/
@Test
public void testMust(){
NativeSearchQueryBuilder builder = new NativeSearchQueryBuilder();
builder.withQuery(QueryBuilders.boolQuery().must(QueryBuilders.matchQuery("address","mill")).must(QueryBuilders.matchQuery("address","lane")));
NativeSearchQuery query = builder.build();
Page<EsAccount> accounts = esAccountRepository.search(query);
for (EsAccount account : accounts) {
System.out.println(account);
}
}
/**
* 满足其中任意一个(should)
*/
@Test
public void testShould(){
NativeSearchQueryBuilder builder = new NativeSearchQueryBuilder();
builder.withQuery(QueryBuilders.boolQuery().should(QueryBuilders.matchQuery("address","lane")).should(QueryBuilders.matchQuery("address","mill")));
NativeSearchQuery searchQuery = builder.build();
Page<EsAccount> accounts = esAccountRepository.search(searchQuery);
for (EsAccount account : accounts) {
System.out.println(account);
}
}
/**
* 同时不满足 must_not
*/
@Test
public void testMustNot(){
NativeSearchQueryBuilder builder = new NativeSearchQueryBuilder();
builder.withQuery(QueryBuilders.boolQuery().mustNot(QueryBuilders.matchQuery("address","mill")).mustNot(QueryBuilders.matchQuery("address","lane")));
NativeSearchQuery query = builder.build();
Page<EsAccount> search = esAccountRepository.search(query);
for (EsAccount esAccount : search) {
System.out.println(esAccount);
}
}
/**
* 满足其中一部分,并且不包含另一部分(组合must和must_not)
*/
@Test
public void testMustAndNotMust(){
NativeSearchQueryBuilder builder = new NativeSearchQueryBuilder();
builder.withQuery(QueryBuilders.boolQuery().mustNot(QueryBuilders.matchQuery("state","ID")).must(QueryBuilders.matchQuery("age","40")));
NativeSearchQuery query = builder.build();
Page<EsAccount> accounts = esAccountRepository.search(query);
for (EsAccount account : accounts) {
System.out.println(account);
}
}
/**
* 过滤搜索 filter
* 过滤出balance字段在20000~30000的文档
*/
@Test
public void testFilter(){
NativeSearchQueryBuilder builder = new NativeSearchQueryBuilder();
builder.withFilter(QueryBuilders.boolQuery().filter(QueryBuilders.rangeQuery("balance").gte(20000).lte(30000)));
NativeSearchQuery query = builder.build();
Page<EsAccount> accounts = esAccountRepository.search(query);
for (EsAccount account : accounts) {
System.out.println(account);
}
}
} - 分组聚合
1
2
3
4
5
6
7
8
9Elasticsearch有一个功能叫做聚合(aggregations),它允许你在数据上生成复杂的分析统计。它很像SQL中的 GROUP BY 但是功能更强大.
Elasticsearch的聚合中声明了两个概念如下:
-- Buckets(桶):满足某个条件的文档集合。
-- Metrics(指标):为某个桶中的文档计算得到的统计信息
我们可以把类似于数据库中的COUNT(*) 看成一个指标
将 GROUP BY 看成一个桶1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90@Autowired
private ElasticsearchTemplate elasticsearchTemplate;
/**
* 分组聚合 ElasticsearchTemplate
* 单个分组聚合
* 对state字段进行聚合,统计出相同state的文档数量
*/
@Test
public void group01(){
NativeSearchQueryBuilder builder = new NativeSearchQueryBuilder();
//统计state字段相同数据出现的次数
TermsAggregationBuilder field = AggregationBuilders.terms("group_by_state").field("state.keyword");
//查询全部
builder.withQuery(QueryBuilders.matchAllQuery());
//添加聚合条件
builder.addAggregation(field);
//构建
NativeSearchQuery query = builder.build();
//查询,采用ElasticsearchTemplate
Aggregations aggregations = elasticsearchTemplate.query(query, new ResultsExtractor<Aggregations>() {
@Override
public Aggregations extract(SearchResponse response) {
return response.getAggregations();
}
});
//转换成map集合
Map<String, Aggregation> aggregationMap = aggregations.asMap();
//获取响应的聚合子类group_by_state
StringTerms groupByState = (StringTerms) aggregationMap.get("group_by_state");
//获取所有的桶
List<StringTerms.Bucket> buckets = groupByState.getBuckets();
Iterator<StringTerms.Bucket> iterator = buckets.iterator();
while (iterator.hasNext()){
StringTerms.Bucket bucket = iterator.next();
System.out.println(bucket.getKeyAsString());
System.out.println(bucket.getDocCount());
}
}
/**
* 分组聚合 ElasticsearchTemplate
* 多个分组聚合
* 统计出相同state的文档数量,再统计出balance的平均值
*/
@Test
public void group02(){
NativeSearchQueryBuilder builder = new NativeSearchQueryBuilder();
//统计state字段相同数据出现的次数
TermsAggregationBuilder field = AggregationBuilders.terms("group_by_state").field("state.keyword");
//统计出balance的平均值
AvgAggregationBuilder field2 = AggregationBuilders.avg("avg_balance").field("balance");
//查询全部
builder.withQuery(QueryBuilders.matchAllQuery());
//添加聚合条件
builder.addAggregation(field);
builder.addAggregation(field2);
//构建
NativeSearchQuery query = builder.build();
//查询,采用ElasticsearchTemplate
Aggregations aggregations = elasticsearchTemplate.query(query, new ResultsExtractor<Aggregations>() {
@Override
public Aggregations extract(SearchResponse response) {
return response.getAggregations();
}
});
//转换成map集合
Map<String, Aggregation> aggregationMap = aggregations.asMap();
//获取响应的聚合子类group_by_state
StringTerms groupByState = (StringTerms) aggregationMap.get("group_by_state");
//获取响应的聚合子类avg_balance
InternalAvg avgBalance = (InternalAvg) aggregationMap.get("avg_balance");
//获取所有的桶
List<StringTerms.Bucket> buckets = groupByState.getBuckets();
Iterator<StringTerms.Bucket> iterator = buckets.iterator();
while (iterator.hasNext()){
StringTerms.Bucket bucket = iterator.next();
System.out.println(bucket.getKeyAsString());
System.out.println(bucket.getDocCount());
}
//获取指标
double value = avgBalance.getValue();
System.out.println("==="+value);
}
第七章 ES和MYSQL数据同步
第1节 logstash简介
Logstash是一款开源的数据收集引擎,具备实时管道处理能力。简单来说,logstash作为数据源与数据存储分析工具之间的桥梁,结合ElasticSearch以及Kibana,能够极大方便数据的处理与分析。通过200多个插件,logstash可以接受几乎各种各样的数据。包括日志、网络请求、关系型数据库、传感器或物联网等等
第2节 logstash下载(6.3.2)
- 官网地址
1
https://www.elastic.co/cn/downloads/logstash
- 华为镜像站
1
https://mirrors.huaweicloud.com/logstash/
第3节 logstash配置(简单配置-单表/多表)
1 | logstash工作时,主要设置3个部分的工作属性。 |
安装/配置步骤(使用单个表生成索引)
1
2
31. 将下载的logstash-6.3.2.zip文件解压到指定位置
2. 创建一个配置文件名字自定义,我命名为 mysql.conf,将自定义配置文件放到指定位置(我放到了D:\soft\logstash-6.3.2\config文件夹下)
3. 在mysql.conf中添加配置信息1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34input{
jdbc{
# 数据库驱动地址
jdbc_driver_library => "D:\\soft\\logstash-6.3.2\\lib\\mysql-connector-java-5.1.47.jar"
# 数据库驱动类名
jdbc_driver_class => "com.mysql.jdbc.Driver"
# 数据库连接地址
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/gn_oa"
# 数据库用户名
jdbc_user => "root"
# 数据库密码
jdbc_password => "root"
# 定时任务
schedule => "* * * * *"
# 生成索引的数据来源
statement => "SELECT * FROM department"
}
}
output{
elasticsearch{
# 集群地址
hosts => ["127.0.0.1:9200","127.0.0.1:9201","127.0.0.1:9202"]
# 索引名称
index => "department"
# 生成根据dept_id,生成ES的 _id
document_id => "%{dept_id}"
# 类型
document_type => "dept_doc"
}
}
4. 启动logstash服务,使用我们自定义的配置启动
-- 使用cmd命令行定位到当前logstash目录的bin目录下使用命令为: logstash -f ../config/mysql.conf 启动服务
5. 等待索引创建完成(这种方式为全量方式)安装/配置步骤(使用多张表生成索引)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71input{
# 第1张表
jdbc{
# 设置当前jdbc的type
type => "department"
# 数据库驱动地址
jdbc_driver_library => "D:\\soft\\logstash-6.3.2\\lib\\mysql-connector-java-5.1.47.jar"
# 数据库驱动类名
jdbc_driver_class => "com.mysql.jdbc.Driver"
# 数据库连接地址
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/gn_oa"
# 数据库用户名
jdbc_user => "root"
# 数据库密码
jdbc_password => "root"
# 定时任务
schedule => "* * * * *"
# 生成索引的数据来源
statement => "SELECT * FROM department"
}
# 第2张表
jdbc{
# 设置当前jdbc的type
type => "student"
# 数据库驱动地址
jdbc_driver_library => "D:\\soft\\logstash-6.3.2\\lib\\mysql-connector-java-5.1.47.jar"
# 数据库驱动类名
jdbc_driver_class => "com.mysql.jdbc.Driver"
# 数据库连接地址
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/gn_oa"
# 数据库用户名
jdbc_user => "root"
# 数据库密码
jdbc_password => "root"
# 定时任务
schedule => "* * * * *"
# 生成索引的数据来源
statement => "SELECT * FROM student"
}
}
output{
# 根据jdbc中的type值指定输入的jdbc对应的输出为哪一个elasticsearch
if[type]=="department"{
elasticsearch{
# 集群地址
hosts => ["127.0.0.1:9200","127.0.0.1:9201","127.0.0.1:9202"]
# 索引名称
index => "department"
# 生成根据dept_id,生成ES的 _id
document_id => "%{dept_id}"
# 类型
document_type => "dept_doc"
}
}
if[type]=="student"{
elasticsearch{
# 集群地址
hosts => ["127.0.0.1:9200","127.0.0.1:9201","127.0.0.1:9202"]
# 索引名称
index => "student"
# 生成根据student_id,生成ES的 _id
document_id => "%{student_id}"
# 类型
document_type => "student_doc"
}
}
}
第4节 logstash配置(其他配置)
- 官网配置地址
1
https://www.elastic.co/guide/en/logstash/6.3/plugins-inputs-jdbc.html
4.1 使用时间字段进行追踪(一般使用更新时间字段)
1 | 在阿里巴巴嵩山版手册中强调,任何数据库表都必须有创建时间和更新时间字段,都是datetime类型,也就是保存时间为年月日时分秒 |
1 | input{ |
4.2 使用其他字段进行追踪(设置使用主键进行追踪)
1 | 在修改其他字段进行追踪时,一定不要忘记修改tracking_column_type类型,详情修改请参官网 |
1 | input{ |
注意: 日常bug记录与采坑指南,如果出现下面的BUG并且在进行配置查找的时候,无论怎么校验都不能从自己的配置中找到错误,但是服务器还启动不起来,那么可以尝试在错误信息中查找关键信息,经过反复校验,如果设置了last_run_metadata_path服务器可以正常启动,如果配置中少了last_run_metadata_path 属性服务器启动失败,就会报如下错误,并且在下面的错误中,有相关信息,我的报错信息如下,关键信息last_run_metadata_path=>"C:\\Users\\lenovo/.logstash_jdbc_last_run",说明在上次发生错的时候logstash对我上次的错误信息进行的保存,造成服务器每次启动读取错误信息,造成的服务器启动失败,去指定位置删除即可.
1 | [2020-08-20T02:27:40,897][ERROR][logstash.pipeline ] Error registering plugin {:pipeline_id=>"main", :plugin=>"<LogStash::Inputs::Jdbc jdbc_driver_library=>\"D:\\\\\\\\soft\\\\\\\\logstash-6.3.2\\\\\\\\lib\\\\\\\\mysql-connector-java-5.1.47.jar\", jdbc_driver_class=>\"com.mysql.jdbc.Driver\", jdbc_connection_string=>\"jdbc:mysql://127.0.0.1:3306/gn_oa\", jdbc_user=>\"root\", jdbc_password=><password>, schedule=>\"* * * * *\", statement=>\"SELECT * FROM department\", id=>\"02efb29127a061192768f52b1a09e6638672c521b53edda48b19f6b9880e90d6\", enable_metric=>true, codec=><LogStash::Codecs::Plain id=>\"plain_e50f7aa8-3c57-4f33-9000-86265d442ec7\", enable_metric=>true, charset=>\"UTF-8\">, jdbc_paging_enabled=>false, jdbc_page_size=>100000, jdbc_validate_connection=>false, jdbc_validation_timeout=>3600, jdbc_pool_timeout=>5, sql_log_level=>\"info\", connection_retry_attempts=>1, connection_retry_attempts_wait_time=>0.5, last_run_metadata_path=>\"C:\\\\Users\\\\lenovo/.logstash_jdbc_last_run\", use_column_value=>false, tracking_column_type=>\"numeric\", clean_run=>false, record_last_run=>true, lowercase_column_names=>true>", :error=>"can't dup Fixnum", :thread=>"#<Thread:0x703b95e6 run>"} |