Drizzle with PostgreSQL
The Apibara Indexer SDK supports Drizzle ORM for storing data to PostgreSQL.
Installation
To use Drizzle with PostgreSQL, you need to install the following dependencies:
npm install drizzle-orm pg
We reccomend using Drizzle Kit to manage the database schema.
npm install --save-dev drizzle-kit
Schema configuration
Indexer's tables require storing data in a way that can handle chain reorgs. The
approach used by Apibara is to add an additional _cursor
column that tracks
when the data was inserted or updated. This is handled transparently by the
pgIndexerTable
function.
import { pgIndexerTable } from "@apibara/indexer/sinks/drizzle";
export const transfers = pgIndexerTable("transfers", {
amount: bigint("amount", { mode: "number" }),
transactionHash: text("transaction_hash"),
});
When generating the database migrations with Drizzle Kit, you can see the extra _cursor
column added to the table.
CREATE TABLE IF NOT EXISTS "transfers" (
"amount" bigint,
"transaction_hash" text,
"_cursor" "int8range" NOT NULL
);
Writing and reading data from within the indexer
From within the indexer, use the useSink
hook to access the current Drizzle database transaction.
Notice that all operations inside the transform
function are executed inside the same transaction
to ensure that the data is inserted or updated atomically. The db
object returned by the hook
provides the following methods:
insert
: to insert new rows into the table.update
: to update existing rows in the table.delete
: to delete rows from the table.select
: to query the table.
All four methods work analogously to the insert
, update
, delete
, and
select
methods from the drizzle-orm
package.
import pg from "pg";
import { defineIndexer, useSink } from "@apibara/indexer";
import { drizzle as drizzleSink } from "@apibara/indexer/sinks/drizzle";
import { StarknetStream } from "@apibara/starknet";
import * as schema, { transfers } from "@/lib/schema";
const connectionString = "postgres://postgres:postgres@localhost:5432/postgres";
const pool = new pg.Pool({ connectionString });
const db = drizzle(pool, { schema });
export default defineIndexer(StarknetStream)({
streamUrl: "https://starknet.preview.apibara.org",
finality: "accepted",
startingCursor,
filter,
sink: drizzleSink({ database, tables: [transfers] }),
async transform({ endCursor, block, context }) {
const { db } = useSink({ context });
const { events } = block;
const values = events.map(decodeTransferEvent);
await db.insert(transfers).values(values);
},
});
Querying data from outside the indexer
When selecting data from outside the indexer, you need to add the following
condition on _cursor
to your query. This ensure that only data that has not been
removed or updated is returned by the query.
WHERE upper_inf(_cursor)
Improving performance
You should always add an index on the _cursor
column to improve the
performance of the queries.
Cleaning up old data
Data that cannot be removed because of reorgs is automatically removed by the indexer as it receives updates to the finalized block. Thanks to this, your database contains the minimal amount of data required to safely handle reorgs.