写点什么

长安链源码分析之交易过程分析(8)

作者:
  • 2022-10-25
    湖南
  • 本文字数:1705 字

    阅读完需:约 6 分钟

本文已参与「开源摘星计划」,欢迎正在阅读的你加入。活动链接:https://github.com/weopenprojects/WeOpen-Star


1.创建交易池

func NewTxPoolImpl(	nodeId string,	chainId string,	txFilter protocol.TxFilter,	chainStore protocol.BlockchainStore,	msgBus msgbus.MessageBus,	chainConf protocol.ChainConf,	singer protocol.SigningMember,	ac protocol.AccessControlProvider,	netService protocol.NetService,	log protocol.Logger,	monitorEnabled bool,	poolConfig map[string]interface{}) (protocol.TxPool, error) {	// chainId and nodeId should not be nil	if len(chainId) == 0 {		return nil, fmt.Errorf("no chainId in create txpool")	}	if len(nodeId) == 0 {		return nil, fmt.Errorf("no nodeId in create txpool")	}	// set pool config	//交易池配置设置	TxPoolConfig = &txPoolConfig{}	if err := mapstructure.Decode(poolConfig, TxPoolConfig); err != nil {		return nil, err	}	MonitorEnabled = monitorEnabled	var (		ticker    = defaultFlushTicker		addChSize = defaultChannelSize	)	// set addChSize	//设置交易池添加通道大小,默认是10000	if TxPoolConfig.AddTxChannelSize > 0 {		addChSize = int(TxPoolConfig.AddTxChannelSize)	}	// set cacheFlushTicker	//设置缓存刷新间隔时间,默认是2s	if TxPoolConfig.CacheFlushTicker > 0 {		ticker = int(TxPoolConfig.CacheFlushTicker)	}	// create txPoolImpl	//创建一个交易池队列	queue := newQueue(chainConf, log)	ctx, cancel := context.WithCancel(context.Background())	//创建一个交易池	txPoolQueue := &txPoolImpl{		nodeId:      nodeId,		chainId:     chainId,		queue:       queue,		cache:       newTxCache(),		addTxsCh:    make(chan *mempoolTxs, addChSize),		ctx:         ctx,		cancel:      cancel,		flushTicker: ticker,		recover:     newTxRecover(nodeId, queue, msgBus, log),		ac:          ac,		log:         log,		msgBus:      msgBus,		chainConf:   chainConf,		txFilter:    txFilter,		blockStore:  chainStore,	}	.....	return txPoolQueue, nil}
复制代码


2.启动交易池服务

// Start start the txPool servicefunc (pool *txPoolImpl) Start() (err error) {	// should not be started again	if !atomic.CompareAndSwapInt32(&pool.stat, 0, 1) {		pool.log.Errorf(commonErrors.ErrTxPoolHasStarted.String())		return commonErrors.ErrTxPoolHasStarted	}	// should not be nil	//交易池的Msgbus 模块必须存在	if pool.msgBus != nil {		pool.msgBus.Register(msgbus.RecvTxPoolMsg, pool)	} else {		panic("should not be happen, msgBus in single pool should not be nil")	}	// replay tx batches in wal	//重放交易池 缓存在wal中	if err = pool.replayTxs(); err != nil {		atomic.StoreInt32(&pool.stat, 0)		pool.log.Errorf("replay dumped txs failed, err:%v", err)		return err	}	// start addTxLoop	//协程 启动交易池的添加交易循环方法	go pool.addTxLoop()	pool.log.Infof("start single pool success")	return}
复制代码


3.添加交易循环方法

// addTxLoop listen addTxsCh and add transaction to txCache//监听添加交易的通道,然后把交易加入到交易缓存中func (pool *txPoolImpl) addTxLoop() {	//默认是2秒	flushTicker := time.NewTicker(time.Duration(pool.flushTicker) * time.Second)	defer flushTicker.Stop()	for {		select {		// receive tx from addTxsCh			//收到交易		case memTxs := <-pool.addTxsCh:			pool.flushOrAddTxsToCache(memTxs)		// flush cache ticker		//每隔两秒执行一次		case <-flushTicker.C:			//			if pool.cache.isFlushByTime() && pool.cache.txCount() > 0 {				pool.flushCommonTxToQueue(nil)			}		// stop goroutine		//停止		case <-pool.ctx.Done():			return		}	}}
复制代码


用户头像

关注

还未添加个人签名 2018-05-04 加入

还未添加个人简介

评论

发布
暂无评论
长安链源码分析之交易过程分析(8)_李_InfoQ写作社区