默认情况下,当使用允许重播的连接池时,将启用嗅探连接,唯一允许这样做的连接池是嗅探连接池(Sniffing connection pool
)
通过询问Elasticsearch集群本身来强制刷新连接池已知的健康节点非常方便,嗅探器通过询问它当前知道的每个节点来获取节点,直到有一个节点作出响应。
以下示例中,在端口9200-9204上开启了5个已知节点,我们认为其中9202,9203,9204是符合主节点条件的节点。当在9201端口上进行搜索时,会抛出引发嗅探的Once
失败:
var audit = new Auditor(() => VirtualClusterWith
.Nodes(5)
.MasterEligible(9202, 9203, 9204)
.ClientCalls(r => r.SucceedAlways())
.ClientCalls(r => r.OnPort(9201).Fails(Once))
.Sniff(p => p.SucceedAlways(VirtualClusterWith
.Nodes(3)
.MasterEligible(9200, 9202)
.ClientCalls(r => r.OnPort(9201).Fails(Once))
.ClientCalls(r => r.SucceedAlways())
.Sniff(s => s.SucceedAlways(VirtualClusterWith
.Nodes(3, 9210)
.MasterEligible(9210, 9212)
.ClientCalls(r => r.SucceedAlways())
.Sniff(r => r.SucceedAlways())
))
))
.SniffingConnectionPool()
.Settings(s => s.DisablePing().SniffOnStartup(false))
);
audit = await audit.TraceCalls(
/** */
new ClientCall {
{ HealthyResponse, 9200 },
{ pool => pool.Nodes.Count.Should().Be(5) }
},
new ClientCall {
{ BadResponse, 9201},
{ SniffOnFail },
{ SniffSuccess, 9202},
{ HealthyResponse, 9200},
{ pool => pool.Nodes.Count.Should().Be(3) }
},
new ClientCall {
{ BadResponse, 9201},
{ SniffOnFail },
{ SniffSuccess, 9200},
{ HealthyResponse, 9210},
{ pool => pool.Nodes.Count.Should().Be(3) }
},
new ClientCall { { HealthyResponse, 9211 } },
new ClientCall { { HealthyResponse, 9212 } },
new ClientCall { { HealthyResponse, 9210 } },
new ClientCall { { HealthyResponse, 9211 } },
new ClientCall { { HealthyResponse, 9212 } },
new ClientCall { { HealthyResponse, 9210 } },
new ClientCall { { HealthyResponse, 9211 } },
new ClientCall { { HealthyResponse, 9212 } },
new ClientCall { { HealthyResponse, 9210 } }
);
在Ping失败后嗅探(Sniffing after ping failure)
这里对测试集群的设置与前面的设置完全相同,只是启用了Ping(默认为true)并使Ping失败:
var audit = new Auditor(() => VirtualClusterWith
.Nodes(5)
.MasterEligible(9202, 9203, 9204)
.Ping(r => r.OnPort(9201).Fails(Once))
.Ping(r => r.SucceedAlways())
.ClientCalls(c=>c.SucceedAlways())
.Sniff(p => p.SucceedAlways(VirtualClusterWith
.Nodes(3)
.MasterEligible(9200, 9202)
.Ping(r => r.OnPort(9201).Fails(Once))
.Ping(r => r.SucceedAlways())
.ClientCalls(c=>c.SucceedAlways())
.Sniff(s => s.SucceedAlways(VirtualClusterWith
.Nodes(3, 9210)
.MasterEligible(9210, 9211)
.Ping(r => r.SucceedAlways())
.Sniff(r => r.SucceedAlways())
.ClientCalls(c=>c.SucceedAlways())
))
))
.SniffingConnectionPool()
.Settings(s => s.SniffOnStartup(false))
);
audit = await audit.TraceCalls(
new ClientCall {
{ PingSuccess, 9200 },
{ HealthyResponse, 9200 },
{ pool => pool.Nodes.Count.Should().Be(5) }
},
new ClientCall {
{ PingFailure, 9201},
{ SniffOnFail }, // 断言对第一个已知的主节点9202进行嗅探
{ SniffSuccess, 9202},
{ PingSuccess, 9200},
{ HealthyResponse, 9200},
{ pool => pool.Nodes.Count.Should().Be(3) } // 连接池现在应该有三个节点
},
new ClientCall {
{ PingFailure, 9201},
{ SniffOnFail }, // 断言对更新后的集群中的第一个主节点进行嗅探
{ SniffSuccess, 9200},
{ PingSuccess, 9210},
{ HealthyResponse, 9210},
{ pool => pool.Nodes.Count.Should().Be(3) }
},
new ClientCall {
{ PingSuccess, 9211 },
{ HealthyResponse, 9211 }
},
new ClientCall {
{ PingSuccess, 9212 },
{ HealthyResponse, 9212 }
},
new ClientCall { { HealthyResponse, 9210 } }, // 嗅探器返回新节点后,已经ping通了9210
new ClientCall { { HealthyResponse, 9211 } },
new ClientCall { { HealthyResponse, 9212 } },
new ClientCall { { HealthyResponse, 9210 } }
);
客户端发布节点地址
var audit = new Auditor(() => VirtualClusterWith
.Nodes(2)
.MasterEligible(9200)
.ClientCalls(c=>c.SucceedAlways())
.Ping(r => r.OnPort(9200).Fails(Once))
.Ping(c=>c.SucceedAlways())
.Sniff(p => p.SucceedAlways(VirtualClusterWith
.Nodes(10)
.MasterEligible(9200, 9202, 9201)
.PublishAddress("10.0.12.1")
.ClientCalls(c=>c.SucceedAlways())
.Ping(c=>c.SucceedAlways())
))
.SniffingConnectionPool()
.Settings(s => s.SniffOnStartup(false))
);
void HostAssert(Audit a, string host, int expectedPort)
{
a.Node.Uri.Host.Should().Be(host);
a.Node.Uri.Port.Should().Be(expectedPort);
}
void SniffUrlAssert(Audit a, string host, int expectedPort)
{
HostAssert(a, host, expectedPort);
var sniffUri = new UriBuilder(a.Node.Uri)
{
Path = RequestPipeline.SniffPath,
Query = "flat_settings=true&timeout=2s"
}.Uri;
sniffUri.PathEquals(a.Path, nameof(SniffUrlAssert));
}
audit = await audit.TraceCalls(
new ClientCall {
{ PingFailure, a => HostAssert(a, "localhost", 9200)},
{ SniffOnFail },
{ SniffSuccess, a => SniffUrlAssert(a, "localhost", 9200)},
{ PingSuccess, a => HostAssert(a, "10.0.12.1", 9200)},
{ HealthyResponse, a => HostAssert(a, "10.0.12.1", 9200)},
{ pool => pool.Nodes.Count.Should().Be(10) } // 连接池现在应该有10个节点
}
);
发表评论
登录用户才能发表评论, 请 登 录 或者 注册