Logstash迁移ES数据
# 迁移数据 Logstash 配置文件详解
这是一个简单的从一个es迁移到另一个es的logstash配置文件。
input {
elasticsearch {
hosts => ["10.10.10.81:9200"] # 源 Elasticsearch 地址
index => "my_test_*" # 匹配所有以 my_test_ 开头的索引
user => "elastic" # 源 Elasticsearch 用户名(如果有认证)
password => "xxxxx" # 源 Elasticsearch 密码(如果有认证)
scroll => "10m" # 滚动查询的时间间隔
size => 2000 # 每批获取的数据量
docinfo => true
docinfo_target => "[@metadata][doc]"
}
}
output {
elasticsearch {
hosts => ["10.10.10.61:9200"] # 目标 Elasticsearch 地址
user => "elastic" # 目标 Elasticsearch 用户名(如果有认证)
password => "xxxxx" # 目标 Elasticsearch 密码(如果有认证)
index => "%{[@metadata][doc][_index]}"
document_id => "%{[@metadata][doc][_id]}"
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
这是一个简单的从一个es迁移到kafka的logstash配置文件。
input {
elasticsearch {
hosts => ["http://10.10.10.81:19200"]
index => "user_login_log_2024_*"
user => "${ES_USER}"
password => "${ES_PASSWORD}"
query => '{
"query": {
"bool": {
"must": [
{
"range": {
"@timestamp": {
"gte": "2024-01-01T00:00:00.000Z"
}
}
}
]
}
}
}'
size => 1000 # 每次查询返回的文档数量
scroll => "5m" # 保持查询上下文的持续时间
}
}
filter {
mutate {
remove_field => [ "@version", "@timestamp" ]
}
}
output {
kafka {
codec => json
bootstrap_servers => "10.10.10.126:9092,10.10.10.127:9092,10.10.10.128:9092,10.10.10.129:9092,10.10.10.130:9092"
topic_id => "user_login_log_topic"
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
下面是一个完整的 Logstash 配置文件,通过动态生成索引名,将符合条件的数据从索引 mytestindex_2024_11
迁移到相应的 mytestindex_<tag_id>_2024_11
索引中。
# 配置文件内容
input {
elasticsearch {
user => "elastic"
password => "xxxx"
hosts => ["10.10.10.10:9200"]
index => "mytestindex_2024_11"
query => '{
"query": {
"bool": {
"must": [
{ "range": {
"create_time": {
"gte": "2024-11-27T00:00:00+08:00",
"lte": "2024-11-28T00:00:00+08:00"
}
}}
]
}
}
}'
docinfo => true
docinfo_target => "[@metadata][doc]"
}
}
filter {
ruby {
code => "
tag_id = event.get('tag_id')
if tag_id
event.set('dynamic_index', 'mytestindex_' + tag_id.to_s + '_2024_11')
else
event.tag('missing_tag_id')
end
"
}
}
output {
elasticsearch {
hosts => ["http://10.10.10.10:9200"]
user => "elastic"
password => "xxxx"
index => "%{dynamic_index}" # 动态生成索引名
document_id => "%{[@metadata][doc][_id]}"
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# 配置解析
# 1. Input 部分
- 输入源:从 Elasticsearch 索引
mytestindex_2024_11
中读取数据。 - 查询条件:
- 使用
query
参数定义条件,筛选出create_time
在2024-11-27T00:00:00+08:00
和2024-11-28T00:00:00+08:00
之间的数据。
- 使用
- 身份认证:通过
user
和password
提供认证信息。 docinfo
设置:开启docinfo
,将读取的文档元数据存储在[@metadata][doc]
中,用于后续的document_id
动态设置。
# 2. Filter 部分
- 使用 Ruby 脚本动态生成目标索引名:
- 根据
tag_id
动态生成索引名mytestindex_<tag_id>_2024_11
。 - 如果
tag_id
缺失,添加missing_tag_id
标签,便于后续排查。
- 根据
# 3. Output 部分
- 输出目标:将数据写入新的 Elasticsearch 集群(
10.10.10.10:9200
)。 - 动态索引名:根据
dynamic_index
动态生成的索引名,将数据写入对应索引,例如mytestindex_5282_2024_11
。 - 文档 ID:使用
[@metadata][doc][_id]
保留原文档 ID,防止重复插入。
# 配置优势
- 动态索引生成:
- 根据
tag_id
自动生成目标索引,无需手动指定。 - 支持不同
tag_id
数据迁移到对应索引。
- 根据
- 保持文档一致性:
- 使用原文档
_id
避免重复插入或数据覆盖。
- 使用原文档
- 可追踪性:
- 如果某些记录缺失
tag_id
,会自动标记为missing_tag_id
,方便排查。
- 如果某些记录缺失
# 运行步骤
- 将上述配置文件保存为
migrate_mytestindex.conf
。 - 使用 Logstash 执行迁移:
bin/logstash -f migrate_mytestindex.conf
1 - 验证迁移结果:
- 登录目标 Elasticsearch 节点,检查是否生成了
mytestindex_<tag_id>_2024_11
索引。 - 查询数据是否正确迁移。
- 登录目标 Elasticsearch 节点,检查是否生成了
# 注意事项
- 认证信息:
- 输入和输出部分的 Elasticsearch 地址和认证信息需要替换为实际值。
- 性能优化:
- 根据数据量调整
size
和scroll
参数,提高处理效率。
- 根据数据量调整
- 目标索引映射:
- 确保目标索引的映射(Mapping)与原始索引一致,避免写入失败。
- 调试输出:
- 可以添加
stdout
插件,实时查看迁移的文档内容:stdout { codec => json_lines }
1
2
3
- 可以添加
上次更新: 12/20/2024
- 01
- Elastichsearch使用wildcard字段模糊匹配12-07
- 03
- Flink 集群部署指南 原创09-20