// Copyright 2022 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package pathdb

import (
	"bytes"
	"fmt"
	"sync"
	"time"

	"github.com/VictoriaMetrics/fastcache"
	"github.com/ethereum/go-ethereum/common"
	"github.com/ethereum/go-ethereum/core/rawdb"
	"github.com/ethereum/go-ethereum/crypto"
	"github.com/ethereum/go-ethereum/ethdb"
	"github.com/ethereum/go-ethereum/log"
)

// diskLayer is a low level persistent layer built on top of a key-value store.
type diskLayer struct {
	root common.Hash // Immutable, root hash to which this layer was made for
	id   uint64      // Immutable, corresponding state id
	db   *Database   // Path-based trie database

	// These two caches must be maintained separately, because the key
	// for the root node of the storage trie (accountHash) is identical
	// to the key for the account data.
	nodes  *fastcache.Cache // GC friendly memory cache of clean nodes
	states *fastcache.Cache // GC friendly memory cache of clean states

	buffer *buffer // Live buffer to aggregate writes
	frozen *buffer // Frozen node buffer waiting for flushing

	stale bool         // Signals that the layer became stale (state progressed)
	lock  sync.RWMutex // Lock used to protect stale flag and genMarker

	// The generator is set if the state snapshot was not fully completed,
	// regardless of whether the background generation is running or not.
	// It should only be unset if the generation completes.
	generator *generator
}

// newDiskLayer creates a new disk layer based on the passing arguments.
func newDiskLayer(root common.Hash, id uint64, db *Database, nodes *fastcache.Cache, states *fastcache.Cache, buffer *buffer, frozen *buffer) *diskLayer {
	// Initialize the clean caches if the memory allowance is not zero
	// or reuse the provided caches if they are not nil (inherited from
	// the original disk layer).
	if nodes == nil && db.config.TrieCleanSize != 0 {
		nodes = fastcache.New(db.config.TrieCleanSize)
	}
	if states == nil && db.config.StateCleanSize != 0 {
		states = fastcache.New(db.config.StateCleanSize)
	}
	return &diskLayer{
		root:   root,
		id:     id,
		db:     db,
		nodes:  nodes,
		states: states,
		buffer: buffer,
		frozen: frozen,
	}
}

// rootHash implements the layer interface, returning root hash of corresponding state.
func (dl *diskLayer) rootHash() common.Hash {
	return dl.root
}

// stateID implements the layer interface, returning the state id of disk layer.
func (dl *diskLayer) stateID() uint64 {
	return dl.id
}

// parentLayer implements the layer interface, returning nil as there's no layer
// below the disk.
func (dl *diskLayer) parentLayer() layer {
	return nil
}

// setGenerator links the given generator to disk layer, representing the
// associated state snapshot is not fully completed yet and the generation
// is potentially running in the background.
func (dl *diskLayer) setGenerator(generator *generator) {
	dl.generator = generator
}

// markStale sets the stale flag as true.
func (dl *diskLayer) markStale() {
	dl.lock.Lock()
	defer dl.lock.Unlock()

	if dl.stale {
		panic("triedb disk layer is stale") // we've committed into the same base from two children, boom
	}
	dl.stale = true
}

// node implements the layer interface, retrieving the trie node with the
// provided node info. No error will be returned if the node is not found.
func (dl *diskLayer) node(owner common.Hash, path []byte, depth int) ([]byte, common.Hash, *nodeLoc, error) {
	dl.lock.RLock()
	defer dl.lock.RUnlock()

	if dl.stale {
		return nil, common.Hash{}, nil, errSnapshotStale
	}
	// Try to retrieve the trie node from the not-yet-written node buffer first
	// (both the live one and the frozen one). Note the buffer is lock free since
	// it's impossible to mutate the buffer before tagging the layer as stale.
	for _, buffer := range []*buffer{dl.buffer, dl.frozen} {
		if buffer != nil {
			n, found := buffer.node(owner, path)
			if found {
				dirtyNodeHitMeter.Mark(1)
				dirtyNodeReadMeter.Mark(int64(len(n.Blob)))
				dirtyNodeHitDepthHist.Update(int64(depth))
				return n.Blob, n.Hash, &nodeLoc{loc: locDirtyCache, depth: depth}, nil
			}
		}
	}
	dirtyNodeMissMeter.Mark(1)

	// Try to retrieve the trie node from the clean memory cache
	key := nodeCacheKey(owner, path)
	if dl.nodes != nil {
		if blob := dl.nodes.Get(nil, key); len(blob) > 0 {
			cleanNodeHitMeter.Mark(1)
			cleanNodeReadMeter.Mark(int64(len(blob)))
			return blob, crypto.Keccak256Hash(blob), &nodeLoc{loc: locCleanCache, depth: depth}, nil
		}
		cleanNodeMissMeter.Mark(1)
	}
	// Try to retrieve the trie node from the disk.
	var blob []byte
	if owner == (common.Hash{}) {
		blob = rawdb.ReadAccountTrieNode(dl.db.diskdb, path)
	} else {
		blob = rawdb.ReadStorageTrieNode(dl.db.diskdb, owner, path)
	}
	// Store the resolved data in the clean cache. The background buffer flusher
	// may also write to the clean cache concurrently, but two writers cannot
	// write the same item with different content. If the item already exists,
	// it will be found in the frozen buffer, eliminating the need to check the
	// database.
	if dl.nodes != nil && len(blob) > 0 {
		dl.nodes.Set(key, blob)
		cleanNodeWriteMeter.Mark(int64(len(blob)))
	}
	return blob, crypto.Keccak256Hash(blob), &nodeLoc{loc: locDiskLayer, depth: depth}, nil
}

// account directly retrieves the account RLP associated with a particular
// hash in the slim data format.
//
// Note the returned account is not a copy, please don't modify it.
func (dl *diskLayer) account(hash common.Hash, depth int) ([]byte, error) {
	dl.lock.RLock()
	defer dl.lock.RUnlock()

	if dl.stale {
		return nil, errSnapshotStale
	}
	// Try to retrieve the trie node from the not-yet-written node buffer first
	// (both the live one and the frozen one). Note the buffer is lock free since
	// it's impossible to mutate the buffer before tagging the layer as stale.
	for _, buffer := range []*buffer{dl.buffer, dl.frozen} {
		if buffer != nil {
			blob, found := buffer.account(hash)
			if found {
				dirtyStateHitMeter.Mark(1)
				dirtyStateReadMeter.Mark(int64(len(blob)))
				dirtyStateHitDepthHist.Update(int64(depth))

				if len(blob) == 0 {
					stateAccountInexMeter.Mark(1)
				} else {
					stateAccountExistMeter.Mark(1)
				}
				return blob, nil
			}
		}
	}
	dirtyStateMissMeter.Mark(1)

	// If the layer is being generated, ensure the requested account has
	// already been covered by the generator.
	marker := dl.genMarker()
	if marker != nil && bytes.Compare(hash.Bytes(), marker) > 0 {
		return nil, errNotCoveredYet
	}
	// Try to retrieve the account from the memory cache
	if dl.states != nil {
		if blob, found := dl.states.HasGet(nil, hash[:]); found {
			cleanStateHitMeter.Mark(1)
			cleanStateReadMeter.Mark(int64(len(blob)))

			if len(blob) == 0 {
				stateAccountInexMeter.Mark(1)
			} else {
				stateAccountExistMeter.Mark(1)
			}
			return blob, nil
		}
		cleanStateMissMeter.Mark(1)
	}
	// Try to retrieve the account from the disk.
	blob := rawdb.ReadAccountSnapshot(dl.db.diskdb, hash)

	// Store the resolved data in the clean cache. The background buffer flusher
	// may also write to the clean cache concurrently, but two writers cannot
	// write the same item with different content. If the item already exists,
	// it will be found in the frozen buffer, eliminating the need to check the
	// database.
	if dl.states != nil {
		dl.states.Set(hash[:], blob)
		cleanStateWriteMeter.Mark(int64(len(blob)))
	}
	if len(blob) == 0 {
		stateAccountInexMeter.Mark(1)
		stateAccountInexDiskMeter.Mark(1)
	} else {
		stateAccountExistMeter.Mark(1)
		stateAccountExistDiskMeter.Mark(1)
	}
	return blob, nil
}

// storage directly retrieves the storage data associated with a particular hash,
// within a particular account.
//
// Note the returned account is not a copy, please don't modify it.
func (dl *diskLayer) storage(accountHash, storageHash common.Hash, depth int) ([]byte, error) {
	// Hold the lock, ensure the parent won't be changed during the
	// state accessing.
	dl.lock.RLock()
	defer dl.lock.RUnlock()

	if dl.stale {
		return nil, errSnapshotStale
	}
	// Try to retrieve the trie node from the not-yet-written node buffer first
	// (both the live one and the frozen one). Note the buffer is lock free since
	// it's impossible to mutate the buffer before tagging the layer as stale.
	for _, buffer := range []*buffer{dl.buffer, dl.frozen} {
		if buffer != nil {
			if blob, found := buffer.storage(accountHash, storageHash); found {
				dirtyStateHitMeter.Mark(1)
				dirtyStateReadMeter.Mark(int64(len(blob)))
				dirtyStateHitDepthHist.Update(int64(depth))

				if len(blob) == 0 {
					stateStorageInexMeter.Mark(1)
				} else {
					stateStorageExistMeter.Mark(1)
				}
				return blob, nil
			}
		}
	}
	dirtyStateMissMeter.Mark(1)

	// If the layer is being generated, ensure the requested storage slot
	// has already been covered by the generator.
	key := storageKeySlice(accountHash, storageHash)
	marker := dl.genMarker()
	if marker != nil && bytes.Compare(key, marker) > 0 {
		return nil, errNotCoveredYet
	}
	// Try to retrieve the storage slot from the memory cache
	if dl.states != nil {
		if blob, found := dl.states.HasGet(nil, key); found {
			cleanStateHitMeter.Mark(1)
			cleanStateReadMeter.Mark(int64(len(blob)))

			if len(blob) == 0 {
				stateStorageInexMeter.Mark(1)
			} else {
				stateStorageExistMeter.Mark(1)
			}
			return blob, nil
		}
		cleanStateMissMeter.Mark(1)
	}
	// Try to retrieve the account from the disk
	blob := rawdb.ReadStorageSnapshot(dl.db.diskdb, accountHash, storageHash)

	// Store the resolved data in the clean cache. The background buffer flusher
	// may also write to the clean cache concurrently, but two writers cannot
	// write the same item with different content. If the item already exists,
	// it will be found in the frozen buffer, eliminating the need to check the
	// database.
	if dl.states != nil {
		dl.states.Set(key, blob)
		cleanStateWriteMeter.Mark(int64(len(blob)))
	}
	if len(blob) == 0 {
		stateStorageInexMeter.Mark(1)
		stateStorageInexDiskMeter.Mark(1)
	} else {
		stateStorageExistMeter.Mark(1)
		stateStorageExistDiskMeter.Mark(1)
	}
	return blob, nil
}

// update implements the layer interface, returning a new diff layer on top
// with the given state set.
func (dl *diskLayer) update(root common.Hash, id uint64, block uint64, nodes *nodeSetWithOrigin, states *StateSetWithOrigin) *diffLayer {
	return newDiffLayer(dl, root, id, block, nodes, states)
}

// writeHistory stores the specified history and indexes if indexing is
// permitted.
//
// What's more, this function also returns a flag indicating whether the
// buffer flushing is required, ensuring the persistent state ID is always
// greater than or equal to the first history ID.
func (dl *diskLayer) writeHistory(typ historyType, diff *diffLayer) (bool, error) {
	var (
		limit     uint64
		freezer   ethdb.AncientStore
		indexer   *historyIndexer
		writeFunc func(writer ethdb.AncientWriter, dl *diffLayer) error
	)
	switch typ {
	case typeStateHistory:
		freezer = dl.db.stateFreezer
		indexer = dl.db.stateIndexer
		writeFunc = writeStateHistory
		limit = dl.db.config.StateHistory
	case typeTrienodeHistory:
		freezer = dl.db.trienodeFreezer
		indexer = dl.db.trienodeIndexer
		writeFunc = func(writer ethdb.AncientWriter, diff *diffLayer) error {
			return writeTrienodeHistory(writer, diff, dl.db.config.FullValueCheckpoint)
		}
		// Skip the history commit if the trienode history is not permitted
		if dl.db.config.TrienodeHistory < 0 {
			return false, nil
		}
		limit = uint64(dl.db.config.TrienodeHistory)
	default:
		panic(fmt.Sprintf("unknown history type: %v", typ))
	}
	// Short circuit if the history freezer is nil
	if freezer == nil {
		return false, nil
	}
	// Bail out with an error if writing the state history fails.
	// This can happen, for example, if the device is full.
	err := writeFunc(freezer, diff)
	if err != nil {
		return false, err
	}
	// Notify the history indexer for newly created history
	if indexer != nil {
		if err := indexer.extend(diff.stateID()); err != nil {
			return false, err
		}
	}
	// Determine if the persisted history object has exceeded the
	// configured limitation.
	if limit == 0 {
		return false, nil
	}
	tail, err := freezer.Tail()
	if err != nil {
		return false, err
	} // firstID = tail+1

	// length = diff.stateID()-firstID+1 = diff.stateID()-tail
	if diff.stateID()-tail <= limit {
		return false, nil
	}
	newFirst := diff.stateID() - limit + 1 // the id of first history **after truncation**

	// In a rare case where the ID of the first history object (after tail
	// truncation) exceeds the persisted state ID, we must take corrective
	// steps:
	//
	// - Skip tail truncation temporarily, avoid the scenario that associated
	//   history of persistent state is removed
	//
	// - Force a commit of the cached dirty states into persistent state
	//
	// These measures ensure the persisted state ID always remains greater
	// than or equal to the first history ID.
	if persistentID := rawdb.ReadPersistentStateID(dl.db.diskdb); persistentID < newFirst {
		log.Debug("Skip tail truncation", "type", typ, "persistentID", persistentID, "tailID", tail+1, "headID", diff.stateID(), "limit", limit)
		return true, nil
	}
	pruned, err := truncateFromTail(freezer, typ, newFirst-1)
	if err != nil {
		return false, err
	}
	log.Debug("Pruned history", "type", typ, "items", pruned, "tailid", newFirst)
	return false, nil
}

// commit merges the given bottom-most diff layer into the node buffer
// and returns a newly constructed disk layer. Note the current disk
// layer must be tagged as stale first to prevent re-access.
func (dl *diskLayer) commit(bottom *diffLayer, force bool) (*diskLayer, error) {
	dl.lock.Lock()
	defer dl.lock.Unlock()

	// Construct and store the state history first. If crash happens after storing
	// the state history but without flushing the corresponding states(journal),
	// the stored state history will be truncated from head in the next restart.
	flushA, err := dl.writeHistory(typeStateHistory, bottom)
	if err != nil {
		return nil, err
	}
	// Construct and store the trienode history first. If crash happens after
	// storing the trienode history but without flushing the corresponding
	// states(journal), the stored trienode history will be truncated from head
	// in the next restart.
	flushB, err := dl.writeHistory(typeTrienodeHistory, bottom)
	if err != nil {
		return nil, err
	}
	// Since the state history and trienode history may be configured with different
	// lengths, the buffer will be flushed once either of them meets its threshold.
	flush := flushA || flushB

	// Mark the diskLayer as stale before applying any mutations on top.
	dl.stale = true

	// Store the root->id lookup afterwards. All stored lookups are identified
	// by the **unique** state root. It's impossible that in the same chain
	// blocks are not adjacent but have the same root.
	if dl.id == 0 {
		rawdb.WriteStateID(dl.db.diskdb, dl.root, 0)
	}
	rawdb.WriteStateID(dl.db.diskdb, bottom.rootHash(), bottom.stateID())

	// Merge the trie nodes and flat states of the bottom-most diff layer into the
	// buffer as the combined layer.
	combined := dl.buffer.commit(bottom.nodes.nodeSet, bottom.states.stateSet)

	// Terminate the background state snapshot generation before mutating the
	// persistent state.
	if combined.full() || force || flush {
		// Wait until the previous frozen buffer is fully flushed
		if dl.frozen != nil {
			if err := dl.frozen.waitFlush(); err != nil {
				return nil, err
			}
		}
		// Release the frozen buffer and the internally referenced maps will
		// be reclaimed by GC.
		dl.frozen = nil

		// Terminate the background state snapshot generator before flushing
		// to prevent data race.
		var (
			progress []byte
			gen      = dl.generator
		)
		if gen != nil {
			gen.stop()
			progress = gen.progressMarker()

			// If the snapshot has been fully generated, unset the generator
			if progress == nil {
				dl.setGenerator(nil)
			} else {
				log.Info("Paused snapshot generation")
			}
		}

		// Freeze the live buffer and schedule background flushing
		dl.frozen = combined
		dl.frozen.flush(bottom.root, dl.db.diskdb, []ethdb.AncientWriter{dl.db.stateFreezer, dl.db.trienodeFreezer}, progress, dl.nodes, dl.states, bottom.stateID(), func() {
			// Resume the background generation if it's not completed yet.
			// The generator is assumed to be available if the progress is
			// not nil.
			//
			// Notably, the generator will be shared and linked by all the
			// disk layer instances, regardless of the generation is terminated
			// or not.
			if progress != nil {
				gen.run(bottom.root)
			}
		})
		// Block until the frozen buffer is fully flushed out if the async flushing
		// is not allowed.
		if dl.db.config.NoAsyncFlush {
			if err := dl.frozen.waitFlush(); err != nil {
				return nil, err
			}
			dl.frozen = nil
		}
		combined = newBuffer(dl.db.config.WriteBufferSize, nil, nil, 0)
	}
	// Link the generator if snapshot is not yet completed
	ndl := newDiskLayer(bottom.root, bottom.stateID(), dl.db, dl.nodes, dl.states, combined, dl.frozen)
	if dl.generator != nil {
		ndl.setGenerator(dl.generator)
	}
	return ndl, nil
}

// revert applies the given state history and return a reverted disk layer.
func (dl *diskLayer) revert(h *stateHistory) (*diskLayer, error) {
	start := time.Now()
	if h.meta.root != dl.rootHash() {
		return nil, errUnexpectedHistory
	}
	if dl.id == 0 {
		return nil, fmt.Errorf("%w: zero state id", errStateUnrecoverable)
	}
	// Apply the reverse state changes upon the current state. This must
	// be done before holding the lock in order to access state in "this"
	// layer.
	nodes, err := apply(dl.db, h.meta.parent, h.meta.root, h.meta.version != stateHistoryV0, h.accounts, h.storages)
	if err != nil {
		return nil, err
	}
	// Derive the state modification set from the history, keyed by the hash
	// of the account address and the storage key.
	accounts, storages := h.stateSet()

	// Mark the diskLayer as stale before applying any mutations on top.
	dl.lock.Lock()
	defer dl.lock.Unlock()

	dl.stale = true

	// Unindex the corresponding history
	if dl.db.stateIndexer != nil {
		if err := dl.db.stateIndexer.shorten(dl.id); err != nil {
			return nil, err
		}
	}
	if dl.db.trienodeIndexer != nil {
		if err := dl.db.trienodeIndexer.shorten(dl.id); err != nil {
			return nil, err
		}
	}
	// State change may be applied to node buffer, or the persistent
	// state, depends on if node buffer is empty or not. If the node
	// buffer is not empty, it means that the state transition that
	// needs to be reverted is not yet flushed and cached in node
	// buffer, otherwise, manipulate persistent state directly.
	if !dl.buffer.empty() {
		err := dl.buffer.revertTo(dl.db.diskdb, nodes, accounts, storages)
		if err != nil {
			return nil, err
		}
		ndl := newDiskLayer(h.meta.parent, dl.id-1, dl.db, dl.nodes, dl.states, dl.buffer, dl.frozen)

		// Link the generator if it exists
		if dl.generator != nil {
			ndl.setGenerator(dl.generator)
		}
		log.Debug("Reverted data in write buffer", "oldroot", h.meta.root, "newroot", h.meta.parent, "elapsed", common.PrettyDuration(time.Since(start)))
		return ndl, nil
	}
	// Block until the frozen buffer is fully flushed
	if dl.frozen != nil {
		if err := dl.frozen.waitFlush(); err != nil {
			return nil, err
		}
		// Unset the frozen buffer if it exists, otherwise these "reverted"
		// states will still be accessible after revert in frozen buffer.
		dl.frozen = nil
	}

	// Terminate the generator before writing any data to the database.
	// This must be done after flushing the frozen buffer, as the generator
	// may be restarted at the end of the flush process.
	var progress []byte
	if dl.generator != nil {
		dl.generator.stop()
		progress = dl.generator.progressMarker()
	}
	batch := dl.db.diskdb.NewBatch()
	writeNodes(batch, nodes, dl.nodes)

	// Provide the original values of modified accounts and storages for revert
	writeStates(batch, progress, accounts, storages, dl.states)
	rawdb.WritePersistentStateID(batch, dl.id-1)
	rawdb.WriteSnapshotRoot(batch, h.meta.parent)
	if err := batch.Write(); err != nil {
		log.Crit("Failed to write states", "err", err)
	}
	// Link the generator and resume generation if the snapshot is not yet
	// fully completed.
	ndl := newDiskLayer(h.meta.parent, dl.id-1, dl.db, dl.nodes, dl.states, dl.buffer, dl.frozen)
	if dl.generator != nil && !dl.generator.completed() {
		ndl.generator = dl.generator
		ndl.generator.run(h.meta.parent)
	}
	log.Debug("Reverted data in persistent state", "oldroot", h.meta.root, "newroot", h.meta.parent, "elapsed", common.PrettyDuration(time.Since(start)))
	return ndl, nil
}

// size returns the approximate size of cached nodes in the disk layer.
func (dl *diskLayer) size() common.StorageSize {
	dl.lock.RLock()
	defer dl.lock.RUnlock()

	if dl.stale {
		return 0
	}
	return common.StorageSize(dl.buffer.size())
}

// resetCache releases the memory held by clean cache to prevent memory leak.
func (dl *diskLayer) resetCache() {
	dl.lock.RLock()
	defer dl.lock.RUnlock()

	// Stale disk layer loses the ownership of clean caches.
	if dl.stale {
		return
	}
	if dl.nodes != nil {
		dl.nodes.Reset()
	}
	if dl.states != nil {
		dl.states.Reset()
	}
}

// genMarker returns the current state snapshot generation progress marker. If
// the state snapshot has already been fully generated, nil is returned.
func (dl *diskLayer) genMarker() []byte {
	if dl.generator == nil {
		return nil
	}
	return dl.generator.progressMarker()
}

// genComplete returns a flag indicating whether the state snapshot has been
// fully generated.
func (dl *diskLayer) genComplete() bool {
	dl.lock.RLock()
	defer dl.lock.RUnlock()

	return dl.genMarker() == nil
}

// waitFlush blocks until the background buffer flush is completed.
func (dl *diskLayer) waitFlush() error {
	dl.lock.RLock()
	defer dl.lock.RUnlock()

	if dl.frozen == nil {
		return nil
	}
	return dl.frozen.waitFlush()
}

// terminate releases the frozen buffer if it's not nil and terminates the
// background state generator.
func (dl *diskLayer) terminate() error {
	dl.lock.Lock()
	defer dl.lock.Unlock()

	if dl.frozen != nil {
		if err := dl.frozen.waitFlush(); err != nil {
			return err
		}
		dl.frozen = nil
	}
	if dl.generator != nil {
		dl.generator.stop()
	}
	return nil
}
