// Copyright 2022 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more detaiapi.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package api

import (
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"net/http"
	"net/url"
	"strconv"
	"sync"
	"time"

	"github.com/donovanhide/eventsource"
	"github.com/ethereum/go-ethereum/beacon/merkle"
	"github.com/ethereum/go-ethereum/beacon/params"
	"github.com/ethereum/go-ethereum/beacon/types"
	"github.com/ethereum/go-ethereum/common"
	"github.com/ethereum/go-ethereum/common/hexutil"
	"github.com/ethereum/go-ethereum/log"
)

var (
	ErrNotFound = errors.New("404 Not Found")
	ErrInternal = errors.New("500 Internal Server Error")
)

type CommitteeUpdate struct {
	Update            types.LightClientUpdate
	NextSyncCommittee types.SerializedSyncCommittee
}

// See data structure definition here:
// https://github.com/ethereum/consensus-specs/blob/dev/specs/altair/light-client/sync-protocol.md#lightclientupdate
type committeeUpdateJson struct {
	Version string              `json:"version"`
	Data    committeeUpdateData `json:"data"`
}

type committeeUpdateData struct {
	Header                  jsonBeaconHeader              `json:"attested_header"`
	NextSyncCommittee       types.SerializedSyncCommittee `json:"next_sync_committee"`
	NextSyncCommitteeBranch merkle.Values                 `json:"next_sync_committee_branch"`
	FinalizedHeader         *jsonBeaconHeader             `json:"finalized_header,omitempty"`
	FinalityBranch          merkle.Values                 `json:"finality_branch,omitempty"`
	SyncAggregate           types.SyncAggregate           `json:"sync_aggregate"`
	SignatureSlot           common.Decimal                `json:"signature_slot"`
}

type jsonBeaconHeader struct {
	Beacon types.Header `json:"beacon"`
}

type jsonHeaderWithExecProof struct {
	Beacon          types.Header    `json:"beacon"`
	Execution       json.RawMessage `json:"execution"`
	ExecutionBranch merkle.Values   `json:"execution_branch"`
}

// UnmarshalJSON unmarshals from JSON.
func (u *CommitteeUpdate) UnmarshalJSON(input []byte) error {
	var dec committeeUpdateJson
	if err := json.Unmarshal(input, &dec); err != nil {
		return err
	}
	u.NextSyncCommittee = dec.Data.NextSyncCommittee
	u.Update = types.LightClientUpdate{
		Version: dec.Version,
		AttestedHeader: types.SignedHeader{
			Header:        dec.Data.Header.Beacon,
			Signature:     dec.Data.SyncAggregate,
			SignatureSlot: uint64(dec.Data.SignatureSlot),
		},
		NextSyncCommitteeRoot:   u.NextSyncCommittee.Root(),
		NextSyncCommitteeBranch: dec.Data.NextSyncCommitteeBranch,
		FinalityBranch:          dec.Data.FinalityBranch,
	}
	if dec.Data.FinalizedHeader != nil {
		u.Update.FinalizedHeader = &dec.Data.FinalizedHeader.Beacon
	}
	return nil
}

// fetcher is an interface useful for debug-harnessing the http api.
type fetcher interface {
	Do(req *http.Request) (*http.Response, error)
}

// BeaconLightApi requests light client information from a beacon node REST API.
// Note: all required API endpoints are currently only implemented by Lodestar.
type BeaconLightApi struct {
	url           string
	client        fetcher
	customHeaders map[string]string
}

func NewBeaconLightApi(url string, customHeaders map[string]string) *BeaconLightApi {
	return &BeaconLightApi{
		url: url,
		client: &http.Client{
			Timeout: time.Second * 10,
		},
		customHeaders: customHeaders,
	}
}

func (api *BeaconLightApi) httpGet(path string, params url.Values) ([]byte, error) {
	uri, err := api.buildURL(path, params)
	if err != nil {
		return nil, err
	}
	req, err := http.NewRequest("GET", uri, nil)
	if err != nil {
		return nil, err
	}
	for k, v := range api.customHeaders {
		req.Header.Set(k, v)
	}
	resp, err := api.client.Do(req)
	if err != nil {
		return nil, err
	}
	defer resp.Body.Close()
	switch resp.StatusCode {
	case 200:
		return io.ReadAll(resp.Body)
	case 404:
		return nil, ErrNotFound
	case 500:
		return nil, ErrInternal
	default:
		return nil, fmt.Errorf("unexpected error from API endpoint \"%s\": status code %d", path, resp.StatusCode)
	}
}

// GetBestUpdatesAndCommittees fetches and validates LightClientUpdate for given
// period and full serialized committee for the next period (committee root hash
// equals update.NextSyncCommitteeRoot).
// Note that the results are validated but the update signature should be verified
// by the caller as its validity depends on the update chain.
func (api *BeaconLightApi) GetBestUpdatesAndCommittees(firstPeriod, count uint64) ([]*types.LightClientUpdate, []*types.SerializedSyncCommittee, error) {
	resp, err := api.httpGet("/eth/v1/beacon/light_client/updates", map[string][]string{
		"start_period": {strconv.FormatUint(firstPeriod, 10)},
		"count":        {strconv.FormatUint(count, 10)},
	})
	if err != nil {
		return nil, nil, err
	}

	var data []CommitteeUpdate
	if err := json.Unmarshal(resp, &data); err != nil {
		return nil, nil, err
	}
	if len(data) != int(count) {
		return nil, nil, errors.New("invalid number of committee updates")
	}
	updates := make([]*types.LightClientUpdate, int(count))
	committees := make([]*types.SerializedSyncCommittee, int(count))
	for i, d := range data {
		if d.Update.AttestedHeader.Header.SyncPeriod() != firstPeriod+uint64(i) {
			return nil, nil, errors.New("wrong committee update header period")
		}
		if err := d.Update.Validate(); err != nil {
			return nil, nil, err
		}
		if d.NextSyncCommittee.Root() != d.Update.NextSyncCommitteeRoot {
			return nil, nil, errors.New("wrong sync committee root")
		}
		updates[i], committees[i] = new(types.LightClientUpdate), new(types.SerializedSyncCommittee)
		*updates[i], *committees[i] = d.Update, d.NextSyncCommittee
	}
	return updates, committees, nil
}

// GetOptimisticUpdate fetches the latest available optimistic update.
// Note that the signature should be verified by the caller as its validity
// depends on the update chain.
//
// See data structure definition here:
// https://github.com/ethereum/consensus-specs/blob/dev/specs/altair/light-client/sync-protocol.md#lightclientoptimisticupdate
func (api *BeaconLightApi) GetOptimisticUpdate() (types.OptimisticUpdate, error) {
	resp, err := api.httpGet("/eth/v1/beacon/light_client/optimistic_update", nil)
	if err != nil {
		return types.OptimisticUpdate{}, err
	}
	return decodeOptimisticUpdate(resp)
}

func decodeOptimisticUpdate(enc []byte) (types.OptimisticUpdate, error) {
	var data struct {
		Version string `json:"version"`
		Data    struct {
			Attested      jsonHeaderWithExecProof `json:"attested_header"`
			Aggregate     types.SyncAggregate     `json:"sync_aggregate"`
			SignatureSlot common.Decimal          `json:"signature_slot"`
		} `json:"data"`
	}
	if err := json.Unmarshal(enc, &data); err != nil {
		return types.OptimisticUpdate{}, err
	}
	// Decode the execution payload headers.
	attestedExecHeader, err := types.ExecutionHeaderFromJSON(data.Version, data.Data.Attested.Execution)
	if err != nil {
		return types.OptimisticUpdate{}, fmt.Errorf("invalid attested header: %v", err)
	}
	if data.Data.Attested.Beacon.StateRoot == (common.Hash{}) {
		// workaround for different event encoding format in Lodestar
		if err := json.Unmarshal(enc, &data.Data); err != nil {
			return types.OptimisticUpdate{}, err
		}
	}

	if len(data.Data.Aggregate.Signers) != params.SyncCommitteeBitmaskSize {
		return types.OptimisticUpdate{}, errors.New("invalid sync_committee_bits length")
	}
	if len(data.Data.Aggregate.Signature) != params.BLSSignatureSize {
		return types.OptimisticUpdate{}, errors.New("invalid sync_committee_signature length")
	}
	return types.OptimisticUpdate{
		Attested: types.HeaderWithExecProof{
			Header:        data.Data.Attested.Beacon,
			PayloadHeader: attestedExecHeader,
			PayloadBranch: data.Data.Attested.ExecutionBranch,
		},
		Signature:     data.Data.Aggregate,
		SignatureSlot: uint64(data.Data.SignatureSlot),
	}, nil
}

// GetFinalityUpdate fetches the latest available finality update.
//
// See data structure definition here:
// https://github.com/ethereum/consensus-specs/blob/dev/specs/altair/light-client/sync-protocol.md#lightclientfinalityupdate
func (api *BeaconLightApi) GetFinalityUpdate() (types.FinalityUpdate, error) {
	resp, err := api.httpGet("/eth/v1/beacon/light_client/finality_update", nil)
	if err != nil {
		return types.FinalityUpdate{}, err
	}
	return decodeFinalityUpdate(resp)
}

func decodeFinalityUpdate(enc []byte) (types.FinalityUpdate, error) {
	var data struct {
		Version string `json:"version"`
		Data    struct {
			Attested       jsonHeaderWithExecProof `json:"attested_header"`
			Finalized      jsonHeaderWithExecProof `json:"finalized_header"`
			FinalityBranch merkle.Values           `json:"finality_branch"`
			Aggregate      types.SyncAggregate     `json:"sync_aggregate"`
			SignatureSlot  common.Decimal          `json:"signature_slot"`
		}
	}
	if err := json.Unmarshal(enc, &data); err != nil {
		return types.FinalityUpdate{}, err
	}
	// Decode the execution payload headers.
	attestedExecHeader, err := types.ExecutionHeaderFromJSON(data.Version, data.Data.Attested.Execution)
	if err != nil {
		return types.FinalityUpdate{}, fmt.Errorf("invalid attested header: %v", err)
	}
	finalizedExecHeader, err := types.ExecutionHeaderFromJSON(data.Version, data.Data.Finalized.Execution)
	if err != nil {
		return types.FinalityUpdate{}, fmt.Errorf("invalid finalized header: %v", err)
	}
	// Perform sanity checks.
	if len(data.Data.Aggregate.Signers) != params.SyncCommitteeBitmaskSize {
		return types.FinalityUpdate{}, errors.New("invalid sync_committee_bits length")
	}
	if len(data.Data.Aggregate.Signature) != params.BLSSignatureSize {
		return types.FinalityUpdate{}, errors.New("invalid sync_committee_signature length")
	}

	return types.FinalityUpdate{
		Version: data.Version,
		Attested: types.HeaderWithExecProof{
			Header:        data.Data.Attested.Beacon,
			PayloadHeader: attestedExecHeader,
			PayloadBranch: data.Data.Attested.ExecutionBranch,
		},
		Finalized: types.HeaderWithExecProof{
			Header:        data.Data.Finalized.Beacon,
			PayloadHeader: finalizedExecHeader,
			PayloadBranch: data.Data.Finalized.ExecutionBranch,
		},
		FinalityBranch: data.Data.FinalityBranch,
		Signature:      data.Data.Aggregate,
		SignatureSlot:  uint64(data.Data.SignatureSlot),
	}, nil
}

// GetHeader fetches and validates the beacon header with the given blockRoot.
// If blockRoot is null hash then the latest head header is fetched.
// The values of the canonical and finalized flags are also returned. Note that
// these flags are not validated.
func (api *BeaconLightApi) GetHeader(blockRoot common.Hash) (types.Header, bool, bool, error) {
	var blockId string
	if blockRoot == (common.Hash{}) {
		blockId = "head"
	} else {
		blockId = blockRoot.Hex()
	}
	resp, err := api.httpGet(fmt.Sprintf("/eth/v1/beacon/headers/%s", blockId), nil)
	if err != nil {
		return types.Header{}, false, false, err
	}

	var data struct {
		Finalized bool `json:"finalized"`
		Data      struct {
			Root      common.Hash `json:"root"`
			Canonical bool        `json:"canonical"`
			Header    struct {
				Message   types.Header  `json:"message"`
				Signature hexutil.Bytes `json:"signature"`
			} `json:"header"`
		} `json:"data"`
	}
	if err := json.Unmarshal(resp, &data); err != nil {
		return types.Header{}, false, false, err
	}
	header := data.Data.Header.Message
	if blockRoot == (common.Hash{}) {
		blockRoot = data.Data.Root
	}
	if header.Hash() != blockRoot {
		return types.Header{}, false, false, errors.New("retrieved beacon header root does not match")
	}
	return header, data.Data.Canonical, data.Finalized, nil
}

// GetCheckpointData fetches and validates bootstrap data belonging to the given checkpoint.
func (api *BeaconLightApi) GetCheckpointData(checkpointHash common.Hash) (*types.BootstrapData, error) {
	resp, err := api.httpGet(fmt.Sprintf("/eth/v1/beacon/light_client/bootstrap/0x%x", checkpointHash[:]), nil)
	if err != nil {
		return nil, err
	}

	// See data structure definition here:
	// https://github.com/ethereum/consensus-specs/blob/dev/specs/altair/light-client/sync-protocol.md#lightclientbootstrap
	type bootstrapData struct {
		Version string `json:"version"`
		Data    struct {
			Header          jsonBeaconHeader               `json:"header"`
			Committee       *types.SerializedSyncCommittee `json:"current_sync_committee"`
			CommitteeBranch merkle.Values                  `json:"current_sync_committee_branch"`
		} `json:"data"`
	}

	var data bootstrapData
	if err := json.Unmarshal(resp, &data); err != nil {
		return nil, err
	}
	if data.Data.Committee == nil {
		return nil, errors.New("sync committee is missing")
	}
	header := data.Data.Header.Beacon
	if header.Hash() != checkpointHash {
		return nil, fmt.Errorf("invalid checkpoint block header, have %v want %v", header.Hash(), checkpointHash)
	}
	checkpoint := &types.BootstrapData{
		Version:         data.Version,
		Header:          header,
		CommitteeBranch: data.Data.CommitteeBranch,
		CommitteeRoot:   data.Data.Committee.Root(),
		Committee:       data.Data.Committee,
	}
	if err := checkpoint.Validate(); err != nil {
		return nil, fmt.Errorf("invalid checkpoint: %w", err)
	}
	if checkpoint.Header.Hash() != checkpointHash {
		return nil, errors.New("wrong checkpoint hash")
	}
	return checkpoint, nil
}

func (api *BeaconLightApi) GetBeaconBlock(blockRoot common.Hash) (*types.BeaconBlock, error) {
	resp, err := api.httpGet(fmt.Sprintf("/eth/v2/beacon/blocks/0x%x", blockRoot), nil)
	if err != nil {
		return nil, err
	}

	var beaconBlockMessage struct {
		Version string `json:"version"`
		Data    struct {
			Message json.RawMessage `json:"message"`
		}
	}
	if err := json.Unmarshal(resp, &beaconBlockMessage); err != nil {
		return nil, fmt.Errorf("invalid block json data: %v", err)
	}
	block, err := types.BlockFromJSON(beaconBlockMessage.Version, beaconBlockMessage.Data.Message)
	if err != nil {
		return nil, err
	}
	computedRoot := block.Root()
	if computedRoot != blockRoot {
		return nil, fmt.Errorf("Beacon block root hash mismatch (expected: %x, got: %x)", blockRoot, computedRoot)
	}
	return block, nil
}

func decodeHeadEvent(enc []byte) (uint64, common.Hash, error) {
	var data struct {
		Slot  common.Decimal `json:"slot"`
		Block common.Hash    `json:"block"`
	}
	if err := json.Unmarshal(enc, &data); err != nil {
		return 0, common.Hash{}, err
	}
	return uint64(data.Slot), data.Block, nil
}

type HeadEventListener struct {
	OnNewHead    func(slot uint64, blockRoot common.Hash)
	OnOptimistic func(head types.OptimisticUpdate)
	OnFinality   func(head types.FinalityUpdate)
	OnError      func(err error)
}

// StartHeadListener creates an event subscription for heads and signed (optimistic)
// head updates and calls the specified callback functions when they are received.
// The callbacks are also called for the current head and optimistic head at startup.
// They are never called concurrently.
func (api *BeaconLightApi) StartHeadListener(listener HeadEventListener) func() {
	var (
		ctx, closeCtx = context.WithCancel(context.Background())
		streamCh      = make(chan *eventsource.Stream, 1)
		wg            sync.WaitGroup
	)

	// When connected to a Lodestar node the subscription blocks until the first actual
	// event arrives; therefore we create the subscription in a separate goroutine while
	// letting the main goroutine sync up to the current head.
	wg.Add(1)
	go func() {
		defer wg.Done()
		stream := api.startEventStream(ctx, &listener)
		if stream == nil {
			// This case happens when the context was closed.
			return
		}
		// Stream was opened, wait for close signal.
		streamCh <- stream
		<-ctx.Done()
		stream.Close()
	}()

	wg.Add(1)
	go func() {
		defer wg.Done()

		// Request initial data.
		log.Trace("Requesting initial head header")
		if head, _, _, err := api.GetHeader(common.Hash{}); err == nil {
			log.Trace("Retrieved initial head header", "slot", head.Slot, "hash", head.Hash())
			listener.OnNewHead(head.Slot, head.Hash())
		} else {
			log.Debug("Failed to retrieve initial head header", "error", err)
		}
		log.Trace("Requesting initial optimistic update")
		if optimisticUpdate, err := api.GetOptimisticUpdate(); err == nil {
			log.Trace("Retrieved initial optimistic update", "slot", optimisticUpdate.Attested.Slot, "hash", optimisticUpdate.Attested.Hash())
			listener.OnOptimistic(optimisticUpdate)
		} else {
			log.Debug("Failed to retrieve initial optimistic update", "error", err)
		}
		log.Trace("Requesting initial finality update")
		if finalityUpdate, err := api.GetFinalityUpdate(); err == nil {
			log.Trace("Retrieved initial finality update", "slot", finalityUpdate.Finalized.Slot, "hash", finalityUpdate.Finalized.Hash())
			listener.OnFinality(finalityUpdate)
		} else {
			log.Debug("Failed to retrieve initial finality update", "error", err)
		}

		log.Trace("Starting event stream processing loop")
		// Receive the stream.
		var stream *eventsource.Stream
		select {
		case stream = <-streamCh:
		case <-ctx.Done():
			log.Trace("Stopping event stream processing loop")
			return
		}

		for {
			select {
			case event, ok := <-stream.Events:
				if !ok {
					log.Trace("Event stream closed")
					return
				}
				log.Trace("New event received from event stream", "type", event.Event())
				switch event.Event() {
				case "head":
					slot, blockRoot, err := decodeHeadEvent([]byte(event.Data()))
					if err == nil {
						listener.OnNewHead(slot, blockRoot)
					} else {
						listener.OnError(fmt.Errorf("error decoding head event: %v", err))
					}
				case "light_client_optimistic_update":
					optimisticUpdate, err := decodeOptimisticUpdate([]byte(event.Data()))
					if err == nil {
						listener.OnOptimistic(optimisticUpdate)
					} else {
						listener.OnError(fmt.Errorf("error decoding optimistic update event: %v", err))
					}
				case "light_client_finality_update":
					finalityUpdate, err := decodeFinalityUpdate([]byte(event.Data()))
					if err == nil {
						listener.OnFinality(finalityUpdate)
					} else {
						listener.OnError(fmt.Errorf("error decoding finality update event: %v", err))
					}
				default:
					listener.OnError(fmt.Errorf("unexpected event: %s", event.Event()))
				}

			case err, ok := <-stream.Errors:
				if !ok {
					return
				}
				listener.OnError(err)
			}
		}
	}()

	return func() {
		closeCtx()
		wg.Wait()
	}
}

// startEventStream establishes an event stream. This will keep retrying until the stream has been
// established. It can only return nil when the context is canceled.
func (api *BeaconLightApi) startEventStream(ctx context.Context, listener *HeadEventListener) *eventsource.Stream {
	for retry := true; retry; retry = ctxSleep(ctx, 5*time.Second) {
		log.Trace("Sending event subscription request")
		uri, err := api.buildURL("/eth/v1/events", map[string][]string{"topics": {"head", "light_client_finality_update", "light_client_optimistic_update"}})
		if err != nil {
			listener.OnError(fmt.Errorf("error creating event subscription URL: %v", err))
			continue
		}
		req, err := http.NewRequestWithContext(ctx, "GET", uri, nil)
		if err != nil {
			listener.OnError(fmt.Errorf("error creating event subscription request: %v", err))
			continue
		}
		for k, v := range api.customHeaders {
			req.Header.Set(k, v)
		}
		stream, err := eventsource.SubscribeWithRequest("", req)
		if err != nil {
			listener.OnError(fmt.Errorf("error creating event subscription: %v", err))
			continue
		}
		log.Trace("Successfully created event stream")
		return stream
	}
	return nil
}

func ctxSleep(ctx context.Context, timeout time.Duration) (ok bool) {
	timer := time.NewTimer(timeout)
	defer timer.Stop()
	select {
	case <-timer.C:
		return true
	case <-ctx.Done():
		return false
	}
}

func (api *BeaconLightApi) buildURL(path string, params url.Values) (string, error) {
	uri, err := url.Parse(api.url)
	if err != nil {
		return "", err
	}
	uri = uri.JoinPath(path)
	if params != nil {
		uri.RawQuery = params.Encode()
	}
	return uri.String(), nil
}
