长安链源码分析之交易过程分析(8)
作者:李
- 2022-10-25 湖南
本文字数:1705 字
阅读完需:约 6 分钟
本文已参与「开源摘星计划」,欢迎正在阅读的你加入。活动链接:https://github.com/weopenprojects/WeOpen-Star
1.创建交易池
func NewTxPoolImpl(
nodeId string,
chainId string,
txFilter protocol.TxFilter,
chainStore protocol.BlockchainStore,
msgBus msgbus.MessageBus,
chainConf protocol.ChainConf,
singer protocol.SigningMember,
ac protocol.AccessControlProvider,
netService protocol.NetService,
log protocol.Logger,
monitorEnabled bool,
poolConfig map[string]interface{}) (protocol.TxPool, error) {
// chainId and nodeId should not be nil
if len(chainId) == 0 {
return nil, fmt.Errorf("no chainId in create txpool")
}
if len(nodeId) == 0 {
return nil, fmt.Errorf("no nodeId in create txpool")
}
// set pool config
//交易池配置设置
TxPoolConfig = &txPoolConfig{}
if err := mapstructure.Decode(poolConfig, TxPoolConfig); err != nil {
return nil, err
}
MonitorEnabled = monitorEnabled
var (
ticker = defaultFlushTicker
addChSize = defaultChannelSize
)
// set addChSize
//设置交易池添加通道大小,默认是10000
if TxPoolConfig.AddTxChannelSize > 0 {
addChSize = int(TxPoolConfig.AddTxChannelSize)
}
// set cacheFlushTicker
//设置缓存刷新间隔时间,默认是2s
if TxPoolConfig.CacheFlushTicker > 0 {
ticker = int(TxPoolConfig.CacheFlushTicker)
}
// create txPoolImpl
//创建一个交易池队列
queue := newQueue(chainConf, log)
ctx, cancel := context.WithCancel(context.Background())
//创建一个交易池
txPoolQueue := &txPoolImpl{
nodeId: nodeId,
chainId: chainId,
queue: queue,
cache: newTxCache(),
addTxsCh: make(chan *mempoolTxs, addChSize),
ctx: ctx,
cancel: cancel,
flushTicker: ticker,
recover: newTxRecover(nodeId, queue, msgBus, log),
ac: ac,
log: log,
msgBus: msgBus,
chainConf: chainConf,
txFilter: txFilter,
blockStore: chainStore,
}
.....
return txPoolQueue, nil
}
复制代码
2.启动交易池服务
// Start start the txPool service
func (pool *txPoolImpl) Start() (err error) {
// should not be started again
if !atomic.CompareAndSwapInt32(&pool.stat, 0, 1) {
pool.log.Errorf(commonErrors.ErrTxPoolHasStarted.String())
return commonErrors.ErrTxPoolHasStarted
}
// should not be nil
//交易池的Msgbus 模块必须存在
if pool.msgBus != nil {
pool.msgBus.Register(msgbus.RecvTxPoolMsg, pool)
} else {
panic("should not be happen, msgBus in single pool should not be nil")
}
// replay tx batches in wal
//重放交易池 缓存在wal中
if err = pool.replayTxs(); err != nil {
atomic.StoreInt32(&pool.stat, 0)
pool.log.Errorf("replay dumped txs failed, err:%v", err)
return err
}
// start addTxLoop
//协程 启动交易池的添加交易循环方法
go pool.addTxLoop()
pool.log.Infof("start single pool success")
return
}
复制代码
3.添加交易循环方法
// addTxLoop listen addTxsCh and add transaction to txCache
//监听添加交易的通道,然后把交易加入到交易缓存中
func (pool *txPoolImpl) addTxLoop() {
//默认是2秒
flushTicker := time.NewTicker(time.Duration(pool.flushTicker) * time.Second)
defer flushTicker.Stop()
for {
select {
// receive tx from addTxsCh
//收到交易
case memTxs := <-pool.addTxsCh:
pool.flushOrAddTxsToCache(memTxs)
// flush cache ticker
//每隔两秒执行一次
case <-flushTicker.C:
//
if pool.cache.isFlushByTime() && pool.cache.txCount() > 0 {
pool.flushCommonTxToQueue(nil)
}
// stop goroutine
//停止
case <-pool.ctx.Done():
return
}
}
}
复制代码
划线
评论
复制
发布于: 刚刚阅读数: 4
李
关注
还未添加个人签名 2018-05-04 加入
还未添加个人简介
评论