前言
这是搬运的自己 2 年前写的内容了,由于最近实在是有点忙,就一直没有写新的内容。
目前.net 生态里好像还没有像 spring cloud 那种云原生开发框架的集大成者出现(可能我见识短浅),所以,我们在搭建微服务架构时需要的开发组件,得自己安装。首先,我们要有一个服务注册中心,可能目前比较常用的是 consul 和 zookeeper,他们都支持集群部署,对容器的支持也很好,这里我用的是 consul。
环境搭建
1.安装 consul
先到 consul 的官网https://www.consul.io/去下载对应开发环境的软件,然后开发环境下启动 consul
看到图片中标红的提示,说明 consul 已经正常启动了,consul 启动后,会有一个占用 8500 端口的 http 服务随之启动,可以打开浏览器http://127.0.0.1:8500/,查看 consul 的配置界面。
2.创建项目,接入 consul
创建一个.net core mvc 项目,使其注册到 consul 中心我们新建一个控制器,当做 consul 的心跳检查机制(具体可查看 consul 文档)的访问接口,
[Route("api/[controller]")]
[ApiController]
public class HealthController : ControllerBase
{
[HttpGet]
public IActionResult Get()
{
return Ok("ok");
}
}
复制代码
然后在项目里添加 consul 引用,可在包管理控制台直接输入”install-package consul”,也可以在 nuget 包管理器搜索 consul 安装修改一下 program.cs
public static IHostBuilder CreateHostBuilder(string[] args)
{
var config = new ConfigurationBuilder()
.AddCommandLine(args)
.Build();
String ip = string.IsNullOrEmpty(config["ip"]) ? "127.0.0.1" : config["ip"];
String port = config["port"];
RestTools.StaticParams.service_id = Guid.NewGuid().ToString();//服务编号,提前生成的目的是作为key,标记ip和端口存到redis
if (port == "0" || string.IsNullOrEmpty(port))
{
port = RestTools.Tools.GetRandAvailablePort().ToString();//该方法是随机寻找一个没有被占用的端口,也可以自己指定一个
}
redis.StringSet(RestTools.StaticParams.service_id, ip + ":" + port);
return Host.CreateDefaultBuilder(args)
.ConfigureWebHostDefaults(webBuilder =>
{
webBuilder.UseStartup<Startup>()
.UseUrls($"http://{ip}:{port}");
});
}
private static void ConfigurationOverview(ConsulClientConfiguration obj)
{
obj.Address = new Uri("http://127.0.0.1:8500");
obj.Datacenter = "datacenter";
}
复制代码
然后修改 startup.cs 文件里 Configure 方法,注册相关服务
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
app.UseRouting();
app.UseAuthorization();
app.UseEndpoints(endpoints =>
{
endpoints.MapControllers();
});
string address = redis.StringGet(RestTools.StaticParams.service_id);
string ip = address.Split(':')[0];
int port = Convert.ToInt32(address.Split(':')[1]);
ConsulClient client = new ConsulClient(ConfigurationOverview);
Task<WriteResult> result = client.Agent.ServiceRegister(new AgentServiceRegistration()
{
ID = "MsgService" + RestTools.StaticParams.service_id,//服务编号,不能重复,用Guid最简单
Name = "MsgService",//服务的名字
Address = ip,//我的ip地址(可以被其他应用访问的地址,本地测试可以用127.0.0.1,机房环境中一定要写自己的内网ip地址)
Port = port,//我的端口
Check = new AgentServiceCheck()
{
DeregisterCriticalServiceAfter = TimeSpan.FromSeconds(5),//服务停止多久后反注册
Interval = TimeSpan.FromSeconds(10),//健康检查时间间隔,或者称为心跳间隔
HTTP = $"http://{ip}:{port}/api/health",//健康检查地址,
Timeout = TimeSpan.FromSeconds(5)
}
});
}
复制代码
然后,注意不要使用 iis 或者 iisexpress,以控制台的方式启动项目(kestrel),
然后我们打开刚才的 consul 管理界面,可以看到我们刚注册的服务已经出现了
熔断降级
在控制台程序里,首先引用 polly 和 AspectCore,AspectCore 是一个 AOP 框架,我们可以在项目里引用它,方便的在方法中实现全局拦截,有点像写 webapi 时用到的请求拦截,目的是使系统保持原有清晰的业务逻辑,降低耦合性,提高代码重用性。首先创建一个拦截类。
/// <summary>
/// 熔断框架
/// </summary>
[AttributeUsage(AttributeTargets.Method)]
public class HystrixCommandAttribute : AbstractInterceptorAttribute
{
#region 属性
/// <summary>
/// 最多重试几次,如果为0则不重试
/// </summary>
public int MaxRetryTimes { get; set; } = 0;
/// <summary>
/// 重试间隔的毫秒数
/// </summary>
public int RetryIntervalMilliseconds { get; set; } = 100;
/// <summary>
/// 是否启用熔断
/// </summary>
public bool EnableCircuitBreater { get; set; } = false;
/// <summary>
/// 熔断前出现允许错误几次
/// </summary>
public int ExceptionAllowedBeforeBreaking { get; set; } = 3;
/// <summary>
/// 熔断多长时间(毫秒 )
/// </summary>
public int MillisecondsOfBreak { get; set; } = 1000;
/// <summary>
/// 执行超过多少毫秒则认为超时(0表示不检测超时)
/// </summary>
public int TimeOutMilliseconds { get; set; } = 0;
/// <summary>
/// 缓存多少毫秒(0表示不缓存),用“类名+方法名+所有参数ToString拼接”做缓存Key
/// </summary>
public int CacheTTLMilliseconds { get; set; } = 0;
private IAsyncPolicy policy;
//缓存
private static readonly Microsoft.Extensions.Caching.Memory.IMemoryCache memoryCache = new Microsoft.Extensions.Caching.Memory.MemoryCache(new Microsoft.Extensions.Caching.Memory.MemoryCacheOptions());
/// <summary>
/// 降级方法名
/// </summary>
public string FallBackMethod { get; set; }
#endregion
#region 构造函数
/// <summary>
/// 熔断框架
/// </summary>
/// <param name="fallBackMethod">降级方法名</param>
public HystrixCommandAttribute(string fallBackMethod)
{
this.FallBackMethod = fallBackMethod;
}
#endregion
public override async Task Invoke(AspectContext context, AspectDelegate next)
{
//一个HystrixCommand中保持一个policy对象即可
//其实主要是CircuitBreaker要求对于同一段代码要共享一个policy对象
//根据反射原理,同一个方法就对应一个HystrixCommandAttribute,无论几次调用,
//而不同方法对应不同的HystrixCommandAttribute对象,天然的一个policy对象共享
//因为同一个方法共享一个policy,因此这个CircuitBreaker是针对所有请求的。
//Attribute也不会在运行时再去改变属性的值,共享同一个policy对象也没问题
lock (this)
{
if (policy == null)
{
policy = Policy.Handle<Exception>()
.FallbackAsync(async (ctx, t) => await Task.Run(() =>
{
AspectContext aspectContext = (AspectContext)ctx["aspectContext"];
var fallBackMethod = context.ServiceMethod.DeclaringType.GetMethod(this.FallBackMethod);
Object fallBackResult = fallBackMethod.Invoke(context.Implementation, context.Parameters);
//不能如下这样,因为这是闭包相关,如果这样写第二次调用Invoke的时候context指向的
//还是第一次的对象,所以要通过Polly的上下文来传递AspectContext
//context.ReturnValue = fallBackResult;
aspectContext.ReturnValue = fallBackResult;
}), async (ex, t) => await Task.Run(() => { }));
if (MaxRetryTimes > 0)//重试
{
policy = policy.WrapAsync(Policy.Handle<Exception>().WaitAndRetryAsync(MaxRetryTimes, i => TimeSpan.FromMilliseconds(RetryIntervalMilliseconds)));
}
if (EnableCircuitBreater)//熔断
{
policy = policy.WrapAsync(Policy.Handle<Exception>().CircuitBreakerAsync(ExceptionAllowedBeforeBreaking, TimeSpan.FromMilliseconds(MillisecondsOfBreak)));
}
if (TimeOutMilliseconds > 0)//超时
{
policy = policy.WrapAsync(Policy.TimeoutAsync(() => TimeSpan.FromMilliseconds(TimeOutMilliseconds), Polly.Timeout.TimeoutStrategy.Pessimistic));
}
}
}
//把本地调用的AspectContext传递给Polly,主要给FallBackMethod中使用,避免闭包的坑
Context pollyCtx = new Context();
pollyCtx["aspectContext"] = context;
if (CacheTTLMilliseconds > 0)
{
//用类名+方法名+参数的下划线连接起来作为缓存key
string cacheKey = "HystrixMethodCacheManager_Key_" + context.ServiceMethod.DeclaringType + "." + context.ServiceMethod + string.Join("_", context.Parameters);
//尝试去缓存中获取。如果找到了,则直接用缓存中的值做返回值
if (memoryCache.TryGetValue(cacheKey, out var cacheValue))
{
context.ReturnValue = cacheValue;
}
else
{
//如果缓存中没有,则执行实际被拦截的方法
await policy.ExecuteAsync(ctx => next(context), pollyCtx);
//存入缓存中
using (var cacheEntry = memoryCache.CreateEntry(cacheKey))
{
cacheEntry.Value = context.ReturnValue;
cacheEntry.AbsoluteExpiration = DateTime.Now + TimeSpan.FromMilliseconds(CacheTTLMilliseconds);
}
}
}
else//如果没有启用缓存,就直接执行业务方法
{
await policy.ExecuteAsync(ctx => next(context), pollyCtx);
}
}
}
复制代码
然后我们编写一个 Person 类(aspect 文档里的例子,稍加改造)
public class Person
{
[HystrixCommand(nameof(HelloFallbackAsync), MaxRetryTimes = 3, EnableCircuitBreater = true)]
public virtual async Task<string> HelloAsync(string name)
{
Console.WriteLine("Hello," + name);
#region 抛错
String s = null;
s.ToString();
#endregion
return "ok" + name;
}
[HystrixCommand(nameof(Hello2FallbackAsync))]
public virtual async Task<string> HelloFallbackAsync(string name)
{
Console.WriteLine("降级等级1," + name);
string s = null;
s.ToString();
return "fail_1";
}
public virtual async Task<string> Hello2FallbackAsync(string name)
{
Console.WriteLine("降级等级2," + name);
return "fail_2";
}
[HystrixCommand(nameof(FallAdd))]
public virtual int Add(int i, int j)
{
//string s = null;
//s.ToString();
return i + j;
}
public virtual int FallAdd(int i, int j)
{
return 0;
}
[HystrixCommand(nameof(TestFallBack), CacheTTLMilliseconds = 3000)]
public virtual void Test(int i)
{
Console.WriteLine("Test," + i);
}
public virtual void TestFallBack(int i)
{
Console.WriteLine("Test" + i);
}
}
复制代码
然后在 program.cs 主函数里编写
static void Main(string[] args)
{
ProxyGeneratorBuilder proxyGeneratorBuilder = new ProxyGeneratorBuilder();
using (IProxyGenerator proxyGenerator = proxyGeneratorBuilder.Build())
{
Person p = proxyGenerator.CreateClassProxy<Person>();
Console.WriteLine(p.HelloAsync("Hello World").Result);
Console.WriteLine(p.Add(1, 2));
while (true)
{
Console.WriteLine(p.HelloAsync("Hello World").Result);
Thread.Sleep(100);
}
}
}
复制代码
执行看下效果
在分享一张熔断器的设计图(来自知乎大神)
注册服务
我们用 Ocelot 来做个测试网关,集成前面在 consul 注册的服务。我们新建一个网站项目,然后分别引用 Ocelot 和 Ocelot.Provider.Consul 两个包,然后新建 configuration.json 文件(这个文件里的具体参数解释,Ocelot 的文档里都有,我在注释里简单介绍下)
{
"ReRoutes": [
{
"DownstreamPathTemplate": "/api/sms/{url}",//下游请求链接,也就是转发的地址
"DownstreamScheme": "http",
"DownstreamHostAndPorts": [
{
"Host": "10.185.1.41", //服务的地址
"Port": 39688 //服务的端口
}
],
"UpstreamPathTemplate": "/sms/{url}",//上游地址,也就是通过网关直接访问的地址
"UpstreamHttpMethod": [ "Get", "Post" ],//识别的请求方式
"ServiceName": "MsgService",//在consul里注册的服务名称
"LoadBalancerOptions": {
"Type": "RoundRobin"
},
"UseServiceDiscovery": true
},
{
"DownstreamPathTemplate": "/api/email/{url}",
"DownstreamScheme": "http",
"DownstreamHostAndPorts": [
{
"Host": "localhost",
"Port": 29216
}
],
"UpstreamPathTemplate": "/email/{url}",
"UpstreamHttpMethod": [ "Get", "Post" ],
"ServiceName": "EmailService",
"LoadBalancerOptions": {
"Type": "RoundRobin"
},
"UseServiceDiscovery": true
}
],
"GlobalConfiguration": {
"ServiceDiscoveryProvider": {
"Host": "127.0.0.1",
"Port": 8500
}
}
}
复制代码
然后我们在 program.cs 里修改 CreateHostBuilder
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureWebHostDefaults(webBuilder =>
{
webBuilder.UseStartup<Startup>();
}).ConfigureAppConfiguration(conf =>
{
conf.AddJsonFile("configuration.json", optional: false, reloadOnChange: true);
});
复制代码
接着在 startup.cs 里引入 using Ocelot.DependencyInjection;using Ocelot.Middleware;using Ocelot.Provider.Consul;三个命名空间,然后修改 ConfigureServices 和 Configuration
public void ConfigureServices(IServiceCollection services)
{
services.AddControllers();
services.AddOcelot()
.AddConsul();
}
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
app.UseRouting();
app.UseAuthorization();
app.UseOcelot().Wait();//不要忘了写Wait
app.UseEndpoints(endpoints =>
{
endpoints.MapControllers();
});
}
复制代码
然后启动项目,因为我们之前注册了 MsgService 服务,同样的方式在注册一个 EmailServices 服务注册完成后,在 consul 配置节目看到如下结果
接着我们可以试一下 Ocelot 为我们提供的网关效果,比如,我在浏览器地址栏输入http://localhost:5000/sms/send?msg=我是转发来的
效果和直接请求服务http://10.185.1.41:39688/api/sms/send?msg=我是转发来的效果是一样的,但是 Ocelot 帮我们做好了转发,所以只需要根据在配置文件里输入的上游请求模板,即可得到从该服务处获取的数据再看下 MsgService 打印的数据结果
呼,就到这吧,真是书到用时方恨少,写不动了。。。还得再多看这些知识。
评论