.net core grpc consul 实现服务注册 服务发现 负载均衡(二)

在上一篇 .net core grpc 实现通信(一) 中,我们实现的grpc通信在.net core中的可行性,但要在微服务中真正使用,还缺少 服务注册,服务发现及负载均衡等,本篇我们将在 .net core grpc 通信 的基础上加上 服务注册,服务发现,负载均衡。

如对.net core grpc 通信不太熟悉的,可以看上一篇 .net core grpc 实现通信(一) ,然后再看本篇。

grpc(https://grpc.io/)是google发布的一个开源、高性能、通用RPC(Remote Procedure Call)框架,使用HTTP/2协议,支持多路复用,并用ProtoBuf作为序列化工具,提供跨语言、跨平台支持。

Consul(https://www.consul.io)是一个分布式,高可用、支持多数据中心的服务注册、发现、健康检查和配置共享的服务软件,由 HashiCorp 公司用 Go 语言开发。

本次服务注册、发现 通过 Consul Api 来实现,开发过程中结合.net core 依赖注入,切面管道思想等。

 

软件版本

.net core:2.0

grpc:1.11.0

Consul:1.1.0

Consul Nuget注册组件:0.7.2.5

 

项目结构

.net core 代码部分:

Snai.GrpcClient 客户端 .net core 2.0控制台程序

Snai.GrpcService.Hosting 服务端宿主,Api服务注册,asp.net core 2.0网站程序

Snai.GrpcService.Impl 协议方法实现  .net standard 2.0类库

Snai.GrpcService.Protocol 生成协议方法 .net standard 2.0类库

Consul:

conf 配置目录,本次用api注册服务,可以删除

data 缓存数据目录,可清空里面内容

dist Consul UI目录,本次用默认的UI,可以删除

consul.exe 注册软件

startup.bat 执行脚本

 

项目实现

 一、服务端

服务端主要包括Grpc服务端,Consul Api服务注册、健康检查等。

新建Snai.GrpcService解决方案,由于这次加入了 Consul Api 服务注册,所以我们先从 Api 服务注册开始。

1、实现 Consul Api 服务注册

新建 Snai.GrpcService.Hosting 基于Asp.net Core 2.0空网站,在 依赖项 右击 管理NuGet程序包 浏览 找到 Consul 版本0.7.2.5安装,用于Api服务注册使用

新建 appsettings.json 配置文件,配置 GrpcService Grpc服务端IP和端口,HealthService健康检测名称、IP和地址,ConsulService ConsulIP和端口,代码如下

{
  "GrpcService": {
    "IP": "localhost",
    "Port": "5031"
  },
  "HealthService": {
    "Name": "GrpcService",
    "IP": "localhost",
    "Port": "5021"
  },
  "ConsulService": {
    "IP": "localhost",
    "Port": "8500"
  }
}

新建Consul目录,用于放Api注册相关代码

Consul目录下新建Entity目录,在Entity目录下新建HealthService.cs,ConsulService.cs类,分别对应HealthService,ConsulService两个配置项,代码如下

HealthService.cs

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace Snai.GrpcService.Hosting.Consul.Entity
{
    public class HealthService
    {
        public string Name { get; set; }
        public string IP { get; set; }
        public int Port { get; set; }
        
    }
}

 ConsulService.cs

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace Snai.GrpcService.Hosting.Consul.Entity
{
    public class ConsulService
    {
        public string IP { get; set; }
        public int Port { get; set; }
    }
}

 Consul 目录下新建 AppRregister.cs 类,添加 IApplicationBuilder 扩展方法 RegisterConsul,来调用 Consul Api 实现服务注册,代码如下

using Consul;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Options;
using Snai.GrpcService.Hosting.Consul.Entity;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace Snai.GrpcService.Hosting.Consul
{
    public static class AppRregister
    {
        // 服务注册
        public static IApplicationBuilder RegisterConsul(this IApplicationBuilder app, IApplicationLifetime lifetime, IOptions<HealthService> healthService, IOptions<ConsulService> consulService)
        {
            var consulClient = new ConsulClient(x => x.Address = new Uri($"http://{consulService.Value.IP}:{consulService.Value.Port}"));//请求注册的 Consul 地址
            var httpCheck = new AgentServiceCheck()
            {
                DeregisterCriticalServiceAfter = TimeSpan.FromSeconds(5),//服务启动多久后注册
                Interval = TimeSpan.FromSeconds(10),//健康检查时间间隔,或者称为心跳间隔
                HTTP = $"http://{healthService.Value.IP}:{healthService.Value.Port}/health",//健康检查地址
                Timeout = TimeSpan.FromSeconds(5)
            };

            // Register service with consul
            var registration = new AgentServiceRegistration()
            {
                Checks = new[] { httpCheck },
                ID = healthService.Value.Name + "_" + healthService.Value.Port,
                Name = healthService.Value.Name,
                Address = healthService.Value.IP,
                Port = healthService.Value.Port,
                Tags = new[] { $"urlprefix-/{healthService.Value.Name}" }//添加 urlprefix-/servicename 格式的 tag 标签,以便 Fabio 识别
            };

            consulClient.Agent.ServiceRegister(registration).Wait();//服务启动时注册,内部实现其实就是使用 Consul API 进行注册(HttpClient发起)
            lifetime.ApplicationStopping.Register(() =>
            {
                consulClient.Agent.ServiceDeregister(registration.ID).Wait();//服务停止时取消注册
            });

            return app;
        }
    }
}

修改 Startup.cs 代码

加入 Startup(IConfiguration configuration) 构造函数,实现配置注入,如果建的是Web Api或MVC网站,默认是有的

修改 ConfigureServices(IServiceCollection services)  方法,注册全局配置

修改 Configure() 方法,添加健康检查路由地址 app.Map("/health", HealthMap),调用 RegisterConsul 扩展方法实现服务注册

添加 HealthMap(IApplicationBuilder app) 实现health路由。由于只有一个健康检查地址,所以没有建Web Api网站,只建了个空网站

代码如下,注册配置GrpcService 、 注册Rpc服务、启动Rpc服务 后面用到等下讲

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Snai.GrpcService.Hosting.Consul;
using Snai.GrpcService.Hosting.Consul.Entity;
using Snai.GrpcService.Impl;

namespace Snai.GrpcService.Hosting
{
    public class Startup
    {
        public Startup(IConfiguration configuration)
        {
            Configuration = configuration;
        }

        public IConfiguration Configuration { get; }

        // This method gets called by the runtime. Use this method to add services to the container.
        // For more information on how to configure your application, visit https://go.microsoft.com/fwlink/?LinkID=398940
        public void ConfigureServices(IServiceCollection services)
        {
            //注册全局配置
            services.AddOptions();
            services.Configure<Impl.Entity.GrpcService>(Configuration.GetSection(nameof(Impl.Entity.GrpcService)));
            services.Configure<HealthService>(Configuration.GetSection(nameof(HealthService)));
            services.Configure<ConsulService>(Configuration.GetSection(nameof(ConsulService)));

            //注册Rpc服务
            services.AddSingleton<IRpcConfig, RpcConfig>();
        }

        // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
        public void Configure(IApplicationBuilder app, IHostingEnvironment env, IApplicationLifetime lifetime, IOptions<HealthService> healthService, IOptions<ConsulService> consulService, IRpcConfig rpc)
        {
            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
            }

            // 添加健康检查路由地址
            app.Map("/health", HealthMap);

            // 服务注册
            app.RegisterConsul(lifetime, healthService, consulService);

            // 启动Rpc服务
            rpc.Start();
        }

        private static void HealthMap(IApplicationBuilder app)
        {
            app.Run(async context =>
            {
                await context.Response.WriteAsync("OK");
            });
        }
    }
}

 修改 Program.cs 代码,调置网站地址为 .UseUrls("http://localhost:5021"),代码如下

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;

namespace Snai.GrpcService.Hosting
{
    public class Program
    {
        public static void Main(string[] args)
        {
            BuildWebHost(args).Run();
        }

        public static IWebHost BuildWebHost(string[] args) =>
            WebHost.CreateDefaultBuilder(args)
               
.UseUrls("http://localhost:5021")
                .UseStartup<Startup>()
                .Build();
    }
}

 到此 Consul Api 服务注册 已完成,最终项目结构如下:

2、协议编写,将协议生成C#代码

由于在上一篇 .net core grpc 实现通信(一) 有过介绍,这里就简单说下

新建 Snai.GrpcService.Protocol协议类库项目,在 依赖项 右击 管理NuGet程序包 浏览 找到 Grpc.Core 版本1.11.0,Google.Protobuf 版本3.5.1 包下载安装

在根目录下新建msg.proto 文件,编写基于proto3语言的协议代码,用于生成各语言协议,msg.proto 代码如下

syntax = "proto3";

package Snai.GrpcService.Protocol;

service MsgService{
  rpc GetSum(GetMsgNumRequest) returns (GetMsgSumReply){}
}

message GetMsgNumRequest {
  int32 Num1 = 1;
  int32 Num2 = 2;
}

message GetMsgSumReply {
  int32 Sum = 1;
}

新建.net framework 项目类库,引用安装 Grpc.ToolsGoogle.Protobuf.Tools 组件程序包,分别得到 grpc_csharp_plugin.exeprotoc.exe 工具

到package目录下,找到与系统相应的grpc_csharp_plugin.exe、protoc.exe工具,拷到 Snai.GrpcService.Protocol 项目下

Snai.GrpcService.Protocol根目录下新建 ProtocGenerate.cmd 文件,在其中输入以下指令

protoc -I . --csharp_out . --grpc_out . --plugin=protoc-gen-grpc=grpc_csharp_plugin.exe msg.proto

然后直接双击运行,项目下生成了“Msg.cs”和“MsgGrpc.cs”两个文件,这样协议部分的所有工作就完成了,最终项目结构如下:

 

 3、编写协议实现代码

新建 Snai.GrpcService.Impl 实现类库项目,在 依赖项 下载安装Grpc.Core 包,项目引用 Snai.GrpcService.Protocol

新建 Entity 目录,Entity目录下新建 GrpcService.cs 类,对应 Snai.GrpcService.Hosting 项目下 appsettings.json 配置文件的 GrpcService 配置项,代码如下

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace Snai.GrpcService.Impl.Entity
{
    public class GrpcService
    {
        public string IP { get; set; }
        public int Port { get; set; }
    }
}

在根目录下新建 RpcService 目录,在 RpcService 目录下新建 MsgServiceImpl.cs 类,继承 MsgService.MsgServiceBase 协议类,实现服务方法,代码如下

using Grpc.Core;
using Snai.GrpcService.Protocol;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

namespace Snai.GrpcService.Impl.RpcService
{
    public class MsgServiceImpl : MsgService.MsgServiceBase
    {
        public override async Task<GetMsgSumReply> GetSum(GetMsgNumRequest request, ServerCallContext context)
        {
            var result = new GetMsgSumReply();

            result.Sum = request.Num1 + request.Num2;

            Console.WriteLine(request.Num1 + "+" + request.Num2 + "=" + result.Sum);

            return result;
        }
    }
}

 在根目录下新建IRpcConfig.cs接口,定义 Start() 用于Rpc启动基方法,代码如下

using System;
using System.Collections.Generic;
using System.Text;

namespace Snai.GrpcService.Impl
{
    public interface IRpcConfig
    {
        void Start();
    }
}

 在根目录下新建 RpcConfig.cs 类,用于实现 IRpcConfig.cs 接口,启动Rpc服务,代码如下

using Grpc.Core;
using Microsoft.Extensions.Options;
using Snai.GrpcService.Impl.RpcService;
using Snai.GrpcService.Protocol;
using System;
using System.Collections.Generic;
using System.Text;

namespace Snai.GrpcService.Impl
{
    public class RpcConfig: IRpcConfig
    {
        private static Server _server;
        static IOptions<Entity.GrpcService> GrpcSettings;

        public RpcConfig(IOptions<Entity.GrpcService> grpcSettings)
        {
            GrpcSettings = grpcSettings;
        }

        public void Start()
        {
            _server = new Server
            {
                Services = { MsgService.BindService(new MsgServiceImpl()) },
                Ports = { new ServerPort(GrpcSettings.Value.IP, GrpcSettings.Value.Port, ServerCredentials.Insecure) }
            };
            _server.Start();

            Console.WriteLine($"Grpc ServerListening On Port {GrpcSettings.Value.Port}");
        }
    }
}

 在回到Snai.GrpcService.Hosting项目中,在 Startup.cs 中 ConfigureServices 中注册 GrpcService 配置、注册Rpc服务,在 Configure 中 启动Rpc服务 就是上面说到的蓝色字体标识的,如图

最终项目结构如下:

到此服务端的代码实现已完成,下面我们启动Consul和服务端,验证 Api 注册和Grpc启动。

二、Consul和服务端启动

启动Consul,启动Grpc服务、注册服务到Consul

1、启动Consul

首先下载Consul:https://www.consul.io/downloads.html,本项目是windows下进行测试,得到consul.exe

由于本次用Api注册,用Consul默认自带UI,所以conf和dist可删除

清除Consul/data 内容,新建startup.bat文件,输入下面代码,双击启动Consul,本项目测试时一台机器,所以把 本机IP 改成 127.0.0.1

consul agent -server -datacenter=grpc-consul -bootstrap -data-dir ./data -ui -node=grpc-consul1 -bind 本机IP -client=0.0.0.0

 再在Consul目录下启动另一个cmd命令行窗口,输入命令:consul operator raft list-peers 查看状态查看状态,结果如下

 

打开Consul UI:http://localhost:8500 查看情况

Consul 启动成功。

.net core Ocelot Consul 实现API网关 服务注册 服务发现 负载均衡 中后面 Consul 部分,有 Consul 集群搭建等其他介绍,可以去参考看下。

2、启动服务端,启动Grpc服务、注册服务到Consul

由于客户端要实现负载,所以把 Snai.GrpcService.Hosting 项目生成两次,启动两个一样的服务端,只是端口不同

服务5021 地址为5021: .UseUrls("http://localhost:5021"),GrpcService:5031,如下图

 服务5022 修改地址为5022: .UseUrls("http://localhost:5022"),GrpcService:5032,如下图

启动 服务5021和服务5022两个服务端,如下面

看到 Grpc ServerListening On Port 5031,Grpc ServerListening On Port 5032 说明 Grpc 服务端启动成功

看到 Request starting HTTP/1.1 GET http://localhost:5021/health 说明 Consul 健康检查成功

打开Consul服务查看地址 http://localhost:8500/ui/#/grpc-consul/services/GrpcService 查看,两个GrpcService注册成功,健康检查状态正常

到此,Grpc启动正常,Consul Api服务注册、健康检查都正常,下面开始实现Grpc客户端

三、客户端

客户端主要包括Grpc客户端,Consul Api服务发现、负载均衡等。

新建Snai.GrpcClient 控制台程序,在 依赖项 下载安装Grpc.Core 包,项目引用Snai.GrpcService.Protocol,在依赖项下载安装下面工具组件包

用于读取 json配置:Microsoft.Extensions.Configuration,Microsoft.Extensions.Configuration.Json 

用于依赖注入:Microsoft.Extensions.DependencyInjection

用于注入全局配置:Microsoft.Extensions.Options,Microsoft.Extensions.Options.ConfigurationExtensions

在项目根目录下新建 Utils 目录,在 Utils 目录下新建 HttpHelper.cs 类,用于程序内发送http请求,代码如下

using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Text;
using System.Threading.Tasks;
/*
 * 参考 pudefu https://www.cnblogs.com/pudefu/p/7581956.html ,在此表示感谢
 */
namespace Snai.GrpcClient.Utils
{
    public class HttpHelper
    {
        /// <summary>
        /// 同步GET请求
        /// </summary>
        /// <param name="url"></param>
        /// <param name="headers"></param>
        /// <param name="timeout">请求响应超时时间,单位/s(默认100秒)</param>
        /// <returns></returns>
        public static string HttpGet(string url, Dictionary<string, string> headers = null, int timeout = 0)
        {
            using (HttpClient client = new HttpClient())
            {
                if (headers != null)
                {
                    foreach (KeyValuePair<string, string> header in headers)
                    {
                        client.DefaultRequestHeaders.Add(header.Key, header.Value);
                    }
                }
                if (timeout > 0)
                {
                    client.Timeout = new TimeSpan(0, 0, timeout);
                }
                Byte[] resultBytes = client.GetByteArrayAsync(url).Result;
                return Encoding.UTF8.GetString(resultBytes);
            }
        }

        /// <summary>
        /// 异步GET请求
        /// </summary>
        /// <param name="url"></param>
        /// <param name="headers"></param>
        /// <param name="timeout">请求响应超时时间,单位/s(默认100秒)</param>
        /// <returns></returns>
        public static async Task<string> HttpGetAsync(string url, Dictionary<string, string> headers = null, int timeout = 0)
        {
            using (HttpClient client = new HttpClient())
            {
                if (headers != null)
                {
                    foreach (KeyValuePair<string, string> header in headers)
                    {
                        client.DefaultRequestHeaders.Add(header.Key, header.Value);
                    }
                }
                if (timeout > 0)
                {
                    client.Timeout = new TimeSpan(0, 0, timeout);
                }
                Byte[] resultBytes = await client.GetByteArrayAsync(url);
                return Encoding.Default.GetString(resultBytes);
            }
        }


        /// <summary>
        /// 同步POST请求
        /// </summary>
        /// <param name="url"></param>
        /// <param name="postData"></param>
        /// <param name="headers"></param>
        /// <param name="contentType"></param>
        /// <param name="timeout">请求响应超时时间,单位/s(默认100秒)</param>
        /// <param name="encoding">默认UTF8</param>
        /// <returns></returns>
        public static string HttpPost(string url, string postData, Dictionary<string, string> headers = null, string contentType = null, int timeout = 0, Encoding encoding = null)
        {
            using (HttpClient client = new HttpClient())
            {
                if (headers != null)
                {
                    foreach (KeyValuePair<string, string> header in headers)
                    {
                        client.DefaultRequestHeaders.Add(header.Key, header.Value);
                    }
                }
                if (timeout > 0)
                {
                    client.Timeout = new TimeSpan(0, 0, timeout);
                }
                using (HttpContent content = new StringContent(postData ?? "", encoding ?? Encoding.UTF8))
                {
                    if (contentType != null)
                    {
                        content.Headers.ContentType = new System.Net.Http.Headers.MediaTypeHeaderValue(contentType);
                    }
                    using (HttpResponseMessage responseMessage = client.PostAsync(url, content).Result)
                    {
                        Byte[] resultBytes = responseMessage.Content.ReadAsByteArrayAsync().Result;
                        return Encoding.UTF8.GetString(resultBytes);
                    }
                }
            }
        }

        /// <summary>
        /// 异步POST请求
        /// </summary>
        /// <param name="url"></param>
        /// <param name="postData"></param>
        /// <param name="headers"></param>
        /// <param name="contentType"></param>
        /// <param name="timeout">请求响应超时时间,单位/s(默认100秒)</param>
        /// <param name="encoding">默认UTF8</param>
        /// <returns></returns>
        public static async Task<string> HttpPostAsync(string url, string postData, Dictionary<string, string> headers = null, string contentType = null, int timeout = 0, Encoding encoding = null)
        {
            using (HttpClient client = new HttpClient())
            {
                if (headers != null)
                {
                    foreach (KeyValuePair<string, string> header in headers)
                    {
                        client.DefaultRequestHeaders.Add(header.Key, header.Value);
                    }
                }
                if (timeout > 0)
                {
                    client.Timeout = new TimeSpan(0, 0, timeout);
                }
                using (HttpContent content = new StringContent(postData ?? "", encoding ?? Encoding.UTF8))
                {
                    if (contentType != null)
                    {
                        content.Headers.ContentType = new System.Net.Http.Headers.MediaTypeHeaderValue(contentType);
                    }
                    using (HttpResponseMessage responseMessage = await client.PostAsync(url, content))
                    {
                        Byte[] resultBytes = await responseMessage.Content.ReadAsByteArrayAsync();
                        return Encoding.UTF8.GetString(resultBytes);
                    }
                }
            }
        }
    }
}

在项目根目录下新建 Consul 目录,在 Consul 目录下新建 Entity 目录,在 Entity 目录下新建 HealthCheck.cs 类,用于接收 Consul Api发现的信息实体,代码如下

using System;
using System.Collections.Generic;
using System.Text;

namespace Snai.GrpcClient.Consul.Entity
{
    public class HealthCheck
    {
        public string Node { get; set; }
        public string CheckID { get; set; }
        public string Name { get; set; }
        public string Status { get; set; }
        public string Notes { get; set; }
        public string Output { get; set; }
        public string ServiceID { get; set; }
        public string ServiceName { get; set; }
        public string[] ServiceTags { get; set; }
        public dynamic Definition { get; set; }
        public int CreateIndex { get; set; }
        public int ModifyIndex { get; set; }
    }
}

 在 Consul 目录下新建 IAppFind.cs 接口,定义 FindConsul() 用于 Consul 服务发现基方法,代码如下

using System;
using System.Collections.Generic;
using System.Text;

namespace Snai.GrpcClient.Consul
{
    public interface IAppFind
    {
        IEnumerable<string> FindConsul(string ServiceName);
    }
}

 在 Consul 目录下新建 AppFind.cs 类,用于实现 IAppFind.cs 接口,实现 Consul 服务发现方法,代码如下

using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using Snai.GrpcClient.Consul.Entity;
using Snai.GrpcClient.Framework.Entity;
using Snai.GrpcClient.Utils;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace Snai.GrpcClient.Consul
{
    /*
     * 服务发现
     * (服务和健康信息)http://localhost:8500/v1/health/service/GrpcService
     * (健康信息)http://localhost:8500/v1/health/checks/GrpcService
     */
    public class AppFind: IAppFind
    {
        static IOptions<GrpcServiceSettings> GrpcSettings;
        static IOptions<ConsulService> ConsulSettings;

        public AppFind(IOptions<GrpcServiceSettings> grpcSettings, IOptions<ConsulService> consulSettings)
        {
            GrpcSettings = grpcSettings;
            ConsulSettings = consulSettings;
        }
        
        public IEnumerable<string> FindConsul(string ServiceName)
        {
            Dictionary<string, string> headers = new Dictionary<string, string>();

            var consul = ConsulSettings.Value;
            string findUrl = $"http://{consul.IP}:{consul.Port}/v1/health/checks/{ServiceName}";

            string findResult = HttpHelper.HttpGet(findUrl, headers, 5);
            if (findResult.Equals(""))
            {
                var grpcServices = GrpcSettings.Value.GrpcServices;
                return grpcServices.Where(g=>g.ServiceName.Equals(ServiceName,StringComparison.CurrentCultureIgnoreCase)).Select(s => s.ServiceID);
            }

            var findCheck = JsonConvert.DeserializeObject<List<HealthCheck>>(findResult);

            return findCheck.Where(g => g.Status.Equals("passing", StringComparison.CurrentCultureIgnoreCase)).Select(g => g.ServiceID);
        }
    }
}

 在项目根目录下新建 LoadBalance 目录,在 LoadBalance 目录下新建 ILoadBalance.cs 接口,定义 GetGrpcService() 用于负载均衡基方法,代码如下

using Snai.GrpcClient.Framework.Entity;
using System;
using System.Collections.Generic;
using System.Text;

namespace Snai.GrpcClient.LoadBalance
{
    /*
     * 负载均衡接口
     */
    public interface ILoadBalance
    {
        string GetGrpcService(string ServiceName);
    }
}

  LoadBalance 目录下新建 WeightRoundBalance.cs 类,用于实现 ILoadBalance.cs 接口,实现 GetGrpcService() 负载均衡方法,本次负载均衡实现权重轮询算法,代码如下

using Snai.GrpcClient.Consul;
using Snai.GrpcClient.Utils;
using System;
using System.Collections.Generic;
using System.Text;
using System.Linq;
using Snai.GrpcClient.Framework.Entity;
using Microsoft.Extensions.Options;

namespace Snai.GrpcClient.LoadBalance
{
    /*
     * 权重轮询
     */
    public class WeightRoundBalance : ILoadBalance
    {
        int Balance;
        IOptions<GrpcServiceSettings> GrpcSettings;
        IAppFind AppFind;

        public WeightRoundBalance(IOptions<GrpcServiceSettings> grpcSettings, IAppFind appFind)
        {
            Balance = 0;
            GrpcSettings = grpcSettings;
            AppFind = appFind;
        }

        public string GetGrpcService(string ServiceName)
        {
            var grpcServices = GrpcSettings.Value.GrpcServices;

            var healthServiceID = AppFind.FindConsul(ServiceName);

            if (grpcServices == null || grpcServices.Count() == 0 || healthServiceID == null || healthServiceID.Count() == 0)
            {
                return "";
            }

            //健康的服务
            var healthServices = new List<Framework.Entity.GrpcService>();

            foreach (var service in grpcServices)
            {
                foreach (var health in healthServiceID)
                {
                    if (service.ServiceID.Equals(health, StringComparison.CurrentCultureIgnoreCase))
                    {
                        healthServices.Add(service);
                        break;
                    }
                }
            }

            if (healthServices == null || healthServices.Count() == 0)
            {
                return "";
            }

            //加权轮询
            var services = new List<string>();

            foreach (var service in healthServices)
            {
                services.AddRange(Enumerable.Repeat(service.IP + ":" + service.Port, service.Weight));
            }
            
            var servicesArray = services.ToArray();

            Balance = Balance % servicesArray.Length;
            var grpcUrl = servicesArray[Balance];
            Balance = Balance + 1;

            return grpcUrl;
        }
    }
}

在项目根目录下新建 RpcClient 目录,在 RpcClient 目录下新建 IMsgClient.cs 接口,定义 GetSum() 用于Grpc客户端调用基方法,代码如下

using System;
using System.Collections.Generic;
using System.Text;

namespace Snai.GrpcClient.RpcClient
{
    public interface IMsgClient
    {
        void GetSum(int num1, int num2);
    }
}

  RpcClient 目录下新建 MsgClient.cs 类,用于实现 IMsgClient.cs 接口,实现 GetSum() 方法用于Grpc客户端调用,代码如下

using Grpc.Core;
using Microsoft.Extensions.DependencyInjection;
using Snai.GrpcClient.LoadBalance;
using Snai.GrpcService.Protocol;
using System;
using System.Collections.Generic;
using System.Text;

namespace Snai.GrpcClient.RpcClient
{
    public class MsgClient: IMsgClient
    {
        ILoadBalance LoadBalance;
        Channel GrpcChannel;
        MsgService.MsgServiceClient GrpcClient;

        public MsgClient(ILoadBalance loadBalance)
        {
            LoadBalance = loadBalance;

            var grpcUrl = LoadBalance.GetGrpcService("GrpcService");

            if (!grpcUrl.Equals(""))
            {
                Console.WriteLine($"Grpc Service:{grpcUrl}");

                GrpcChannel = new Channel(grpcUrl, ChannelCredentials.Insecure);
                GrpcClient = new MsgService.MsgServiceClient(GrpcChannel);
            }
        }

        public void GetSum(int num1, int num2)
        {
            if (GrpcClient != null)
            {
                GetMsgSumReply msgSum = GrpcClient.GetSum(new GetMsgNumRequest
                {
                    Num1 = num1,
                    Num2 = num2
                });

                Console.WriteLine("Grpc Client Call GetSum():" + msgSum.Sum);
            }
            else
            {
                Console.WriteLine("所有负载都挂掉了!");
            }
        }
    }
}

在项目根目录下新建 Framework 目录,在 Framework 目录下新建 Entity 目录,在 Entity 目录下新建 ConsulService.cs 和 GrpcServiceSettings.cs 类,分别对应配置appsettings.json的 ConsulService,GrpcServiceSettings 两个配置项,代码如下

ConsulService.cs

using System;
using System.Collections.Generic;
using System.Text;

namespace Snai.GrpcClient.Framework.Entity
{
    public class ConsulService
    {
        public string IP { get; set; }
        public int Port { get; set; }
    }
}

 GrpcServiceSettings.cs

using System;
using System.Collections.Generic;
using System.Text;

namespace Snai.GrpcClient.Framework.Entity
{
    public class GrpcServiceSettings
    {
        public List<GrpcService> GrpcServices { get; set; }
    }

    public class GrpcService
    {
        public string ServiceName { get; set; }
        public string ServiceID { get; set; }
        public string IP { get; set; }
        public int Port { get; set; }
        public int Weight { get; set; }
    }
}

Framework 目录下新建 DependencyInitialize.cs 类,定义 AddImplement() 方法用于注册全局配置和类到容器,实现依赖注入,代码如下

using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Snai.GrpcClient.Consul;
using Snai.GrpcClient.Framework.Entity;
using Snai.GrpcClient.LoadBalance;
using Snai.GrpcClient.RpcClient;
using System;
using System.Collections.Generic;
using System.IO;
using System.Text;

namespace Snai.GrpcClient.Framework
{
    /*
     *  IServiceCollection 依赖注入生命周期
     *  AddTransient 每次都是全新的
     *  AddScoped    在一个范围之内只有同一个实例(同一个线程,同一个浏览器请求只有一个实例)
     *  AddSingleton 单例
     */
    public static class DependencyInitialize
    {
        /// <summary>
        /// 注册对象
        /// </summary>
        /// <param name="services">The services.</param>
        /*
         * IAppFind AppFind;
         * 构造函数注入使用 IAppFind appFind
         * AppFind = appFind;
         */
        public static void AddImplement(this IServiceCollection services)
        {
            //添加 json 文件路径
            var builder = new ConfigurationBuilder().SetBasePath(Directory.GetCurrentDirectory()).AddJsonFile("appsettings.json");
            //创建配置根对象
            var configurationRoot = builder.Build();

            //注册全局配置
            services.AddConfigImplement(configurationRoot);

            //注册服务发现
            services.AddScoped<IAppFind, AppFind>();

            //注册负载均衡
            if (configurationRoot["LoadBalancer"].Equals("WeightRound", StringComparison.CurrentCultureIgnoreCase))
            {
                services.AddSingleton<ILoadBalance, WeightRoundBalance>();
            }

            //注册Rpc客户端
            services.AddTransient<IMsgClient, MsgClient>();
        }

        /// <summary>
        /// 注册全局配置
        /// </summary>
        /// <param name="services">The services.</param>
        /// <param name="configurationRoot">The configurationRoot.</param>
        /*  
         *  IOptions<GrpcServiceSettings> GrpcSettings;
         *  构造函数注入使用 IOptions<GrpcServiceSettings> grpcSettings
         *  GrpcSettings = grpcSettings;
         */
        public static void AddConfigImplement(this IServiceCollection services, IConfigurationRoot configurationRoot)
        {
            //注册配置对象
            services.AddOptions();
            services.Configure<GrpcServiceSettings>(configurationRoot.GetSection(nameof(GrpcServiceSettings)));
            services.Configure<ConsulService>(configurationRoot.GetSection(nameof(ConsulService)));
        }
    }
}

 在根目录下新建 appsettings.json 配置文件,配置 GrpcServiceSettings 的 GrpcServices 为服务端发布的两个服务5021和5022,LoadBalancer 负载均衡为 WeightRound 权重轮询(如实现其他负载方法可做相应配置,注册负载均衡时也做相应修改),ConsulService Consul的IP和端口,代码如下

{
  "GrpcServiceSettings": {
    "GrpcServices": [
      {
        "ServiceName": "GrpcService",
        "ServiceID": "GrpcService_5021",
        "IP": "localhost",
        "Port": "5031",
        "Weight": "2"
      },
      {
        "ServiceName": "GrpcService",
        "ServiceID": "GrpcService_5022",
        "IP": "localhost",
        "Port": "5032",
        "Weight": "1"
      }
    ]
  },
  "LoadBalancer": "WeightRound",
  "ConsulService": {
    "IP": "localhost",
    "Port": "8500"
  }
}

GrpcServices Grpc服务列表

  ServiceName:服务名称,负载同一服务名称相同

  ServiceID:服务ID,保持唯一

  IP:服务IP

  Port:端口

  Weight:服务权重

 修改 Program.cs 的 Main() 方法,调用 AddImplement(),注册全局配置和类到容器,注入使用 MsgClient 类的 GetSum() 方法,实现 Grpc 调用,代码如下

using Microsoft.Extensions.DependencyInjection;
using Snai.GrpcClient.Framework;
using Snai.GrpcClient.RpcClient;
using System;

namespace Snai.GrpcClient
{
    class Program
    {
        static void Main(string[] args)
        {
            IServiceCollection service = new ServiceCollection();

            //注册对象
            service.AddImplement();

            //注入使用对象
            var provider = service.BuildServiceProvider();

            string exeArg = string.Empty;
            Console.WriteLine("Grpc调用!");
            Console.WriteLine("-c\t调用Grpc服务;");
            Console.WriteLine("-q\t退出服务;");

            while (true)
            {
                exeArg = Console.ReadKey().KeyChar.ToString();
                Console.WriteLine();

                if (exeArg.ToLower().Equals("c", StringComparison.CurrentCultureIgnoreCase))
                {
                    //调用服务
                    var rpcClient = provider.GetService<IMsgClient>();
                    rpcClient.GetSum(10, 2);
                }
                else if (exeArg.ToLower().Equals("q", StringComparison.CurrentCultureIgnoreCase))
                {
                    break;
                }
                else
                {
                    Console.WriteLine("参数异常!");
                }
            }
        }
    }
}

右击项目生成,最终项目结构如下:

 

到此客户端的代码实现已完成,下面运行测试 Grpc+Consul 服务注册、服务发现和负载均衡。

四、运行测试 Grpc+Consul 服务注册、服务发现和负载均衡

 双击 startup.bat 启动 Consul,再启动服务5021和5022,启动成功打开 http://localhost:8500/ui/#/grpc-consul/services/GrpcService 查看服务情况

启动 Snai.GrpcClient 客户端

输入 c 调用Grpc服务,调用3次,5031调用2次,5032调用1次,成功实现负载均衡

关掉服务5022,等10秒左右(因为设置健康检查时间间隔10秒),再输入 c 调用Grpc服务,只调用5031

打开 http://localhost:8500/ui/#/grpc-consul/services/GrpcService 查看,5022 状态失败,或消失

Grpc+Consul实现服务注册、服务发现、健康检查和负载均衡已完成

Github源码地址:https://github.com/Liu-Alan/Grpc-Consul

博客地址:https://www.snaill.net/post/2

posted @ 2018-08-19 07:57  蜗牛丨  阅读(10390)  评论(15编辑  收藏  举报