长安链源码分析之交易过程分析(8)
作者:李
- 2022-10-25 湖南
本文字数:1705 字
阅读完需:约 6 分钟
本文已参与「开源摘星计划」,欢迎正在阅读的你加入。活动链接:https://github.com/weopenprojects/WeOpen-Star
1.创建交易池
func NewTxPoolImpl( nodeId string, chainId string, txFilter protocol.TxFilter, chainStore protocol.BlockchainStore, msgBus msgbus.MessageBus, chainConf protocol.ChainConf, singer protocol.SigningMember, ac protocol.AccessControlProvider, netService protocol.NetService, log protocol.Logger, monitorEnabled bool, poolConfig map[string]interface{}) (protocol.TxPool, error) { // chainId and nodeId should not be nil if len(chainId) == 0 { return nil, fmt.Errorf("no chainId in create txpool") } if len(nodeId) == 0 { return nil, fmt.Errorf("no nodeId in create txpool") } // set pool config //交易池配置设置 TxPoolConfig = &txPoolConfig{} if err := mapstructure.Decode(poolConfig, TxPoolConfig); err != nil { return nil, err } MonitorEnabled = monitorEnabled var ( ticker = defaultFlushTicker addChSize = defaultChannelSize ) // set addChSize //设置交易池添加通道大小,默认是10000 if TxPoolConfig.AddTxChannelSize > 0 { addChSize = int(TxPoolConfig.AddTxChannelSize) } // set cacheFlushTicker //设置缓存刷新间隔时间,默认是2s if TxPoolConfig.CacheFlushTicker > 0 { ticker = int(TxPoolConfig.CacheFlushTicker) } // create txPoolImpl //创建一个交易池队列 queue := newQueue(chainConf, log) ctx, cancel := context.WithCancel(context.Background()) //创建一个交易池 txPoolQueue := &txPoolImpl{ nodeId: nodeId, chainId: chainId, queue: queue, cache: newTxCache(), addTxsCh: make(chan *mempoolTxs, addChSize), ctx: ctx, cancel: cancel, flushTicker: ticker, recover: newTxRecover(nodeId, queue, msgBus, log), ac: ac, log: log, msgBus: msgBus, chainConf: chainConf, txFilter: txFilter, blockStore: chainStore, } ..... return txPoolQueue, nil}复制代码
2.启动交易池服务
// Start start the txPool servicefunc (pool *txPoolImpl) Start() (err error) { // should not be started again if !atomic.CompareAndSwapInt32(&pool.stat, 0, 1) { pool.log.Errorf(commonErrors.ErrTxPoolHasStarted.String()) return commonErrors.ErrTxPoolHasStarted } // should not be nil //交易池的Msgbus 模块必须存在 if pool.msgBus != nil { pool.msgBus.Register(msgbus.RecvTxPoolMsg, pool) } else { panic("should not be happen, msgBus in single pool should not be nil") } // replay tx batches in wal //重放交易池 缓存在wal中 if err = pool.replayTxs(); err != nil { atomic.StoreInt32(&pool.stat, 0) pool.log.Errorf("replay dumped txs failed, err:%v", err) return err } // start addTxLoop //协程 启动交易池的添加交易循环方法 go pool.addTxLoop() pool.log.Infof("start single pool success") return}复制代码
3.添加交易循环方法
// addTxLoop listen addTxsCh and add transaction to txCache//监听添加交易的通道,然后把交易加入到交易缓存中func (pool *txPoolImpl) addTxLoop() { //默认是2秒 flushTicker := time.NewTicker(time.Duration(pool.flushTicker) * time.Second) defer flushTicker.Stop() for { select { // receive tx from addTxsCh //收到交易 case memTxs := <-pool.addTxsCh: pool.flushOrAddTxsToCache(memTxs) // flush cache ticker //每隔两秒执行一次 case <-flushTicker.C: // if pool.cache.isFlushByTime() && pool.cache.txCount() > 0 { pool.flushCommonTxToQueue(nil) } // stop goroutine //停止 case <-pool.ctx.Done(): return } }}复制代码
划线
评论
复制
发布于: 刚刚阅读数: 4
李
关注
还未添加个人签名 2018-05-04 加入
还未添加个人简介









评论