写点什么

微服务实践之分布式定时任务

用户头像
Kevin Wan
关注
发布于: 2021 年 03 月 07 日

作者:Mikael


承接上篇:上篇文章讲到改造 go-zero 生成的 app module 中的 gateway & RPC 。本篇讲讲如何接入 异步任务 以及 log 的使用。

Delay Job


日常任务开放中,我们会有很多异步、批量、定时、延迟任务要处理,go-zero 中有 go-queue,推荐使用 go-queue 去处理,go-queue 本身也是基于 go-zero 开发的,其本身是有两种模式:


  • dq : 依赖于 beanstalkd ,分布式,可存储,延迟、定时设置,关机重启可以重新执行,消息会丢失,使用非常简单,go-queue 中使用了 redis setnx 保证了每个消息只被消费一次,使用场景主要是用来做日常任务使用

  • kq:依赖于 kafka ,这个就不多介绍啦,大名鼎鼎的 kafka ,使用场景主要是做日志用


我们主要说一下 dq,kq 使用也一样的,只是依赖底层不同,如果没使用过 beanstalkd,没接触过 beanstalkd 的可以先 google 一下,使用起来还是挺容易的。


我在 jobs 下使用 goctl 新建了一个 message-job.api 服务


info(	title: //消息任务	desc: // 消息任务	author: "Mikael"	email: "13247629622@163.com")
type BatchSendMessageReq {}
type BatchSendMessageResp {}
service message-job-api { @handler batchSendMessageHandler // 批量发送短信 post batchSendMessage(BatchSendMessageReq) returns(BatchSendMessageResp)}
复制代码


因为不需要使用路由,所以 handler 下的 routes.go 被我删除了,在 handler 下新建了一个 jobRun.go,内容如下:


package handler
import ( "fishtwo/app/jobs/message/internal/svc" "github.com/tal-tech/go-zero/core/threading")

/*** @Description 启动job* @Author Mikael* @Date 2021/1/18 12:05* @Version 1.0**/
func JobRun(serverCtx *svc.ServiceContext) { threading.GoSafe(func() { batchSendMessageHandler(serverCtx) //...many job })}
复制代码


其实 threading.GoSafe 就是 go batchSendMessageHandler(serverCtx) ,封装了一下 goroutine,防止 goroutine panic


然后修改一下启动文件 message-job.go


package main
import ( "flag" "fmt"
"fishtwo/app/jobs/message/internal/config" "fishtwo/app/jobs/message/internal/handler" "fishtwo/app/jobs/message/internal/svc"
"github.com/tal-tech/go-zero/core/conf" "github.com/tal-tech/go-zero/rest")
var configFile = flag.String("f", "etc/message-job-api.yaml", "the config file")
func main() { flag.Parse()
var c config.Config conf.MustLoad(*configFile, &c)
ctx := svc.NewServiceContext(c) server := rest.MustNewServer(c.RestConf) defer server.Stop()
handler.JobRun(ctx)
fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port) server.Start()}
复制代码


主要是 handler.RegisterHandlers(server, ctx) 修改为 handler.JobRun(ctx)


接下来,我们就可以引入 dq 了,首先在 etc/xxx.yaml 下添加 dqConf


.....
DqConf: Beanstalks: - Endpoint: 127.0.0.1:7771 Tube: tube1 - Endpoint: 127.0.0.1:7772 Tube: tube2 Redis: Host: 127.0.0.1:6379 Type: node
复制代码


我这里本地用不同端口,模拟开了 2 个节点,7771、7772


在 internal/config/config.go 添加配置解析对象


type Config struct {	....	DqConf dq.DqConf}
复制代码


修改 handler/batchsendmessagehandler.go


package handler
import ( "context" "fishtwo/app/jobs/message/internal/logic" "fishtwo/app/jobs/message/internal/svc" "github.com/tal-tech/go-zero/core/logx")
func batchSendMessageHandler(ctx *svc.ServiceContext){ rootCxt:= context.Background() l := logic.NewBatchSendMessageLogic(context.Background(), ctx) err := l.BatchSendMessage() if err != nil{ logx.WithContext(rootCxt).Error("【JOB-ERR】 : %+v ",err) }}
复制代码


修改 logic 下 batchsendmessagelogic.go,写我们的 consumer 消费逻辑


package logic
import ( "context" "fishtwo/app/jobs/message/internal/svc" "fmt" "github.com/tal-tech/go-zero/core/logx")
type BatchSendMessageLogic struct { logx.Logger ctx context.Context svcCtx *svc.ServiceContext}
func NewBatchSendMessageLogic(ctx context.Context, svcCtx *svc.ServiceContext) BatchSendMessageLogic { return BatchSendMessageLogic{ Logger: logx.WithContext(ctx), ctx: ctx, svcCtx: svcCtx, }}

func (l *BatchSendMessageLogic) BatchSendMessage() error { fmt.Println("job BatchSendMessage start")
l.svcCtx.Consumer.Consume(func(body []byte) { fmt.Printf("job BatchSendMessage %s \n" + string(body)) })
fmt.Printf("job BatchSendMessage finish \n") return nil}
复制代码


这样就大功告成了,启动 message-job.go 就 ok 课


go run message-job.go
复制代码


之后我们就可以在业务代码中向 dq 添加任务,它就可以自动消费了


producer.Delay 向 dq 中投递 5 个延迟任务:


	producer := dq.NewProducer([]dq.Beanstalk{		{			Endpoint: "localhost:7771",			Tube:     "tube1",		},		{			Endpoint: "localhost:7772",			Tube:     "tube2",		},	})
for i := 1000; i < 1005; i++ { _, err := producer.Delay([]byte(strconv.Itoa(i)), time.Second * 1) if err != nil { fmt.Println(err) } }
复制代码


producer.At 可以指定某个时间执行,非常好用,感兴趣的朋友自己可以研究下。


错误日志


在前面说到 gateway 改造时候,如果眼神好的童鞋,在上面的 httpresult.go 中已经看到了 log 的身影:



我们在来看下 rpc 中怎么处理的



是的,我在每个 rpc 启动的 main 中加入了 grpc 拦截器 https://www.yuque.com/tal-tech/go-zero/ttzlo1,那让我们看看 grpc 拦截器里面做了什么



然后我代码里面使用 github/pkg/errors 这个包去处理错误的,这个包还是很好用的




所以呢:


我们在 grpc 中打印日志 logx.WithContext(ctx).Errorf("【RPC-SRV-ERR】 %+v",err)


api 中打印日志 logx.WithContext(r.Context()).Error("【GATEWAY-SRV-ERR】 : %+v ",err)


go-zero 中打印日志,使用 logx.WithContext 会把 trace-id 带入,这样一个请求下来,比如


user-api --> user-srv --> message-srv


那如果 messsage-srv 出错,他们三个是同一个 trace-id ,是不是就可以在 elk 通过输入这个 trace-id 一次性搜索出来这条请求报错堆栈信息呢?当然你也可以接入 jaeger、zipkin、skywalking 等,这个我暂时还没接入。

框架地址

https://github.com/tal-tech/go-zero

欢迎使用 go-zero 并 star 支持我们!👍


发布于: 2021 年 03 月 07 日阅读数: 1097
用户头像

Kevin Wan

关注

保持简单 2017.10.24 加入

go-zero作者

评论

发布
暂无评论
微服务实践之分布式定时任务