Documentation
¶
Index ¶
- func NewDARetriever(client da.Client, cache cache.CacheManager, genesis genesis.Genesis, ...) *daRetriever
- type BlockSyncer
- type DAFollower
- type DAFollowerConfig
- type DARetriever
- type MockDARetriever
- func (_m *MockDARetriever) EXPECT() *MockDARetriever_Expecter
- func (_mock *MockDARetriever) PopPriorityHeight() uint64
- func (_mock *MockDARetriever) ProcessBlobs(ctx context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent
- func (_mock *MockDARetriever) QueuePriorityHeight(daHeight uint64)
- func (_mock *MockDARetriever) RetrieveFromDA(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error)
- type MockDARetriever_Expecter
- func (_e *MockDARetriever_Expecter) PopPriorityHeight() *MockDARetriever_PopPriorityHeight_Call
- func (_e *MockDARetriever_Expecter) ProcessBlobs(ctx interface{}, blobs interface{}, daHeight interface{}) *MockDARetriever_ProcessBlobs_Call
- func (_e *MockDARetriever_Expecter) QueuePriorityHeight(daHeight interface{}) *MockDARetriever_QueuePriorityHeight_Call
- func (_e *MockDARetriever_Expecter) RetrieveFromDA(ctx interface{}, daHeight interface{}) *MockDARetriever_RetrieveFromDA_Call
- type MockDARetriever_PopPriorityHeight_Call
- func (_c *MockDARetriever_PopPriorityHeight_Call) Return(v uint64) *MockDARetriever_PopPriorityHeight_Call
- func (_c *MockDARetriever_PopPriorityHeight_Call) Run(run func()) *MockDARetriever_PopPriorityHeight_Call
- func (_c *MockDARetriever_PopPriorityHeight_Call) RunAndReturn(run func() uint64) *MockDARetriever_PopPriorityHeight_Call
- type MockDARetriever_ProcessBlobs_Call
- func (_c *MockDARetriever_ProcessBlobs_Call) Return(dAHeightEvents []common.DAHeightEvent) *MockDARetriever_ProcessBlobs_Call
- func (_c *MockDARetriever_ProcessBlobs_Call) Run(run func(ctx context.Context, blobs [][]byte, daHeight uint64)) *MockDARetriever_ProcessBlobs_Call
- func (_c *MockDARetriever_ProcessBlobs_Call) RunAndReturn(...) *MockDARetriever_ProcessBlobs_Call
- type MockDARetriever_QueuePriorityHeight_Call
- func (_c *MockDARetriever_QueuePriorityHeight_Call) Return() *MockDARetriever_QueuePriorityHeight_Call
- func (_c *MockDARetriever_QueuePriorityHeight_Call) Run(run func(daHeight uint64)) *MockDARetriever_QueuePriorityHeight_Call
- func (_c *MockDARetriever_QueuePriorityHeight_Call) RunAndReturn(run func(daHeight uint64)) *MockDARetriever_QueuePriorityHeight_Call
- type MockDARetriever_RetrieveFromDA_Call
- func (_c *MockDARetriever_RetrieveFromDA_Call) Return(dAHeightEvents []common.DAHeightEvent, err error) *MockDARetriever_RetrieveFromDA_Call
- func (_c *MockDARetriever_RetrieveFromDA_Call) Run(run func(ctx context.Context, daHeight uint64)) *MockDARetriever_RetrieveFromDA_Call
- func (_c *MockDARetriever_RetrieveFromDA_Call) RunAndReturn(run func(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error)) *MockDARetriever_RetrieveFromDA_Call
- type P2PHandler
- type Syncer
- func (s *Syncer) ApplyBlock(ctx context.Context, header types.Header, data *types.Data, ...) (types.State, error)
- func (s *Syncer) HasReachedDAHead() bool
- func (s *Syncer) IsSyncedWithRaft(raftState *raft.RaftBlockState) (int, error)
- func (s *Syncer) PendingCount() int
- func (s *Syncer) RecoverFromRaft(ctx context.Context, raftState *raft.RaftBlockState) error
- func (s *Syncer) SetBlockSyncer(bs BlockSyncer)
- func (s *Syncer) SetLastState(state types.State)
- func (s *Syncer) Start(ctx context.Context) (err error)
- func (s *Syncer) Stop(ctx context.Context) error
- func (s *Syncer) TrySyncNextBlock(ctx context.Context, event *common.DAHeightEvent) error
- func (s *Syncer) ValidateBlock(_ context.Context, currState types.State, data *types.Data, ...) error
- func (s *Syncer) VerifyForcedInclusionTxs(ctx context.Context, daHeight uint64, data *types.Data) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewDARetriever ¶
func NewDARetriever( client da.Client, cache cache.CacheManager, genesis genesis.Genesis, logger zerolog.Logger, ) *daRetriever
NewDARetriever creates a new DA retriever
Types ¶
type BlockSyncer ¶
type BlockSyncer interface {
// TrySyncNextBlock attempts to sync the next available block from DA or P2P.
TrySyncNextBlock(ctx context.Context, event *common.DAHeightEvent) error
// ApplyBlock executes block transactions and returns the new state.
ApplyBlock(ctx context.Context, header types.Header, data *types.Data, currentState types.State) (types.State, error)
// ValidateBlock validates block structure and state transitions.
ValidateBlock(ctx context.Context, currState types.State, data *types.Data, header *types.SignedHeader) error
// VerifyForcedInclusionTxs verifies that forced inclusion transactions are properly handled.
VerifyForcedInclusionTxs(ctx context.Context, daHeight uint64, data *types.Data) error
}
BlockSyncer defines operations for block synchronization that can be traced. The Syncer implements this interface, and a tracing decorator can wrap it to add OpenTelemetry spans to each operation.
func WithTracingBlockSyncer ¶
func WithTracingBlockSyncer(inner BlockSyncer) BlockSyncer
WithTracingBlockSyncer decorates the provided BlockSyncer with tracing spans.
type DAFollower ¶
type DAFollower interface {
// Start begins the follow and catchup loops.
Start(ctx context.Context) error
// Stop cancels the context and waits for goroutines.
Stop()
// HasReachedHead returns true once the catchup loop has processed the
// DA head at least once. Once true, it stays true.
HasReachedHead() bool
}
DAFollower subscribes to DA blob events and drives sequential catchup.
Architecture:
- followLoop listens on the subscription channel. When caught up, it processes subscription blobs inline (fast path, no DA re-fetch). Otherwise, it updates highestSeenDAHeight and signals the catchup loop.
- catchupLoop sequentially retrieves from localNextDAHeight → highestSeenDAHeight, piping events to the Syncer's heightInCh.
The two goroutines share only atomic state; no mutexes needed.
func NewDAFollower ¶
func NewDAFollower(cfg DAFollowerConfig) DAFollower
NewDAFollower creates a new daFollower.
type DAFollowerConfig ¶
type DAFollowerConfig struct {
Client da.Client
Retriever DARetriever
Logger zerolog.Logger
PipeEvent func(ctx context.Context, event common.DAHeightEvent) error
Namespace []byte
DataNamespace []byte // may be nil or equal to Namespace
StartDAHeight uint64
DABlockTime time.Duration
}
DAFollowerConfig holds configuration for creating a DAFollower.
type DARetriever ¶
type DARetriever interface {
// RetrieveFromDA retrieves blocks from the specified DA height and returns height events
RetrieveFromDA(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error)
// ProcessBlobs parses raw blob bytes at a given DA height into height events.
// Used by the DAFollower to process subscription blobs inline without re-fetching.
ProcessBlobs(ctx context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent
// QueuePriorityHeight queues a DA height for priority retrieval (from P2P hints).
// These heights take precedence over sequential fetching.
QueuePriorityHeight(daHeight uint64)
// PopPriorityHeight returns the next priority height to fetch, or 0 if none.
PopPriorityHeight() uint64
}
DARetriever defines the interface for retrieving events from the DA layer
func WithTracingDARetriever ¶
func WithTracingDARetriever(inner DARetriever) DARetriever
WithTracingDARetriever wraps a DARetriever with OpenTelemetry tracing.
type MockDARetriever ¶
MockDARetriever is an autogenerated mock type for the DARetriever type
func NewMockDARetriever ¶
func NewMockDARetriever(t interface {
mock.TestingT
Cleanup(func())
}) *MockDARetriever
NewMockDARetriever creates a new instance of MockDARetriever. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. The first argument is typically a *testing.T value.
func (*MockDARetriever) EXPECT ¶
func (_m *MockDARetriever) EXPECT() *MockDARetriever_Expecter
func (*MockDARetriever) PopPriorityHeight ¶
func (_mock *MockDARetriever) PopPriorityHeight() uint64
PopPriorityHeight provides a mock function for the type MockDARetriever
func (*MockDARetriever) ProcessBlobs ¶
func (_mock *MockDARetriever) ProcessBlobs(ctx context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent
ProcessBlobs provides a mock function for the type MockDARetriever
func (*MockDARetriever) QueuePriorityHeight ¶
func (_mock *MockDARetriever) QueuePriorityHeight(daHeight uint64)
QueuePriorityHeight provides a mock function for the type MockDARetriever
func (*MockDARetriever) RetrieveFromDA ¶
func (_mock *MockDARetriever) RetrieveFromDA(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error)
RetrieveFromDA provides a mock function for the type MockDARetriever
type MockDARetriever_Expecter ¶
type MockDARetriever_Expecter struct {
// contains filtered or unexported fields
}
func (*MockDARetriever_Expecter) PopPriorityHeight ¶
func (_e *MockDARetriever_Expecter) PopPriorityHeight() *MockDARetriever_PopPriorityHeight_Call
PopPriorityHeight is a helper method to define mock.On call
func (*MockDARetriever_Expecter) ProcessBlobs ¶
func (_e *MockDARetriever_Expecter) ProcessBlobs(ctx interface{}, blobs interface{}, daHeight interface{}) *MockDARetriever_ProcessBlobs_Call
ProcessBlobs is a helper method to define mock.On call
- ctx context.Context
- blobs [][]byte
- daHeight uint64
func (*MockDARetriever_Expecter) QueuePriorityHeight ¶
func (_e *MockDARetriever_Expecter) QueuePriorityHeight(daHeight interface{}) *MockDARetriever_QueuePriorityHeight_Call
QueuePriorityHeight is a helper method to define mock.On call
- daHeight uint64
func (*MockDARetriever_Expecter) RetrieveFromDA ¶
func (_e *MockDARetriever_Expecter) RetrieveFromDA(ctx interface{}, daHeight interface{}) *MockDARetriever_RetrieveFromDA_Call
RetrieveFromDA is a helper method to define mock.On call
- ctx context.Context
- daHeight uint64
type MockDARetriever_PopPriorityHeight_Call ¶
MockDARetriever_PopPriorityHeight_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'PopPriorityHeight'
func (*MockDARetriever_PopPriorityHeight_Call) Return ¶
func (_c *MockDARetriever_PopPriorityHeight_Call) Return(v uint64) *MockDARetriever_PopPriorityHeight_Call
func (*MockDARetriever_PopPriorityHeight_Call) Run ¶
func (_c *MockDARetriever_PopPriorityHeight_Call) Run(run func()) *MockDARetriever_PopPriorityHeight_Call
func (*MockDARetriever_PopPriorityHeight_Call) RunAndReturn ¶
func (_c *MockDARetriever_PopPriorityHeight_Call) RunAndReturn(run func() uint64) *MockDARetriever_PopPriorityHeight_Call
type MockDARetriever_ProcessBlobs_Call ¶
MockDARetriever_ProcessBlobs_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ProcessBlobs'
func (*MockDARetriever_ProcessBlobs_Call) Return ¶
func (_c *MockDARetriever_ProcessBlobs_Call) Return(dAHeightEvents []common.DAHeightEvent) *MockDARetriever_ProcessBlobs_Call
func (*MockDARetriever_ProcessBlobs_Call) Run ¶
func (_c *MockDARetriever_ProcessBlobs_Call) Run(run func(ctx context.Context, blobs [][]byte, daHeight uint64)) *MockDARetriever_ProcessBlobs_Call
func (*MockDARetriever_ProcessBlobs_Call) RunAndReturn ¶
func (_c *MockDARetriever_ProcessBlobs_Call) RunAndReturn(run func(ctx context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent) *MockDARetriever_ProcessBlobs_Call
type MockDARetriever_QueuePriorityHeight_Call ¶
MockDARetriever_QueuePriorityHeight_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'QueuePriorityHeight'
func (*MockDARetriever_QueuePriorityHeight_Call) Return ¶
func (_c *MockDARetriever_QueuePriorityHeight_Call) Return() *MockDARetriever_QueuePriorityHeight_Call
func (*MockDARetriever_QueuePriorityHeight_Call) Run ¶
func (_c *MockDARetriever_QueuePriorityHeight_Call) Run(run func(daHeight uint64)) *MockDARetriever_QueuePriorityHeight_Call
func (*MockDARetriever_QueuePriorityHeight_Call) RunAndReturn ¶
func (_c *MockDARetriever_QueuePriorityHeight_Call) RunAndReturn(run func(daHeight uint64)) *MockDARetriever_QueuePriorityHeight_Call
type MockDARetriever_RetrieveFromDA_Call ¶
MockDARetriever_RetrieveFromDA_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RetrieveFromDA'
func (*MockDARetriever_RetrieveFromDA_Call) Return ¶
func (_c *MockDARetriever_RetrieveFromDA_Call) Return(dAHeightEvents []common.DAHeightEvent, err error) *MockDARetriever_RetrieveFromDA_Call
func (*MockDARetriever_RetrieveFromDA_Call) Run ¶
func (_c *MockDARetriever_RetrieveFromDA_Call) Run(run func(ctx context.Context, daHeight uint64)) *MockDARetriever_RetrieveFromDA_Call
func (*MockDARetriever_RetrieveFromDA_Call) RunAndReturn ¶
func (_c *MockDARetriever_RetrieveFromDA_Call) RunAndReturn(run func(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error)) *MockDARetriever_RetrieveFromDA_Call
type P2PHandler ¶
type P2PHandler struct {
// contains filtered or unexported fields
}
P2PHandler coordinates block retrieval from P2P stores for the syncer. It waits for both header and data to be available at a given height, validates their consistency, and emits events to the syncer for processing.
The handler maintains a processedHeight to track the highest block that has been successfully validated and sent to the syncer, preventing duplicate processing.
func NewP2PHandler ¶
func NewP2PHandler( headerStore header.Store[*types.P2PSignedHeader], dataStore header.Store[*types.P2PData], cache cache.CacheManager, genesis genesis.Genesis, logger zerolog.Logger, ) *P2PHandler
NewP2PHandler creates a new P2P handler.
func (*P2PHandler) ProcessHeight ¶
func (h *P2PHandler) ProcessHeight(ctx context.Context, height uint64, heightInCh chan<- common.DAHeightEvent) error
ProcessHeight retrieves and validates both header and data for the given height from P2P stores. It blocks until both are available, validates consistency (proposer address and data hash match), then emits the event to heightInCh or stores it as pending. Updates processedHeight on success.
func (*P2PHandler) SetProcessedHeight ¶
func (h *P2PHandler) SetProcessedHeight(height uint64)
SetProcessedHeight updates the highest processed block height.
type Syncer ¶
type Syncer struct {
// contains filtered or unexported fields
}
Syncer handles block synchronization from DA and P2P sources.
func NewSyncer ¶
func NewSyncer( store store.Store, exec coreexecutor.Executor, daClient da.Client, cache cache.Manager, metrics *common.Metrics, config config.Config, genesis genesis.Genesis, headerStore header.Store[*types.P2PSignedHeader], dataStore header.Store[*types.P2PData], logger zerolog.Logger, options common.BlockOptions, errorCh chan<- error, raftNode common.RaftNode, ) *Syncer
NewSyncer creates a new block syncer
func (*Syncer) ApplyBlock ¶
func (s *Syncer) ApplyBlock(ctx context.Context, header types.Header, data *types.Data, currentState types.State) (types.State, error)
ApplyBlock applies a block to get the new state
func (*Syncer) HasReachedDAHead ¶
HasReachedDAHead returns true once the DA follower has caught up to the DA head. Once set, it stays true.
func (*Syncer) IsSyncedWithRaft ¶
func (s *Syncer) IsSyncedWithRaft(raftState *raft.RaftBlockState) (int, error)
IsSyncedWithRaft checks if the local state is synced with the given raft state, including hash check.
func (*Syncer) PendingCount ¶
PendingCount returns the number of unprocessed height events in the pipeline.
func (*Syncer) RecoverFromRaft ¶
RecoverFromRaft attempts to recover the state from a raft block state
func (*Syncer) SetBlockSyncer ¶
func (s *Syncer) SetBlockSyncer(bs BlockSyncer)
SetBlockSyncer sets the block syncer interface, allowing injection of a tracing wrapper or other decorator.
func (*Syncer) SetLastState ¶
SetLastState updates the current state
func (*Syncer) TrySyncNextBlock ¶
TrySyncNextBlock attempts to sync the next available block the event is always the next block in sequence as processHeightEvent ensures it.
func (*Syncer) ValidateBlock ¶
func (s *Syncer) ValidateBlock(_ context.Context, currState types.State, data *types.Data, header *types.SignedHeader) error
ValidateBlock validates a synced block NOTE: if the header was gibberish and somehow passed all validation prior but the data was correct or if the data was gibberish and somehow passed all validation prior but the header was correct we are still losing both in the pending event. This should never happen.
func (*Syncer) VerifyForcedInclusionTxs ¶
func (s *Syncer) VerifyForcedInclusionTxs(ctx context.Context, daHeight uint64, data *types.Data) error
VerifyForcedInclusionTxs checks that every forced-inclusion tx submitted during epochs whose grace window has elapsed appears in seenBlockTxs. Txs may be spread across multiple blocks; what matters is that each one landed somewhere before its epoch's grace deadline.