在选择节点时,连接池将尝试跳过所有被标记为非存活(dead
)的节点。
创建视图(CreateView)
CreateView
方法是以无锁线程安全的方式实现的,这意味着每个被调用者都会返回自己的游标,以在内部节点列表上前进。这样做是为了保证每个失败的请求都尝试所有节点,而不会影响全局游标。
var seeds = Enumerable.Range(9200, NumberOfNodes).Select(p => new Node(new Uri("http://localhost:" + p))).ToList();
var pool = new StaticConnectionPool(seeds, randomize: false);
for (var i = 0; i < 20; i++)
{
var node = pool.CreateView().First();
node.Uri.Port.Should().Be(9200);
node = pool.CreateView().First();
node.Uri.Port.Should().Be(9201);
node = pool.CreateView().First();
node.Uri.Port.Should().Be(9202);
}
var seeds = Enumerable.Range(9200, NumberOfNodes).Select(p => new Node(new Uri("http://localhost:" + p))).ToList();
seeds.First().MarkDead(DateTime.Now.AddDays(1));
var pool = new StaticConnectionPool(seeds, randomize: false);
for (var i = 0; i < 20; i++)
{
var node = pool.CreateView().First();
node.Uri.Port.Should().Be(9201);
node = pool.CreateView().First();
node.Uri.Port.Should().Be(9202);
}
在再次标记第一个节点为存活节点之后,我们期望它再次被选中:
seeds.First().MarkAlive();
for (var i = 0; i < 20; i++)
{
var node = pool.CreateView().First();
node.Uri.Port.Should().Be(9201);
node = pool.CreateView().First();
node.Uri.Port.Should().Be(9202);
node = pool.CreateView().First();
node.Uri.Port.Should().Be(9200);
}
var dateTimeProvider = new TestableDateTimeProvider();
var seeds = Enumerable.Range(9200, NumberOfNodes).Select(p => new Node(new Uri("http://localhost:" + p))).ToList();
seeds.First().MarkDead(dateTimeProvider.Now().AddDays(1));
var pool = new StaticConnectionPool(seeds, randomize: false, dateTimeProvider: dateTimeProvider);
for (var i = 0; i < 20; i++)
{
var node = pool.CreateView().First();
node.Uri.Port.Should().Be(9201);
node = pool.CreateView().First();
node.Uri.Port.Should().Be(9202);
}
如果将时钟向前调两天,模拟时间为明天,那么被标记为死亡的节点应该被复活:
dateTimeProvider.ChangeTime(d => d.AddDays(2));
var n = pool.CreateView().First();
n.Uri.Port.Should().Be(9201);
n = pool.CreateView().First();
n.Uri.Port.Should().Be(9202);
n = pool.CreateView().First();
n.Uri.Port.Should().Be(9200);
n.IsResurrected.Should().BeTrue();
一个有2个节点的集群,其中第二个节点ping失败
var audit = new Auditor(() => VirtualClusterWith
.Nodes(4)
.ClientCalls(p => p.Succeeds(Always))
.ClientCalls(p => p.OnPort(9201).FailAlways())
.ClientCalls(p => p.OnPort(9203).FailAlways())
.StaticConnectionPool()
.Settings(p=>p.DisablePing())
);
await audit.TraceCalls(
new ClientCall {
{ HealthyResponse, 9200}, // 第一次调用9200成功
{ pool => pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(0) }
},
new ClientCall {
{ BadResponse, 9201}, // 第二次调用9201,因为它是第一次使用所以执行ping操作但失败了,所以换到节点9202
{ HealthyResponse, 9202},
/** 最终我们断言连接池中有一个节点被标记成了非存活 */
{ pool => pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(1) }
},
new ClientCall {
{ BadResponse, 9203}, // 下一次调用命中9203节点,结果也失败,所以应该结束调用
{ HealthyResponse, 9200},
{ pool => pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(2) }
},
new ClientCall {
{ HealthyResponse, 9202},
{ pool => pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(2) }
},
new ClientCall {
{ HealthyResponse, 9200},
{ pool => pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(2) }
},
new ClientCall {
{ HealthyResponse, 9202},
{ pool => pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(2) }
},
new ClientCall {
{ HealthyResponse, 9200},
{ pool => pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(2) }
}
);
一个有2个节点的集群,其中第二个节点ping失败
var audit = new Auditor(() => VirtualClusterWith
.Nodes(4)
.ClientCalls(p => p.Fails(Always))
.StaticConnectionPool()
.Settings(p=>p.DisablePing())
);
await audit.TraceCalls(
new ClientCall {
{ BadResponse, 9200}, // 所有调用均失败
{ BadResponse, 9201},
{ BadResponse, 9202},
{ BadResponse, 9203},
{ MaxRetriesReached },
{ FailedOverAllNodes },
{ pool => pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(4) }
},
new ClientCall {
{ AllNodesDead }, // 在所有注册的节点都被标记为“非存活”后,我们希望每次都对一个“非存活”节点进行抽样,以快速查看集群是否恢复,而不希望重试所有4个节点
{ Resurrection, 9201},
{ BadResponse, 9201},
{ pool => pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(4) }
},
new ClientCall {
{ AllNodesDead },
{ Resurrection, 9202},
{ BadResponse, 9202},
{ pool => pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(4) }
},
new ClientCall {
{ AllNodesDead },
{ Resurrection, 9203},
{ BadResponse, 9203},
{ pool => pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(4) }
},
new ClientCall {
{ AllNodesDead },
{ Resurrection, 9200},
{ BadResponse, 9200},
{ pool => pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(4) }
}
);
发表评论
登录用户才能发表评论, 请 登 录 或者 注册