写点什么

Golang 数据库事务实践

作者:俞凡
  • 2024-04-02
    上海
  • 本文字数:5834 字

    阅读完需:约 19 分钟

事务是很多业务的基础,本文介绍了如何在 Golang 里实现数据库事务操作,并以一个用户注册场景给出了完整实现。原文: Transactions in Go application



Go 是一种年轻而强大的语言,专为编写小型、简单的服务而创建。但随着时间推移,越来越多复杂应用和系统也在采用 Go 进行开发,这就出现了一些问题:如何处理事务?


为了深入探讨这个问题,我们假设一个简单的业务场景:用户注册


作为一个系统,我希望在注册时创建用户和个人资料


RDBMS/DBMS 的现代 Go 库不像 C# 和 Java 的 Hibernate、Entity Framework 那样强大,因此我们必须自己处理。为了实现用户注册业务场景,我们将创建并评估几种处理事务的方法。


由于每种事务处理方法都必须与 sql.DB 和 sql.Tx 配合使用,因此需要引入接口来封装对数据库的访问。


生成的应用有两个域实体和一个用于访问数据库的 DB 低级接口。


package model
type User struct { Email string}
type Profile struct { Name string}
复制代码


package transaction
type DB interface { QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) PrepareContext(ctx context.Context, query string) (*sql.Stmt, error)}
复制代码


准备工作完成后,就可以采用如下两种方法。

1. 事务感知上下文

工作原理:transaction.Manager启动事务并将其放入上下文。当存储库执行查询时,助手会检查上下文中是否有事务,并使用创建的事务来执行查询,或者如果上下文为空,则不使用事务来执行查询。


为了启动事务,我们需要实体:Manager


package transaction
type Manager interface { Run( ctx context.Context, callback func(ctx context.Context) error, ) error}
复制代码


transaction.Manager 实现


package transaction
import ( "context" "database/sql" "github.com/pkg/errors" "go.uber.org/multierr")
type txKey string
var ctxWithTx = txKey("tx")
type SQLTransactionManager struct { db *sql.DB}
func NewManager(db *sql.DB) *SQLTransactionManager { return &SQLTransactionManager{db: db}}
func (m *SQLTransactionManager) Run( ctx context.Context, callback func(ctx context.Context) error,) (rErr error) { tx, err := m.db.BeginTx(ctx, &sql.TxOptions{}) if err != nil { return errors.WithStack(err) }
defer func() { if rErr != nil { rErr = multierr.Combine(rErr, errors.WithStack(tx.Rollback())) } }()
defer func() { if rec := recover(); rec != nil { if e, ok := rec.(error); ok { rErr = e } else { rErr = errors.Errorf("%s", rec) } } }()
if err = callback(putTxToContext(ctx, tx)); err != nil { return err }
return errors.WithStack(tx.Commit())}
func ExtractTxFromContext(ctx context.Context) (*sql.Tx, bool) { tx := ctx.Value(ctxWithTx)
if t, ok := tx.(*sql.Tx); ok { return t, true }
return nil, false}
func putTxToContext(ctx context.Context, tx *sql.Tx) context.Context { return context.WithValue(ctx, ctxWithTx, tx)}
复制代码


DB 实现


package storage
import ( "brand/transaction/example1/transaction" "context" "database/sql")
type DB struct { db *sql.DB}
func NewDB(db *sql.DB) *DB { return &DB{db: db}}
func (d *DB) QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row { tx, ok := transaction.ExtractTxFromContext(ctx) if !ok { return d.db.QueryRowContext(ctx, query, args...) }
return tx.QueryRowContext(ctx, query, args...)}
func (d *DB) QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error) { tx, ok := transaction.ExtractTxFromContext(ctx) if !ok { return d.db.QueryContext(ctx, query, args...) }
return tx.QueryContext(ctx, query, args...)}
func (d *DB) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) { tx, ok := transaction.ExtractTxFromContext(ctx) if !ok { return d.db.ExecContext(ctx, query, args...) }
return tx.ExecContext(ctx, query, args...)}
func (d *DB) PrepareContext(ctx context.Context, query string) (*sql.Stmt, error) { tx, ok := transaction.ExtractTxFromContext(ctx) if !ok { return d.db.PrepareContext(ctx, query) }
return tx.PrepareContext(ctx, query)}
复制代码


RegistrationService 负责用户注册业务场景


package service
import ( "brand/transaction/example1/model" "brand/transaction/example1/transaction" "context")
type UserRepository interface { Create(ctx context.Context, user *model.User) error}
type ProfileRepository interface { Create(ctx context.Context, user *model.Profile) error}
type RegistrationData struct { Email string Name string}
type RegistrationService struct { transactionManager transaction.Manager userRepository UserRepository profileRepository ProfileRepository}
func NewRegistrationService( transactionManager transaction.Manager, userRepository UserRepository, profileRepository ProfileRepository,) *RegistrationService { return &RegistrationService{ transactionManager: transactionManager, userRepository: userRepository, profileRepository: profileRepository, }}
func (s *RegistrationService) Register(ctx context.Context, data RegistrationData) error { return s.transactionManager.Run(ctx, func(ctx context.Context) error { if err := s.userRepository.Create(ctx, &model.User{ Email: data.Email, }); err != nil { return err }
if err := s.profileRepository.Create(ctx, &model.Profile{ Name: data.Name, }); err != nil { return err }
return nil })}
复制代码


UserProfileRepository的实现


package storage
import ( "brand/transaction" "brand/transaction/example1/model" "context")
type ProfileRepository struct { db transaction.DB}
func NewProfileRepository(db transaction.DB) *ProfileRepository { return &ProfileRepository{db: db}}
func (r *ProfileRepository) Create(ctx context.Context, profile *model.Profile) error { _, err := r.db.ExecContext(ctx, "INSERT ...", profile.Name)
return err}
复制代码


package storage
import ( "brand/transaction" "brand/transaction/example1/model" "context")
type UserRepository struct { db transaction.DB}
func NewUserRepository(db transaction.DB) *UserRepository { return &UserRepository{db: db}}
func (r *UserRepository) Create(ctx context.Context, user *model.User) error { _, err := r.db.ExecContext(ctx, "INSERT ...", user.Email)
return err}
复制代码


优点


  • 简单:存储库会自动使用由 TransactionManager 启动的事务

  • 与存储无关:客户端代码对存储类型一无所知


缺点


  • 不符合 Go 的使用习惯

  • 控制较少:无法防止在事务中启动事务,可能会产生意想不到的副作用,代码审查时必须考虑到这一点

2. 事务感知存储库

工作原理:事务管理器启动事务并将事务放入回调,存储库工厂方法使用事务创建自己。


为了启动事务,我们需要实体:Manager


type Manager interface { Run(  ctx context.Context,  callback func(ctx context.Context, tx *sql.Tx) error, ) error}
复制代码


transaction.Manager 实现


package transaction
import ( "context" "database/sql" "github.com/pkg/errors" "go.uber.org/multierr")
type txKey string
var ctxWithTx = txKey("tx")
type SQLTransactionManager struct { db *sql.DB}
func NewManager(db *sql.DB) *SQLTransactionManager { return &SQLTransactionManager{db: db}}
func (m *SQLTransactionManager) Run( ctx context.Context, callback func(ctx context.Context, tx *sql.Tx) error,) (rErr error) { tx, err := m.db.BeginTx(ctx, &sql.TxOptions{}) if err != nil { return errors.WithStack(err) }
defer func() { if rErr != nil { rErr = multierr.Combine(rErr, errors.WithStack(tx.Rollback())) } }()
defer func() { if rec := recover(); rec != nil { if e, ok := rec.(error); ok { rErr = e } else { rErr = errors.Errorf("%s", rec) } } }()
if err = callback(ctx, tx); err != nil { return err }
return errors.WithStack(tx.Commit())}
复制代码


DB 实现


package storage
import ( "context" "database/sql")
type DB struct { db *sql.DB}
func NewDB(db *sql.DB) *DB { return &DB{db: db}}
func (d *DB) QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row { return d.db.QueryRowContext(ctx, query, args...)}
func (d *DB) QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error) { return d.db.QueryContext(ctx, query, args...)}
func (d *DB) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) { return d.db.ExecContext(ctx, query, args...)}
func (d *DB) PrepareContext(ctx context.Context, query string) (*sql.Stmt, error) { return d.db.PrepareContext(ctx, query)}
复制代码


RegistrationService 负责用户注册业务场景


有两种方法可以创建带有事务的存储库:


  • 存储库带有结构方法 WithTransaction(示例中使用了该方法)

  • 存储库工厂 userRepositoryFactory.CreateFromTransaction(tx)


package service
import ( "brand/transaction/example2/model" "brand/transaction/example2/transaction" "context" "database/sql")
type UserRepository interface { Create(ctx context.Context, user *model.User) error WithTransaction(tx *sql.Tx) UserRepository}
type ProfileRepository interface { Create(ctx context.Context, user *model.Profile) error WithTransaction(tx *sql.Tx) ProfileRepository}
type RegistrationData struct { Email string Name string}
type RegistrationService struct { transactionManager transaction.Manager userRepository UserRepository profileRepository ProfileRepository}
func NewRegistrationService( transactionManager transaction.Manager, userRepository UserRepository, profileRepository ProfileRepository,) *RegistrationService { return &RegistrationService{ transactionManager: transactionManager, userRepository: userRepository, profileRepository: profileRepository, }}
func (s *RegistrationService) Register(ctx context.Context, data RegistrationData) error { return s.transactionManager.Run(ctx, func(ctx context.Context, tx *sql.Tx) error { userRepository := s.userRepository.WithTransaction(tx) profileRepository := s.profileRepository.WithTransaction(tx)
if err := userRepository.Create(ctx, &model.User{ Email: data.Email, }); err != nil { return err }
if err := profileRepository.Create(ctx, &model.Profile{ Name: data.Name, }); err != nil { return err }
return nil })}
复制代码


UserProfileRepository的实现


package storage
import ( "brand/transaction" "brand/transaction/example2/model" "brand/transaction/example2/service" "context" "database/sql")
type ProfileRepository struct { db transaction.DB}
func NewProfileRepository(db transaction.DB) *ProfileRepository { return &ProfileRepository{db: db}}
func (r *ProfileRepository) Create(ctx context.Context, profile *model.Profile) error { _, err := r.db.ExecContext(ctx, "INSERT ...", profile.Name)
return err}
func (r *ProfileRepository) WithTransaction(tx *sql.Tx) service.ProfileRepository { return NewProfileRepository(tx)}
复制代码


package storage
import ( "brand/transaction" "brand/transaction/example2/model" "brand/transaction/example2/service" "context" "database/sql")
type UserRepository struct { db transaction.DB}
func NewUserRepository(db transaction.DB) *UserRepository { return &UserRepository{db: db}}
func (r *UserRepository) Create(ctx context.Context, user *model.User) error { _, err := r.db.ExecContext(ctx, "INSERT ...", user.Email)
return err}
func (r *UserRepository) WithTransaction(tx *sql.Tx) service.UserRepository { return NewUserRepository(tx)}
复制代码


优点


  • 更明确:在注册服务内部创建事务,可避免副作用


缺点


  • 客户端代码知道存储类型

  • 客户端代码负责创建新的存储库


我相信任何一种方法都能使代码更易读、更简单,但建议使用第一种方法,从而可以隐藏存储细节,使我们能够在一个项目中使用多个存储,而无需考虑实现和存储细节。




你好,我是俞凡,在 Motorola 做过研发,现在在 Mavenir 做技术工作,对通信、网络、后端架构、云原生、DevOps、CICD、区块链、AI 等技术始终保持着浓厚的兴趣,平时喜欢阅读、思考,相信持续学习、终身成长,欢迎一起交流学习。为了方便大家以后能第一时间看到文章,请朋友们关注公众号"DeepNoMind",并设个星标吧,如果能一键三连(转发、点赞、在看),则能给我带来更多的支持和动力,激励我持续写下去,和大家共同成长进步!

发布于: 刚刚阅读数: 4
用户头像

俞凡

关注

公众号:DeepNoMind 2017-10-18 加入

俞凡,Mavenir Systems研发总监,关注高可用架构、高性能服务、5G、人工智能、区块链、DevOps、Agile等。公众号:DeepNoMind

评论

发布
暂无评论
Golang数据库事务实践_golang_俞凡_InfoQ写作社区