写点什么

关于 RocketMQ 事务方面 Demo

作者:
  • 2021 年 12 月 07 日
  • 本文字数:1168 字

    阅读完需:约 4 分钟

关于RocketMQ事务方面Demo

试想的场景是,订单发生半消息给 broker,但是由于创建订单成功之后就宕机了导致没有二次提交,所以 broker 可以使用回查机制判断是否需要提交。

package main
import ( "context" "fmt" "os" "strconv" "time"
"github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/primitive" "github.com/apache/rocketmq-client-go/v2/producer")
type OrderListener struct { // localTrans *sync.Map // transactionIndex int32}
func NewOrderListener() *OrderListener { return &OrderListener{ // localTrans: new(sync.Map), }}
//执行事务函数,需要返回消息状态,发生half消息后自动调用func (dl *OrderListener) ExecuteLocalTransaction(msg *primitive.Message) primitive.LocalTransactionState { fmt.Println("执行本地事务 :订单创建中....") fmt.Println("执行本地事务 :订单创建过程中宕机,返回的是unknow") return primitive.UnknowState
}
//回查函数
func (dl *OrderListener) CheckLocalTransaction(msg *primitive.MessageExt) primitive.LocalTransactionState { fmt.Println("购物车开始回查事务,订单已经创建成功,执行二阶段提交") state := primitive.CommitMessageState switch state { case 1: fmt.Printf(" : %v\n", msg) return primitive.CommitMessageState case 2: fmt.Printf("checkLocalTransaction ROLLBACK_MESSAGE: %v\n", msg) return primitive.RollbackMessageState case 3: fmt.Printf("checkLocalTransaction unknow: %v\n", msg) return primitive.UnknowState default: fmt.Printf("checkLocalTransaction default COMMIT_MESSAGE: %v\n", msg) return primitive.CommitMessageState }
}
func main() { p, _ := rocketmq.NewTransactionProducer( NewOrderListener(), producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})), )
err := p.Start() if err != nil { fmt.Printf("start producer error: %s\n", err.Error()) os.Exit(1) }
for i := 0; i < 5; i++ { res, err := p.SendMessageInTransaction(context.Background(), primitive.NewMessage("Order", []byte("创建订单 : "+strconv.Itoa(i)))) if err != nil { fmt.Printf("send message error: %s\n", err) } else { fmt.Println("发送状态:", res.Status) fmt.Println("消息id:", res.MsgID) fmt.Printf("send message success: result=%s\n", res.String()) } }
//sleep 暂时不关闭 等待回查 time.Sleep(5 * time.Minute) err = p.Shutdown() if err != nil { fmt.Printf("shutdown producer error: %s", err.Error()) }}
复制代码


发布于: 4 小时前阅读数: 5
用户头像

关注

爱技术,记录学习过程 分享分享生活 2021.03.24 加入

喜欢数据结构与操作系统,努力学习中。 目前自学go 希望以后能成为一个合格的go程序员。

评论

发布
暂无评论
关于RocketMQ事务方面Demo