Logstash迁移ES集群

红米1年前技术文章1162

一、背景介绍

   logstash 支持从一个 ES 集群中读取数据然后写入到另一个 ES 集群,因此可以使用 logstash 进行数据迁移,使用 logstash 进行迁移前,需要注意以下几点:

  • 需要在和云上的 ES 集群相同的 VPC 下创建 服务器,部署 logstash,同时保证该 服务器能够访问到源 ES 集群。

  • 用于部署 logstash 的 服务器最好选择比较高的配置

  • logstash 应该和目标 ES 集群的主版本号相同,例如目标 ES 集群为6.8.2版本,则 logstash 也需要使用6.8版本。

  • 需要特别注意索引 type 的问题,因为 ES 的不同版本对索引 type 的约束不同,跨大版本迁移 ES 集群时可能出现因为索引的 type 而导致写入目标集群失败等的问题。

二、全量同步数据

一个常用的使用 logstash 进行跨集群数据迁移的配置文件如下:

input {
  elasticsearch {
    hosts => "1.1.1.1:9200"
    index => "*"
    docinfo => true
    size => 5000
    scroll => "5m"
    }
}

output {
  elasticsearch {
    hosts => ["[http://2.2.2.2:9200]"]
    user => "elastic"
    password => "your_password"
    index => "%{[@metadata][_index]}"
    document_type => "%{[@metadata][_type]}"
    document_id => "%{[@metadata][_id]}"
  }
}

上述配置文件将源 ES 集群的所有索引同步到目标集群中,同时也可以设置只同步指定的索引,利用 logstash 进行迁移的更多功能可查阅 logstash-input-elasticsearch 和 logstash-output-elasticsearch

三、 增量同步数据

input {
    elasticsearch {
        hosts => "1.1.1.1:9200"
        index => "es-runlog-2019.11.20"
        #查询这个索引前5分钟的5000条数据
        query => '{"query":{"range":{"@timestamp":{"gte":"now-5m","lte":"now/m"}}}}'
        size => 5000
        scroll => "5m"
        docinfo => true
        schedule => "* * * * *" #定时任务,每分钟执行一次
      }
}
filter {
     mutate {
   remove_field => ["source", "@version"]
 }
}
output {
    elasticsearch {
        hosts => ["http://2.2.2.2:9200"]
        index => "%{[@metadata][_index]}"
        document_type => "%{[@metadata][_type]}"
        document_id => "%{[@metadata][_id]}"
        pipeline => "timezone-pipeline"
    }
}

每分钟执行一次,从源集群中拉取5分钟前到当前分钟的所有数据,同步到新的集群中;因为查询的粒度为分钟,所以每次执行定时任务查询时会有一部分重叠的数据,所以需要在output中配置document_id参数避免重复写入到新集群中。

实施过程中遇到的问题有:

a.用于运行logstash的机器的规格要比较大,因为logstash比较消耗内存和cpu,机器性能不够,很可能出现数据同步延迟增大

b.可以通过比较新旧集群当天的索引每分钟doc数据量,判断同步的延迟情况,如果延迟较大,可以通过调整logstash配置或者使用更大的机器运行logstash确保同步过程顺利进行


相关文章

Trino开启ldap认证

Trino开启ldap认证

1、背景由于对于安全监管的要求,需要对trino服务开启安全认证体系。2、基于私有证书的httpskeytool -genkeypair -validity 36500 -ext SAN=IP:172...

Flink window详解

Flink window详解

一、窗口(window)一般真实的流都是无界的,如果是无界怎样处理无界的数据可以把无限的数据流进行切分,得到有限的数据集进行处理 —— 也 就是得到有界流 窗口(window)就是将无限流切割为有限流...

借助arthas工具打火焰图

借助arthas工具打火焰图

1、下载arthas在命令行下面执行(使用和目标进程一致的用户启动,否则可能 attach 失败):curl -O https://arthas.aliyun.com/arthas-boot.jar...

开源大数据集群部署(十三)Ranger 集成Trino

开源大数据集群部署(十三)Ranger 集成Trino

1、安装ranger trino插件在trino的coordinator节点部署Ø 解压ranger-2.3.0-trino-plugin.tar.gz[root@hd2.dtstack.com ]#...

Python functools 模块

1、reduce 方法reduce 方法,顾名思义就是减少,map reduce 应用:大数据语法: reduce(function, sequence[, initial]) -> value...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。