Blockchain Data Sync
Many services require to synchronize blockchain data, e.g. blocks, transactions, receipts and traces. This package provides a data synchronization framework to resolve some problems in common:
- Pipeline: poll blockchain data, transform data and store data in separate goroutines.
- Concurrency: poll blockchain data in parallel during catch up phase.
- Batch: allow to store data into databases in batch, stead of block by block during catch up pahse.
- Chain reorg: framework detects chain reorg and defines a common interface for clients to handle chain reorg (e.g. pop data from database).
- Memory bounded: uses memory bounded channel to cache polled blockchain data to void OOM.
Using this framework, users only need to focus on how to handle the data.
Adapter
This package provides a Adapter interface to adapt any data source to poll blockchain data.
There are 2 pre-defined adapters:
- EVM adapter: poll data from eSpace RPC.
- Core adapter: poll data from core space RPC.
Poller
There are 3 kinds of pollers available:
- CatchUpPoller: optimized to poll data in catch up phase with high performance.
- FinalizedPoller: poll finalized data block by block.
- LatestPoller: poll latest data block by block, and handle the chain reorg.
Database Processor
This package defines a common interface to transform blockchain data into a database operation, so that the framework will operate database in a transaction. Besides, some common used operations are already defined.
type Operation interface {
Exec(tx *gorm.DB) error
}
// ComposeOperation composes multiple database operations in batch.
func ComposeOperation(ops ...Operation) Operation
// CreateOperation returns a Create database operation.
func CreateOperation(models ...any) Operation
// DeleteOperation returns a Delete database operation.
func DeleteOperation(modelPtr any, conds ...any) Operation
User could implement below interface to transform the blockchain data into a database operation:
// Processor is implemented by types that process data to update database.
type Processor[T any] interface {
Process(data T) Operation
}
If user want to synchronize the latest data, and handle the chain reorg correctly. Then, please implements the revertable interface as below:
// RevertableProcessor is implemented by types that process revertable data.
type RevertableProcessor[T any] interface {
Processor[T]
// Revert deletes data from database of given data block number.
Revert(data T) Operation
}
During catch up phase, to achieve batch database operations, user could implement the batchable interface:
// BatchProcessor is implemented by types that process data in batch.
//
// Note, thread-safe is not required in the implementations, since batch
// related methods are executed in a single thread.
type BatchProcessor[T any] interface {
Processor[T]
// BatchProcess processes the given data and returns the number of SQLs
// to be executed in batch.
BatchProcess(data T) int
// BatchExec executes SQLs in batch.
BatchExec(tx *gorm.DB, createBatchSize int) error
// BatchReset reset data for the next batch.
BatchReset()
}
Sync Utilities
There are 3 helper methods available in the framework to poll blockchain data and store in database. Users need to provide custom database processors to handle polled blockchain data.
- CatchUpDB: catch up blockchain data to the finalized block using
BatchProcessor.
- StartFinalizedDB: start to synchronize data block by block against the finalized block using normal
Processor.
- StartLatestDB: start to synchronize data block by block against the latest block and handle chain reorg using
RevertableProcessor.