跳过非存活节点

818 发布于: 2021-03-25 读完约需 1 分钟

在选择节点时,连接池将尝试跳过所有被标记为非存活(dead)的节点。

创建视图(CreateView)

CreateView方法是以无锁线程安全的方式实现的,这意味着每个被调用者都会返回自己的游标,以在内部节点列表上前进。这样做是为了保证每个失败的请求都尝试所有节点,而不会影响全局游标。

var seeds = Enumerable.Range(9200, NumberOfNodes).Select(p => new Node(new Uri("http://localhost:" + p))).ToList();
var pool = new StaticConnectionPool(seeds, randomize: false);
for (var i = 0; i < 20; i++)
{
    var node = pool.CreateView().First();
    node.Uri.Port.Should().Be(9200);
    node = pool.CreateView().First();
    node.Uri.Port.Should().Be(9201);
    node = pool.CreateView().First();
    node.Uri.Port.Should().Be(9202);
}

var seeds = Enumerable.Range(9200, NumberOfNodes).Select(p => new Node(new Uri("http://localhost:" + p))).ToList();
seeds.First().MarkDead(DateTime.Now.AddDays(1));
var pool = new StaticConnectionPool(seeds, randomize: false);
for (var i = 0; i < 20; i++)
{
    var node = pool.CreateView().First();
    node.Uri.Port.Should().Be(9201);
    node = pool.CreateView().First();
    node.Uri.Port.Should().Be(9202);
}

在再次标记第一个节点为存活节点之后,我们期望它再次被选中:

seeds.First().MarkAlive();
for (var i = 0; i < 20; i++)
{
    var node = pool.CreateView().First();
    node.Uri.Port.Should().Be(9201);
    node = pool.CreateView().First();
    node.Uri.Port.Should().Be(9202);
    node = pool.CreateView().First();
    node.Uri.Port.Should().Be(9200);
}

var dateTimeProvider = new TestableDateTimeProvider();
var seeds = Enumerable.Range(9200, NumberOfNodes).Select(p => new Node(new Uri("http://localhost:" + p))).ToList();
seeds.First().MarkDead(dateTimeProvider.Now().AddDays(1));
var pool = new StaticConnectionPool(seeds, randomize: false, dateTimeProvider: dateTimeProvider);
for (var i = 0; i < 20; i++)
{
    var node = pool.CreateView().First();
    node.Uri.Port.Should().Be(9201);
    node = pool.CreateView().First();
    node.Uri.Port.Should().Be(9202);
}

如果将时钟向前调两天,模拟时间为明天,那么被标记为死亡的节点应该被复活:

dateTimeProvider.ChangeTime(d => d.AddDays(2));
var n = pool.CreateView().First();
n.Uri.Port.Should().Be(9201);
n = pool.CreateView().First();
n.Uri.Port.Should().Be(9202);
n = pool.CreateView().First();
n.Uri.Port.Should().Be(9200);
n.IsResurrected.Should().BeTrue();

一个有2个节点的集群,其中第二个节点ping失败

var audit = new Auditor(() => VirtualClusterWith
    .Nodes(4)
    .ClientCalls(p => p.Succeeds(Always))
    .ClientCalls(p => p.OnPort(9201).FailAlways())
    .ClientCalls(p => p.OnPort(9203).FailAlways())
    .StaticConnectionPool()
    .Settings(p=>p.DisablePing())
);

await audit.TraceCalls(
    new ClientCall {
        { HealthyResponse, 9200}, // 第一次调用9200成功
        { pool => pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(0) }
    },
    new ClientCall {
        { BadResponse, 9201}, // 第二次调用9201,因为它是第一次使用所以执行ping操作但失败了,所以换到节点9202
        { HealthyResponse, 9202},
        /** 最终我们断言连接池中有一个节点被标记成了非存活 */
        { pool =>  pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(1) }
    },
    new ClientCall {
        { BadResponse, 9203}, // 下一次调用命中9203节点,结果也失败,所以应该结束调用
        { HealthyResponse, 9200},
        { pool => pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(2) }
    },
    new ClientCall {
        { HealthyResponse, 9202},
        { pool => pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(2) }
    },
    new ClientCall {
        { HealthyResponse, 9200},
        { pool => pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(2) }
    },
    new ClientCall {
        { HealthyResponse, 9202},
        { pool => pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(2) }
    },
    new ClientCall {
        { HealthyResponse, 9200},
        { pool => pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(2) }
    }
);

一个有2个节点的集群,其中第二个节点ping失败

var audit = new Auditor(() => VirtualClusterWith
    .Nodes(4)
    .ClientCalls(p => p.Fails(Always))
    .StaticConnectionPool()
    .Settings(p=>p.DisablePing())
);

await audit.TraceCalls(
    new ClientCall {
        { BadResponse, 9200}, // 所有调用均失败
        { BadResponse, 9201},
        { BadResponse, 9202},
        { BadResponse, 9203},
        { MaxRetriesReached },
        { FailedOverAllNodes },
        { pool => pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(4) }
    },
    new ClientCall {
        { AllNodesDead }, // 在所有注册的节点都被标记为“非存活”后,我们希望每次都对一个“非存活”节点进行抽样,以快速查看集群是否恢复,而不希望重试所有4个节点
        { Resurrection, 9201},
        { BadResponse, 9201},
        { pool =>  pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(4) }
    },
    new ClientCall {
        { AllNodesDead },
        { Resurrection, 9202},
        { BadResponse, 9202},
        { pool =>  pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(4) }
    },
    new ClientCall {
        { AllNodesDead },
        { Resurrection, 9203},
        { BadResponse, 9203},
        { pool =>  pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(4) }
    },
    new ClientCall {
        { AllNodesDead },
        { Resurrection, 9200},
        { BadResponse, 9200},
        { pool =>  pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(4) }
    }
);

版权声明:本作品系原创,版权归码友网所有,如未经许可,禁止任何形式转载,违者必究。

发表评论

登录用户才能发表评论, 请 登 录 或者 注册