请求管道介绍
当使用默认的ITranssport
实现时,所有请求都在RequestPipeline
的上下文中执行。
当在ITransport
上调用Request()
或者RequestAsync()
方法时,整个请求将会被延迟到一个使用using
块包裹的一个新的请求管道实例中去处理,如下:
var pipeline = new RequestPipeline(
settings,
DateTimeProvider.Default,
new RecyclableMemoryStreamFactory(),
new SearchRequestParameters());
pipeline.GetType().Should().Implement<IDisposable>();
一个ITransporp
不能直接实例化一个RequestPipeline
对象,而是使用插入式的IRequestPipelineFactory
来创建的,如下:
var requestPipelineFactory = new RequestPipelineFactory();
var requestPipeline = requestPipelineFactory.Create(
settings,
DateTimeProvider.Default,
new RecyclableMemoryStreamFactory(),
new SearchRequestParameters());
requestPipeline.Should().BeOfType<RequestPipeline>();
requestPipeline.GetType().Should().Implement<IDisposable>();
在实例化Elasticsearch.Net
或者NEST
客户端时,你可以将自己的IRequestPipeline
实现传递给传输对象,从执行你自己定制的请求管道请求,如下:
var transport = new Transport<IConnectionSettingsValues>(
settings,
requestPipelineFactory,
DateTimeProvider.Default,
new RecyclableMemoryStreamFactory());
var client = new ElasticClient(transport);
下面,我们会探索一下请求管道的一些特征。
初次使用嗅探(Sniffing)
以下代码片段使用三个不同的连接池分别创建了三个管道:
var singleNodePipeline = CreatePipeline(uris => new SingleNodeConnectionPool(uris.First()));
var staticPipeline = CreatePipeline(uris => new StaticConnectionPool(uris));
var sniffingPipeline = CreatePipeline(uris => new SniffingConnectionPool(uris));
看一下它们第一次使用时的表现:
singleNodePipeline.FirstPoolUsageNeedsSniffing.Should().BeFalse();
staticPipeline.FirstPoolUsageNeedsSniffing.Should().BeFalse();
sniffingPipeline.FirstPoolUsageNeedsSniffing.Should().BeTrue();
我们看到,只有嗅探连接池在第一次使用时支持嗅探,因为它支持重播。但是,可以在嗅探连接池的ConnectionSettings
中禁用在启动时嗅探的功能,如下:
sniffingPipeline = CreatePipeline(
uris => new SniffingConnectionPool(uris),
s => s.SniffOnStartup(false)); // 禁用在启动时嗅探功能
sniffingPipeline.FirstPoolUsageNeedsSniffing.Should().BeFalse();
等待第一次嗅探
所有线程都会在启动时等待嗅探完成,等待请求超时时间。SemaphoreSlim
用于阻塞线程直到嗅探完成,之后等待线程会适当地释放SemaphoreSlim
。
下面的例子可以来演示这一点。首先,我们配置一个自定义的IConnection
实现,它将在一秒钟后返回一个已知的200响应:
var inMemoryConnection = new WaitingInMemoryConnection(
TimeSpan.FromSeconds(1),
responseBody);
接首,使用我们自定义的连接创建一个嗅探连接池(Sniffing connection pool),并为请求在客户端设置一个超时时间:
var sniffingPipeline = CreatePipeline(
uris => new SniffingConnectionPool(uris),
connection: inMemoryConnection,
settingsSelector: s => s.RequestTimeout(TimeSpan.FromSeconds(2)));
现在,我们创建一个一次只允许一个线程进入的SemaphoreSlim
,并启动三个不同的线程去执行嗅探任务。
第一个任务将在启动时嗅探成功,其余两个任务将在无异常情形下退出,SemaphoreSlim
也会被释放,以便其他任务再次进行嗅探时使用:
var semaphoreSlim = new SemaphoreSlim(1, 1);
var task1 = Task.Run(() => sniffingPipeline.FirstPoolUsage(semaphoreSlim));
var task2 = Task.Run(() => sniffingPipeline.FirstPoolUsage(semaphoreSlim));
var task3 = Task.Run(() => sniffingPipeline.FirstPoolUsage(semaphoreSlim));
var exception = Record.Exception(() => Task.WaitAll(task1, task2, task3));
exception.Should().BeNull();
semaphoreSlim.CurrentCount.Should().Be(1);
嗅探连接故障
只有支持重播的连接池才会加入SniffsOnConnectionFailure()
,以下示例仅仅是演示嗅探连接池:
var singleNodePipeline = CreatePipeline(uris => new SingleNodeConnectionPool(uris.First()));
var staticPipeline = CreatePipeline(uris => new StaticConnectionPool(uris));
var sniffingPipeline = CreatePipeline(uris => new SniffingConnectionPool(uris));
singleNodePipeline.SniffsOnConnectionFailure.Should().BeFalse();
staticPipeline.SniffsOnConnectionFailure.Should().BeFalse();
sniffingPipeline.SniffsOnConnectionFailure.Should().BeTrue();
你也可以在ConnectionSettings
配置中禁用嗅探连接故障的功能,如下:
sniffingPipeline = CreatePipeline(uris => new SniffingConnectionPool(uris), s => s.SniffOnConnectionFault(false));
sniffingPipeline.SniffsOnConnectionFailure.Should().BeFalse();
嗅探过期的集群
支持重播的连接池将在一段时间后自动嗅探,以确保它获取集群的最新状态。
现在,我们用不同的连接池设置三个请求管道和一个允许更改时间日期的时间提供程序:
var dateTime = new TestableDateTimeProvider();
var singleNodePipeline = CreatePipeline(uris =>
new SingleNodeConnectionPool(uris.First(), dateTime), dateTimeProvider: dateTime);
var staticPipeline = CreatePipeline(uris =>
new StaticConnectionPool(uris, dateTimeProvider: dateTime), dateTimeProvider: dateTime);
var sniffingPipeline = CreatePipeline(uris =>
new SniffingConnectionPool(uris, dateTimeProvider: dateTime), dateTimeProvider: dateTime);
在请求管道上,嗅探连接池将嗅探其对集群的理解何时过期:
singleNodePipeline.SniffsOnStaleCluster.Should().BeFalse();
staticPipeline.SniffsOnStaleCluster.Should().BeFalse();
sniffingPipeline.SniffsOnStaleCluster.Should().BeTrue();
开始时,所有的请求管道都有一个新的集群状态视图,即不是过期的
singleNodePipeline.StaleClusterState.Should().BeFalse();
staticPipeline.StaleClusterState.Should().BeFalse();
sniffingPipeline.StaleClusterState.Should().BeFalse();
现在,如果我们将时间更改到两小时后:
dateTime.ChangeTime(d => d.Add(TimeSpan.FromHours(2)));
那些不支持重播的连接池永远不会失效:
singleNodePipeline.StaleClusterState.Should().BeFalse();
staticPipeline.StaleClusterState.Should().BeFalse();
但是使用支持重播的嗅探连接池的请求管道会发出信号,表明它对集群状态的理解已经过期:
sniffingPipeline.StaleClusterState.Should().BeTrue();
重试
请求管道还会检查多次重试的总时间是否超过请求超时时间:
var dateTime = new TestableDateTimeProvider();
var singleNodePipeline = CreatePipeline(uris =>
new SingleNodeConnectionPool(uris.First(), dateTime), dateTimeProvider: dateTime);
var staticPipeline = CreatePipeline(uris =>
new StaticConnectionPool(uris, dateTimeProvider: dateTime), dateTimeProvider: dateTime);
var sniffingPipeline = CreatePipeline(uris =>
new SniffingConnectionPool(uris, dateTimeProvider: dateTime), dateTimeProvider: dateTime);
singleNodePipeline.IsTakingTooLong.Should().BeFalse();
staticPipeline.IsTakingTooLong.Should().BeFalse();
sniffingPipeline.IsTakingTooLong.Should().BeFalse();
现在,将时间更改成两小时以后:
dateTime.ChangeTime(d => d.Add(TimeSpan.FromHours(2)));
那些不支持重播的连接池永远不会失效:
singleNodePipeline.IsTakingTooLong.Should().BeTrue();
staticPipeline.IsTakingTooLong.Should().BeTrue();
但是使用支持重播的嗅探连接池的请求管道会发出信号,表明它对集群状态的理解已经过期:
sniffingPipeline.IsTakingTooLong.Should().BeTrue();
请求管道暴露了它启动的时间,我们断言它是在2小时前开始的:
(dateTime.Now() - singleNodePipeline.StartedOn).Should().BePositive().And.BeCloseTo(TimeSpan.FromHours(2));
(dateTime.Now() - staticPipeline.StartedOn).Should().BePositive().And.BeCloseTo(TimeSpan.FromHours(2));
(dateTime.Now() - sniffingPipeline.StartedOn).Should().BePositive().And.BeCloseTo(TimeSpan.FromHours(2));
发表评论
登录用户才能发表评论, 请 登 录 或者 注册