关于 RocketMQ 事务方面 Demo
作者:友
- 2021 年 12 月 07 日
本文字数:1168 字
阅读完需:约 4 分钟
试想的场景是,订单发生半消息给 broker,但是由于创建订单成功之后就宕机了导致没有二次提交,所以 broker 可以使用回查机制判断是否需要提交。
package main
import (
"context"
"fmt"
"os"
"strconv"
"time"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
)
type OrderListener struct {
// localTrans *sync.Map
// transactionIndex int32
}
func NewOrderListener() *OrderListener {
return &OrderListener{
// localTrans: new(sync.Map),
}
}
//执行事务函数,需要返回消息状态,发生half消息后自动调用
func (dl *OrderListener) ExecuteLocalTransaction(msg *primitive.Message) primitive.LocalTransactionState {
fmt.Println("执行本地事务 :订单创建中....")
fmt.Println("执行本地事务 :订单创建过程中宕机,返回的是unknow")
return primitive.UnknowState
}
//回查函数
func (dl *OrderListener) CheckLocalTransaction(msg *primitive.MessageExt) primitive.LocalTransactionState {
fmt.Println("购物车开始回查事务,订单已经创建成功,执行二阶段提交")
state := primitive.CommitMessageState
switch state {
case 1:
fmt.Printf(" : %v\n", msg)
return primitive.CommitMessageState
case 2:
fmt.Printf("checkLocalTransaction ROLLBACK_MESSAGE: %v\n", msg)
return primitive.RollbackMessageState
case 3:
fmt.Printf("checkLocalTransaction unknow: %v\n", msg)
return primitive.UnknowState
default:
fmt.Printf("checkLocalTransaction default COMMIT_MESSAGE: %v\n", msg)
return primitive.CommitMessageState
}
}
func main() {
p, _ := rocketmq.NewTransactionProducer(
NewOrderListener(),
producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
)
err := p.Start()
if err != nil {
fmt.Printf("start producer error: %s\n", err.Error())
os.Exit(1)
}
for i := 0; i < 5; i++ {
res, err := p.SendMessageInTransaction(context.Background(), primitive.NewMessage("Order",
[]byte("创建订单 : "+strconv.Itoa(i))))
if err != nil {
fmt.Printf("send message error: %s\n", err)
} else {
fmt.Println("发送状态:", res.Status)
fmt.Println("消息id:", res.MsgID)
fmt.Printf("send message success: result=%s\n", res.String())
}
}
//sleep 暂时不关闭 等待回查
time.Sleep(5 * time.Minute)
err = p.Shutdown()
if err != nil {
fmt.Printf("shutdown producer error: %s", err.Error())
}
}
复制代码
划线
评论
复制
发布于: 4 小时前阅读数: 5
版权声明: 本文为 InfoQ 作者【友】的原创文章。
原文链接:【http://xie.infoq.cn/article/04c4545691dbd19b36b46770a】。文章转载请联系作者。
友
关注
爱技术,记录学习过程 分享分享生活 2021.03.24 加入
喜欢数据结构与操作系统,努力学习中。 目前自学go 希望以后能成为一个合格的go程序员。
评论