package eth

import (
	"math/big"
	"sync"
	"sync/atomic"
	"time"

	"github.com/ethereum/go-ethereum/core"
	"github.com/ethereum/go-ethereum/eth/protocols/eth"
	"github.com/ethereum/go-ethereum/log"
)

const (
	xdcForceSyncCycle  = 1 * time.Second // XDC has 2s block time; re-engage every block
	xdcMinPeers        = 1               // Minimum peers to start syncing
	xdcMinBackoff      = 5 * time.Second // Min backoff for same-peer retry
	xdcMaxBackoff      = 2 * time.Minute // Max backoff for same-peer retry
	xdcSyncTimeout     = 5 * time.Minute // Hard upper-bound for a single sync cycle (safety net)
	// Adaptive stall windows (per #576 short-term tuning).
	// - Bulk: enough slack for batch imports (~200 blocks land in <1s) without
	//   tripping during legitimate bulk-sync work.
	// - Tip:  ~1× block-time (XDC = 2s); when we're already at tip the only
	//   reason head wouldn't advance is the post-target hang. Cancel fast so
	//   forceSync (1s) re-engages with the latest peerHead.
	// nearTip threshold = the peer's reported head is within this many blocks
	// of ours; tuned to be longer than a typical sync round's catchup window.
	xdcSyncStallWindowBulk = 5 * time.Second
	xdcSyncStallWindowTip  = 1 * time.Second
	xdcNearTipThreshold    = 16 // peerHead - localHead ≤ this → use tip window
	// Stuck-flag watchdog: if `syncing` CAS stays true and chain head doesn't
	// advance for this long while we have peers, assume the synchronise
	// goroutine died inside the downloader and force-reset (#588).
	xdcStuckFlagTimeout = 90 * time.Second
)

// xdcSyncer manages pre-merge sync for XDC network
type xdcSyncer struct {
	handler   *handler
	syncing   atomic.Bool
	quitCh    chan struct{}
	newPeerCh chan *eth.Peer

	// Backoff tracking to avoid retrying same stale peer
	mu            sync.Mutex
	peerBackoff   map[string]time.Duration // peer ID -> current backoff duration
	peerLastTry   map[string]time.Time     // peer ID -> last sync attempt time
	lastSyncBlock uint64                   // block number after last successful sync

	// Sync completion signal — goroutine sends here when done
	syncDone chan struct{}
}

// newXDCSyncer creates a new XDC syncer
func newXDCSyncer(h *handler) *xdcSyncer {
	return &xdcSyncer{
		handler:     h,
		quitCh:      make(chan struct{}),
		newPeerCh:   make(chan *eth.Peer, 10),
		peerBackoff: make(map[string]time.Duration),
		peerLastTry: make(map[string]time.Time),
		syncDone:    make(chan struct{}, 1),
	}
}

// start begins the sync loop. Wrapped with a recover so a panic in any
// downstream handler restarts the loop instead of silently killing
// sync forever (see #588).
func (s *xdcSyncer) start() {
	go s.runLoop()
}

// runLoop wraps loop with panic recovery + auto-restart. Without this, a
// panic in synchronise / bestPeer would terminate the loop goroutine and
// leave the node frozen — forceSync stops firing, no `starting
// synchronisation` log appears, peers reconnect but nobody pulls from them.
// The 1s pause prevents tight-loop panicking if the cause is persistent.
func (s *xdcSyncer) runLoop() {
	for {
		select {
		case <-s.quitCh:
			return
		default:
		}
		func() {
			defer func() {
				if r := recover(); r != nil {
					log.Error("xdcSyncer.loop panicked, restarting in 1s", "panic", r)
				}
			}()
			s.loop()
		}()
		// If loop() returned without quit signal it's an unexpected exit —
		// restart anyway. A normal stop closes quitCh and we hit the return above.
		time.Sleep(1 * time.Second)
	}
}

// stop terminates the sync loop
func (s *xdcSyncer) stop() {
	close(s.quitCh)
}

// notifyPeer signals that a new peer is available
func (s *xdcSyncer) notifyPeer(peer *eth.Peer) {
	select {
	case s.newPeerCh <- peer:
	default:
	}
}

// loop is the main sync loop. It dispatches sync attempts as goroutines
// (guarded by s.syncing CAS) so the loop never blocks on a slow downloader.
//
// Three watchdogs (see #588):
//   - forceSync ticker (1s): primary re-engagement.
//   - heartbeat ticker (30s): INFO log of head/peercount/syncing-flag so a
//     future stuck-sync is diagnosable from the log alone.
//   - stuck-flag watchdog (90s): if `syncing` is true but the chain head
//     hasn't advanced AND no sync round has logged completion, force-reset
//     the CAS. Catches the goroutine-died-without-defer case where the flag
//     would otherwise stay latched forever.
func (s *xdcSyncer) loop() {
	forceSync := time.NewTicker(xdcForceSyncCycle)
	defer forceSync.Stop()
	heartbeat := time.NewTicker(30 * time.Second)
	defer heartbeat.Stop()

	// Track when sync last made observable progress (round completed or
	// new chain head). If syncing CAS stays true past xdcStuckFlagTimeout
	// without any progress, we consider it latched and reset it.
	var (
		lastProgressTime  = time.Now()
		lastProgressBlock uint64
	)
	if cur := s.handler.chain.CurrentBlock(); cur != nil {
		lastProgressBlock = cur.Number.Uint64()
	}

	// trySync launches a sync goroutine if not already running
	trySync := func(peer *eth.Peer) {
		if peer == nil {
			return
		}
		// syncing CAS is checked inside synchronise — safe to call from goroutine
		go func() {
			s.synchronise(peer)
			// Signal loop that sync finished so it can immediately retry
			select {
			case s.syncDone <- struct{}{}:
			default:
			}
		}()
	}

	for {
		select {
		case peer := <-s.newPeerCh:
			trySync(peer)

		case <-s.syncDone:
			// A sync just finished — wait for next forceSync tick before retry
			// to avoid tight-loop spinning when peer head <= local head
			// The forceSync ticker provides natural pacing
			// If progress was made, the peer backoff was reset, so it will be
			// picked as best peer on the next tick

		case <-forceSync.C:
			// Periodic fallback — pick best peer and sync
			if peer := s.bestPeer(); peer != nil {
				trySync(peer)
			}

		case <-heartbeat.C:
			cur := uint64(0)
			head := s.handler.chain.CurrentBlock()
			if head != nil {
				cur = head.Number.Uint64()
			}
			peercount := s.handler.peers.len()
			syncing := s.syncing.Load()
			if cur > lastProgressBlock {
				lastProgressBlock = cur
				lastProgressTime = time.Now()
			}
			stalled := time.Since(lastProgressTime)
			log.Info("xdcSyncer heartbeat",
				"head", cur, "peers", peercount, "syncing", syncing,
				"sinceLastAdvance", stalled.Round(time.Second))

			// Stuck-flag rescue: if syncing has been true for a long time AND
			// the chain hasn't advanced AND we have peers to try, the CAS is
			// almost certainly latched (synchronise goroutine died inside the
			// downloader without releasing). Reset it so forceSync can re-engage.
			if syncing && stalled >= xdcStuckFlagTimeout && peercount > 0 {
				if s.syncing.CompareAndSwap(true, false) {
					log.Warn("xdcSyncer: syncing flag latched past stuck-flag timeout, force-reset",
						"head", cur, "peers", peercount,
						"stalled", stalled.Round(time.Second))
					// Also cancel any in-flight downloader work the dead
					// goroutine left behind.
					s.handler.downloader.Cancel()
				}
			}

		case <-s.quitCh:
			return
		}
	}
}

// bestPeer finds the best peer to sync from: the highest-TD peer not currently
// in backoff. FIX #567: the prior "head-known" filter has been removed —
// peer.Head() returns the stale handshake hash that is never refreshed during
// bulk sync, so excluding peers whose hash is already in our chain silently hid
// peers that had actually advanced millions of blocks past their handshake
// state. We always give a non-backed-off peer a chance to sync; the downloader
// returns zero progress if the peer really has nothing new, and per-peer
// exponential backoff (5s → 2min) bounds wasted retries.
func (s *xdcSyncer) bestPeer() *eth.Peer {
	var (
		bestPeer *eth.Peer
		bestTd   *big.Int
	)

	s.mu.Lock()
	defer s.mu.Unlock()

	now := time.Now()

	for _, p := range s.handler.peers.all() {
		if p.Peer == nil {
			continue
		}
		_, td := p.Peer.Head()
		if td == nil {
			continue
		}
		pid := p.Peer.ID()
		if lastTry, ok := s.peerLastTry[pid]; ok {
			if now.Sub(lastTry) < s.peerBackoff[pid] {
				continue
			}
		}
		if bestTd == nil || td.Cmp(bestTd) > 0 {
			bestPeer = p.Peer
			bestTd = td
		}
	}

	return bestPeer
}

// synchronise runs ONE sync round against the given peer, geth-aligned to
// upstream Synchronise() one-shot semantics. Re-engagement when peers advance
// past our local head is the responsibility of the forceSync ticker + the
// handler's peer.SetHead path (NewBlockHashes / NewBlock) — we don't keep
// retrying internally, which is what previously caused the syncing CAS to
// stay latched while SynchroniseXDC hung past its work.
//
// A hard timeout (xdcSyncTimeout) guards against unbounded hangs inside the
// downloader (e.g., spawnSync's queue.Close() racing with processFullSyncContent
// after the bulk download window). On timeout we call Downloader.Cancel(),
// which closes cancelCh; the fetchers return errCanceled, spawnSync closes
// the queue, and processFullSyncContent's Results(true) unblocks. The
// goroutine drains the error and we release the syncing flag so the next
// forceSync tick (3s) can immediately re-engage.
func (s *xdcSyncer) synchronise(peer *eth.Peer) {
	if peer == nil {
		return
	}

	// Only one sync at a time
	if !s.syncing.CompareAndSwap(false, true) {
		return
	}
	defer s.syncing.Store(false)

	peerHead, peerTd := peer.Head()

	currentBlock := s.handler.chain.CurrentBlock()
	if currentBlock == nil {
		return
	}

	if peerTd == nil || peerTd.Sign() == 0 {
		peerTd = new(big.Int).SetInt64(1 << 62)
	}

	s.recordPeerAttempt(peer.ID())
	beforeBlock := currentBlock.Number.Uint64()

	log.Info("XDC sync: starting synchronisation",
		"peer", peer.ID()[:16],
		"peerHead", peerHead.Hex()[:16],
		"peerTd", peerTd,
		"ourBlock", beforeBlock,
	)

	mode := s.handler.downloader.ConfigSyncMode()

	core.XdcBulkSyncMode.Store(true)
	defer core.XdcBulkSyncMode.Store(false)

	// Run one round, supervised by two watchdogs:
	//   - xdcSyncStallWindow (30s): cancel if chain head doesn't advance.
	//     Catches the post-target hang where bulk download has completed but
	//     SynchroniseXDC's spawnSync is still waiting on processFullSyncContent.
	//   - xdcSyncTimeout (5m): absolute upper bound so we never sit forever.
	// On either trip, Cancel() closes the downloader's cancelCh; fetchers
	// return errCanceled, spawnSync closes the queue, processFullSyncContent
	// unblocks, and the deferred syncing.Store(false) frees this slot so the
	// next forceSync tick (3s) re-engages with a fresh peerHead.
	syncErrCh := make(chan error, 1)
	go func() {
		syncErrCh <- s.handler.downloader.SynchroniseXDC(peer.ID(), peerHead, peerTd, mode)
	}()
	deadline := time.NewTimer(xdcSyncTimeout)
	defer deadline.Stop()
	// Tick at 500ms so the 1s tip window is honoured with at most ~1.5s latency.
	stallTicker := time.NewTicker(500 * time.Millisecond)
	defer stallTicker.Stop()

	lastBlock := beforeBlock
	lastProgress := time.Now()
	var err error
syncWait:
	for {
		select {
		case err = <-syncErrCh:
			break syncWait
		case <-deadline.C:
			log.Warn("XDC sync: deadline exceeded, cancelling downloader",
				"peer", peer.ID()[:16], "timeout", xdcSyncTimeout)
			s.handler.downloader.Cancel()
			err = <-syncErrCh
			break syncWait
		case <-stallTicker.C:
			cur := s.handler.chain.CurrentBlock().Number.Uint64()
			if cur > lastBlock {
				lastBlock = cur
				lastProgress = time.Now()
				continue
			}
			// Adaptive window. Tip mode when the chain advanced fewer than
			// xdcNearTipThreshold blocks this round — this round is a
			// tail-follow catchup, not a bulk import, so any further idle is
			// just the post-target hang.
			//
			// We can't compare peerHead vs cur directly: XDC's peerTd is the
			// real cumulative difficulty (not block number) for peers learned
			// via the handshake. Round-size is a reliable proxy.
			window := xdcSyncStallWindowBulk
			progress := uint64(0)
			if lastBlock >= beforeBlock {
				progress = lastBlock - beforeBlock
			}
			if progress > 0 && progress < uint64(xdcNearTipThreshold) {
				window = xdcSyncStallWindowTip
			}
			if time.Since(lastProgress) >= window {
				log.Warn("XDC sync: stalled, cancelling downloader",
					"peer", peer.ID()[:16],
					"block", cur,
					"progress", progress,
					"stalled", time.Since(lastProgress).Round(100*time.Millisecond),
					"window", window)
				s.handler.downloader.Cancel()
				err = <-syncErrCh
				break syncWait
			}
		}
	}

	afterBlock := s.handler.chain.CurrentBlock().Number.Uint64()
	progress := afterBlock - beforeBlock

	if err != nil {
		log.Warn("XDC sync: synchronisation failed",
			"peer", peer.ID()[:16], "err", err, "progress", progress)
	} else {
		log.Info("XDC sync: round completed",
			"peer", peer.ID()[:16], "progress", progress, "block", afterBlock)
	}

	if progress > 0 {
		s.resetPeerBackoff(peer.ID())
		s.mu.Lock()
		s.lastSyncBlock = afterBlock
		s.mu.Unlock()
		// Fix #311: Only mark as synced when we actually made progress.
		// Upstream geth sets synced when downloader completes successfully;
		// we match that semantics rather than gating on best-peer distance.
		s.handler.synced.Store(true)
	} else {
		s.recordPeerBackoff(peer.ID())
	}
}

func (s *xdcSyncer) recordPeerAttempt(peerID string) {
	s.mu.Lock()
	defer s.mu.Unlock()
	s.peerLastTry[peerID] = time.Now()
	if _, ok := s.peerBackoff[peerID]; !ok {
		s.peerBackoff[peerID] = xdcMinBackoff
	}
}

func (s *xdcSyncer) recordPeerBackoff(peerID string) {
	s.mu.Lock()
	defer s.mu.Unlock()
	s.peerLastTry[peerID] = time.Now()
	current := s.peerBackoff[peerID]
	if current < xdcMinBackoff {
		current = xdcMinBackoff
	} else {
		current *= 2
	}
	if current > xdcMaxBackoff {
		current = xdcMaxBackoff
	}
	s.peerBackoff[peerID] = current
}

func (s *xdcSyncer) resetPeerBackoff(peerID string) {
	s.mu.Lock()
	defer s.mu.Unlock()
	delete(s.peerBackoff, peerID)
	delete(s.peerLastTry, peerID)
}

// resetAllBackoffs resets all peer backoffs (called when new block hashes arrive).
func (s *xdcSyncer) resetAllBackoffs() {
	s.mu.Lock()
	defer s.mu.Unlock()
	for k := range s.peerBackoff {
		delete(s.peerBackoff, k)
	}
	for k := range s.peerLastTry {
		delete(s.peerLastTry, k)
	}
}
