主页 > imtoken官网钱包app > 以太坊源码交易池启动流程分析
以太坊源码交易池启动流程分析
从以太坊的main函数入手,该源码在项目工程中的 cmd/geth/main.go 中,代码如下:
func main() {
if err := app.Run(os.Args); err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
}
在默认情况下, app.Run(os.Args)调用geth,其代码如下:
func geth(ctx *cli.Context) error {
node := makeFullNode(ctx)
startNode(ctx, node)
node.Wait()
return nil
}
geth函数主要实现了以下功能:
1.配置全节点Node的配置文件,创建相关服务
2.启动一个Node节点
3.等待节点的退出信号
下面主要分析功能1,makeFullNode的代码如下:
func makeFullNode(ctx *cli.Context) *node.Node {
stack, cfg := makeConfigNode(ctx) //配置节点相关信息
utils.RegisterEthService(stack, &cfg.Eth) //注册相关服务
if ctx.GlobalBool(utils.DashboardEnabledFlag.Name) {
utils.RegisterDashboardService(stack, &cfg.Dashboard, gitCommit)
}
// Whisper must be explicitly enabled by specifying at least 1 whisper flag or in dev mode
shhEnabled := enableWhisper(ctx)
shhAutoEnabled := !ctx.GlobalIsSet(utils.WhisperEnabledFlag.Name) && ctx.GlobalIsSet(utils.DeveloperFlag.Name)
if shhEnabled || shhAutoEnabled {
if ctx.GlobalIsSet(utils.WhisperMaxMessageSizeFlag.Name) {
cfg.Shh.MaxMessageSize = uint32(ctx.Int(utils.WhisperMaxMessageSizeFlag.Name))
}
if ctx.GlobalIsSet(utils.WhisperMinPOWFlag.Name) {
cfg.Shh.MinimumAcceptedPOW = ctx.Float64(utils.WhisperMinPOWFlag.Name)
}
utils.RegisterShhService(stack, &cfg.Shh)
}
// Add the Ethereum Stats daemon if requested.
if cfg.Ethstats.URL != "" {
utils.RegisterEthStatsService(stack, cfg.Ethstats.URL)
}
return stack
}
在utils.RegisterEthService(stack, &cfg.Eth)注册服务中会创建一个 eth.New 实例并初始化相关模块,下面进入 utils.RegisterEthService 函数看代码:
func RegisterEthService(stack *node.Node, cfg *eth.Config) {
var err error
if cfg.SyncMode == downloader.LightSync { //
err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
return les.New(ctx, cfg)
})
} else { //
err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
fullNode, err := eth.New(ctx, cfg) //创建一个node服务实例
if fullNode != nil && cfg.LightServ > 0 {
ls, _ := les.NewLesServer(fullNode, cfg)
fullNode.AddLesServer(ls)
}
return fullNode, err
})
}
if err != nil {
Fatalf("Failed to register the Ethereum service: %v", err)
}
}
注:以太坊在 eth/downloader/modes.go 定义了数据的三种同步模式:
const (
FullSync SyncMode = iota // 同步完整的区块信息
FastSync // 快速同步header,然后再跟进header同步全部内容
LightSync // 只下载header并在之后终止,轻节点
)
一般不使用轻节点去同步数据,接下来就是 eth.New(ctx, cfg) 来创建一个服务实例,代码如下:
func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
if config.SyncMode == downloader.LightSync {
return nil, errors.New("can't run eth.Ethereum in light sync mode, use les.LightEthereum")
}
if !config.SyncMode.IsValid() {
return nil, fmt.Errorf("invalid sync mode %d", config.SyncMode)
}
chainDb, err := CreateDB(ctx, config, "chaindata")
if err != nil {
return nil, err
}
chainConfig, genesisHash, genesisErr := core.SetupGenesisBlock(chainDb, config.Genesis)
if _, ok := genesisErr.(*params.ConfigCompatError); genesisErr != nil && !ok {
return nil, genesisErr
}
log.Info("Initialised chain configuration", "config", chainConfig)
。。。。。。
if config.TxPool.Journal != "" {
config.TxPool.Journal = ctx.ResolvePath(config.TxPool.Journal)
}
eth.txPool = core.NewTxPool(config.TxPool, eth.chainConfig, eth.blockchain) //初始化交易池
if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb); err != nil {
return nil, err
}
eth.miner = miner.New(eth, eth.chainConfig, eth.EventMux(), eth.engine)
eth.miner.SetExtra(makeExtraData(config.ExtraData))
eth.APIBackend = &EthAPIBackend{eth, nil}
gpoParams := config.GPO
if gpoParams.Default == nil {
gpoParams.Default = config.GasPrice
}
eth.APIBackend.gpo = gasprice.NewOracle(eth.APIBackend, gpoParams)
return eth, nil
}
中间代码太多就省略部分,其主要是创建DB实例、创建创世块及链配置、创建共识引擎,创建Bloom服务,加载完整链,初始化交易池等操作
通过上面的源码分析,我们找到了整个程序启动加载时初始化交易池的入口core.NewTxPool。 几个密切相关的数据结构:
type TxPoolConfig struct {
NoLocals bool // 是否应该禁用本地事务处理
Journal string // 本地事务日志,保存相关交易数据的文件名;系统每隔一定时间将交易数据回写到本地文件
Rejournal time.Duration // 重新生成本地事务日志的时间间隔
PriceLimit uint64 // 最低gas价格,以接受入池
PriceBump uint64 // 更换//现有交易的最低价格增幅
AccountSlots uint64 // 每个帐户最多在pending队列的交易数
GlobalSlots uint64 //penging队列最大支持的交易数
AccountQueue uint64 // 每个帐户最多在queue队列的交易数
GlobalQueue uint64 // queue队列最大支持的交易数
Lifetime time.Duration //queue 队列超过多少小时交易不处理,剔除
}
针对TxPoolConfig系统的默认数值如下:
var DefaultTxPoolConfig = TxPoolConfig{
Journal: "transactions.rlp",
Rejournal: time.Hour,
PriceLimit: 1,
PriceBump: 10,
AccountSlots: 16,
GlobalSlots: 4096,
AccountQueue: 64,
GlobalQueue: 1024,
Lifetime: 3 * time.Hour,
}
type TxPool struct {
config TxPoolConfig //配置信息
chainconfig *params.ChainConfig //链配置
chain blockChain //当前的链
gasPrice *big.Int //最低的gas价格
txFeed event.Feed //通过txFedd订阅TxPool的消息
scope event.SubscriptionScope //提供了同时取消多个订阅的功能
chainHeadCh chan ChainHeadEvent //当有了新的区块的产生会收到消息,订阅区块头消息
chainHeadSub event.Subscription //区块头消息订阅器
signer types.Signer //对事物进行签名处理
mu sync.RWMutex //读写互斥锁
currentState *state.StateDB //当前区块链头部的状态
pendingState *state.ManagedState //挂起状态跟踪虚拟nonces
currentMaxGas uint64 // 目前交易的费用上限
locals *accountSet //一套豁免驱逐规则的本地交易
journal *txJournal //本地事务日志备份到磁盘
pending map[common.Address]*txList //等待队列
queue map[common.Address]*txList //排队但不可处理的事务
beats map[common.Address]time.Time //每个已知帐户的最后一次心跳
all map[common.Hash]*types.Transaction //所有允许查询的事务
priced *txPricedList //所有按价格排序的交易
wg sync.WaitGroup // for shutdown sync //关闭同步
homestead bool //家园版本判断
}
下面重点介绍启动TxPool的代码:
if config.TxPool.Journal != "" {
config.TxPool.Journal = ctx.ResolvePath(config.TxPool.Journal)
}
eth.txPool = core.NewTxPool(config.TxPool, eth.chainConfig, eth.blockchain)
ctx.ResolvePath(配置.TxPool.Journal)
geth运行节点时将Journal的绝对路径转换为相对路径。 一般文件默认命名为:transactions.rlp
该文件存储本地账户待定队列交易数据和队列队列交易数据。
core.NewTxPool(config.TxPool, eth.chainConfig, eth.blockchain)
为了真正加载交易池的代码以太坊交易系统,下面对代码进行详细分析:
func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain) *TxPool {
// Sanitize the input to ensure no vulnerable gas prices are set
config = (&config).sanitize()
// Create the transaction pool with its initial settings
pool := &TxPool{
config: config,
chainconfig: chainconfig,
chain: chain,
signer: types.NewEIP155Signer(chainconfig.ChainId),
pending: make(map[common.Address]*txList),
queue: make(map[common.Address]*txList),
beats: make(map[common.Address]time.Time),
all: make(map[common.Hash]*types.Transaction),
chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize),
gasPrice: new(big.Int).SetUint64(config.PriceLimit),
}
pool.locals = newAccountSet(pool.signer)
pool.priced = newTxPricedList(&pool.all)
pool.reset(nil, chain.CurrentBlock().Header())
// If local transactions and journaling is enabled, load from disk
if !config.NoLocals && config.Journal != "" {
pool.journal = newTxJournal(config.Journal)
//本地文件中加载local交易
if err := pool.journal.load(pool.AddLocal); err != nil {
log.Warn("Failed to load transaction journal", "err", err)
}
if err := pool.journal.rotate(pool.local()); err != nil {
log.Warn("Failed to rotate transaction journal", "err", err)
}
}
// Subscribe events from blockchain
pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh)
// Start the event loop and return
pool.wg.Add(1)
go pool.loop()
return pool
}
1.config = (&config).sanitize()
验证TxPoolConfig的相关配置信息,如果配置不正确,恢复代码的默认配置,代码很简单,自己查看
2.pool := &TxPool{
……
}
实例化一个TxPool对象,具体字段定义请参考上面
3.游泳池。 locals = newAccountSet(pool.signer)
设置本地白名单账号存储数据结构,即本地账号; 排队在后续交易池中有优先权
4.pool.priced = newTxPricedList(&pool.all)
为交易池中所有按价格排序的交易设置存储数据结构
5.游泳池。 重置(无以太坊交易系统,链。CurrentBlock()。标头())
重置交易池,稍后单独分析
6. pool.journal.load() 和 pool.journal.rotate()
在本地文件中加载本地交易,函数实现后面会单独分析
7. pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh)