重建文档索引

1143 发布于: 2021-05-10 读完约需 4 分钟

概述

在实现的应用场景中,有时需要将文档从一个索引重新索引到另一个索引。NEST客户端提供了两种不同的方法来重建索引文档,分别是ReindexOnServerReindex

重建索引

在NEST中与Elasticsearch重建索引(reindex)对应是是ReindexOnServer方法(同时,也提供了同名的异步方法ReindexOnServerAsync),最简单的用法是定义源索引和目标索引,然后等待操作完成:

var reindexResponse = client.ReindexOnServer(r => r
    .Source(s => s
        .Index("source_index")
    )
    .Destination(d => d
        .Index("destination_index")
    )
    .WaitForCompletion() // 在响应返回前等待重建索引处理完成
);

在上面的示例中,Elasticsearch将等待索引重建完成,然后再返回一个响应给客户端。因此,在使用WaitForCompletion时,确保NEST客户端配置了足够长的请求超时时间。

当然,在NEST中你还可以使用异步的方式来重建索引,NEST异步重建会返回一个任务响应结果,你可以使用这个任务响应结果来获取当前重建索引任务的运行结果,甚至你还可以取消正在进行中的重建索引任务。

var reindexResponse = client.ReindexOnServer(r => r
        .Source(s => s
            .Index("source_index")
        )
        .Destination(d => d
            .Index("destination_index")
        )
        .WaitForCompletion(false) 
);

var taskId = reindexResponse.Task; 
var taskResponse = client.Tasks.GetTask(taskId);

while (!taskResponse.Completed) 
{
    Thread.Sleep(TimeSpan.FromSeconds(20)); 
    taskResponse = client.Tasks.GetTask(taskId);
}

var completedReindexResponse = taskResponse.GetResponse<ReindexOnServerResponse>();

使用重建索引参数

NEST中的重建索引方法还提供了一些参数,以便更细粒度地控制重建索引细节,比如:

  • 在源索引上运行指定的查询,以便仅仅重新索引与查询匹配的文档的子集
  • 只从源文档中选择字段的子集,以便重新索引到目标索引中
  • 对要索引到目标索引中的文档运行预处理管道

以下示例演示了上面描述的参数设置:

var reindexResponse = client.ReindexOnServer(r => r
        .Source(s => s
            .Index("source_index")
            .Query<Person>(q => q // 指定查询
                .Term(m => m
                    .Field(f => f.FirstName)
                    .Value("Russ")
                )
            )
            .Source<Person>(so => so // 仅将查询到的文档中的指定字段进行重建索引
                .Field(f => f.FirstName)
                .Field(f => f.LastName)
            )
        )
        .Destination(d => d
            .Index("destination_index")
            .Pipeline("my_reindex_pipeline") // 当重建索引时运行预处理管道
        )
);

使用观察者模式重建索引

在NEST中,除了ReindexOnServer方法外,还提供了Reindex方法,该方法使用观察者设计模式来设置重索引操作,并允许观察者在重建索引过程中注册额外的满足业务需要的操作。

ReindexOnServer方法是调用Elasticsearch的reindexAPI,整个索引的重建过程完全在Elasticsearch服务端进行的,但Reindex方法并不是这样,而是:

  • 从Elasticsearch源索引中通过网络获取批量的索引文档
  • 允许在客户端执行修改
  • 将修改后的文档批量索引到目标索引

这种方法比ReindexOnServer更灵活,代价是对Elasticsearch的请求更多,网络流量也更高。这两种方法都有其用途,因此应该选择最适合需求的一种。

Reindex建立在ScrollAllObservableBulkAllObservable之上,分别从Elasticsearch获取文档和将文档索引到Elasticsearch中,下面的示例演示了Reindex的简单使用:

var slices = Environment.ProcessorCount; 

var reindexObserver = client.Reindex<Person>(r => r
    .ScrollAll("5s", slices, s => s 
        .Search(ss => ss
            .Index("source_index")
        )
    )
    .BulkAll(b => b 
        .Index("destination_index")
    )
)
.Wait(TimeSpan.FromMinutes(15), response => 
{
    // 使用BulkResponse处理业务逻辑,比如:累加索引的文档数
});

使用Reindex时可以创建索引。例如,可以检索源索引设置,并将其用作目标索引的索引设置的基础:

var getIndexResponse = client.Indices.Get("source_index"); 
var indexSettings = getIndexResponse.Indices["source_index"];

var lastNameProperty = indexSettings.Mappings.Properties["lastName"]; 

if (lastNameProperty is TextProperty textProperty) 
{
    if (textProperty.Fields == null)
        textProperty.Fields = new Properties();

    textProperty.Fields.Add("keyword", new KeywordProperty());
}

var reindexObserver = client.Reindex<Person>(r => r
    .CreateIndex(c => c
        .InitializeUsing(indexSettings) 
    )
    .ScrollAll("5s", Environment.ProcessorCount, s => s
        .Search(ss => ss
            .Index("source_index")
        )
    )
    .BulkAll(b => b
        .Index("destination_index")
    )
)
.Wait(TimeSpan.FromMinutes(15), response =>
{
    // 使用BulkResponse处理业务逻辑,比如:累加索引的文档数
});

Reindex有一个重载,它接受一个关于源文档应该如何映射到目标文档的函数。

var reindexObservable = client.Reindex<Person, Person>(
    person => person, 
    r => r
    .ScrollAll("5s", Environment.ProcessorCount, s => s
        .Search(ss => ss
            .Index("source_index")
        )
    )
    .BulkAll(b => b
        .Index("destination_index")
    )
);

var waitHandle = new ManualResetEvent(false);
ExceptionDispatchInfo exceptionDispatchInfo = null;

var observer = new ReindexObserver(
    onNext: response =>
    {
        // 你的业务逻辑,比如将页数打印到控制台
    },
    onError: exception =>
    {
        exceptionDispatchInfo = ExceptionDispatchInfo.Capture(exception);
        waitHandle.Set();
    },
    onCompleted: () => waitHandle.Set());

reindexObservable.Subscribe(observer); 

waitHandle.WaitOne(); 

exceptionDispatchInfo?.Throw();

版权声明:本作品系原创,版权归码友网所有,如未经许可,禁止任何形式转载,违者必究。

发表评论

登录用户才能发表评论, 请 登 录 或者 注册