Elasticsearch.Net使用

1223 发布于: 2021-03-26 读完约需 8 分钟

前言

Elasticsearch.NetElasticsearch官方提供的用于.NET平台的低阶客户端,它不依赖其它任何第三方库,也不关心开发者如何构建请求及如何处理响应。

使用Elasticsearch.Net连接Elasticsearch服务

要连接到Elasticsearch服务,需要实例化一个Elasticsearch.Net的客户端对象:

var lowlevelClient = new ElasticLowLevelClient();

通常情况下,如果Elasticsearch运行在远程服务器,你可能需要向客户端传递额外的配置选项,比如Elasticsearch的地址。这时可以使用ConnectionConfiguration,创建一个ConnectionConfiguration实例,并为客户端设置合适的配置参数即可,如下:

var settings = new ConnectionConfiguration(new Uri("http://example.com:9200"))
    .RequestTimeout(TimeSpan.FromMinutes(2));

var lowlevelClient = new ElasticLowLevelClient(settings);

在上面的示例中,通过RequestTimeout()方法设置了当前Elasticsearch.Net客户端实例的自定义请求超时时间为2分钟,ConnectionConfiguration上还有许多其他配置选项,如:

  • 配置Basic Authentication的身份认证请求
  • 配置客户端是否通过代理发送请求
  • 配置客户端是否应该启用HTTP压缩支持

连接池(Connection pools)

ConnectionConfiguration并不局限于为Elasticsearch传递单个地址。Elasticsearch.Net提供了几种不同类型的连接池,每种连接池都有不同的特征。

下面的示例使用了一个SniffingConnectionPool连接池,它的包含集群中三个Elasticsearch节点的地址,客户端将使用这种类型的连接池来维护集群中可用节点的列表,它可以以轮询的方式向这些节点发送请求。

var uris = new[]
{
    new Uri("http://localhost:9200"),
    new Uri("http://localhost:9201"),
    new Uri("http://localhost:9202"),
};

var connectionPool = new SniffingConnectionPool(uris);
var settings = new ConnectionConfiguration(connectionPool);

var lowlevelClient = new ElasticLowLevelClient(settings);

索引(Indexing)

Elasticsearch.Net客户端配置好并连接到Elasticsearch远程服务后,我们需要将一些数据写入集群中进行测试。假如有如下的C#实体模型(Plain Old CLR Object (POCO)):

public class Person
{
    public string FirstName { get; set; }
    public string LastName { get; set; }
}

之后便可以用这个Person实例在Elasticsearch中创建索引数据,同步或者异步方式调用创建索引的API可是非常简单的,如下:

using Elasticsearch.Net;
using System;
using System.Text;
using System.Threading.Tasks;
using ElasticDocSample.Models;

namespace ElasticDocSample
{
    class Program
    {
        static void Main(string[] args)
        {
            Index();
            Console.ReadKey();
        }

        static async Task Index()
        {
            var settings = new ConnectionConfiguration(new Uri("http://192.168.7.91:9200"))
                .RequestTimeout(TimeSpan.FromMinutes(2));
            var lowlevelClient = new ElasticLowLevelClient(settings);
            var person = new Person
            {
                FirstName = "Rector",
                LastName = "Liu"
            };
            var indexResponse = lowlevelClient.Index<BytesResponse>("people", "1", PostData.Serializable(person));
            var responseBytes = indexResponse.Body;
            Console.WriteLine($"同步索引的响应结果为:{Encoding.UTF8.GetString(responseBytes)}");

            var asyncIndexResponse =
                await client.IndexAsync<StringResponse>("people", "2", PostData.Serializable(person));
            var asyncResponseString = asyncIndexResponse.Body;
            Console.WriteLine($"异步索引的响应结果为:{asyncResponseString}");
        }
    }
}

Elasticsearch.Net中所有可用方法均同时对外公开为同步和异步两种版本,后者使用惯用的*Async后缀作为异步方法名。

运行结果为:

同步索引的响应结果为:{"_index":"people","_type":"_doc","_id":"1","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":0,"_p
rimary_term":1}
异步索引的响应结果为:{"_index":"people","_type":"_doc","_id":"2","_version":1,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":1,"_p
rimary_term":1}

查看Elasticsearch服务中的索引people,如图:

image-20210325145142400

说明Elasticsearch.Net与远程Elasticsearch服务连接和操作均正常。

以上示例中,无论是同步方法还是异步方法,我们向Elasticsearch服务器发送的都是使用PostData.Serializable()方法序列化后的SerializableData数据对象。

对于API的请求数据体,你可以将发送如(匿名)对象,字节[],字符串,流等数据。此外,对于采用多行json的API,你也可以发送一个对象列表或字节列表来帮助你格式化。这些数据都会被PostData封装,你可以使用该类上的静态方法以任何形式发送数据。

PostData提供的静态方法有:

image-20210325150141831

lowlevelClient.Index<BytesResponse>中的BytesResponse是一个泛型参数,它表明了响应结果的类型,比如这里的返回类型我们指定为BytesResponse,而lowlevelClient.IndexAsync<StringResponse>的响应返回类型为StringResponse

批量创建索引文档(Bulk indexing)

在实际开发场景中,很有可能需要一次批量写入多个索引文档,Elasticsearch提供了Bulk API来实现批量写入索引文档的功能。

为了测试方便,这里我创建一个客户端实例帮助类CodeDefaultElasticClient.cs

using System;
using Elasticsearch.Net;

namespace ElasticDocSample.Util
{
    public class CodeDefaultElasticClientFactory
    {
        /// <summary>
        /// 获取一个低阶的Elastic客户端实例
        /// </summary>
        public static ElasticLowLevelClient GetLowLevelClient
        {
            get
            {
                var settings = new ConnectionConfiguration(new Uri("http://elastic:123456@192.168.7.91:9200"))
                    .RequestTimeout(TimeSpan.FromMinutes(2));
                var lowLevelClient = new ElasticLowLevelClient(settings);
                return lowLevelClient;
            }
        }
    }
}

批量写入索引文档测试代码:

/// <summary>
/// 批量写入索引文档示例
/// </summary>
/// <returns></returns>
public static async Task BulkIndex()
{
    var people = new object[]
    {
        new { index = new { _index = "people", _type = "person", _id = "1"  }},
        new { FirstName = "Martijn", LastName = "Laarman" },
        new { index = new { _index = "people", _type = "person", _id = "2"  }},
        new { FirstName = "Greg", LastName = "Marzouka" },
        new { index = new { _index = "people", _type = "person", _id = "3"  }},
        new { FirstName = "Russ", LastName = "Cam" },
    };
    var client = CodeDefaultElasticClientFactory.GetLowLevelClient;
    var indexResponse = await client.BulkAsync<StringResponse>(PostData.MultiJson(people));
    Console.WriteLine($"批量写入索引文档的响应结果为:{indexResponse.Body}");
}

Elasticsearch.Net客户端将单独序列化每个元素,并根据Bulk API的要求使用换行符\n连接。请参考Elasticsearch批量API文档了解更多细节和支持的操作。

运行结果为:

批量写入索引文档的响应结果为:{"took":1367,"errors":false,"items":[{"index":{"_index":"people","_type":"person","_id":"1","_version":1,"result":"created","_shard
s":{"total":2,"successful":2,"failed":0},"_seq_no":0,"_primary_term":1,"status":201}},{"index":{"_index":"people","_type":"person","_id":"2","_version":1,"resul
t":"created","_shards":{"total":2,"successful":2,"failed":0},"_seq_no":1,"_primary_term":1,"status":201}},{"index":{"_index":"people","_type":"person","_id":"3"
,"_version":1,"result":"created","_shards":{"total":2,"successful":2,"failed":0},"_seq_no":2,"_primary_term":1,"status":201}}]}

搜索文档(Searching)

上面我们已经为一些文档建立了索引,现在可以开始搜索它们了。

Elasticsearch查询DSL可以在请求中使用匿名类型表示,如下:

/// <summary>
/// 搜索
/// </summary>
public static async void SearchAsync()
{
    var client = CodeDefaultElasticClientFactory.GetLowLevelClient;
    var searchResponse = await client.SearchAsync<StringResponse>("people", PostData.Serializable(new
                                                                                                  {
                                                                                                      from = 0,
                                                                                                      size = 10,
                                                                                                      query = new
                                                                                                      {
                                                                                                          match = new
                                                                                                          {
                                                                                                              FirstName = new
                                                                                                              {
                                                                                                                  query = "Martijn"
                                                                                                              }
                                                                                                          }
                                                                                                      }
                                                                                                  }));
    var successful = searchResponse.Success;
    Console.WriteLine($"搜索是否成功:{successful}");
    var responseJson = searchResponse.Body;
    Console.WriteLine($"搜索响应JSON为:{responseJson}");
}

运行结果为:

搜索是否成功:True
搜索响应JSON为:{"took":13,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":1,"relation":"eq"},"max_score":
0.9808291,"hits":[{"_index":"people","_type":"person","_id":"1","_score":0.9808291,"_source":{"FirstName":"Martijn","LastName":"Laarman"}}]}}

responseJson是响应返回的JSON字符串,Elasticsearch查询的端点为/people/person/_search,你可以通过分别更改请求中的索引和类型参数,实现搜索多个索引和类型。

字符串作为请求体也是支持的,如下:

/// <summary>
/// 搜索(字符串作为请求体)
/// </summary>
public static async void SearchPostStringAsync()
{
    var client = CodeDefaultElasticClientFactory.GetLowLevelClient;
    var searchResponse = await client.SearchAsync<StringResponse>("people", @"
            {
                ""from"": 0,
                ""size"": 10,
                ""query"": {
                    ""match"": {
                        ""FirstName"": {
                            ""query"": ""Martijn""
                        }
                    }
                }
            }");
    var successful = searchResponse.Success;
    Console.WriteLine($"搜索是否成功:{successful}");
    var responseJson = searchResponse.Body;
    Console.WriteLine($"搜索响应JSON为:{responseJson}");
}

如你所见,使用字符串可能比使用匿名类型稍微麻烦一些,因为需要转义双引号,但有时它还是很有用的。

Elasticsearch.Net不提供类型化对象来表示响应,如果需要将强类型对象作为响应结果,你应该考虑使用NEST,它将所有请求和响应映射成指定的强类型。您可以使用Elasticsearch.Net来处理强类型,但是需要开发者来配置Elasticsearch.Net以便反序列化成你期望的类型类型。你可以实现IElasticsearchSerializer接口并通过配置ConnectionConfiguration来完成。

错误处理(Handling Errors)

默认情况下,Elasticsearch.Net的配置为:如果返回的HTTP响应状态码不在200-300范围内则不会抛出异常,对于特定的请求也不不会返回预期的响应状态码,例如:检查是否存在索引会返回404。

低阶客户端调用的响应提供了许多可用于确定调用是否成功的属性。

var searchResponse = lowlevelClient.Search<BytesResponse>("people", PostData.Serializable(new { match_all = new {} }));

var success = searchResponse.Success; // 响应在200范围内,或者是给定请求的预期响应
var successOrKnownError = searchResponse.SuccessOrKnownError; // 响应成功,或者响应代码在400-599之间,指示不能重试请求
var exception = searchResponse.OriginalException; // 如果响应不成功,将捕获原始的异常信息

如果掌握了这些错误的细节,你就可以决定在应用程序中应该如何处理错误或者异常。

默认的不抛出异常的行为可以通过ConnectionConfiguration.ThrowExceptions()来更改:

var settings = new ConnectionConfiguration(new Uri("http://example.com:9200"))
    .ThrowExceptions();

var lowlevelClient = new ElasticLowLevelClient(settings);

如果需要更细粒度的控制,可以使用ConnectionConfiguration.OnRequestCompleted()方法来配置:

var settings = new ConnectionConfiguration(new Uri("http://example.com:9200"))
    .OnRequestCompleted(apiCallDetails =>
    {
        if (apiCallDetails.HttpStatusCode == 418)
        {
            throw new TimeForACoffeeException();
        }
    });

var lowlevelClient = new ElasticLowLevelClient(settings);

版权声明:本作品系原创,版权归码友网所有,如未经许可,禁止任何形式转载,违者必究。

发表评论

登录用户才能发表评论, 请 登 录 或者 注册