sync

package
v0.7.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 12, 2026 License: MIT Imports: 12 Imported by: 0

README

Blockchain Data Sync

Many services require to synchronize blockchain data, e.g. blocks, transactions, receipts and traces. This package provides a data synchronization framework to resolve some problems in common:

  • Pipeline: poll blockchain data, transform data and store data in separate goroutines.
  • Concurrency: poll blockchain data in parallel during catch up phase.
  • Batch: allow to store data into databases in batch, stead of block by block during catch up pahse.
  • Chain reorg: framework detects chain reorg and defines a common interface for clients to handle chain reorg (e.g. pop data from database).
  • Memory bounded: uses memory bounded channel to cache polled blockchain data to void OOM.

Using this framework, users only need to focus on how to handle the data.

Adapter

This package provides a Adapter interface to adapt any data source to poll blockchain data.

There are 2 pre-defined adapters:

  1. EVM adapter: poll data from eSpace RPC.
  2. Core adapter: poll data from core space RPC.

Poller

There are 3 kinds of pollers available:

  1. CatchUpPoller: optimized to poll data in catch up phase with high performance.
  2. FinalizedPoller: poll finalized data block by block.
  3. LatestPoller: poll latest data block by block, and handle the chain reorg.

Database Processor

This package defines a common interface to transform blockchain data into a database operation, so that the framework will operate database in a transaction. Besides, some common used operations are already defined.

type Operation interface {
    Exec(tx *gorm.DB) error
}

// ComposeOperation composes multiple database operations in batch.
func ComposeOperation(ops ...Operation) Operation

// CreateOperation returns a Create database operation.
func CreateOperation(models ...any) Operation

// DeleteOperation returns a Delete database operation.
func DeleteOperation(modelPtr any, conds ...any) Operation

User could implement below interface to transform the blockchain data into a database operation:

// Processor is implemented by types that process data to update database.
type Processor[T any] interface {
    Process(data T) Operation
}

If user want to synchronize the latest data, and handle the chain reorg correctly. Then, please implements the revertable interface as below:

// RevertableProcessor is implemented by types that process revertable data.
type RevertableProcessor[T any] interface {
    Processor[T]

    // Revert deletes data from database of given data block number.
    Revert(data T) Operation
}

During catch up phase, to achieve batch database operations, user could implement the batchable interface:

// BatchProcessor is implemented by types that process data in batch.
//
// Note, thread-safe is not required in the implementations, since batch
// related methods are executed in a single thread.
type BatchProcessor[T any] interface {
    Processor[T]

    // BatchProcess processes the given data and returns the number of SQLs
    // to be executed in batch.
    BatchProcess(data T) int

    // BatchExec executes SQLs in batch.
    BatchExec(tx *gorm.DB, createBatchSize int) error

    // BatchReset reset data for the next batch.
    BatchReset()
}

Sync Utilities

There are 3 helper methods available in the framework to poll blockchain data and store in database. Users need to provide custom database processors to handle polled blockchain data.

  1. CatchUpDB: catch up blockchain data to the finalized block using BatchProcessor.
  2. StartFinalizedDB: start to synchronize data block by block against the finalized block using normal Processor.
  3. StartLatestDB: start to synchronize data block by block against the latest block and handle chain reorg using RevertableProcessor.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CatchUpDB

func CatchUpDB[T channel.Sizable](ctx context.Context, params CatchupParamsDB[T], processors ...db.BatchProcessor[T]) uint64

func StartFinalizedDB

func StartFinalizedDB[T any](ctx context.Context, wg *sync.WaitGroup, params ParamsDB[T], processors ...db.Processor[T])

func StartLatestDB

func StartLatestDB[T any](ctx context.Context, wg *sync.WaitGroup, params ParamsDB[T], processors ...db.RevertableProcessor[T])

Types

type CatchupParamsDB

type CatchupParamsDB[T any] struct {
	Adapter         poll.Adapter[T]
	Poller          poll.CatchUpOption
	Processor       db.BatchOption
	DB              *gorm.DB
	NextBlockNumber uint64
}

type ParamsDB

type ParamsDB[T any] struct {
	Adapter         poll.Adapter[T]
	Poller          poll.Option
	Processor       db.Option
	DB              *gorm.DB
	NextBlockNumber uint64

	// only used to sync latest data, and usually loads from database
	Reorg poll.ReorgWindowParams
}

Directories

Path Synopsis
db

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL