嗅探角色检测

826 发布于: 2021-03-25 读完约需 3 分钟

当嗅探集群状态时,我们会检测每个节点的角色,例如:它是否符合主节点的条件,是否是数据节点等等。然后,我们可以根据这些信息选择相应的节点进行API调用。

var audit = new Auditor(() => VirtualClusterWith
    .Nodes(10)
    .Sniff(s => s.Fails(Always))
    .Sniff(s => s.OnPort(9202)
        .Succeeds(Always, VirtualClusterWith.Nodes(8).MasterEligible(9200, 9201, 9202))
    )
    .SniffingConnectionPool()
    .AllDefaults()
)
{
    AssertPoolBeforeStartup = (pool) =>
    {
        pool.Should().NotBeNull();
        pool.Nodes.Should().HaveCount(10);
        pool.Nodes.Where(n => n.MasterEligible).Should().HaveCount(10);
    },
    AssertPoolAfterStartup = (pool) =>
    {
        pool.Should().NotBeNull();
        pool.Nodes.Should().HaveCount(8);
        pool.Nodes.Where(n => n.MasterEligible).Should().HaveCount(3);
    }
};

await audit.TraceStartup();

var audit = new Auditor(() => VirtualClusterWith
    .Nodes(10)
    .Sniff(s => s.Fails(Always))
    .Sniff(s => s.OnPort(9202)
        .Succeeds(Always, VirtualClusterWith.Nodes(8).StoresNoData(9200, 9201, 9202))
    )
    .SniffingConnectionPool()
    .AllDefaults()
)
{
    AssertPoolBeforeStartup = (pool) =>
    {
        pool.Should().NotBeNull();
        pool.Nodes.Should().HaveCount(10);
        pool.Nodes.Where(n => n.HoldsData).Should().HaveCount(10);
    },

    AssertPoolAfterStartup = (pool) =>
    {
        pool.Should().NotBeNull();
        pool.Nodes.Should().HaveCount(8);
        pool.Nodes.Where(n => n.HoldsData).Should().HaveCount(5);
    }
};
await audit.TraceStartup();

var audit = new Auditor(() => VirtualClusterWith
    .Nodes(10)
    .Sniff(s => s.SucceedAlways()
        .Succeeds(Always, VirtualClusterWith.Nodes(8).StoresNoData(9200, 9201, 9202).HttpDisabled(9201))
    )
    .SniffingConnectionPool()
    .AllDefaults()
)
{
    AssertPoolBeforeStartup = (pool) =>
    {
        pool.Should().NotBeNull();
        pool.Nodes.Should().HaveCount(10);
        pool.Nodes.Where(n => n.HoldsData).Should().HaveCount(10);
        pool.Nodes.Where(n => n.HttpEnabled).Should().HaveCount(10);
        pool.Nodes.Should().OnlyContain(n => n.Uri.Host == "localhost");
    },

    AssertPoolAfterStartup = (pool) =>
    {
        pool.Should().NotBeNull();
        pool.Nodes.Should().HaveCount(7, "we filtered the node that has no http enabled");
        pool.Nodes.Should().NotContain(n=>n.Uri.Port == 9201);
        pool.Nodes.Where(n => n.HoldsData).Should().HaveCount(5);
    }
};
await audit.TraceStartup();

在本例中,我们创建了一个虚拟集群,其中包含一个嗅探连接池,该连接池为创建了已知的主节。当客户端嗅探启动时,我们看到集群总共有20个节点。其中,符合条件的主节点没有存储数据。

var masterNodes = new[] {9200, 9201, 9202};
var totalNodesInTheCluster = 20;
//
var audit = new Auditor(() => VirtualClusterWith
    .MasterOnlyNodes(masterNodes.Length)
    .ClientCalls(r => r.SucceedAlways())
    .Sniff(s => s.SucceedAlways()
        .Succeeds(Always, VirtualClusterWith
            .Nodes(totalNodesInTheCluster)
            .StoresNoData(masterNodes)
            .MasterEligible(masterNodes)
            .ClientCalls(r => r.SucceedAlways())
        )
    )
    .SniffingConnectionPool()
    .Settings(s=>s.DisablePing())
)
{
    AssertPoolBeforeStartup = pool => // 在嗅探之前,断言只看到3个主节点
    {
        pool.Should().NotBeNull();
        pool.Nodes.Should().HaveCount(3, "we seeded 3 master only nodes at the start of the application");
        pool.Nodes.Where(n => n.HoldsData).Should().HaveCount(0, "none of which hold data");
    },
    AssertPoolAfterStartup = (pool) => // 嗅探之后,断言集群中现在有20个节点
    {
        pool.Should().NotBeNull();
        var nodes = pool.CreateView().ToList();
        nodes.Count().Should().Be(20, "Master nodes are included in the registration of nodes since we still favor sniffing on them");
    }
};

在第一个API调用之前,在9200上发生嗅探之后,断言后续的API调用将命中第一个非主节点

audit = await audit.TraceStartup(new ClientCall
{
    { SniffSuccess, 9200},
    { HealthyResponse, 9203} // 来自9203的响应正常,它不是符合条件的主节点
});

为了验证客户端在向虚拟集群发出请求时的行为是否如我们所预期的那样,进行1000次不同的客户端调用,并断言每个调用都没有发送到任何已知的主节点

var seenNodes = new HashSet<int>();
foreach (var _ in Enumerable.Range(0, 1000))
{
    audit = await audit.TraceCalls(
        new ClientCall {{HealthyResponse, (a) =>
        {
            var port = a.Node.Uri.Port;
            masterNodes.Should().NotContain(port);
            seenNodes.Add(port);
        }}}
    );
}

seenNodes.Should().HaveCount(totalNodesInTheCluster - masterNodes.Length);

节点谓词(Node predicates)

可以在ConnectionSettings上指定谓词,该谓词可用于确定可以对集群中的哪些节点执行API调用操作。

作为示例,我们创建一个带有嗅探连接池的虚拟集群,该连接池开启了20个节点。当客户端嗅探启动时,我们看到集群总共20个节点,但是我们现在可以从集群响应中了解节点的实际配置设置。

var totalNodesInTheCluster = 20;
var setting = "node.attr.rack_id";
var value = "rack_one";
var nodesInRackOne = new[] {9204, 9210, 9213};

var audit = new Auditor(() => VirtualClusterWith
    .Nodes(totalNodesInTheCluster)
    .ClientCalls(r => r.SucceedAlways())
    .Sniff(s => s.SucceedAlways()
        .Succeeds(Always, VirtualClusterWith
            .Nodes(totalNodesInTheCluster)
            .HasSetting(setting, value, nodesInRackOne)
            .ClientCalls(r => r.SucceedAlways())
        )
    )
    .SniffingConnectionPool()
    .Settings(s=>s
        .DisablePing() // 为测试方便,此处禁用了Ping
        .NodePredicate(node => // 我们只想对rack_one中的节点执行API调用
            node.Settings.ContainsKey(setting) &&
            node.Settings[setting].ToString() == value
        )
    )
)
{
    AssertPoolAfterStartup = pool => // 在启动时嗅探之后,断言客户端会对包含rack_one中的三个节点进行API调用
    {
        pool.Should().NotBeNull();
        var nodes = pool.CreateView().ToList();
        nodes.Count(n => n.Settings.ContainsKey(setting)).Should().Be(3, "only three nodes are in rack_one");
    }
};

设置好集群后,断言嗅探在第一个API调用之前发生在9200,并且该API调用命中rack_one中的第一个节点

audit = await audit.TraceStartup(new ClientCall
{
    { SniffSuccess, 9200},
    { HealthyResponse, 9204}
});

为了证明客户端工作正常,执行1000个不同的客户端调用,并断言每个调用都只发送到rack_one中的一个节点,并遵守连接设置上的节点谓词设置:

var seenNodes = new HashSet<int>();
foreach (var _ in Enumerable.Range(0, 1000))
{
    audit = await audit.TraceCalls(
        new ClientCall {{HealthyResponse, (a) =>
        {
            var port = a.Node.Uri.Port;
            nodesInRackOne.Should().Contain(port);
            seenNodes.Add(port);
        }}}
    );
}

seenNodes.Should().HaveCount(nodesInRackOne.Length);

作为节点谓词的另一个示例,我们设置一个带有错误节点谓词的虚拟集群,即过滤掉客户端API调用目标的所有节点的谓词

var totalNodesInTheCluster = 20;

var audit = new Auditor(() => VirtualClusterWith
    .Nodes(totalNodesInTheCluster)
    .Sniff(s => s.SucceedAlways()
        .Succeeds(Always, VirtualClusterWith.Nodes(totalNodesInTheCluster))
    )
    .SniffingConnectionPool()
    .Settings(s => s
        .DisablePing()
        .NodePredicate(node => false) // 拒绝所有节点的谓词
    )
);

现在,当进行客户端调用时,审计跟踪指示启动时的嗅探成功,但是后续的API调用失败,因为节点谓词过滤掉了执行API调用的目标的所有节点

await audit.TraceUnexpectedElasticsearchException(new ClientCall
{
    { SniffOnStartup }, // 审计跟踪指示在启动时第一次进行嗅探
    { SniffSuccess }, // 嗅探成功,因为在嗅探时节点谓词被忽略
    { NoNodesAttempted } // 当尝试执行实际的API调用时,谓词会阻止尝试任何节点
}, e =>
{
    e.FailureReason.Should().Be(PipelineFailure.Unexpected);

    Func<string> debug = () => e.DebugInformation;
    debug.Invoking(s => s.Invoke()).Should().NotThrow();
});

版权声明:本作品系原创,版权归码友网所有,如未经许可,禁止任何形式转载,违者必究。

发表评论

登录用户才能发表评论, 请 登 录 或者 注册