概述
NEST实现了Elasticsearch索引和批量操作API的方法,以便于.NET开发者能够索引单个或者多个文档。除此之外,NEST还为特殊的索引方法提供了一些便捷的方法。
索引单个文档
单个文档可以分别使用同步方法IndexDocument
或者异步方法IndexDocumentAsync
进行索引。使用这两个方法对文档进行索引时比较简单,不需要其他的请求参数,如下:
var person = new Person
{
Id = 1,
FirstName = "Martijn",
LastName = "Laarman"
};
var indexResponse = client.IndexDocument(person); // 同步方法的返回值是IndexResponse
if (!indexResponse.IsValid)
{
// 验证索引是否成功
}
// var indexResponseAsync = await client.IndexDocumentAsync(person); //同步方法的返回值是Task<IndexResponse>
索引单个文档(带参数)
如果在对单个文档进行索引时需要携带额外的参数,你可以使用Index()
方法,Index()
方法提供了额外的参数,比如你可以指定文档的索引名称、文档的ID、文档的路由参数等等,这些参数可以让你更好地控制文档的索引方式,如下:
var person = new Person
{
Id = 1,
FirstName = "Martijn",
LastName = "Laarman"
};
var indexResponse1 = client.Index(person, i => i.Index("people")); // 这种是基于链式的编程方式
// var indexResponse2 = client.Index(new IndexRequest<Person>(person, "people")); // 这种是基于对象初始化语法的方式
使用IndexMany
对多文档索引
在NEST中,可以使用IndexMany
或者IndexManyAsync
方法对多个文档进行索引。同样的,它们分别是同步操作和异步操作。这两个方法是NEST对应Elasticsearch中_bulk
API的封装。
IndexMany
方法对多文档进行索引时向Elasticsearch服务器发送的是单个HTTP请求,所以在文档集合非常庞大的时候不推荐使用此方法,而是使用BulkAllObservale
。
IndexMany
使用示例如下:
var people = new []
{
new Person
{
Id = 1,
FirstName = "Martijn",
LastName = "Laarman"
},
new Person
{
Id = 2,
FirstName = "Stuart",
LastName = "Cam"
},
new Person
{
Id = 3,
FirstName = "Russ",
LastName = "Cam"
}
};
var indexManyResponse = client.IndexMany(people);
if (indexManyResponse.Errors)
{
foreach (var itemWithError in indexManyResponse.ItemsWithErrors)
{
Console.WriteLine($"Failed to index document {itemWithError.Id}: {itemWithError.Error}");
}
}
// 以下是异步方法的示例,返回结果是Task<IBulkResponse>
// var indexManyAsyncResponse = await client.IndexManyAsync(people);
使用Bulk
对多文档索引
如果你需要在多文档索引时进行更细致地操作,则可以使用Bulk
或者BulkAsync
方法来实现。
与IndexMany
方法类似,使用Bulk
对多文档进行索引时也只会发送单个HTTP请求到Elasticsearch服务器。所以,如果同时对大量文档进行Bulk
操作时,应该考虑将大量的文档切分成少量的文档后再进行Bulk
批量操作。
var bulkIndexResponse = client.Bulk(b => b
.Index("people")
.IndexMany(people)
);
// Alternatively, documents can be indexed asynchronously similar to IndexManyAsync
var asyncBulkIndexResponse = await client.BulkAsync(b => b
.Index("people")
.IndexMany(people)
);
你可以通过配置Bulk
上的IndexMany
方法对索引进行更详细地配置,如下:
var bulkIndexResponse = client.Bulk(b => b
.Index("people")
.IndexMany(people, (descriptor, person) => descriptor
.Index(person.Id % 2 == 0
? "even-index"
: "odd-index")
.Pipeline(person.FirstName.StartsWith("M")
? "startswith_m_pipeline"
: "does_not_start_with_m_pipeline")
)
);
使用BulkAllObservable
对多文档索引
使用BulkAllObservable
可以让你专注于索引的操作,而不必担心重试、回退或分块机制。可以使用BulkAll
方法和Wait()
扩展方法进行多文档索引。
BulkAllObservable
提供了在索引失败时自动重试/回退的功能,并可配置在单个HTTP请求中索引的文档数量。在下面的示例中,每个请求将包含1000个文档,这些文档是从原始输入中分块获取到的。在有大量文档的情况下,这可能会导致许多HTTP请求,每个请求包含1000个文档(最后一个请求可能包含更少的文档,这取决于总数)。
var bulkAllObservable = client.BulkAll(people, b => b
.Index("people")
.BackOffTime("30s") // 重试的等待时间
.BackOffRetries(2) // 重试次数
.RefreshOnCompleted()
.MaxDegreeOfParallelism(Environment.ProcessorCount)
.Size(1000) // 每次HTTP请求索引的文档数
)
.Wait(TimeSpan.FromMinutes(15), next =>
{
// 一些业务逻辑,比如将页数打印到控制台等等
});
高级的批量索引功能
BulkAllObservable
提供了许多方法来进一步控制索引细节,比如:
BufferToBulk
用于在将批量请求分派到服务器之前定制批量请求中的各个操作RetryDocumentPredicate
用于指定当文档索引失败后是否需要重试DroppedDocumentCallback
用于决定在文档没有被索引的情况下应该怎么做,即使在重试之后。
下面的示例演示了其中的一些方法,此外还使用BulkAllObserver
订阅批量索引过程,并在发生错误和进程结束时对每个成功的批量响应采取一些操作。
var bulkAllObservable = client.BulkAll(people, b => b
.BufferToBulk((descriptor, buffer) =>
{
foreach (var person in buffer)
{
descriptor.Index<Person>(bi => bi
.Index(person.Id % 2 == 0 ? "even-index" : "odd-index")
.Document(person)
);
}
})
.RetryDocumentPredicate((bulkResponseItem, person) =>
{
return bulkResponseItem.Error.Index == "even-index" && person.FirstName == "Martijn";
})
.DroppedDocumentCallback((bulkResponseItem, person) =>
{
Console.WriteLine($"Unable to index: {bulkResponseItem} {person}");
}));
var waitHandle = new ManualResetEvent(false);
ExceptionDispatchInfo exceptionDispatchInfo = null;
var observer = new BulkAllObserver(
onNext: response =>
{
// 一些业务逻辑
},
onError: exception =>
{
exceptionDispatchInfo = ExceptionDispatchInfo.Capture(exception);
waitHandle.Set();
},
onCompleted: () => waitHandle.Set());
bulkAllObservable.Subscribe(observer);
waitHandle.WaitOne();
exceptionDispatchInfo?.Throw();
发表评论
登录用户才能发表评论, 请 登 录 或者 注册