全部文档
当前文档

暂无内容

如果没有找到您期望的内容,请尝试其他搜索词

文档中心

线上ElasticSearch迁移方案

最近更新时间:2023-12-21 17:26:33

对于那些已经投入生产并向外提供服务的客户而言,在迁移过程中保持业务性能不受影响至关重要。金山云提供的数据迁移方案确保在 Elasticsearch(ES)迁移过程中实现零停机,即使在遇到异常情况时,也能够迅速回退而不会丢失任何数据。这为客户的宝贵数据资源提供了额外的安全保障。

方案选型

我们提供了四种灵活的数据迁移方案,这四种方案覆盖了不同的使用场景,使客户能够选择最适合其需求的迁移方式,确保在数据迁移过程中实现高效、安全且无缝的转移。

方案

ElasticSearch-Dump

Reindex

Snapshot

Logstash

基本原理

逻辑备份,类似mysqldump将数据一条一条导出后再执行导入

reindex 是 Elasticsearch 提供的一个 API 接口,可以把数据从一个集群迁移到另外一个集群

从源集群通过Snapshot API 创建数据快照,然后在目标集群中进行恢复

从一个集群中读取数据然后写入到另一个集群

网络要求

集群间互导需要网络互通,先导出文件再通过文件导入集群则不需要网络互通

网络需要互通

无网络互通要求

网络需要互通

迁移速度

一般

适合场景

适用于数据量小的场景

适用于数据量大,在线迁移数据的场景

适用于数据量大,接受离线数据迁移的场景,无法降版本

适用于数据量一般,近实时数据传输

配置复杂度

中等

简单

复杂

中等

方案实施

步骤一:存量数据跨版本迁移:

集群克隆:

方案一:通过snapshot克隆集群

通过ES原生snapshot备份导出,并在中转克隆ES集群导入恢复。

  1. 使用snapshot API在自建Elasticsearch集群中创建快照备份仓库。

  2. 为需要迁移的索引创建快照,并将快照备份到已创建的仓库中。

  3. 在中转克隆Elasticsearch集群的Kibana控制台中,使用snapshot API创建一个与自建Elasticsearch集群相同的快照备份仓库。

  4. 将仓库中已备份的自建Elasticsearch集群的快照恢复到中转克隆Elasticsearch集群中,完成数据迁移。

注:在自建Elasticsearch中创建快照时,默认会备份所有打开的索引。如果您不想备份系统索引,例如以.kibana.security.monitoring等开头的索引,可在创建快照时指定需要备份的索引。

Snapshots: You cannot restore snapshots from later Elasticsearch versions into a cluster running an earlier Elasticsearch version. For example, you can not restore a snapshot taken in 7.6.0 to a cluster running 7.5.0.
Snapshot and restore | Elasticsearch Guide [7.10] | Elastic

方案二:通过硬盘快照恢复克隆集群

通过MaaS云平台本身支持的硬盘快照功能,先对待迁移ES集群磁盘做快照,后克隆新硬盘。重新拉起克隆ES集群(注意修改默认配置或作好网络隔离,避免恢复后自动加入生产集群)。

降版本迁移(elasticsearch-dump/reindex):

方案一:elasticsearch-dump(推荐)

通过开源elasticsearch-dump工具,可以将中转克隆Elasticsearch实例索引的settings、mapping、data等数据迁移至托管ES实例上。

# 安装node.js
wget https://nodejs.org/dist/v16.18.0/node-v16.18.0-linux-x64.tar.xz
tar -xf node-v16.18.0-linux-x64.tar.xz
# 设置环境变量
vim ~/.bash_profile
export PATH=$PATH:/root/node-v16.18.0-linux-x64/bin/
source ~/.bash_profile

# 安装elasticsearch-dump
npm install elasticdump -g


# 迁移指定索引的settings
elasticdump --input=http://"<UserName>:<YourPassword>"@<YourEsHost>/<YourEsIndex> --output=http://"<OtherName>:<OtherPassword>"@<OtherEsHost>/<OtherEsIndex> --type=settings

# 迁移指定索引的mapping
elasticdump --input=http://"<UserName>:<YourPassword>"@<YourEsHost>/<YourEsIndex> --output=http://"<OtherName>:<OtherPassword>"@<OtherEsHost>/<OtherEsIndex> --type=mapping

# 迁移指定索引的data
elasticdump --input=http://"<UserName>:<YourPassword>"@<YourEsHost>/<YourEsIndex> --output=http://"<OtherName>:<OtherPassword>"@<OtherEsHost>/<OtherEsIndex> --type=data

<YourEsHost>/<OtherEsHost>

源或目标托管ES实例的访问地址

<UserName>/<OtherName>

源或目标托管ES实例的访问用户名,默认为elastic

<YourPassword>/<OtherPassword>

源或目标托管ES实例的访问密码

<YourEsIndex>/<OtherEsIndex>

源或目标索引名称

https://github.com/elasticsearch-dump/elasticsearch-dump?spm=a2c4g.461655.0.0.53f86844ektFzg

方案二:reindex迁移

首先需要在托管 Elasticsearch 集群中配置白名单,编辑 elasticsearch.yml 文件,然后重新启动集群:(在准备环境中已经配置)。在目标集群(托管 Elasticsearch )新建目标index,需要手动建,reindex会检测,不存在报错。需要修改host对应的地址。

附 Reindex迁移文档:

  1. 金山云集群YML配置修改reindex.remote.whitelist:之前 ES地址(私网地址、内网地址不同)

  2. 获取当前ES索引【用户名,密码以及集群地址根据实际修改】

curl -u 'XXXXXX:XXXXX+'  -XGET XX-XX-XXXXXXXXXX.public.elasticsearch.XXXXXX.com:XXXX/_XXX/indices
  1. 获取当前ES索引mapping

curl -u 'XXXXXX:XXXXX+'  -XGET es-cn-XX-XX-XXXXXXXXXX.public.elasticsearch.XXXXXX.com:XXXX/twitter/_mapping?pretty
    {
  "twitter" : {
    "mappings" : {
      "properties" : {
        "address" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "age" : {
          "type" : "long"
        },
        "city" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "country" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "location" : {
          "properties" : {
            "lat" : {
              "type" : "text",
              "fields" : {
                "keyword" : {
                  "type" : "keyword",
                  "ignore_above" : 256
                }
              }
            },
            "lon" : {
              "type" : "text",
              "fields" : {
                "keyword" : {
                  "type" : "keyword",
                  "ignore_above" : 256
                }
              }
            }
          }
        },
        "message" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "province" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "uid" : {
          "type" : "long"
        },
        "user" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        }
      }
    }
  }
}
  1. 同步mapping到金山云ES

curl -H 'Content-Type: application/json' -u 'elastic:XXXXX+'  -XPUT http://XXX.XX.XX.XXX:XXXX/twitter -d '{
      "settings": {
        "number_of_shards": 1,
        "number_of_replicas": 1
      },
      "mappings": {
        "properties": {
          "address": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword",
                "ignore_above": 256
              }
            }
          },
          "age": {
            "type": "long"
          },
          "city": {
            "type": "text"
          },
          "country": {
            "type": "text"
          },
          "location": {
            "type": "geo_point"
          },
          "message": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword",
                "ignore_above": 256
              }
            }
          },
          "province": {
            "type": "text"
          },
          "uid": {
            "type": "long"
          },
          "user": {
            "type": "text"
          }
        }
      }
    }'
  1. 在金山云ES执行reindex

 curl -H 'Content-Type: application/json' -XPOST  http://XXX.XX.XX.XXX:XXXX/_reindex -d '{
  "source": {
    "index": "twitter",
    "remote": {
      "host": "http://XX-XX-XXXXXXXXXXXXXXXX.XXXXXXXX.com:9200",
      "username": "XXXXX",
      "password": "XXXXXX"
    },
    "query": {
      "match_all": {}
    }
  },
  "dest": {
    "index": "twitter"
  }
}'

迁移脚本

      
# -*- coding: utf-8 -*-
# 执行es索引迁移到不同的集群
# 源集群地址、用户、密码
# 目标集群修改reindex.remote.whitelist: 源集群地址
# 迁移过程索引名、mapping保持不变,同名索引未兼容处理
import requests
import json
from elasticsearch import client
from datetime import datetime
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from elasticsearch import exceptions
import time
import math
import threadpool

class Elastic(object):
    def __init__(self, ip ="XXX.X.X.X", port=XXXX, user=None, password=None):

        if user is not None:
            self.es = Elasticsearch([ip], http_auth=(user, password), port=port)
        else:
            self.es = Elasticsearch([ip], port=port)

        self.indiceClient = client.IndicesClient(self.es)
        self.cat = client.CatClient(self.es)

    def get_indices(self):
        return self.cat.indices()

    def get_mapping(self, index):
        return self.indiceClient.get_mapping(index).get(index)

    def create_index(self, index, mapping, shards=1, replicas=1):
        if not self.index_exists(index):
            mapping["settings"] = { "number_of_shards": shards, "number_of_replicas": replicas, "refresh_interval": "60S"}
            return self.indiceClient.create(index, body=mapping)

    def index_exists(self, index):
        return self.indiceClient.exists(index)

    def delete_index(self, index):
        try:
            return self.indiceClient.delete(index)
        except exceptions.NotFoundError:
            return True

    def stat_index(self, index):
        return self.indiceClient.stat(index)

    def get_doc_count(self, index):
        try:
            ret = self.stat_index(index)
            return ret["_all"]["primaries"]["docs"]["count"]
        except:
            return -99

def reindex(destHost, sourceHost, index, destIndex=None, user=None, password=None, destUser=None, destPassword=None):
    url = "http://%s:XXXX/_reindex" % destHost
    sourceEs = 'http://%s:XXXX' % sourceHost
    headers = { "Content-Type": "application/json"}
    if destIndex is None:
        destIndex = index
    if user is None:
        remote = {
          "host": sourceEs
        }
    else:
        remote = {
          "host": sourceEs,
          "username": user,
          "password": password
        }
    body =  {
      "source": {
        "index": index,
        "size": 10000,
        "slice": {
          "id": 0,
          "max": 10
        },
        "remote": remote,
        "query": {
          "match_all": {}
        }
      },
      "dest": {
        "index": destIndex
      }
    }
    if destUser is not None:
        r = requests.post(url=url, auth=(destUser, destPassword), headers=headers, data=json.dumps(body))
    else:
        r = requests.post(url=url, headers=headers, data=json.dumps(body))
    if r.status_code == 200:
        pass
    else:
        raise Exception("reindex failed, %s, %s" % (index, r.text))

def get_shards(store_size):
    try:
        if store_size.find("mb")>0 or store_size.find("kb")>=0:
            return 1
        elif store_size.find("gb") >0:
            size = float(store_size.replace("gb", ""))
            shards = math.ceil(size / 20) + 1
            return shards
        elif store_size.find("tb") >0:
            size = float(store_size.replace("tb", ""))
            shards = math.ceil(size * 1000 / 20) + 1
            return shards
        return 1
    except Exception as e:
        return 1


class Index(object):
    def __init__(self, index_name, new_index=None, doc_count=0, store_size="10gb"):
        self.index_name = index_name
        if new_index is not None:
            self.new_index = new_index
        else:
            self.new_index = index_name
        self.doc_count = int(doc_count)
        self.shards = get_shards(store_size)

def test():
    name = "klog_logs"
    doc_count = "XXXXXXXXX"
    store_size = "100gb"
    index = Index(name, doc_count, store_size)
    print(index.shards)

# test()

def get_indices(es, new_name=False):
    index_map = {}
    indices = es.get_indices()
    for item in indices.split('\n'):
        fields = item.split( )
        if len(fields) < 2:
            continue
        old_index = fields[2]
        if new_name:
            new_index = old_index + "_new"
        else:
            new_index = old_index
        store_size = fields[-1]
        doc_count = fields[-4]
        if old_index.find('.security')>=0 or old_index.find('.kibana')>=0:
            continue
        index = Index(old_index, new_index, doc_count, store_size)
        index_map[old_index] = index
    return index_map

def make_report(old_index_map, new_index_map):
    for k, index in old_index_map.items():
        if index.new_index in new_index_map:
            index2 = new_index_map[index.new_index]
            if index.doc_count == index2.doc_count:
                print('reindex %s succeed, doc_count: %s' % (index.new_index, index.doc_count))
            else:
                print('reindex %s failed, old index doc_count %s, new index doc_count: %s' % (index.new_index, index.doc_count, index2.doc_count))
        else:
            print('reindex %s failed, no such index' % (index.new_index))


def do_reindex(destHost, sourceHost, sourceES, destES, index, user=None, password=None, destUser=None, destPassword=None):
    old_index = index.index_name
    new_index = index.new_index
    shards = index.shards
    for i in range(5):
        try:
            print("start reindex %s, called %s times" % (old_index, i+1))
            t1 = int(time.time())
            mapping = sourceES.get_mapping(old_index)
            destES.create_index(new_index, mapping, shards=shards, replicas=0)
            reindex(destHost, sourceHost, old_index, new_index, user, password, destUser, destPassword)
            t2 = int(time.time())
            print("end reindex %s succeed, new index %s, cost %sS," % (old_index, new_index, (t2 - t1)))
            break
        except Exception as e:
            print("Error reindex %s, %s" % (old_index, e))
            time.sleep(2*i)
            continue

def go():
    #### example,根据需求修改源、目标集群地址
    # sourceHost = 'XXX-XX-XXXXXXXXXXXXXXXX.XXXXXX.com'
    user = 'XXXXXXX'   #源集群用户
    password = 'XXXXXXXX' #源集群密码
    # destHost = 'XXX.XX.XX.XXX'
    # user = None
    # password = None
    sourceHost = 'XX.XX.XX.XX' #源标集群地址
    destHost = 'XX.XX.XX.XX'   #目标集群地址
    destUser = user          #目标集群用户
    destPassword = password  #目标集群密码

    if user is not None:
        sourceES = Elastic(sourceHost, 9200, user, password)
    else:
        sourceES = Elastic(sourceHost, 9200)

    if destHost is not None:
        destES = Elastic(destHost, 9200, destUser, destPassword)
    else:
        destES = Elastic(destHost, 9200)
    index_rename = False  # 索引重命名,加后缀_new
    old_index_map = get_indices(sourceES, index_rename)
    task_pool = threadpool.ThreadPool(5)
    request_list = []
    args = []
    for k, index in old_index_map.items():
        args.append(([destHost, sourceHost, sourceES, destES, index, user, password, destUser, destPassword], None))
    request_list = threadpool.makeRequests(do_reindex, args)
    for req in request_list:
        task_pool.putRequest(req)
    task_pool.poll()
    task_pool.wait()
    # new_index_map = get_indices(destES)
    # new_index_map = get_indices(destES)
    # print("---" * 10 + "summary report" + "---" * 10)
    # time.sleep(120)
    # make_report(old_index_map, new_index_map)

# curl XX.XX.XX.XX:XXXX/_cat/indices?v -u 'XXXXXXX:XXXXXXXX'| grep new|awk '{print "curl -u 'XXXXXXX:XXXXXXXXX'  -XDELETE XX.XX.XX.XX:XXXX/" $3}'

go()

步骤二:增量数据双写:

  1. snapshot快照备份时,可查询备份时间位点。当打块照时,Elasticsearch 会在每个分片上创建一个新的 Lucene 索引提交点。提交点是 Lucene 索引的一致性视图,它包含了一个指向索引数据文件的清单。根据提交点找到对应的时间戳。logstash可直接从记录的时间位点开始重复消费。

  2. 若业务写入ES时,已经指定_id字段,并非ES自动生成时,则无需关注时间戳,对于相同数据重复写入ES时默认会覆盖。

  3. 若直接按时间索引整个迁移,则kafka消费时制定对应时间数据重新消费。

步骤三:数据效验及业务切换

数据一致性效验:

数据效验:验证索引数量、名称、文档数量(若持续有增量数据产生,在同一时刻无法完全追平)

业务效验:起测试业务,模拟用户查询,看业务结果返回是否一致。

业务灰度切量及回退:

根据ES链接方式、业务部署方式,来制定切换策略。

ES连接方式:若通过域名,并且可以控制DNS恢复切量时,可以按百分比流量灰度切换至新的ES集群。

业务程序部署方式:若前端业务多套环境部署时,可以逐个修改ES链接地址,先从边缘业务开始切换。观测业务稳定后再切换下一套。

回退策略:新老集群共存,在任意时刻,新集群故障、异常等情况时均可切换回老集群链接查询。故障影响时间取决于集群链接切换时间。

文档导读
纯净模式常规模式

纯净模式

点击可全屏预览文档内容
文档反馈