Plugins & Hooks
Indexers are extensible through hooks and plugins. Hooks are functions that are called at specific points in the indexer's lifecycle. Plugins are components that contain reusable hooks callbacks.
Hooks
The following hooks are available in all indexers.
run:before
: Called before the indexer starts running.run:after
: Called after the indexer has finished running.connect:before
: Called before the indexer connects to the DNA stream. Can be used to change the request or stream options.connect:after
: Called after the indexer has connected to the DNA stream.connect:factory
: Called before the indexer reconnects to the DNA stream with a new filter (in factory mode).message
andmessage:*
: Called for each message received from the DNA stream.handler:before
: Called before the indexer handles a block.handler:after
: Called after the indexer has handled a block successfully.handler:error
: Called if the indexer encounters an error while handling a block.transaction:commit
: Called after the indexer's sink transaction has been committed.
Using plugins
You can register plugins in the indexer's configuration, under the plugins
key.
import { BeaconChainStream } from "@apibara/beaconchain";
import { defineIndexer } from "@apibara/indexer";
import { myAwesomePlugin } from "@/lib/my-plugin.ts";
export default defineIndexer(BeaconChainStream)({
streamUrl: "https://beaconchain.preview.apibara.org",
filter: { /* ... */ },
plugins: [myAwesomePlugin()],
async transform({ block: { header, validators } }) {
/* ... */
},
});
Building plugins
Developers can create new plugins to be shared across multiple indexers or projects. Plugins use the available hooks to extend the functionality of indexers.
The main way to define a plugin is by using the defineIndexerPlugin
function. This function takes a callback with
the indexer as parameter, the plugin should register itself with the indexer's hooks.
When the runner runs the indexer, all the relevant hooks are called.
import type { Cursor } from "@apibara/protocol";
import { defineIndexerPlugin } from "@apibara/indexer/plugins";
export function myAwesomePlugin<TFilter, TBlock, TTxnParams>() {
return defineIndexerPlugin<TFilter, TBlock, TTxnParams>((indexer) => {
let lastCursor: Cursor | undefined;
indexer.hooks.hook("transaction:commit", ({ endCursor }) => {
if (endCursor) {
lastCursor = endCursor;
}
});
indexer.hooks.hook("run:after", () => {
console.log(`Last cursor: ${lastCursor}`);
});
});
}
Inline hooks
For all cases where you want to use a hook without creating a plugin, you can use the hooks
property of the indexer.
import { BeaconChainStream } from "@apibara/beaconchain";
import { defineIndexer } from "@apibara/indexer";
export default defineIndexer(BeaconChainStream)({
streamUrl: "https://beaconchain.preview.apibara.org",
filter: { /* ... */ },
async transform({ block: { header, validators } }) {
/* ... */
},
hooks: {
async "connect:before"({ request, options }) {
// Do something before the indexer connects to the DNA stream.
},
},
});
Indexer lifecycle
The following Javascript pseudocode shows the indexer's lifecycle. This should give you a good understand of when hooks are called.
function run(indexer) {
indexer.callHook("run:before");
const sink = indexer.sink;
// Create the request based on the indexer's configuration.
const request = Request.create({
filter: indexer.filter,
startingCursor: indexer.startingCursor,
finality: indexer.finality,
});
// Stream options.
const options = {};
indexer.callHook("connect:before", { request, options });
let stream = indexer.streamData(request, options);
indexer.callHook("connect:after");
while (true) {
const { message, done } = stream.next();
if (done) {
break;
}
indexer.callHook("message", { message });
switch (message._tag) {
case "data": {
const { block, endCursor, finality } = message.data
sink.transaction(() => {
if (indexer.isFactoryMode()) {
// Handle the factory portion of the indexer data.
// Implementation detail is not important here.
const newFilter = indexer.factory();
const request = Request.create(/* ... */);
indexer.callHook("connect:factory", { request, endCursor });
stream = indexer.streamData(request, options);
}
indexer.callHook("handler:before", { block, endCursor, finality });
try {
indexer.transform({ block, endCursor, finality });
indexer.callHook("handler:after", { block, endCursor, finality });
} catch (error) {
indexer.callHook("handler:error", { error });
}
});
indexer.callHook("transaction:commit", { finality, endCursor });
break;
}
case "invalidate": {
indexer.callHook("message:invalidate", { message });
break;
}
case "finalize": {
indexer.callHook("message:finalize", { message });
break;
}
case "heartbeat": {
indexer.callHook("message:heartbeat", { message });
break;
}
case "systemMessage": {
indexer.callHook("message:systemMessage", { message });
break;
}
}
}
indexer.callHook("run:after");
}