Carry の Blog Carry の Blog
首页
  • Nginx
  • Prometheus
  • Iptables
  • Systemd
  • Firewalld
  • Docker
  • Sshd
  • DBA工作笔记
  • MySQL
  • Redis
  • TiDB
  • Elasticsearch
  • Python
  • Shell
  • MySQL8-SOP手册
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

Carry の Blog

好记性不如烂键盘
首页
  • Nginx
  • Prometheus
  • Iptables
  • Systemd
  • Firewalld
  • Docker
  • Sshd
  • DBA工作笔记
  • MySQL
  • Redis
  • TiDB
  • Elasticsearch
  • Python
  • Shell
  • MySQL8-SOP手册
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • MySQL

  • Redis

  • Keydb

  • TiDB

  • MongoDB

  • Elasticsearch

    • 安装配置
    • 给Elasticsearch集群添加用户密码
    • Elastichsearch的分片和副本
    • 单节点分片达到默认上限解决办法
    • Elasticsearch集群节点磁盘使用分配不均解决办法
    • Elastichsearch的模板template和映射mapping
    • Elastichsearch查询-分页查询
    • Elasticsearch字符串搜索方式
    • Elastichsearch使用wildcard字段模糊匹配
    • ES数据迁移工具esm
    • Nginx Mirror 模块实现三套ES写入网关
    • ES单机多节点集群docker-compose一键安装
    • tcpdump抓包Elasticsearch语句
    • ElasticSearch 动态模板 使用方法
    • ES打开slowlog记录慢语句
    • ES加速恢复
    • Elasticsearch 常用 DSL 语句
    • Logstash迁移ES数据
    • Kafka

    • victoriametrics

    • BigData

    • Sqlserver

    • 数据库
    • Elasticsearch
    Carry の Blog
    2024-12-11
    目录

    Logstash迁移ES数据

    # 迁移数据 Logstash 配置文件详解

    这是一个简单的从一个es迁移到另一个es的logstash配置文件。

    
    input {
      elasticsearch {
        hosts => ["10.10.10.81:9200"]      # 源 Elasticsearch 地址
        index => "my_test_*"      # 匹配所有以 my_test_ 开头的索引
        user => "elastic"    # 源 Elasticsearch 用户名(如果有认证)
        password => "xxxxx" # 源 Elasticsearch 密码(如果有认证)
        scroll => "10m"                # 滚动查询的时间间隔
        size => 2000                   # 每批获取的数据量
        docinfo => true
        docinfo_target => "[@metadata][doc]"
    
      }
    }
    
    output {
      elasticsearch {
        hosts => ["10.10.10.61:9200"]  # 目标 Elasticsearch 地址
        user => "elastic"    # 目标 Elasticsearch 用户名(如果有认证)
        password => "xxxxx" # 目标 Elasticsearch 密码(如果有认证)
        index => "%{[@metadata][doc][_index]}"
        document_id => "%{[@metadata][doc][_id]}"
      }
    }
    
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25

    这是一个简单的从一个es迁移到kafka的logstash配置文件。

    input {
        elasticsearch {
            hosts => ["http://10.10.10.81:19200"]
            index => "user_login_log_2024_*"
            user => "${ES_USER}"
            password => "${ES_PASSWORD}"
            query => '{
              "query": {
                "bool": {
                  "must": [
                    {
                      "range": {
                        "@timestamp": {
                          "gte": "2024-01-01T00:00:00.000Z"
                        }
                      }
                    }
                  ]
                }
              }
            }'
            size => 1000  # 每次查询返回的文档数量
            scroll => "5m" # 保持查询上下文的持续时间
        }
    }
    
    filter {
        mutate {
            remove_field => [ "@version", "@timestamp" ]
        }
    }
    
    output {
        kafka {
            codec => json
            bootstrap_servers => "10.10.10.126:9092,10.10.10.127:9092,10.10.10.128:9092,10.10.10.129:9092,10.10.10.130:9092"
            topic_id => "user_login_log_topic"
        }
    }
    
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40

    下面是一个完整的 Logstash 配置文件,通过动态生成索引名,将符合条件的数据从索引 mytestindex_2024_11 迁移到相应的 mytestindex_<tag_id>_2024_11 索引中。

    # 配置文件内容

    input {
        elasticsearch {
            user => "elastic"
            password => "xxxx"
            hosts  => ["10.10.10.10:9200"]
            index => "mytestindex_2024_11"
            query => '{
                "query": {
                    "bool": {
                        "must": [
                            { "range": {
                                "create_time": {
                                    "gte": "2024-11-27T00:00:00+08:00",
                                    "lte": "2024-11-28T00:00:00+08:00"
                                }
                            }}
                        ]
                    }
                }
            }'
            docinfo => true
            docinfo_target => "[@metadata][doc]"
        }
    }
    
    filter {
        ruby {
            code => "
                tag_id = event.get('tag_id')
                if tag_id
                    event.set('dynamic_index', 'mytestindex_' + tag_id.to_s + '_2024_11')
                else
                    event.tag('missing_tag_id')
                end
            "
        }
    }
    
    output {
        elasticsearch {
            hosts => ["http://10.10.10.10:9200"]
            user => "elastic"
            password => "xxxx"
            index => "%{dynamic_index}"  # 动态生成索引名
            document_id => "%{[@metadata][doc][_id]}"
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47

    # 配置解析

    # 1. Input 部分

    • 输入源:从 Elasticsearch 索引 mytestindex_2024_11 中读取数据。
    • 查询条件:
      • 使用 query 参数定义条件,筛选出 create_time 在 2024-11-27T00:00:00+08:00 和 2024-11-28T00:00:00+08:00 之间的数据。
    • 身份认证:通过 user 和 password 提供认证信息。
    • docinfo 设置:开启 docinfo,将读取的文档元数据存储在 [@metadata][doc] 中,用于后续的 document_id 动态设置。

    # 2. Filter 部分

    • 使用 Ruby 脚本动态生成目标索引名:
      • 根据 tag_id 动态生成索引名 mytestindex_<tag_id>_2024_11。
      • 如果 tag_id 缺失,添加 missing_tag_id 标签,便于后续排查。

    # 3. Output 部分

    • 输出目标:将数据写入新的 Elasticsearch 集群(10.10.10.10:9200)。
    • 动态索引名:根据 dynamic_index 动态生成的索引名,将数据写入对应索引,例如 mytestindex_5282_2024_11。
    • 文档 ID:使用 [@metadata][doc][_id] 保留原文档 ID,防止重复插入。

    # 配置优势

    1. 动态索引生成:
      • 根据 tag_id 自动生成目标索引,无需手动指定。
      • 支持不同 tag_id 数据迁移到对应索引。
    2. 保持文档一致性:
      • 使用原文档 _id 避免重复插入或数据覆盖。
    3. 可追踪性:
      • 如果某些记录缺失 tag_id,会自动标记为 missing_tag_id,方便排查。

    # 运行步骤

    1. 将上述配置文件保存为 migrate_mytestindex.conf。
    2. 使用 Logstash 执行迁移:
      bin/logstash -f migrate_mytestindex.conf
      
      1
    3. 验证迁移结果:
      • 登录目标 Elasticsearch 节点,检查是否生成了 mytestindex_<tag_id>_2024_11 索引。
      • 查询数据是否正确迁移。

    # 注意事项

    1. 认证信息:
      • 输入和输出部分的 Elasticsearch 地址和认证信息需要替换为实际值。
    2. 性能优化:
      • 根据数据量调整 size 和 scroll 参数,提高处理效率。
    3. 目标索引映射:
      • 确保目标索引的映射(Mapping)与原始索引一致,避免写入失败。
    4. 调试输出:
      • 可以添加 stdout 插件,实时查看迁移的文档内容:
        stdout {
            codec => json_lines
        }
        
        1
        2
        3
    上次更新: 4/24/2025

    ← Elasticsearch 常用 DSL 语句 Kafka 日常操作→

    最近更新
    01
    tidb fast ddl
    04-04
    02
    TiDB配置文件调优 原创
    04-03
    03
    如何移除TiDB中的表分区 原创
    04-03
    更多文章>
    Theme by Vdoing
    • 跟随系统
    • 浅色模式
    • 深色模式
    • 阅读模式