DNA streaming protocol
The Direct Node Access (DNA) streaming protocol is used to efficiently stream any on-chain data from nodes into applications.
The goals of the protocol are:
- linearize non-linear data into something easy to follow,
- let clients customize the data they want to receive, reducing the bandwidth required,
- provide support for data at all stages of finality (pending, accepted, finalized).
Note: understanding the low-level DNA protocol is not required to use Apibara. This page is provided for developers that want to understand more about Apibara's implementation details. Knowing how DNA works is useful if you want to contribute to Apibara.
The protocol
The client starts by calling the StreamData
method in the Stream
gRPC
service. The client must send a StreamDataRequest
message to the server to
start the stream.
The request includes:
stream_id
: unique id for the stream. All messages generated in response to this request will have the specified stream id, or 0 if not specified.starting_cursor
: specifies from where to start the stream. The cursor is stream-specific.finality
: specifies the finality required by the client. This parameter changes the behavior of the stream.filter
: specifies what type of data the client wants to receive. This is specific to each stream.
After the client requests data, the stream will start sending
StreamDataResponse
messages to the client. The messages can have the
following content:
invalidate
: invalidates data previously sent, for example in response to chain reorganizations.data
: sends a new batch of data.heartbeat
: periodically sent if no other types of messages were produced. Used to confirm that the client and server are still connected.
The following diagram shows how the client starts receiving a stream of
StreamDataResponse
messages after the initial StreamDataRequest
.
┌──────────┐ ┌──────────┐
│ Client │ │ Server │
└──────────┘ └──────────┘
│ │
│ StreamDataRequest(stream_id=0) │
│─────────────────────────────────────────▶│
┌ ┼ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─│─
│ [messages] │ │l
│ │ │ o
│ │ │o
│ │ StreamDataResponse(stream_id=0) │ p
│◀─────────────────────────────────────────│ │
└ ┼ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─│─
│ │
│ │
The client can reset the stream by sending a new StreamDataRequest
. The server
will stop sending data for the previous request and will start sending data for
the new stream. Notice that because the flow is async, the client may still
receive messages from the old stream definition. Use the stream_id
to uniquely
identify streams.
┌──────────┐ ┌──────────┐
│ Client │ │ Server │
└──────────┘ └──────────┘
│ │
│ StreamDataRequest(stream_id=1) │
│─────────────────────────────────────────▶│
┌ ┼ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─│─ <- ignore these messages
│ [messages from prev stream] │ │l
│ │ │ o
│ │ │o
│ │ StreamDataResponse(stream_id=0) │ p
│◀─────────────────────────────────────────│ │
└ ┼ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─│─
┌ ┼ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─│─
│ [messages] │ │l
│ │ │ o
│ │ │o
│ │ StreamDataResponse(stream_id=1) │ p
│◀─────────────────────────────────────────│ │
└ ┼ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─│─
│ │
│ │
Immutable stream
The StreamData
RPC method requires a bidirectional gRPC stream between the
client and the server. Bidirectional streams are harder to use and in general
not as well supported by gRPC libraries, for this reason we provide a
StreamDataImmutable
method that implements unidirectional streaming from
server to client.
Data finality
The DNA protocol supports streaming data with different finality. The stream behaves differently based on the finality mode specified in the request:
finalized
: the stream sendsdata
messages for finalized data only. Thedata
messages contain the data requested in the stream filter. Notice that there cannot beinvalidate
messages in this type of stream.accepted
: the stream sendsdata
messages for accepted data (including historical finalized data).pending
: the stream sendsdata
messages for pending data and for accepted data. Notice that the client may receive the same pending data multiple times.
Cursor
Cursors are used to identify a location in a stream. The end_cursor
in Data
messages refers to the location of the latest piece of data in the stream. When
clients send this cursor as a starting_cursor
in a request, the server will
resume streaming by sending the first message that follows the provided
cursor. When no starting_cursor
is specified, the stream will start from the
genesis block. If the data in or before end_cursor
was invalidated (following
a chain reorganization), the server will inform the client of it and will resume
the stream from the new stream's head.
A Cursor
is made of two components:
order_key
: this is the sequence number for messages,unique_key
: this is used to discriminate between data generated from different blocks at the same height.
Protobuf specification
This section includes the stream.proto
file containing the specification of
the Stream
gRPC service.
syntax = "proto3";
package apibara.node.v1alpha2;
service Stream {
// Stream data from the node (bi-directional).
rpc StreamData(stream StreamDataRequest) returns (stream StreamDataResponse);
// Stream data from the node.
rpc StreamDataImmutable(StreamDataRequest)
returns (stream StreamDataResponse);
// Get DNA service status.
rpc Status(StatusRequest) returns (StatusResponse);
}
// Request data to be streamed.
message StreamDataRequest {
// Used by the client to uniquely identify a stream.
// All streams use `stream_id = 0` by default.
optional uint64 stream_id = 1;
// How many items to send in a single response.
optional uint64 batch_size = 2;
// Start streaming from the provided cursor.
Cursor starting_cursor = 3;
// Return data with the specified finality.
// If not specified, defaults to `DATA_STATUS_ACCEPTED`.
optional DataFinality finality = 4;
// Return data according to the stream-specific filter.
bytes filter = 5;
// Combine multiple filters in the same stream.
repeated bytes multi_filter = 6;
}
// Contains the data requested from the client.
message StreamDataResponse {
// The stream id.
uint64 stream_id = 1;
oneof message {
Invalidate invalidate = 2;
Data data = 3;
Heartbeat heartbeat = 4;
}
}
// A cursor over the stream content.
message Cursor {
// Key used for ordering messages in the stream.
uint64 order_key = 1;
// Key used to discriminate branches in the stream.
bytes unique_key = 2;
}
// Data finality.
enum DataFinality {
DATA_STATUS_UNKNOWN = 0;
// Data was received, but is not part of the canonical chain yet.
DATA_STATUS_PENDING = 1;
// Data is now part of the canonical chain, but could still be invalidated.
DATA_STATUS_ACCEPTED = 2;
// Data is finalized and cannot be invalidated.
DATA_STATUS_FINALIZED = 3;
}
// Invalidate data after the given cursor.
message Invalidate {
// The cursor of the message before the now invalid data.
Cursor cursor = 1;
}
// A batch of data.
message Data {
// Cursor of the last item in the batch.
Cursor end_cursor = 1;
// The finality status of the data in the batch.
DataFinality finality = 2;
// The stream data.
repeated bytes data = 3;
// Cursor used to produced the batch.
Cursor cursor = 4;
}
// Sent to clients to check if stream is still connected.
message Heartbeat {}
// Request for the `Status` method.
message StatusRequest {}
// Response for the `Status` method.
message StatusResponse {
// The current head of the chain.
Cursor current_head = 1;
// The last cursor that was ingested by the node.
Cursor last_ingested = 2;
}
End of docs.