Writing Custom Steps

Custom steps can be specified like so:

from cherry_etl import config as cc

cc.Step(
    kind=cc.StepKind.CUSTOM,
    config=cc.CustomStepConfig(
        runner=my_custom_step,
    ),
),

where join_data is an arbitrary python function with this signature:

def my_custom_step(data: Dict[str, polars.DataFrame], _: Any) -> Dict[str, polars.DataFrame]:
    pass

This mechanism can be used to enrich the data with external sources like eth_call or ipfs.

It can also be used to just join the input tables like this:

def join_data(data: Dict[str, polars.DataFrame], _: Any) -> Dict[str, polars.DataFrame]:
    blocks = data["blocks"]
    transfers = data["transfers"]

    bn = blocks.get_column("number")
    logger.info(f"processing data from: {bn.min()} to: {bn.max()}")

    blocks = blocks.select(
        polars.col("number").alias("block_number"),
        polars.col("timestamp").alias("block_timestamp"),
    )
    out = transfers.join(blocks, on="block_number")

    return {"transfers": out}