索引文档

1130 发布于: 2021-04-23 读完约需 5 分钟

概述

NEST实现了Elasticsearch索引和批量操作API的方法,以便于.NET开发者能够索引单个或者多个文档。除此之外,NEST还为特殊的索引方法提供了一些便捷的方法。

索引单个文档

单个文档可以分别使用同步方法IndexDocument或者异步方法IndexDocumentAsync进行索引。使用这两个方法对文档进行索引时比较简单,不需要其他的请求参数,如下:

var person = new Person
{
    Id = 1,
    FirstName = "Martijn",
    LastName = "Laarman"
};

var indexResponse = client.IndexDocument(person); // 同步方法的返回值是IndexResponse
if (!indexResponse.IsValid)
{
    // 验证索引是否成功
}

// var indexResponseAsync = await client.IndexDocumentAsync(person); //同步方法的返回值是Task<IndexResponse>

索引单个文档(带参数)

如果在对单个文档进行索引时需要携带额外的参数,你可以使用Index()方法,Index()方法提供了额外的参数,比如你可以指定文档的索引名称、文档的ID、文档的路由参数等等,这些参数可以让你更好地控制文档的索引方式,如下:

var person = new Person
{
    Id = 1,
    FirstName = "Martijn",
    LastName = "Laarman"
};

var indexResponse1 = client.Index(person, i => i.Index("people")); // 这种是基于链式的编程方式

// var indexResponse2 = client.Index(new IndexRequest<Person>(person, "people")); // 这种是基于对象初始化语法的方式

使用IndexMany对多文档索引

NEST中,可以使用IndexMany或者IndexManyAsync方法对多个文档进行索引。同样的,它们分别是同步操作和异步操作。这两个方法是NEST对应Elasticsearch中_bulkAPI的封装。

IndexMany方法对多文档进行索引时向Elasticsearch服务器发送的是单个HTTP请求,所以在文档集合非常庞大的时候不推荐使用此方法,而是使用BulkAllObservale

IndexMany使用示例如下:



var people = new []
{
    new Person
    {
        Id = 1,
        FirstName = "Martijn",
        LastName = "Laarman"
    },
    new Person
    {
        Id = 2,
        FirstName = "Stuart",
        LastName = "Cam"
    },
    new Person
    {
        Id = 3,
        FirstName = "Russ",
        LastName = "Cam"
    }
};

var indexManyResponse = client.IndexMany(people); 

if (indexManyResponse.Errors) 
{
    foreach (var itemWithError in indexManyResponse.ItemsWithErrors) 
    {
        Console.WriteLine($"Failed to index document {itemWithError.Id}: {itemWithError.Error}");
    }
}

// 以下是异步方法的示例,返回结果是Task<IBulkResponse>
// var indexManyAsyncResponse = await client.IndexManyAsync(people);

使用Bulk对多文档索引

如果你需要在多文档索引时进行更细致地操作,则可以使用Bulk或者BulkAsync方法来实现。

IndexMany方法类似,使用Bulk对多文档进行索引时也只会发送单个HTTP请求到Elasticsearch服务器。所以,如果同时对大量文档进行Bulk操作时,应该考虑将大量的文档切分成少量的文档后再进行Bulk批量操作。

var bulkIndexResponse = client.Bulk(b => b
    .Index("people")
    .IndexMany(people)
); 

// Alternatively, documents can be indexed asynchronously similar to IndexManyAsync
var asyncBulkIndexResponse = await client.BulkAsync(b => b
    .Index("people")
    .IndexMany(people)
);

你可以通过配置Bulk上的IndexMany方法对索引进行更详细地配置,如下:

var bulkIndexResponse = client.Bulk(b => b
    .Index("people")
    .IndexMany(people, (descriptor, person) => descriptor
        .Index(person.Id % 2 == 0
            ? "even-index"
            : "odd-index") 
        .Pipeline(person.FirstName.StartsWith("M")
            ? "startswith_m_pipeline"
            : "does_not_start_with_m_pipeline") 
    )
);

使用BulkAllObservable对多文档索引

使用BulkAllObservable可以让你专注于索引的操作,而不必担心重试、回退或分块机制。可以使用BulkAll方法和Wait()扩展方法进行多文档索引。

BulkAllObservable提供了在索引失败时自动重试/回退的功能,并可配置在单个HTTP请求中索引的文档数量。在下面的示例中,每个请求将包含1000个文档,这些文档是从原始输入中分块获取到的。在有大量文档的情况下,这可能会导致许多HTTP请求,每个请求包含1000个文档(最后一个请求可能包含更少的文档,这取决于总数)。

var bulkAllObservable = client.BulkAll(people, b => b
    .Index("people")
    .BackOffTime("30s") // 重试的等待时间
    .BackOffRetries(2) // 重试次数
    .RefreshOnCompleted()
    .MaxDegreeOfParallelism(Environment.ProcessorCount)
    .Size(1000) // 每次HTTP请求索引的文档数
)
.Wait(TimeSpan.FromMinutes(15), next => 
{
    // 一些业务逻辑,比如将页数打印到控制台等等
});

高级的批量索引功能

BulkAllObservable提供了许多方法来进一步控制索引细节,比如:

  • BufferToBulk用于在将批量请求分派到服务器之前定制批量请求中的各个操作
  • RetryDocumentPredicate用于指定当文档索引失败后是否需要重试
  • DroppedDocumentCallback用于决定在文档没有被索引的情况下应该怎么做,即使在重试之后。

下面的示例演示了其中的一些方法,此外还使用BulkAllObserver订阅批量索引过程,并在发生错误和进程结束时对每个成功的批量响应采取一些操作。

var bulkAllObservable = client.BulkAll(people, b => b
      .BufferToBulk((descriptor, buffer) => 
      {
          foreach (var person in buffer)
          {
              descriptor.Index<Person>(bi => bi
                  .Index(person.Id % 2 == 0 ? "even-index" : "odd-index") 
                  .Document(person)
              );
          }
      })
      .RetryDocumentPredicate((bulkResponseItem, person) => 
      {
          return bulkResponseItem.Error.Index == "even-index" && person.FirstName == "Martijn";
      })
      .DroppedDocumentCallback((bulkResponseItem, person) => 
      {
          Console.WriteLine($"Unable to index: {bulkResponseItem} {person}");
      }));

var waitHandle = new ManualResetEvent(false);
ExceptionDispatchInfo exceptionDispatchInfo = null;

var observer = new BulkAllObserver(
    onNext: response =>
    {
        // 一些业务逻辑
    },
    onError: exception =>
    {
        exceptionDispatchInfo = ExceptionDispatchInfo.Capture(exception);
        waitHandle.Set();
    },
    onCompleted: () => waitHandle.Set());

bulkAllObservable.Subscribe(observer); 

waitHandle.WaitOne(); 

exceptionDispatchInfo?.Throw();

版权声明:本作品系原创,版权归码友网所有,如未经许可,禁止任何形式转载,违者必究。

发表评论

登录用户才能发表评论, 请 登 录 或者 注册