概述
在实现的应用场景中,有时需要将文档从一个索引重新索引到另一个索引。NEST客户端提供了两种不同的方法来重建索引文档,分别是ReindexOnServer
和Reindex
。
重建索引
在NEST中与Elasticsearch重建索引(reindex
)对应是是ReindexOnServer
方法(同时,也提供了同名的异步方法ReindexOnServerAsync
),最简单的用法是定义源索引和目标索引,然后等待操作完成:
var reindexResponse = client.ReindexOnServer(r => r
.Source(s => s
.Index("source_index")
)
.Destination(d => d
.Index("destination_index")
)
.WaitForCompletion() // 在响应返回前等待重建索引处理完成
);
在上面的示例中,Elasticsearch将等待索引重建完成,然后再返回一个响应给客户端。因此,在使用WaitForCompletion
时,确保NEST客户端配置了足够长的请求超时时间。
当然,在NEST中你还可以使用异步的方式来重建索引,NEST异步重建会返回一个任务响应结果,你可以使用这个任务响应结果来获取当前重建索引任务的运行结果,甚至你还可以取消正在进行中的重建索引任务。
var reindexResponse = client.ReindexOnServer(r => r
.Source(s => s
.Index("source_index")
)
.Destination(d => d
.Index("destination_index")
)
.WaitForCompletion(false)
);
var taskId = reindexResponse.Task;
var taskResponse = client.Tasks.GetTask(taskId);
while (!taskResponse.Completed)
{
Thread.Sleep(TimeSpan.FromSeconds(20));
taskResponse = client.Tasks.GetTask(taskId);
}
var completedReindexResponse = taskResponse.GetResponse<ReindexOnServerResponse>();
使用重建索引参数
NEST中的重建索引方法还提供了一些参数,以便更细粒度地控制重建索引细节,比如:
- 在源索引上运行指定的查询,以便仅仅重新索引与查询匹配的文档的子集
- 只从源文档中选择字段的子集,以便重新索引到目标索引中
- 对要索引到目标索引中的文档运行预处理管道
以下示例演示了上面描述的参数设置:
var reindexResponse = client.ReindexOnServer(r => r
.Source(s => s
.Index("source_index")
.Query<Person>(q => q // 指定查询
.Term(m => m
.Field(f => f.FirstName)
.Value("Russ")
)
)
.Source<Person>(so => so // 仅将查询到的文档中的指定字段进行重建索引
.Field(f => f.FirstName)
.Field(f => f.LastName)
)
)
.Destination(d => d
.Index("destination_index")
.Pipeline("my_reindex_pipeline") // 当重建索引时运行预处理管道
)
);
使用观察者模式重建索引
在NEST中,除了ReindexOnServer
方法外,还提供了Reindex
方法,该方法使用观察者设计模式来设置重索引操作,并允许观察者在重建索引过程中注册额外的满足业务需要的操作。
ReindexOnServer
方法是调用Elasticsearch的reindex
API,整个索引的重建过程完全在Elasticsearch服务端进行的,但Reindex
方法并不是这样,而是:
- 从Elasticsearch源索引中通过网络获取批量的索引文档
- 允许在客户端执行修改
- 将修改后的文档批量索引到目标索引
这种方法比
ReindexOnServer
更灵活,代价是对Elasticsearch的请求更多,网络流量也更高。这两种方法都有其用途,因此应该选择最适合需求的一种。
Reindex
建立在ScrollAllObservable
和BulkAllObservable
之上,分别从Elasticsearch获取文档和将文档索引到Elasticsearch中,下面的示例演示了Reindex
的简单使用:
var slices = Environment.ProcessorCount;
var reindexObserver = client.Reindex<Person>(r => r
.ScrollAll("5s", slices, s => s
.Search(ss => ss
.Index("source_index")
)
)
.BulkAll(b => b
.Index("destination_index")
)
)
.Wait(TimeSpan.FromMinutes(15), response =>
{
// 使用BulkResponse处理业务逻辑,比如:累加索引的文档数
});
使用Reindex
时可以创建索引。例如,可以检索源索引设置,并将其用作目标索引的索引设置的基础:
var getIndexResponse = client.Indices.Get("source_index");
var indexSettings = getIndexResponse.Indices["source_index"];
var lastNameProperty = indexSettings.Mappings.Properties["lastName"];
if (lastNameProperty is TextProperty textProperty)
{
if (textProperty.Fields == null)
textProperty.Fields = new Properties();
textProperty.Fields.Add("keyword", new KeywordProperty());
}
var reindexObserver = client.Reindex<Person>(r => r
.CreateIndex(c => c
.InitializeUsing(indexSettings)
)
.ScrollAll("5s", Environment.ProcessorCount, s => s
.Search(ss => ss
.Index("source_index")
)
)
.BulkAll(b => b
.Index("destination_index")
)
)
.Wait(TimeSpan.FromMinutes(15), response =>
{
// 使用BulkResponse处理业务逻辑,比如:累加索引的文档数
});
Reindex
有一个重载,它接受一个关于源文档应该如何映射到目标文档的函数。
var reindexObservable = client.Reindex<Person, Person>(
person => person,
r => r
.ScrollAll("5s", Environment.ProcessorCount, s => s
.Search(ss => ss
.Index("source_index")
)
)
.BulkAll(b => b
.Index("destination_index")
)
);
var waitHandle = new ManualResetEvent(false);
ExceptionDispatchInfo exceptionDispatchInfo = null;
var observer = new ReindexObserver(
onNext: response =>
{
// 你的业务逻辑,比如将页数打印到控制台
},
onError: exception =>
{
exceptionDispatchInfo = ExceptionDispatchInfo.Capture(exception);
waitHandle.Set();
},
onCompleted: () => waitHandle.Set());
reindexObservable.Subscribe(observer);
waitHandle.WaitOne();
exceptionDispatchInfo?.Throw();
发表评论
登录用户才能发表评论, 请 登 录 或者 注册