// Copyright 2014 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

// Package eth implements the Ethereum protocol.
package eth

import (
	"context"
	"encoding/json"
	"fmt"
	"math"
	"math/big"
	"os"
	"runtime"
	"sync"
	"time"

	"github.com/ethereum/go-ethereum/accounts"
	"github.com/ethereum/go-ethereum/accounts/keystore"
	"github.com/ethereum/go-ethereum/common"
	"github.com/ethereum/go-ethereum/common/hexutil"
	"github.com/ethereum/go-ethereum/consensus"
	"github.com/ethereum/go-ethereum/core"
	"github.com/ethereum/go-ethereum/core/filtermaps"
	"github.com/ethereum/go-ethereum/core/rawdb"
	"github.com/ethereum/go-ethereum/core/state"
	"github.com/ethereum/go-ethereum/core/state/pruner"
	"github.com/ethereum/go-ethereum/core/txpool"
	"github.com/ethereum/go-ethereum/core/txpool/blobpool"
	"github.com/ethereum/go-ethereum/core/txpool/legacypool"
	"github.com/ethereum/go-ethereum/core/txpool/locals"
	"github.com/ethereum/go-ethereum/core/types"
	"github.com/ethereum/go-ethereum/core/vm"
	"github.com/ethereum/go-ethereum/eth/downloader"
	"github.com/ethereum/go-ethereum/eth/ethconfig"
	"github.com/ethereum/go-ethereum/eth/gasprice"
	"github.com/ethereum/go-ethereum/eth/protocols/eth"
	// "github.com/ethereum/go-ethereum/eth/protocols/snap" // Disabled for XDC compatibility
	"github.com/ethereum/go-ethereum/eth/tracers"
	"github.com/ethereum/go-ethereum/eth/hooks"
	"github.com/ethereum/go-ethereum/consensus/XDPoS"
	engine_v2 "github.com/ethereum/go-ethereum/consensus/XDPoS/engines/engine_v2"
	"github.com/ethereum/go-ethereum/ethdb"
	"github.com/ethereum/go-ethereum/event"
	"github.com/ethereum/go-ethereum/internal/ethapi"
	"github.com/ethereum/go-ethereum/internal/shutdowncheck"
	"github.com/ethereum/go-ethereum/internal/version"
	"github.com/ethereum/go-ethereum/log"
	"github.com/ethereum/go-ethereum/miner"
	"github.com/ethereum/go-ethereum/node"
	"github.com/ethereum/go-ethereum/p2p"
	"github.com/ethereum/go-ethereum/p2p/dnsdisc"
	"github.com/ethereum/go-ethereum/p2p/enode"
	"github.com/ethereum/go-ethereum/params"
	"github.com/ethereum/go-ethereum/rlp"
	"github.com/ethereum/go-ethereum/rpc"
	gethversion "github.com/ethereum/go-ethereum/version"
)

const (
	// This is the fairness knob for the discovery mixer. When looking for peers, we'll
	// wait this long for a single source of candidates before moving on and trying other
	// sources. If this timeout expires, the source will be skipped in this round, but it
	// will continue to fetch in the background and will have a chance with a new timeout
	// in the next rounds, giving it overall more time but a proportionally smaller share.
	// We expect a normal source to produce ~10 candidates per second.
	discmixTimeout = 100 * time.Millisecond

	// discoveryPrefetchBuffer is the number of peers to pre-fetch from a discovery
	// source. It is useful to avoid the negative effects of potential longer timeouts
	// in the discovery, keeping dial progress while waiting for the next batch of
	// candidates.
	discoveryPrefetchBuffer = 32

	// maxParallelENRRequests is the maximum number of parallel ENR requests that can be
	// performed by a disc/v4 source.
	maxParallelENRRequests = 16
)

// Config contains the configuration options of the ETH protocol.
// Deprecated: use ethconfig.Config instead.
type Config = ethconfig.Config

// Ethereum implements the Ethereum full node service.
type Ethereum struct {
	// core protocol objects
	config         *ethconfig.Config
	txPool         *txpool.TxPool
	blobTxPool     *blobpool.BlobPool
	localTxTracker *locals.TxTracker
	blockchain     *core.BlockChain

	handler *handler
	discmix *enode.FairMix
	dropper *dropper

	// DB interfaces
	chainDb ethdb.Database // Block chain database

	eventMux       *event.TypeMux
	engine         consensus.Engine
	accountManager *accounts.Manager

	filterMaps      *filtermaps.FilterMaps
	closeFilterMaps chan chan struct{}

	APIBackend *EthAPIBackend

	miner    *miner.Miner
	xdcAgent *miner.XdcAgent // XDPoS V2 sealing agent (#02)
	gasPrice *big.Int

	networkID     uint64
	netRPCService *ethapi.NetAPI

	p2pServer *p2p.Server

	lock sync.RWMutex // Protects the variadic fields (e.g. gas price and etherbase)

	shutdownTracker *shutdowncheck.ShutdownTracker // Tracks if and when the node has shutdown ungracefully
}

// New creates a new Ethereum object (including the initialisation of the common Ethereum object),
// whose lifecycle will be managed by the provided node.
func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
	// Ensure configuration values are compatible and sane
	if !config.SyncMode.IsValid() {
		return nil, fmt.Errorf("invalid sync mode %d", config.SyncMode)
	}
	if !config.HistoryMode.IsValid() {
		return nil, fmt.Errorf("invalid history mode %d", config.HistoryMode)
	}
	if config.Miner.GasPrice == nil || config.Miner.GasPrice.Sign() <= 0 {
		log.Warn("Sanitizing invalid miner gas price", "provided", config.Miner.GasPrice, "updated", ethconfig.Defaults.Miner.GasPrice)
		config.Miner.GasPrice = new(big.Int).Set(ethconfig.Defaults.Miner.GasPrice)
	}
	if config.NoPruning && config.TrieDirtyCache > 0 && config.StateScheme == rawdb.HashScheme {
		if config.SnapshotCache > 0 {
			config.TrieCleanCache += config.TrieDirtyCache * 3 / 5
			config.SnapshotCache += config.TrieDirtyCache * 2 / 5
		} else {
			config.TrieCleanCache += config.TrieDirtyCache
		}
		config.TrieDirtyCache = 0
	}
	log.Info("Allocated trie memory caches", "clean", common.StorageSize(config.TrieCleanCache)*1024*1024, "dirty", common.StorageSize(config.TrieDirtyCache)*1024*1024)

	dbOptions := node.DatabaseOptions{
		Cache:             config.DatabaseCache,
		Handles:           config.DatabaseHandles,
		AncientsDirectory: config.DatabaseFreezer,
		EraDirectory:      config.DatabaseEra,
		MetricsNamespace:  "eth/db/chaindata/",
	}
	chainDb, err := stack.OpenDatabaseWithOptions("chaindata", dbOptions)
	if err != nil {
		return nil, err
	}
	scheme, err := rawdb.ParseStateScheme(config.StateScheme, chainDb)
	if err != nil {
		return nil, err
	}
	// Try to recover offline state pruning only in hash-based.
	if scheme == rawdb.HashScheme {
		if err := pruner.RecoverPruning(stack.ResolvePath(""), chainDb); err != nil {
			log.Error("Failed to recover state", "error", err)
		}
	}

	// Here we determine genesis hash and active ChainConfig.
	// We need these to figure out the consensus parameters and to set up history pruning.
	chainConfig, _, err := core.LoadChainConfig(chainDb, config.Genesis)
	if err != nil {
		return nil, err
	}
	engine, err := ethconfig.CreateConsensusEngine(chainConfig, chainDb)
	if err != nil {
		return nil, err
	}
	// Set networkID to chainID by default.
	networkID := config.NetworkId
	if networkID == 0 {
		networkID = chainConfig.ChainID.Uint64()
	}

	// Set XDC-specific constants based on chain ID (TIPSigning, TIPTRC21Fee, etc.)
	common.CopyXDCConstants(chainConfig.ChainID.Uint64())

	// Set XDPoS network-specific constants (fork blocks, V2 switch, etc.)
	XDPoS.SetNetworkConstants(chainConfig.ChainID.Uint64())

	// Validate sync-from-block against XDPoS V2 switch block for XDC networks
	if config.SyncFromBlock > 0 && chainConfig.XDPoS != nil && chainConfig.XDPoS.V2 != nil && chainConfig.XDPoS.V2.SwitchBlock != nil {
		v2Switch := chainConfig.XDPoS.V2.SwitchBlock.Uint64()
		if config.SyncFromBlock < v2Switch {
			log.Warn("Sync-from-block is before XDPoS V2 switch block — V1 consensus blocks will still be processed",
				"syncFromBlock", config.SyncFromBlock,
				"v2SwitchBlock", v2Switch,
				"chainID", chainConfig.ChainID)
		} else if config.SyncFromBlock == v2Switch {
			log.Info("Sync-from-block is at XDPoS V2 switch block — V1 consensus will be skipped, switch block itself uses V1 format",
				"syncFromBlock", config.SyncFromBlock,
				"v2SwitchBlock", v2Switch)
		} else {
			log.Info("Sync-from-block is after XDPoS V2 switch block — only V2 consensus will be processed",
				"syncFromBlock", config.SyncFromBlock,
				"v2SwitchBlock", v2Switch)
		}
	}

	// Assemble the Ethereum object.
	eth := &Ethereum{
		config:          config,
		chainDb:         chainDb,
		eventMux:        stack.EventMux(),
		accountManager:  stack.AccountManager(),
		engine:          engine,
		networkID:       networkID,
		gasPrice:        config.Miner.GasPrice,
		p2pServer:       stack.Server(),
		discmix:         enode.NewFairMix(discmixTimeout),
		shutdownTracker: shutdowncheck.NewShutdownTracker(chainDb),
	}
	bcVersion := rawdb.ReadDatabaseVersion(chainDb)
	var dbVer = "<nil>"
	if bcVersion != nil {
		dbVer = fmt.Sprintf("%d", *bcVersion)
	}
	log.Info("Initialising Ethereum protocol", "network", networkID, "dbversion", dbVer)

	// Create BlockChain object.
	if !config.SkipBcVersionCheck {
		if bcVersion != nil && *bcVersion > core.BlockChainVersion {
			return nil, fmt.Errorf("database version is v%d, Geth %s only supports v%d", *bcVersion, version.WithMeta, core.BlockChainVersion)
		} else if bcVersion == nil || *bcVersion < core.BlockChainVersion {
			if bcVersion != nil { // only print warning on upgrade, not on init
				log.Warn("Upgrade blockchain database version", "from", dbVer, "to", core.BlockChainVersion)
			}
			rawdb.WriteDatabaseVersion(chainDb, core.BlockChainVersion)
		}
	}
	var (
		options = &core.BlockChainConfig{
			TrieCleanLimit:          config.TrieCleanCache,
			NoPrefetch:              config.NoPrefetch,
			TrieDirtyLimit:          config.TrieDirtyCache,
			ArchiveMode:             config.NoPruning,
			TrieTimeLimit:           config.TrieTimeout,
			SnapshotLimit:           config.SnapshotCache,
			Preimages:               config.Preimages,
			StateHistory:            config.StateHistory,
			TrienodeHistory:         config.TrienodeHistory,
			NodeFullValueCheckpoint: config.NodeFullValueCheckpoint,
			StateScheme:             scheme,
			ChainHistoryMode:        config.HistoryMode,
			TxLookupLimit:           int64(min(config.TransactionHistory, math.MaxInt64)),
			SyncFromBlock:           config.SyncFromBlock,
			VmConfig: vm.Config{
				EnablePreimageRecording: config.EnablePreimageRecording,
				EnableWitnessStats:      config.EnableWitnessStats,
				StatelessSelfValidation: config.StatelessSelfValidation,
			},
			// Enables file journaling for the trie database. The journal files will be stored
			// within the data directory. The corresponding paths will be either:
			// - DATADIR/triedb/merkle.journal
			// - DATADIR/triedb/verkle.journal
			TrieJournalDirectory: stack.ResolvePath("triedb"),
			StateSizeTracking:    config.EnableStateSizeTracking,
			SlowBlockThreshold:   config.SlowBlockThreshold,
		}
	)
	if config.VMTrace != "" {
		traceConfig := json.RawMessage("{}")
		if config.VMTraceJsonConfig != "" {
			traceConfig = json.RawMessage(config.VMTraceJsonConfig)
		}
		t, err := tracers.LiveDirectory.New(config.VMTrace, traceConfig)
		if err != nil {
			return nil, fmt.Errorf("failed to create tracer %s: %v", config.VMTrace, err)
		}
		options.VmConfig.Tracer = t
	}
	// Override the chain config with provided settings.
	var overrides core.ChainOverrides
	if config.OverrideOsaka != nil {
		overrides.OverrideOsaka = config.OverrideOsaka
	}
	if config.OverrideBPO1 != nil {
		overrides.OverrideBPO1 = config.OverrideBPO1
	}
	if config.OverrideBPO2 != nil {
		overrides.OverrideBPO2 = config.OverrideBPO2
	}
	if config.OverrideVerkle != nil {
		overrides.OverrideVerkle = config.OverrideVerkle
	}
	options.Overrides = &overrides

	eth.blockchain, err = core.NewBlockChain(chainDb, config.Genesis, eth.engine, options)
	if err != nil {
		return nil, err
	}

	// Attach XDPoS consensus hooks if using XDPoS engine
	if xdposEngine, ok := eth.engine.(*XDPoS.XDPoS); ok {
		log.Info("Attaching XDPoS V1 + V2 consensus hooks")
		hooks.AttachConsensusV1Hooks(xdposEngine, eth.blockchain, chainConfig) // #72
		hooks.AttachConsensusV2Hooks(xdposEngine, eth.blockchain, chainConfig)
		// Wire the V2 engine for full VerifyHeader validation (#94)
		if chainConfig.XDPoS != nil && chainConfig.XDPoS.V2 != nil {
			v2Engine := engine_v2.New(chainConfig, chainDb, nil, nil)
			// Wire highestCommitBlock to blockchain finality (#95):
			// Each time the 3-chain rule commits a block, notify geth's blockchain so that
			// eth_getFinalizedBlock / eth_getBlockByNumber("finalized") returns the real BFT-finalized head.
			bc := eth.blockchain // capture for closure
			v2Engine.HookCommitBlock = func(header *types.Header) {
				bc.SetFinalized(header)
				log.Debug("XDPoS V2: SetFinalized called", "number", header.Number, "hash", header.Hash())
			}
			xdposEngine.SetEngineV2(v2Engine)
			// Wire V2 engine's reward hook to delegate to the wrapper hook installed by
			// AttachConsensusV2Hooks above. AttachConsensusV2Hooks ran before SetEngineV2,
			// so its `if adaptor.EngineV2 != nil` guard skipped the V2 wiring — without
			// this assignment, V2.Finalize at every epoch switch (block 3600, 4500, …)
			// finds x.HookReward == nil and silently skips reward distribution.
			// Canonical: XDPoSChain eth/hooks/engine_v2_hooks.go:270.
			xdposCapture := xdposEngine
			v2Engine.SetHookReward(func(chain consensus.ChainReader, state *state.StateDB, parentState *state.StateDB, header *types.Header) (map[string]interface{}, error) {
				return xdposCapture.HookReward(chain, state, parentState, header)
			})
			// Fix #146: wire EngineV2.HookPenalty for V2 epoch masternode penalty calculation.
			// Without this, calcMasternodes skips all penalties (nil check at engine_v2/engine.go:802).
			// Ported from XinFinOrg/XDPoSChain eth/hooks/engine_v2_hooks.go + XFN-114 nil guard.
			// CRITICAL FIX: Immediate skip during sync - no waiting, no timeout
			v2EngineCapture := v2Engine
			v2Engine.HookPenalty = func(chain consensus.ChainReader, number *big.Int, parentHash common.Hash, candidates []common.Address) ([]common.Address, error) {
				start := time.Now()
				parentNumber := number.Uint64() - 1
				currentHash := parentHash
				listBlockHash := []common.Hash{parentHash}
				statMiners := make(map[common.Address]int)
				// Walk backward collecting coinbases until epoch switch
				for i := uint64(1); ; i++ {
					ph := chain.GetHeader(parentHash, parentNumber)
					if ph == nil {
						// XFN-114: nil guard — block not yet available, skip penalty
						log.Error("[V2 HookPenalty] fail to get parent header", "number", parentNumber, "hash", parentHash)
						return []common.Address{}, fmt.Errorf("[V2 HookPenalty] header not found at number %d", parentNumber)
					}
					isEpochSwitch, _, err := v2EngineCapture.IsEpochSwitch(ph)
					if err != nil {
						log.Error("[V2 HookPenalty] IsEpochSwitch error", "err", err)
						return []common.Address{}, err
					}
					if isEpochSwitch {
						break
					}
					miner := ph.Coinbase
					if _, exist := statMiners[miner]; exist {
						statMiners[miner]++
					} else {
						statMiners[miner] = 1
					}
					parentNumber--
					parentHash = ph.ParentHash
					listBlockHash = append(listBlockHash, parentHash)
				}
				// Determine masternodes from epoch switch header of currentHash
				preMasternodes := v2EngineCapture.GetMasternodesByHash(chain, currentHash)
				log.Debug("V2 HookPenalty", "block", number, "preMasternodes", len(preMasternodes), "statMiners", len(statMiners),
					"preMastFirst", func() string { if len(preMasternodes) > 0 { return preMasternodes[0].Hex() } ; return "<empty>" }(),
					"preMastLast", func() string { if len(preMasternodes) > 0 { return preMasternodes[len(preMasternodes)-1].Hex() } ; return "<empty>" }())
				for miner, total := range statMiners {
					log.Debug("V2 HookPenalty statMiners", "miner", miner.Hex(), "total", total)
				}
				penalties := []common.Address{}
				for miner, total := range statMiners {
					if total < common.MinimunMinerBlockPerEpoch {
						log.Info("[V2 HookPenalty] node did not create enough blocks", "addr", miner.Hex(), "total", total)
						penalties = append(penalties, miner)
					}
				}
				for _, addr := range preMasternodes {
					if _, exist := statMiners[addr]; !exist {
						log.Info("[V2 HookPenalty] node created no blocks", "addr", addr.Hex())
						penalties = append(penalties, addr)
					}
				}
				// Comeback mechanism — aligned with v2.6.8: always active once past comebackHeight
				var comebackHeight uint64
				if chainConfig.XDPoS.V2 != nil && chainConfig.XDPoS.V2.SwitchBlock != nil {
					comebackHeight = (uint64(common.LimitPenaltyEpochV2)+1)*chainConfig.XDPoS.Epoch + chainConfig.XDPoS.V2.SwitchBlock.Uint64()
				}
				penComebacks := []common.Address{}
				if number.Uint64() > comebackHeight {
					pens := v2EngineCapture.GetPreviousPenaltyByHash(chain, currentHash, common.LimitPenaltyEpochV2)
					for _, p := range pens {
						for _, addr := range candidates {
							if p == addr {
								penComebacks = append(penComebacks, p)
								break
							}
						}
					}
				}
				// Loop for each block to check missing sign with comeback nodes
				mapBlockHash := map[common.Hash]bool{}
				startRange := common.RangeReturnSigner - 1
				if startRange >= len(listBlockHash) {
					startRange = len(listBlockHash) - 1
				}
				for i := startRange; i >= 0; i-- {
					if len(penComebacks) == 0 {
						break
					}
					blockNumber := number.Uint64() - uint64(i) - 1
					bhash := listBlockHash[i]
					if blockNumber%uint64(common.MergeSignRange) == 0 {
						mapBlockHash[bhash] = true
					}
					signingTxs, ok := xdposEngine.GetCachedSigningTxs(bhash)
					if !ok {
						blk := chain.GetBlock(bhash, blockNumber)
						if blk != nil {
							signingTxs = xdposEngine.CacheSigner(bhash, blk.Transactions())
						}
					}
					for _, tx := range signingTxs {
						if len(tx.Data()) < 32 {
							continue
						}
						blkHash := common.BytesToHash(tx.Data()[len(tx.Data())-32:])
						from, err := types.Sender(types.LatestSignerForChainID(tx.ChainId()), tx)
						if err != nil {
							continue
						}
						if mapBlockHash[blkHash] {
							for j, addr := range penComebacks {
								if from == addr {
									penComebacks = append(penComebacks[:j], penComebacks[j+1:]...)
									break
								}
							}
						}
					}
				}
				for _, comeback := range penComebacks {
					already := false
					for _, p := range penalties {
						if p == comeback {
							already = true
							break
						}
					}
					if !already {
						penalties = append(penalties, comeback)
					}
				}
				for i, p := range penalties {
					log.Info("[V2 HookPenalty] Final penalty", "index", i, "addr", p.Hex())
				}
				log.Info("[V2 HookPenalty] Done", "block", number, "elapsed", common.PrettyDuration(time.Since(start)))
				return penalties, nil
			}
			log.Info("XDPoS V2 engine wired for full header verification (#94), finality (#95), and V2 penalty (#146)")
		}
	}

	// Initialize filtermaps log index.
	fmConfig := filtermaps.Config{
		History:        config.LogHistory,
		Disabled:       config.LogNoHistory,
		ExportFileName: config.LogExportCheckpoints,
		HashScheme:     scheme == rawdb.HashScheme,
	}
	chainView := eth.newChainView(eth.blockchain.CurrentBlock())
	historyCutoff, _ := eth.blockchain.HistoryPruningCutoff()
	var finalBlock uint64
	if fb := eth.blockchain.CurrentFinalBlock(); fb != nil {
		finalBlock = fb.Number.Uint64()
	}
	filterMaps, err := filtermaps.NewFilterMaps(chainDb, chainView, historyCutoff, finalBlock, filtermaps.DefaultParams, fmConfig)
	if err != nil {
		return nil, err
	}
	eth.filterMaps = filterMaps
	eth.closeFilterMaps = make(chan chan struct{})

	// TxPool
	if config.TxPool.Journal != "" {
		config.TxPool.Journal = stack.ResolvePath(config.TxPool.Journal)
	}
	legacyPool := legacypool.New(config.TxPool, eth.blockchain)

	if config.BlobPool.Datadir != "" {
		config.BlobPool.Datadir = stack.ResolvePath(config.BlobPool.Datadir)
	}
	eth.blobTxPool = blobpool.New(config.BlobPool, eth.blockchain, legacyPool.HasPendingAuth)

	eth.txPool, err = txpool.New(config.TxPool.PriceLimit, eth.blockchain, []txpool.SubPool{legacyPool, eth.blobTxPool})
	if err != nil {
		return nil, err
	}

	if !config.TxPool.NoLocals {
		rejournal := config.TxPool.Rejournal
		if rejournal < time.Second {
			log.Warn("Sanitizing invalid txpool journal time", "provided", rejournal, "updated", time.Second)
			rejournal = time.Second
		}
		eth.localTxTracker = locals.New(config.TxPool.Journal, rejournal, eth.blockchain.Config(), eth.txPool)
		stack.RegisterLifecycle(eth.localTxTracker)
	}

	// Permit the downloader to use the trie cache allowance during fast sync
	cacheLimit := options.TrieCleanLimit + options.TrieDirtyLimit + options.SnapshotLimit
	if eth.handler, err = newHandler(&handlerConfig{
		NodeID:         eth.p2pServer.Self().ID(),
		Database:       chainDb,
		Chain:          eth.blockchain,
		TxPool:         eth.txPool,
		Network:        networkID,
		Sync:           config.SyncMode,
		SyncFromBlock:  config.SyncFromBlock,
		BloomCache:     uint64(cacheLimit),
		EventMux:       eth.eventMux,
		RequiredBlocks: config.RequiredBlocks,
	}); err != nil {
		return nil, err
	}

	eth.dropper = newDropper(eth.p2pServer.MaxDialedConns(), eth.p2pServer.MaxInboundConns())

	eth.miner = miner.New(eth, config.Miner, eth.engine)
	eth.miner.SetExtra(makeExtraData(config.Miner.ExtraData))
	eth.miner.SetPrioAddresses(config.TxPool.Locals)

	// Wire XDPoS V2 sealing agent (#02).  The agent polls the V2 engine for
	// round-leadership and drives Prepare → FinalizeAndAssemble → Seal when
	// this node is the leader.  It is only relevant for XDPoS chains; PoW/PoS
	// chains use the payload-builder pipeline instead.
	if _, isXDPoS := eth.engine.(*XDPoS.XDPoS); isXDPoS {
		eth.xdcAgent = miner.NewXdcAgent(eth.miner, eth.engine, eth.blockchain)
		log.Info("XdcAgent: constructed", "chain", chainConfig.ChainID)
	}

	eth.APIBackend = &EthAPIBackend{stack.Config().ExtRPCEnabled(), stack.Config().AllowUnprotectedTxs, eth, nil}
	if eth.APIBackend.allowUnprotectedTxs {
		log.Info("Unprotected transactions allowed")
	}
	eth.APIBackend.gpo = gasprice.NewOracle(eth.APIBackend, config.GPO, config.Miner.GasPrice)

	// Start the RPC service
	eth.netRPCService = ethapi.NewNetAPI(eth.p2pServer, networkID)

	// Register the backend on the node
	stack.RegisterAPIs(eth.APIs())
	stack.RegisterProtocols(eth.Protocols())
	stack.RegisterLifecycle(eth)

	// Successful startup; push a marker and check previous unclean shutdowns.
	eth.shutdownTracker.MarkStartup()

	return eth, nil
}

func makeExtraData(extra []byte) []byte {
	if len(extra) == 0 {
		// create default extradata
		extra, _ = rlp.EncodeToBytes([]interface{}{
			uint(gethversion.Major<<16 | gethversion.Minor<<8 | gethversion.Patch),
			"geth",
			runtime.Version(),
			runtime.GOOS,
		})
	}
	if uint64(len(extra)) > params.MaximumExtraDataSize {
		log.Warn("Miner extra data exceed limit", "extra", hexutil.Bytes(extra), "limit", params.MaximumExtraDataSize)
		extra = nil
	}
	return extra
}

// APIs return the collection of RPC services the ethereum package offers.
// NOTE, some of these services probably need to be moved to somewhere else.
func (s *Ethereum) APIs() []rpc.API {
	apis := ethapi.GetAPIs(s.APIBackend)

	// Append consensus engine APIs (xdpos)
	if xdposEngine, ok := s.engine.(*XDPoS.XDPoS); ok {
		apis = append(apis, xdposEngine.APIs(s.blockchain)...)
	}

	// Append all the local APIs and return
	return append(apis, []rpc.API{
		{
			Namespace: "miner",
			Service:   NewMinerAPI(s),
		}, {
			Namespace: "eth",
			Service:   downloader.NewDownloaderAPI(s.handler.downloader, s.blockchain, s.eventMux),
		}, {
			Namespace: "admin",
			Service:   NewAdminAPI(s),
		}, {
			Namespace: "debug",
			Service:   NewDebugAPI(s),
		}, {
			Namespace: "net",
			Service:   s.netRPCService,
		},
	}...)
}

func (s *Ethereum) ResetWithGenesisBlock(gb *types.Block) {
	s.blockchain.ResetWithGenesisBlock(gb)
}

func (s *Ethereum) Miner() *miner.Miner { return s.miner }

func (s *Ethereum) AccountManager() *accounts.Manager  { return s.accountManager }
func (s *Ethereum) BlockChain() *core.BlockChain       { return s.blockchain }
func (s *Ethereum) TxPool() *txpool.TxPool             { return s.txPool }
func (s *Ethereum) BlobTxPool() *blobpool.BlobPool     { return s.blobTxPool }
func (s *Ethereum) Engine() consensus.Engine           { return s.engine }
func (s *Ethereum) ChainDb() ethdb.Database            { return s.chainDb }
func (s *Ethereum) IsListening() bool                  { return true } // Always listening
func (s *Ethereum) Downloader() *downloader.Downloader { return s.handler.downloader }
func (s *Ethereum) Synced() bool                       { return s.handler.synced.Load() }
func (s *Ethereum) SetSynced()                         { s.handler.enableSyncedFeatures() }
func (s *Ethereum) ArchiveMode() bool                  { return s.config.NoPruning }

// Protocols returns all the currently configured
// network protocols to start.
func (s *Ethereum) Protocols() []p2p.Protocol {
	protos := eth.MakeProtocols((*ethHandler)(s.handler), s.networkID, s.discmix)
	// XDC uses eth/62, eth/63 which are not compatible with snap protocol.
	// Snap sync requires eth/67+ so we disable it for XDC networks.
	// if s.config.SnapshotCache > 0 {
	// 	protos = append(protos, snap.MakeProtocols((*snapHandler)(s.handler))...)
	// }
	return protos
}

// Start implements node.Lifecycle, starting all internal goroutines needed by the
// Ethereum protocol implementation.
func (s *Ethereum) Start() error {
	if err := s.setupDiscovery(); err != nil {
		return err
	}

	// Regularly update shutdown marker
	s.shutdownTracker.Start()

	// Start the networking layer
	s.handler.Start(s.p2pServer.MaxPeers)

	// Start the connection manager
	s.dropper.Start(s.p2pServer, func() bool { return !s.Synced() })

	// start log indexer
	s.filterMaps.Start()
	go s.updateFilterMapsHeads()

	// Start XDPoS V2 sealing agent (#02) and its results consumer (#03).
	// The agent only runs when:
	//   1. The chain uses XDPoS consensus, AND
	//   2. A pending-fee-recipient (mining address) is configured.
	// Without a configured recipient the agent would fail the V2 Prepare
	// check (header.Coinbase == signer) every tick, so we skip starting.
	if s.xdcAgent != nil {
		if s.miner != nil && s.miner.Config() != nil && s.miner.Config().PendingFeeRecipient != (common.Address{}) {
			etherbase := s.miner.Config().PendingFeeRecipient
			// Wire the unlocked wallet's SignData into engine.Authorize so the
			// engine can actually sign sealed blocks. Without this, every Seal
			// returns ErrUnauthorized because signFn is nil. Mirrors v2.6.8
			// eth/backend.go Start, which called engine.Authorize(addr, wallet.SignData)
			// after unlocking the etherbase account.
			if xdposEngine, ok := s.engine.(*XDPoS.XDPoS); ok {
				account := accounts.Account{Address: etherbase}
				// Geth 1.17 deprecated --unlock and --password (no-ops post-merge).
				// For XDPoS sealing we need the keystore wallet unlocked at startup,
				// so we read the passphrase from XDC_MINER_PASSWORD env var and call
				// keystore.Unlock directly. Mirrors the dev-mode unlock in
				// cmd/utils/flags.go:1991.
				var ks *keystore.KeyStore
				if backends := s.accountManager.Backends(keystore.KeyStoreType); len(backends) > 0 {
					ks, _ = backends[0].(*keystore.KeyStore)
				}
				if ks != nil {
					if passphrase := os.Getenv("XDC_MINER_PASSWORD"); passphrase != "" {
						if err := ks.Unlock(account, passphrase); err != nil {
							log.Warn("XDPoS: keystore unlock failed", "address", etherbase, "err", err)
						} else {
							log.Info("XDPoS: keystore unlocked for sealing", "address", etherbase)
						}
					}
					// IMPORTANT: do NOT use wallet.SignData here. It applies
					// keccak256(data) before signing — but the V1 engine already
					// passes sigHash(header) (already keccak-hashed) as `data`,
					// and the verifier ecrecovers against sigHash directly. Using
					// SignData would double-hash and every block would fail
					// "unauthorized" recovery. We sign the hash directly via
					// keystore.SignHash, matching v2.6.8 eth/backend.go behavior.
					signFn := func(_ accounts.Account, _ string, data []byte) ([]byte, error) {
						return ks.SignHash(account, data)
					}
					xdposEngine.Authorize(etherbase, signFn)
					log.Info("XDPoS: signer authorized for sealing (direct SignHash)", "address", etherbase)
				} else {
					log.Warn("XDPoS: keystore backend not found — sealing will fail with 'unauthorized'", "address", etherbase)
				}
			}
			s.xdcAgent.Start()
			go s.drainXdcAgentResults()
			log.Info("XdcAgent: started sealing loop", "etherbase", etherbase)
		} else {
			log.Info("XdcAgent: not started — no PendingFeeRecipient configured (use --miner.pending.feeRecipient)")
		}
	}

	return nil
}

func (s *Ethereum) newChainView(head *types.Header) *filtermaps.ChainView {
	if head == nil {
		return nil
	}
	return filtermaps.NewChainView(s.blockchain, head.Number.Uint64(), head.Hash())
}

func (s *Ethereum) updateFilterMapsHeads() {
	headEventCh := make(chan core.ChainEvent, 10)
	blockProcCh := make(chan bool, 10)
	sub := s.blockchain.SubscribeChainEvent(headEventCh)
	sub2 := s.blockchain.SubscribeBlockProcessingEvent(blockProcCh)
	defer func() {
		sub.Unsubscribe()
		sub2.Unsubscribe()
		for {
			select {
			case <-headEventCh:
			case <-blockProcCh:
			default:
				return
			}
		}
	}()

	var head *types.Header
	setHead := func(newHead *types.Header) {
		if newHead == nil {
			return
		}
		// XDC: Skip FilterMaps log indexing during bulk sync — saves CPU and disk I/O.
		// Log indices will be built after sync completes.
		if core.XdcBulkSyncMode.Load() {
			return
		}
		if head == nil || newHead.Hash() != head.Hash() {
			head = newHead
			chainView := s.newChainView(head)
			if chainView == nil {
				return // Skip filter maps update when chain view is unavailable (e.g., fresh node at block 0)
			}
			historyCutoff, _ := s.blockchain.HistoryPruningCutoff()
			var finalBlock uint64
			if fb := s.blockchain.CurrentFinalBlock(); fb != nil {
				finalBlock = fb.Number.Uint64()
			}
			s.filterMaps.SetTarget(chainView, historyCutoff, finalBlock)
		}
	}
	setHead(s.blockchain.CurrentBlock())

	for {
		select {
		case ev := <-headEventCh:
			setHead(ev.Header)
		case blockProc := <-blockProcCh:
			s.filterMaps.SetBlockProcessing(blockProc)
		case <-time.After(time.Second * 10):
			setHead(s.blockchain.CurrentBlock())
		case ch := <-s.closeFilterMaps:
			close(ch)
			return
		}
	}
}

func (s *Ethereum) setupDiscovery() error {
	eth.StartENRUpdater(s.blockchain, s.p2pServer.LocalNode())

	// Add eth nodes from DNS.
	dnsclient := dnsdisc.NewClient(dnsdisc.Config{})
	if len(s.config.EthDiscoveryURLs) > 0 {
		iter, err := dnsclient.NewIterator(s.config.EthDiscoveryURLs...)
		if err != nil {
			return err
		}
		s.discmix.AddSource(iter)
	}

	// Add snap nodes from DNS.
	if len(s.config.SnapDiscoveryURLs) > 0 {
		iter, err := dnsclient.NewIterator(s.config.SnapDiscoveryURLs...)
		if err != nil {
			return err
		}
		s.discmix.AddSource(iter)
	}

	// Add DHT nodes from discv4.
	if s.p2pServer.DiscoveryV4() != nil {
		iter := s.p2pServer.DiscoveryV4().RandomNodes()
		resolverFunc := func(ctx context.Context, enr *enode.Node) *enode.Node {
			// RequestENR does not yet support context. It will simply time out.
			// If the ENR can't be resolved, RequestENR will return nil. We don't
			// care about the specific error here, so we ignore it.
			nn, _ := s.p2pServer.DiscoveryV4().RequestENR(enr)
			return nn
		}
		iter = enode.AsyncFilter(iter, resolverFunc, maxParallelENRRequests)
		iter = enode.Filter(iter, eth.NewNodeFilter(s.blockchain))
		iter = enode.NewBufferIter(iter, discoveryPrefetchBuffer)
		s.discmix.AddSource(iter)
	}

	// Add DHT nodes from discv5.
	if s.p2pServer.DiscoveryV5() != nil {
		filter := eth.NewNodeFilter(s.blockchain)
		iter := enode.Filter(s.p2pServer.DiscoveryV5().RandomNodes(), filter)
		iter = enode.NewBufferIter(iter, discoveryPrefetchBuffer)
		s.discmix.AddSource(iter)
	}

	return nil
}

// Stop implements node.Lifecycle, terminating all internal goroutines used by the
// Ethereum protocol.
func (s *Ethereum) Stop() error {
	// Stop all the peer-related stuff first.
	s.discmix.Close()
	s.dropper.Stop()
	s.handler.Stop()

	// Stop XDPoS V2 sealing agent (#02) before shutting down chain/engine.
	if s.xdcAgent != nil {
		s.xdcAgent.Stop()
		log.Info("XdcAgent: stopped")
	}

	// Then stop everything else.
	ch := make(chan struct{})
	s.closeFilterMaps <- ch
	<-ch
	s.filterMaps.Stop()
	s.txPool.Close()
	s.blockchain.Stop()
	s.engine.Close()

	// Clean shutdown marker as the last thing before closing db
	s.shutdownTracker.Stop()

	s.chainDb.Close()
	s.eventMux.Stop()

	return nil
}

// drainXdcAgentResults consumes sealed blocks from the XdcAgent and routes
// them to the local BlockChain insert + peer broadcast (#03).
func (s *Ethereum) drainXdcAgentResults() {
	if s.xdcAgent == nil {
		return
	}
	for block := range s.xdcAgent.Results() {
		log.Info("XdcAgent: sealed block", "number", block.NumberU64(), "hash", block.Hash(), "txs", len(block.Transactions()))

		// Insert into local chain
		if _, err := s.blockchain.InsertChain([]*types.Block{block}); err != nil {
			log.Error("XdcAgent: InsertChain failed", "number", block.NumberU64(), "hash", block.Hash(), "err", err)
			continue
		}

		// Broadcast to peers
		if s.handler != nil {
			// geth-1.17's handler does not have BroadcastBlock; use the peer set
			// directly via p2p.Send with NewBlockMsg (eth/62+ protocol).
			for _, peer := range s.handler.peers.peers {
				peer.Send(eth.NewBlockMsg, &eth.NewBlockPacket{Block: block, TD: big.NewInt(1)})
			}
			log.Debug("XdcAgent: broadcasted block", "number", block.NumberU64(), "peers", len(s.handler.peers.peers))
		}
	}
	log.Info("XdcAgent: results consumer exited")
}
