Introduction
Cherry is a python library for building blockchain data pipelines.
It is designed to make building production-ready blockchain data pipelines easy.
Getting Started
See getting started section of the docs.
Features
- Pure
python
library. Don't need yaml, SQL, toml etc. - High-level
datasets
API and flexible pipeline API. High-performance
,low-cost
anduniform
data access. Ability to use advanced providers without platform lock-in.- Included functionality to
decode
,validate
,transform
blockchain data. All implemented inrust
for performance. - Write transformations using
polars
,pyarrow
,datafusion
,pandas
,duckdb
or any otherpyarrow
compatible library. Schema inference
automatically creates output tables.- Keep datasets fresh with
continuous ingestion
. Parallelized
, next batch of data is being fetched while your pre-processing function is running, while the database writes are being executed in parallel. Don't need to hand optimize anything.- Included library of transformations.
- Included functionality to implement
crash-resistance
.
Data providers
Provider | Ethereum (EVM) | Solana (SVM) |
---|---|---|
HyperSync | ✅ | ❌ |
SQD | ✅ | ✅ |
Yellowstone-GRPC | ❌ | ✅ |
Supported output formats
- ClickHouse
- Iceberg
- Deltalake
- DuckDB
- Arrow Datasets
- Parquet
Examples
License
Licensed under either of
- Apache License, Version 2.0 (LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0)
- MIT license (LICENSE-MIT or http://opensource.org/licenses/MIT)
at your option.
Sponsors
Getting Started
This section explains how to install and use cherry.
Installation
Cherry is published to PyPI as cherry-etl.
Core tooling that is used with cherry is published as cherry-core.
You can add it to your python project like this:
pip install cherry-etl cherry-core
Or if you are using uv:
uv add cherry-etl cherry-core
Import it in your python scripts like this:
import cherry_etl
import cherry_core
Using datasets
Easiest way to start using cherry is to use the datasets
feature. datasets
make creating
pipelines easy.
There are datasets for both svm
and evm
under the cherry_etl.datasets
module
from cherry_core import ingest
from cherry_etl import config as cc
from cherry_etl import datasets
from cherry_etl.pipeline import run_pipeline
import asyncio
import duckdb
# create in-memory duckdb database
db = duckdb.connect()
async def sync_data():
# configure a data provider
# See Providers section
provider = ingest.ProviderConfig(
kind=provider_kind,
url=provider_url,
)
# write data to duckdb
writer = cc.Writer(
kind=cc.WriterKind.DUCKDB,
config=cc.DuckdbWriterConfig(
connection=db.cursor(),
),
)
# Create the pipeline using the blocks dataset
pipeline = datasets.evm.blocks(provider, writer, 18123123, 18123200)
# Run the pipeline
await run_pipeline(pipeline_name="blocks", pipeline=pipeline)
asyncio.run(sync_data())
data = db.sql("SELECT * FROM blocks LIMIT 20")
print(data)
Writing custom pipelines
Cherry is able to do much more than just the datasets
API.
Read the writing custom pipelines section.
Data Providers
Everything in cherry works the same regardless of which data provider you choose.
You can change the provider by just changing provider_kind
and provider_url
arguments when constructing a ProviderConfig
object.
from cherry_core import ingest
provider = ingest.ProviderConfig(
kind=my_provider_kind,
url=my_provider_url,
)
Check out the specific section of the provider you want to use to get more info.
Data availability
Provider | Ethereum (EVM) | Solana (SVM) |
---|---|---|
HyperSync | ✅ | ❌ |
SQD | ✅ | ✅ |
Yellowstone-GRPC | ❌ | ✅ |
Full ProviderConfig
class ProviderKind(str, Enum):
SQD = "sqd"
HYPERSYNC = "hypersync"
YELLOWSTONE_GRPC = "yellowstone_grpc"
@dataclass
class ProviderConfig:
kind: ProviderKind
url: Optional[str] = None
bearer_token: Optional[str] = None
max_num_retries: Optional[int] = None
retry_backoff_ms: Optional[int] = None
retry_base_ms: Optional[int] = None
retry_ceiling_ms: Optional[int] = None
req_timeout_millis: Optional[int] = None
stop_on_head: bool = False
head_poll_interval_millis: Optional[int] = None
buffer_size: Optional[int] = None
HyperSync
HyperSync is a purpose-built, high-performance data retrieval layer that gives developers unprecedented access to blockchain data. Built from the ground up in Rust, HyperSync serves as an alternative to traditional JSON-RPC endpoints, offering dramatically faster queries and more flexible data access patterns.
HyperSync currently supports 70+ EVM chains.
Read more about it at the offical documentation.
Example ProviderConfig
from cherry_core import ingest
provider = ingest.ProviderConfig(
kind=ingest.ProviderKind.HYPERSYNC,
bearer_token="<Your Hypersync API Token>",
url="https://eth.hypersync.xyz",
)
You can get an url for a desired chain from the supported networks page.
Getting API token
HyperSync is currently free and unlimited but it is transitioning into API Tokens.
It is recommended to get an API token and use it with your queries.
SQD
SQD Network is a decentralized query engine optimized for batch extraction of large volumes of data. It currently serves historical on-chain data ingested from 100+ EVM and Substrate networks, as well as Solana (in beta), Tron, Starknet and Fuel. The data is comprehensive: for example, on EVM it includes event logs, transaction receipts, traces and per-transaction state diffs.
SQD currently supports 100+ EVM Chains
SQD has beta support for Solana.
Warning: Data starts from block slot 317617480
when using solana with sqd.
Read more at the official documentation.
Example ProviderConfig
from cherry_core import ingest
provider = ingest.ProviderConfig(
kind=ingest.ProviderKind.HYPERSYNC,
url="https://portal.sqd.dev/datasets/solana-beta",
)
You can change solana-beta
with any dataset name from the supported networks page.
For example you can use https://portal.sqd.dev/datasets/arbitrum-nova
to get arbitrum-nova data.
Deploying portals
Access to SQD portal API is free but slightly limited at the moment.
You can deploy a portal to have unlimited access. You can contact us in telegram for help with deploying and maintaining portal instances.
Yellowstone-GRPC
Support for yellostone-grpc is still partial. We recommend using sqd
instead.
You can read more about yellowstone_grpc here and here.
Example ProviderConfig
from cherry_core import ingest
provider = ingest.ProviderConfig(
kind=ingest.ProviderKind.YELLOWSTONE_GRPC,
bearer_token="<Your X-Token>",
url="<URL to a Yellowstone GRPC endpoint>",
)
Writers
Writers control how data is written to the output after it is ingested from the provider and processed by the configured steps.
Supported output formats
- ClickHouse
- Iceberg
- Deltalake
- DuckDB
- Arrow Datasets
- Parquet
Full Writer configuration API
class WriterKind(str, Enum):
CLICKHOUSE = "clickhouse"
ICEBERG = "iceberg"
DELTA_LAKE = "delta_lake"
PYARROW_DATASET = "pyarrow_dataset"
DUCKDB = "duckdb"
@dataclass
class Writer:
kind: WriterKind
config: (
ClickHouseWriterConfig
| IcebergWriterConfig
| DeltaLakeWriterConfig
| PyArrowDatasetWriterConfig
| DuckdbWriterConfig
)
See specific writer pages to learn more about your writer of choice.
If you don't have a choice, we highly recommend duckdb for development and testing as it is very solid and easy to use.
ClickHouse
Config
@dataclass
class ClickHouseWriterConfig:
client: ClickHouseClient
codec: Dict[str, Dict[str, str]] = field(default_factory=dict)
order_by: Dict[str, List[str]] = field(default_factory=dict)
engine: str = "MergeTree()"
skip_index: Dict[str, List[ClickHouseSkipIndex]] = field(default_factory=dict)
anchor_table: Optional[str] = None
Dict[str, _]
fields generally mean config per table name for example codec["my_table"]["my_column"] would give the codec to use for my_column column of my_table table.
Example
from cherry_etl import config as cc
clickhouse_client = await clickhouse_connect.get_async_client(
host=os.environ.get("CLICKHOUSE_HOST", "localhost"),
port=int(os.environ.get("CLICKHOUSE_PORT", "8123")),
username=os.environ.get("CLICKHOUSE_USER", "default"),
password=os.environ.get("CLICKHOUSE_PASSWORD", "clickhouse"),
database=os.environ.get("CLICKHOUSE_DATABASE", "blockchain"),
)
writer = cc.Writer(
kind=cc.WriterKind.CLICKHOUSE,
config=cc.ClickHouseWriterConfig(
client=clickhouse_client,
order_by={"transfers": ["block_number"]},
codec={"transfers": {"data": "ZSTD"}},
skip_index={
"transfers": [
cc.ClickHouseSkipIndex(
name="log_addr_idx",
val="address",
type_="bloom_filter(0.01)",
granularity=1,
),
cc.ClickHouseSkipIndex(
name="from_addr_idx",
val="from",
type_="bloom_filter(0.01)",
granularity=1,
),
cc.ClickHouseSkipIndex(
name="to_addr_idx",
val="to",
type_="bloom_filter(0.01)",
granularity=1,
),
]
},
),
)
Anchor table
All tables are written in parallel but anchor table is written seperately so it can be used to implement crash-resistance
.
DeltaLake
Config
@dataclass
class DeltaLakeWriterConfig:
data_uri: str
partition_by: Dict[str, list[str]] = field(default_factory=dict)
storage_options: Optional[Dict[str, str]] = None
writer_properties: Optional[deltalake.WriterProperties] = None
anchor_table: Optional[str] = None
Dict[str, _]
fields generally mean config per table name for example partition_by["my_table"] would give list of columns to partition the my_table
table by.
Example
data_uri = "./data"
writer = cc.Writer(
kind=cc.WriterKind.DELTA_LAKE,
config=cc.DeltaLakeWriterConfig(
data_uri=data_uri,
),
)
Anchor table
All tables are written in parallel but anchor table is written seperately so it can be used to implement crash-resistance
.
DuckDB
We highly recommend duckdb for development and testing as it is very solid and easy to use.
Read their python docs
Config
@dataclass
class DuckdbWriterConfig:
connection: duckdb.DuckDBPyConnection
Warning: Do not pass a duckdb.DuckDBPyConnection
directly if you are using it while the pipeline is running. It is not thread safe.
use connection.cursor()
to create a cursor and pass that instead.
`
Example
# create an in-memory duckdb database
connection = duckdb.connect()
writer = cc.Writer(
kind=cc.WriterKind.DUCKDB,
config=cc.DuckdbWriterConfig(
connection=connection.cursor(),
),
)
Iceberg
Config
from pyiceberg.catalog import Catalog as IcebergCatalog
@dataclass
class IcebergWriterConfig:
namespace: str
catalog: IcebergCatalog
write_location: str
See pyiceberg docs.
Example
from cherry_etl import config as cc
from pyiceberg.catalog.sql import SqlCatalog
catalog = SqlCatalog(
name="cherry",
uri="postgresql+psycopg2://postgres:postgres@localhost/iceberg",
warehouse="s3://blockchain-data",
**{
"s3.endpoint": "http://localhost:9000",
"s3.access-key-id": "minioadmin",
"s3.secret-access-key": "minioadmin",
},
)
writer = cc.Writer(
kind=cc.WriterKind.ICEBERG,
config=cc.IcebergWriterConfig(
namespace="my_namespace",
catalog=catalog,
write_location="s3://blockchain-data/",
),
)
PyArrowDataset(Parquet)
See the documentation to learn more about arrow datasets.
This is the writer you want if you just want a directory of parquet files.
Config
@dataclass
class PyArrowDatasetWriterConfig:
base_dir: str
basename_template: Optional[str] = None
partitioning: Dict[str, pa_dataset.Partitioning | list[str]] = field(
default_factory=dict
)
partitioning_flavor: Dict[str, str] = field(default_factory=dict)
filesystem: Optional[pa_fs.FileSystem] = None
file_options: Optional[pa_dataset.FileWriteOptions] = None
use_threads: bool = True
max_partitions: int = 1024
max_open_files: int = 1024
max_rows_per_file: int = 0
min_rows_per_group: int = 0
max_rows_per_group: int = 1024 * 1024
create_dir: bool = True
anchor_table: Optional[str] = None
See pyarrow docs for more explanation about the parameters.
Example
from cherry_eth import config as cc
base_dir = "./data"
writer = cc.Writer(
kind=cc.WriterKind.PYARROW_DATASET,
config=cc.PyArrowDatasetWriterConfig(
base_dir=base_dir,
),
)
Datasets
Datasets are pre-built, high-level abstractions that make it easy to extract common blockchain data. They are implemented as helper functions that construct complete Pipeline
objects with predefined schemas and transformations.
Available Datasets
Ethereum (EVM) Datasets
Dataset | Description | Use Cases |
---|---|---|
Blocks | Extract block headers and metadata | Block analysis, network statistics, gas analysis |
Address Appearances | Track all address appearances in traces | Contract interactions, address relationships, contract creation tracking |
All Contracts | Information about all contracts | Contract deployment analysis |
Solana (SVM) Datasets
Dataset | Description | Use Cases |
---|---|---|
Token Balances | Track token account balances | Token holdings, transfers, token program analysis |
Usage Pattern
All datasets follow a similar usage pattern:
from cherry_etl import datasets
from cherry_etl.pipeline import run_pipeline
# Create a pipeline using a dataset
pipeline = datasets.evm.blocks( # or any other dataset
provider=provider,
writer=writer,
from_block=18123123, # or from_slot for Solana
to_block=18123200 # or to_slot for Solana
)
# Run the pipeline
await run_pipeline(pipeline_name="dataset_name", pipeline=pipeline)
Common Features
All datasets share these common features:
- Predefined Schemas: Each dataset has a well-defined output schema
- Optimized Performance: Leverages Rust-based core components
- Parallel Processing: Data ingestion and processing happen in parallel
- Crash Resistance: Built-in support for crash recovery
- Continuous Ingestion: Can keep datasets fresh with continuous updates
Data Providers
Datasets work with any supported data provider:
- EVM Chains: HyperSync, SQD
- Solana: SQD (beta), Yellowstone-GRPC
Output Formats
Datasets can write to any supported output format:
- ClickHouse
- Iceberg
- Deltalake
- DuckDB
- Arrow Datasets
- Parquet
Writing Custom Datasets
While the built-in datasets cover common use cases, you can also create custom datasets by:
- Defining your schema
- Creating transformation steps
- Building a pipeline configuration
See the Writing Custom Pipelines section for more details.
Notes
- Datasets are inspired by cryo
- Each dataset is optimized for its specific use case
- Datasets handle all the complexity of data extraction and transformation
- You can combine datasets with custom pipeline steps for advanced use cases
Ethereum (EVM)
Address Appearances Dataset
The address appearances dataset tracks all occurrences of addresses in transaction traces, including their relationships to the transactions. This is useful for analyzing address interactions, contract creations, and address relationships in the blockchain.
Usage
from cherry_etl import datasets
from cherry_etl.pipeline import run_pipeline
# Create a pipeline for address appearances
pipeline = datasets.evm.address_appearances(
provider=provider,
writer=writer,
from_block=18123123, # Starting block number
to_block=18123200 # Ending block number
)
# Run the pipeline
await run_pipeline(pipeline_name="address_appearances", pipeline=pipeline)
Output Schema
Field | Type | Description |
---|---|---|
block_number | uint64 | Block number where the address appeared |
block_hash | binary | Hash of the block |
transaction_hash | binary | Hash of the transaction |
address | binary | The address that appeared |
relationship | string | The relationship of the address to the trace. Possible values: |
- call_from : Address that initiated the call | ||
- call_to : Address that received the call | ||
- factory : Contract factory address | ||
- suicide : Address that was self-destructed | ||
- suicide_refund : Address that received refund from self-destruct | ||
- author : Address that authored the block | ||
- create : Address that was created |
Example Queries
Find All Contract Creations by an Address
SELECT
block_number,
transaction_hash,
address as created_contract
FROM address_appearances
WHERE relationship = 'create'
AND address = '0x...'; -- Replace with the address you're interested in
Get Address Interaction Timeline
SELECT
block_number,
transaction_hash,
relationship,
address
FROM address_appearances
WHERE address = '0x...' -- Replace with the address you're interested in
ORDER BY block_number, transaction_hash;
Count Address Relationships
SELECT
relationship,
COUNT(*) as count
FROM address_appearances
WHERE address = '0x...' -- Replace with the address you're interested in
GROUP BY relationship;
Notes
- The dataset requires trace data from the blockchain
- The dataset captures all address appearances in transaction traces, including:
- Direct calls between contracts
- Contract creations
- Self-destruct operations
- Block author addresses
- This dataset is particularly useful for:
- Tracking contract interactions
- Analyzing address relationships
- Monitoring contract creation patterns
- Understanding address behavior in the blockchain
Blocks Dataset
The blocks dataset allows you to extract block headers from any EVM chain. This is one of the most fundamental datasets for blockchain analysis.
Usage
from cherry_etl import datasets
from cherry_etl.pipeline import run_pipeline
# Create a pipeline for blocks
pipeline = datasets.evm.blocks(
provider=provider,
writer=writer,
from_block=18123123, # Starting block number
to_block=18123200 # Ending block number
)
# Run the pipeline
await run_pipeline(pipeline_name="blocks", pipeline=pipeline)
Output Schema
Field | Type | Description |
---|---|---|
number | uint64 | Block number |
hash | string | Block hash |
parent_hash | string | Hash of the parent block |
nonce | string | Block nonce |
logs_bloom | string | Bloom filter for logs |
transactions_root | string | Merkle root of transactions |
state_root | string | Merkle root of state |
receipts_root | string | Merkle root of receipts |
miner | string | Address of the block miner |
difficulty | Decimal128(38, 0) | Block difficulty |
total_difficulty | Decimal128(38, 0) | Cumulative difficulty |
extra_data | string | Extra data field |
size | Decimal128(38, 0) | Block size in bytes |
gas_limit | Decimal128(38, 0) | Gas limit for the block |
gas_used | Decimal128(38, 0) | Gas used in the block |
timestamp | Decimal128(38, 0) | Block timestamp |
uncles | list(binary) | List of uncle block hashes |
base_fee_per_gas | Decimal128(38, 0) | Base fee per gas (EIP-1559) |
withdrawals_root | string | Root hash of withdrawals (Shanghai upgrade) |
Example Queries
Get Latest Block
SELECT * FROM blocks ORDER BY number DESC LIMIT 1;
Get Blocks in a Time Range
SELECT * FROM blocks
WHERE timestamp BETWEEN 1704067200 AND 1704153600; -- Last 24 hours
Get Block Statistics
SELECT
MIN(number) as first_block,
MAX(number) as last_block,
AVG(gas_used) as avg_gas_used,
AVG(size) as avg_block_size
FROM blocks;
Notes
- The dataset includes all fields from EVM block header
- Timestamps are in Unix timestamp format
- The
base_fee_per_gas
field is only available for blocks after the London hard fork (EIP-1559) - The
withdrawals_root
field is only available for blocks after the Shanghai upgrade
All Contracts Dataset
The all_contracts dataset track all contracts ever created on any EVM chains, including their creation details. This dataset is essential for analyzing contract deployments.
Usage
from cherry_etl import datasets
from cherry_etl.pipeline import run_pipeline
# Create a pipeline for all contracts
pipeline = datasets.evm.all_contracts(
provider=provider,
writer=writer,
from_block=18123123, # Starting block number
to_block=18123200 # Ending block number
)
# Run the pipeline
await run_pipeline(pipeline_name="all_contracts", pipeline=pipeline)
Output Schema
Field | Type | Description |
---|---|---|
block_number | uint64 | Block number where the contract was created |
transaction_hash | binary | Hash of the transaction that created the contract |
address | binary | The contract's address |
Example Queries
Find Contracts Created by a Specific Address
SELECT
block_number,
transaction_hash,
address as contract_address
FROM all_contracts
ORDER BY block_number DESC
LIMIT 10;
Get Contract Creation Statistics
SELECT
COUNT(*) as contracts_created,
FROM all_contracts
Notes
- This dataset captures all contract creations on any EVM chain
- The dataset includes both contract creation transactions and contract creation through internal transactions
- This dataset is particularly useful for:
- Tracking contract deployments
- Analyzing contract creation patterns
- Identifying contract factories
- Monitoring contract ecosystem growth
Solana (SVM)
Token Balances Dataset
The token balances dataset tracks token account balances on the Solana blockchain. This dataset is essential for analyzing token holdings, transfers, and token-related activities on Solana.
Starting from where you left off
Since the pipeline is created purely in python at runtime. It is possible to set from_block parameter programatically.
For example when using duckdb
, you can create a function like this to get last written block:
def get_start_block(con: duckdb.DuckDBPyConnection) -> int:
try:
res = con.sql("SELECT MAX(block_number) from my_table").fetchone()
if res is not None:
return int(res[0])
else:
return 0
except Exception:
print(f"failed to get start block from db: {traceback.format_exc()}")
return 0
Then you can use it like this when creating the pipeline object:
pipeline = datasets.evm.blocks(
provider,
writer,
from_block=get_start_block(duckdb_connection),
to_block
)
Crash resistance
Some writers like duckdb
are naturally crash-resistant because writes run inside a transaction.
Other writers are naturally crash-resistant as long as there is only one output table. If there are multiple
output tables, you can utilize the anchor_table
parameter to implement crash-resistance.
Output tables are written in parallel for performance but the anchor_table
is always written last. So you can prune the
non-anchor tables at startup from the output database and start from max_block of anchor_table (see restarting from where you left off section)
Logging
Python code uses the standard logging
module of python, so it can be configured according to python docs.
Set RUST_LOG
environment variable according to env_logger docs in order to see logs from rust modules.
To run an example with trace level logging for rust modules:
RUST_LOG=trace uv run examples/path/to/my/example
Writing Custom Pipelines
There are three components of a pipeline.
- First we get the data from the Provider.
- The data we get from the provider depends on the query we use.
- Then we run the steps one by one on the incoming data to transform it.
- The transformed data is written to the output using the writer in the end.
We already covered Provider and Writer in the previous pages so we will cover writing queries and steps in this section.
Example
Here is a simple example pipeline that ingests and decodes erc20 transfers into duckdb: source code.
Query
The query is used to specify the data we get from the provider.
It consists of:
- from_block, the block to start from
- to_block (optional), the block to stop at (inclusive).
- request fields
- field selection
Request Fields
The request fields are named after tables, e.g. logs
from cherry_core import ingest
query = ingest.Query(
kind=ingest.QueryKind.EVM,
params=ingest.evm.Query(
from_block=from_block,
to_block=to_block,
logs=[
ingest.evm.LogRequest(
topic0=[cherry_core.evm_signature_to_topic0(dex.event_signature)],
include_blocks=True,
include_transactions=True,
)
],
fields=request_fields,
),
)
This example creates an EVM query looking for logs matching a single LogRequest
.
include_
fields determine if related data should be included. For example, include_blocks=True
means the response will contain the blocks table with the blocks containing the matched logs.
Log Filtering
The core filtering behaviors are:
-
Between multiple
LogRequest
objects: When multipleLogRequest
objects are provided, logs matching ANY of them will be returned (OR logic). -
Between different fields in the same
LogRequest
: When multiple fields are specified (e.g.,address
ANDtopic0
), logs matching ALL criteria will be returned (AND logic). -
Within a single field: When multiple values are provided for one field (e.g., multiple addresses in the
address
field), logs matching ANY of those values will be returned (OR logic).
OR Mode: Separate Log Requests
Use separate log requests to match events that satisfy ANY of the conditions:
logs=[
# Will match USDC Transfer OR WETH Approval
LogRequest(address=[USDC], topic0=[TRANSFER_TOPIC]),
LogRequest(address=[WETH], topic0=[APPROVAL_TOPIC])
]
Results:
┌─────────────┬──────────┐
│ address ┆ topic0 │
╞═════════════╪══════════╡
│ USDC ┆ Transfer │ # Only USDC Transfers
│ WETH ┆ Approval │ # Only WETH Approvals
└─────────────┴──────────┘
CROSS JOIN: Multiple Values in Single Request
Put multiple values in a single request to match ALL combinations:
logs=[
LogRequest(
address=[USDC, WETH],
topic0=[TRANSFER_TOPIC, APPROVAL_TOPIC]
)
]
Results:
┌─────────────┬──────────┐
│ address ┆ topic0 │
╞═════════════╪══════════╡
│ WETH ┆ Approval │ # All possible combinations
│ WETH ┆ Transfer │ # of addresses and topics
│ USDC ┆ Approval │ # will be matched
│ USDC ┆ Transfer │
└─────────────┴──────────┘
Common Use Cases
-
Track specific events from multiple contracts:
logs=[ LogRequest( address=[CONTRACT_A, CONTRACT_B, CONTRACT_C], topic0=[SPECIFIC_EVENT_TOPIC] ) ]
-
Track multiple events from a specific contract:
logs=[ LogRequest( address=[SPECIFIC_CONTRACT], topic0=[EVENT_1_TOPIC, EVENT_2_TOPIC, EVENT_3_TOPIC] ) ]
-
Track specific event combinations:
logs=[ LogRequest(address=[CONTRACT_A], topic0=[EVENT_X_TOPIC]), LogRequest(address=[CONTRACT_B], topic0=[EVENT_Y_TOPIC]) ]
Filtering by Indexed Parameters (topic1, topic2, etc.)
For EVM events, indexed parameters are stored in the topics array (topic1, topic2, etc.). You can filter by these parameters to get more specific results:
# Filter Transfer events where "from" address is specific
from_address = "0x000000000000000000000000a0b86991c6218b36c1d19d4a2e9eb0ce3606eb48" # Padded to 32 bytes
transfer_topic0 = cherry_core.evm_signature_to_topic0("Transfer(address,address,uint256)")
query = ingest.Query(
kind=ingest.QueryKind.EVM,
params=ingest.evm.Query(
from_block=20123123,
to_block=20123223,
logs=[
ingest.evm.LogRequest(
topic0=[transfer_topic0],
topic1=[from_address], # Filter by 'from' address (first indexed parameter)
include_blocks=True,
)
],
fields=ingest.evm.Fields(
# Fields definition...
),
),
)
Important notes about indexed parameters:
- Topic values should be padded to 32 bytes (for addresses, prepend with zeros)
- For addresses, the correct format is:
"0x000000000000000000000000" + address[2:]
- Parameter ordering follows the indexed parameters in the event signature
- Non-indexed parameters are encoded in the
data
field and can't be filtered directly
Field Selection
Query.fields
specifies which columns of each table we want to select.
request_fields = ingest.evm.Fields(
block=ingest.evm.BlockFields(number=True, timestamp=True),
transaction=ingest.evm.TransactionFields(
block_number=True, transaction_index=True, hash=True, from_=True, to=True
),
log=ingest.evm.LogFields(
block_number=True,
transaction_index=True,
log_index=True,
address=True,
data=True,
topic0=True,
topic1=True,
topic2=True,
topic3=True,
),
)
This means we want to get blocks.number
, blocks.timestamp
, transactions.block_number
... fields in the response.
Minimizing the fields we request can make a massive difference in performance and response body sizes, both can drastically effect cost of running infrastructure.
Modern providers like SQD and HyperSync fully utilize this selection so it makes a big difference.
Data Schema
Each chain format has it's own arrow schema. All providers return data according to this schema.
Ethereum (EVM)
All providers return data according to this schema when queried using ingest.evm.Query
.
blocks
Field Name | Data Type | Nullable |
---|---|---|
number | UInt64 | Yes |
hash | Binary | Yes |
parent_hash | Binary | Yes |
nonce | Binary | Yes |
sha3_uncles | Binary | Yes |
logs_bloom | Binary | Yes |
transactions_root | Binary | Yes |
state_root | Binary | Yes |
receipts_root | Binary | Yes |
miner | Binary | Yes |
difficulty | Decimal256(76, 0) | Yes |
total_difficulty | Decimal256(76, 0) | Yes |
extra_data | Binary | Yes |
size | Decimal256(76, 0) | Yes |
gas_limit | Decimal256(76, 0) | Yes |
gas_used | Decimal256(76, 0) | Yes |
timestamp | Decimal256(76, 0) | Yes |
uncles | List(Binary) | Yes |
base_fee_per_gas | Decimal256(76, 0) | Yes |
blob_gas_used | Decimal256(76, 0) | Yes |
excess_blob_gas | Decimal256(76, 0) | Yes |
parent_beacon_block_root | Binary | Yes |
withdrawals_root | Binary | Yes |
withdrawals | List(Struct(index: UInt64, validator_index: UInt64, address: Binary, amount: Decimal256(76, 0))) | Yes |
l1_block_number | UInt64 | Yes |
send_count | Decimal256(76, 0) | Yes |
send_root | Binary | Yes |
mix_hash | Binary | Yes |
transactions
Field Name | Data Type | Nullable |
---|---|---|
block_hash | Binary | Yes |
block_number | UInt64 | Yes |
from | Binary | Yes |
gas | Decimal256(76, 0) | Yes |
gas_price | Decimal256(76, 0) | Yes |
hash | Binary | Yes |
input | Binary | Yes |
nonce | Decimal256(76, 0) | Yes |
to | Binary | Yes |
transaction_index | UInt64 | Yes |
value | Decimal256(76, 0) | Yes |
v | UInt8 | Yes |
r | Binary | Yes |
s | Binary | Yes |
max_priority_fee_per_gas | Decimal256(76, 0) | Yes |
max_fee_per_gas | Decimal256(76, 0) | Yes |
chain_id | Decimal256(76, 0) | Yes |
cumulative_gas_used | Decimal256(76, 0) | Yes |
effective_gas_price | Decimal256(76, 0) | Yes |
gas_used | Decimal256(76, 0) | Yes |
contract_address | Binary | Yes |
logs_bloom | Binary | Yes |
type | UInt8 | Yes |
root | Binary | Yes |
status | UInt8 | Yes |
sighash | Binary | Yes |
y_parity | Boolean | Yes |
access_list | List(Struct(address: Binary, storage_keys: List(Binary))) | Yes |
l1_fee | Decimal256(76, 0) | Yes |
l1_gas_price | Decimal256(76, 0) | Yes |
l1_gas_used | Decimal256(76, 0) | Yes |
l1_fee_scalar | Decimal256(76, 0) | Yes |
gas_used_for_l1 | Decimal256(76, 0) | Yes |
max_fee_per_blob_gas | Decimal256(76, 0) | Yes |
blob_versioned_hashes | List(Binary) | Yes |
deposit_nonce | Decimal256(76, 0) | Yes |
blob_gas_price | Decimal256(76, 0) | Yes |
deposit_receipt_version | Decimal256(76, 0) | Yes |
blob_gas_used | Decimal256(76, 0) | Yes |
l1_base_fee_scalar | Decimal256(76, 0) | Yes |
l1_blob_base_fee | Decimal256(76, 0) | Yes |
l1_blob_base_fee_scalar | Decimal256(76, 0) | Yes |
l1_block_number | UInt64 | Yes |
mint | Decimal256(76, 0) | Yes |
source_hash | Binary | Yes |
logs
Field Name | Data Type | Nullable |
---|---|---|
removed | Boolean | Yes |
log_index | UInt64 | Yes |
transaction_index | UInt64 | Yes |
transaction_hash | Binary | Yes |
block_hash | Binary | Yes |
block_number | UInt64 | Yes |
address | Binary | Yes |
data | Binary | Yes |
topic0 | Binary | Yes |
topic1 | Binary | Yes |
topic2 | Binary | Yes |
topic3 | Binary | Yes |
traces
Field Name | Data Type | Nullable |
---|---|---|
from | Binary | Yes |
to | Binary | Yes |
call_type | Utf8 | Yes |
gas | Decimal256(76, 0) | Yes |
input | Binary | Yes |
init | Binary | Yes |
value | Decimal256(76, 0) | Yes |
author | Binary | Yes |
reward_type | Utf8 | Yes |
block_hash | Binary | Yes |
block_number | UInt64 | Yes |
address | Binary | Yes |
code | Binary | Yes |
gas_used | Decimal256(76, 0) | Yes |
output | Binary | Yes |
subtraces | UInt64 | Yes |
trace_address | List(UInt64) | Yes |
transaction_hash | Binary | Yes |
transaction_position | UInt64 | Yes |
type | Utf8 | Yes |
error | Utf8 | Yes |
sighash | Binary | Yes |
action_address | Binary | Yes |
balance | Decimal256(76, 0) | Yes |
refund_address | Binary | Yes |
Solana (SVM)
All providers return data according to this schema when queried using ingest.svm.Query
.
blocks
Field Name | Data Type | Nullable |
---|---|---|
slot | UInt64 | Yes |
hash | Binary | Yes |
parent_slot | UInt64 | Yes |
parent_hash | Binary | Yes |
height | UInt64 | Yes |
timestamp | Int64 | Yes |
rewards
Field Name | Data Type | Nullable |
---|---|---|
block_slot | UInt64 | Yes |
block_hash | Binary | Yes |
pubkey | Binary | Yes |
lamports | Int64 | Yes |
post_balance | UInt64 | Yes |
reward_type | Utf8 | Yes |
commission | UInt8 | Yes |
token_balances
Field Name | Data Type | Nullable |
---|---|---|
block_slot | UInt64 | Yes |
block_hash | Binary | Yes |
transaction_index | UInt32 | Yes |
account | Binary | Yes |
pre_mint | Binary | Yes |
post_mint | Binary | Yes |
pre_decimals | UInt16 | Yes |
post_decimals | UInt16 | Yes |
pre_program_id | Binary | Yes |
post_program_id | Binary | Yes |
pre_owner | Binary | Yes |
post_owner | Binary | Yes |
pre_amount | UInt64 | Yes |
post_amount | UInt64 | Yes |
balances
Field Name | Data Type | Nullable |
---|---|---|
block_slot | UInt64 | Yes |
block_hash | Binary | Yes |
transaction_index | UInt32 | Yes |
account | Binary | Yes |
pre | UInt64 | Yes |
post | UInt64 | Yes |
logs
Field Name | Data Type | Nullable |
---|---|---|
block_slot | UInt64 | Yes |
block_hash | Binary | Yes |
transaction_index | UInt32 | Yes |
log_index | UInt32 | Yes |
instruction_address | List(UInt32) | Yes |
program_id | Binary | Yes |
kind | Utf8 | Yes |
message | Utf8 | Yes |
transactions
Field Name | Data Type | Nullable |
---|---|---|
block_slot | UInt64 | Yes |
block_hash | Binary | Yes |
transaction_index | UInt32 | Yes |
signature | Binary | Yes |
version | Int8 | Yes |
account_keys | List(Binary) | Yes |
address_table_lookups | List(Struct(account_key: Binary, writable_indexes: List(UInt64), readonly_indexes: List(UInt64))) | Yes |
num_readonly_signed_accounts | UInt32 | Yes |
num_readonly_unsigned_accounts | UInt32 | Yes |
num_required_signatures | UInt32 | Yes |
recent_blockhash | Binary | Yes |
signatures | List(Binary) | Yes |
err | Utf8 | Yes |
fee | UInt64 | Yes |
compute_units_consumed | UInt64 | Yes |
loaded_readonly_addresses | List(Binary) | Yes |
loaded_writable_addresses | List(Binary) | Yes |
fee_payer | Binary | Yes |
has_dropped_log_messages | Boolean | Yes |
instructions
Field Name | Data Type | Nullable |
---|---|---|
block_slot | UInt64 | Yes |
block_hash | Binary | Yes |
transaction_index | UInt32 | Yes |
instruction_address | List(UInt32) | Yes |
program_id | Binary | Yes |
a0 | Binary | Yes |
a1 | Binary | Yes |
a2 | Binary | Yes |
a3 | Binary | Yes |
a4 | Binary | Yes |
a5 | Binary | Yes |
a6 | Binary | Yes |
a7 | Binary | Yes |
a8 | Binary | Yes |
a9 | Binary | Yes |
rest_of_accounts | List(Binary) | Yes |
data | Binary | Yes |
d1 | Binary | Yes |
d2 | Binary | Yes |
d4 | Binary | Yes |
d8 | Binary | Yes |
error | Utf8 | Yes |
compute_units_consumed | UInt64 | Yes |
is_committed | Boolean | Yes |
has_dropped_log_messages | Boolean | Yes |
Steps
Steps are run between the provider and writer to modify the data or do other arbitrary actions.
There are built-in steps that can be configured and there is a mechanism to write custom steps which enables running arbitrary python code on the data as a part of the pipeline.
Base58Encode
Encode all binary fields as solana(bitcoin) style base58
@dataclass
class Base58EncodeConfig:
tables: Optional[list[str]] = None
tables
parameter can be used to specify which tables should be encoded.
All tables are encoded if tables
parameter is left None.
Cast
Cast columns to specified arrow types.
@dataclass
class CastConfig:
table_name: str
mappings: Dict[str, pa.DataType]
allow_decode_fail: bool = False
When allow_decode_fail
is set to True
, this step will write null
if it fails to cast a value instead of erroring out.
Example
Cast transfers.block_timestamp
to int64
type.
from cherry_etl import config as cc
cc.Step(
kind=cc.StepKind.CAST,
config=cc.CastConfig(
table_name="transfers",
mappings={"block_timestamp": pa.int64()},
),
),
CastByType
Cast all columns with the from_type
to to_type
.
@dataclass
class CastByTypeConfig:
from_type: pa.DataType
to_type: pa.DataType
allow_decode_fail: bool = False
When allow_decode_fail
is set to True
, this step will write null
if it fails to cast a value instead of erroring out.
EvmDecodeEvents
Decode evm events using the given signature
@dataclass
class EvmDecodeEventsConfig:
event_signature: str
allow_decode_fail: bool = False
input_table: str = "logs"
output_table: str = "decoded_logs"
hstack: bool = True
allow_decode_fail
controls if the decoder should fail if a row fails to decode or if it should write null instead.
hstack
means to horizontally stack raw logs and decoded logs tables together. So decoded values will have raw log values like
block_number, address, topic0 etc. next to them.
Example
from cherry_etl import config as cc
cc.Step(
kind=cc.StepKind.EVM_DECODE_EVENTS,
config=cc.EvmDecodeEventsConfig(
event_signature="Transfer(address indexed from, address indexed to, uint256 amount)",
output_table="transfers",
),
),
HexEncode
Encode all binary fields as hex strings
@dataclass
class HexEncodeConfig:
tables: Optional[list[str]] = None
prefixed: bool = True
tables
parameter can be used to specify which tables should be encoded.
All tables are encoded if tables
parameter is left None.
prefixed
parameter chooses between prefixed-hex('0x..', eth style) and regular hex encoding.
U256ToBinary
Convert all Decimal256 columns to trimmed and big endian encoded binary values. This binary representation is the same as how it is done in ETH RPC and all eth related tools.
@dataclass
class U256ToBinaryConfig:
tables: Optional[list[str]] = None
tables
parameter can be used to specify which tables should be encoded.
All tables are encoded if tables
parameter is left None.
Writing Custom Steps
Custom steps can be specified like so:
from cherry_etl import config as cc
cc.Step(
kind=cc.StepKind.CUSTOM,
config=cc.CustomStepConfig(
runner=my_custom_step,
),
),
where join_data
is an arbitrary python function with this signature:
def my_custom_step(data: Dict[str, polars.DataFrame], _: Any) -> Dict[str, polars.DataFrame]:
pass
This mechanism can be used to enrich the data with external sources like eth_call
or ipfs
.
It can also be used to just join the input tables like this:
def join_data(data: Dict[str, polars.DataFrame], _: Any) -> Dict[str, polars.DataFrame]:
blocks = data["blocks"]
transfers = data["transfers"]
bn = blocks.get_column("number")
logger.info(f"processing data from: {bn.min()} to: {bn.max()}")
blocks = blocks.select(
polars.col("number").alias("block_number"),
polars.col("timestamp").alias("block_timestamp"),
)
out = transfers.join(blocks, on="block_number")
return {"transfers": out}
Appendix
Polars and Decimal types (UInt256)
Note: This section is irrelevant if you are using datasets
since datasets handle this internally.
Polars doesn't properly support Decimal256 type yet so it is not possible to work with ethereum uint256 values natively. We prefer to cast these values to Decimal128 if possible, we cast it to binary or string if it doesn't fit in that.
You can use the built-in CastByType step to cast all Decimal256 columns to Decimal128 like this:
from cherry_etl import config as cc
cc.Step(
kind=cc.StepKind.CAST_BY_TYPE,
config=cc.CastByTypeConfig(
from_type=pa.decimal256(76, 0),
# can also use `pa.string()` as `to_type` here to get string values like '123213'
to_type=pa.decimal128(38, 0),
),
),
Or you can use U256ToBinary step to cast all u256 columns to big endian (eth compatible) encoded binary values
from cherry_etl import config as cc
cc.Step(
kind=cc.StepKind.U256_TO_BINARY,
config=cc.U256ToBinary(),
)
Some output formats also don't support Decimal256 but all of them support Decimal128 so this can be useful if you are having trouble writing Decimal256 to your output.