Streaming Import
Convex supports streaming import. Convex provides a connector implementation for Airbyte. Those connectors use the following APIs.
Streaming import support is automatically enabled for all Convex projects.
Streaming import requests require deployment admin authorization via the HTTP
header Authorization
. The value is Convex <access_key>
where the access key
comes from "Deploy key" on the Convex dashboard and gives full read and write
access to your Convex data.
Headers
Streaming import endpoints accept a Convex-Client: streaming-import-<version>
header, where the version follows Semver guidelines. If
this header is not specified, Convex will default to the latest version. We
recommend using the header to ensure the consumer of this API does not break as
the API changes.
GET /api/streaming_import/primary_key_indexes_ready
The primary_key_indexes_ready
endpoint takes a list of table names and returns
true if the primary key indexes (created by add_primary_key_indexes
) on all
those tables are ready. If the tables are newly created, the indexes should be
ready immediately; however if there are existing documents in the tables, it may
take some time to backfill the primary key indexes. The response looks like:
{
"indexesReady": true
}
PUT /api/streaming_import/add_primary_key_indexes
The add_primary_key_indexes
endpoint takes a JSON body containing the primary
keys for tables and creates indexes on the primary keys to be backfilled. Note
that they are not immediately ready to query - the primary_key_indexes_ready
endpoint needs to be polled until it returns True before calling
import_airbyte_records
with records that require primary key indexes. Also
note that Convex queries will not have access to these added indexes. These are
solely for use in import_airbyte_records
. The body takes the form of a map of
index names to list of field paths to index. Each field path is represented by a
list of fields that can represent nested field paths.
{
"indexes": {
"<table_name>": [["<field1>"], ["<field2>", "<nested_field>"]]
}
}
Expected API Usage:
- Add indexes for primary keys by making a request to
add_primary_key_indexes
. - Poll
primary_key_indexes_ready
until the response is true. - Query using the added indexes.
PUT api/streaming_import/clear_tables
The clear_tables
endpoint deletes all documents from the specified tables.
Note that this may require multiple transactions. If there is an intermediate
error only some documents may be deleted. The JSON body to use this API request
contains a list of table names:
{
"tableNames": ["<table_1>", "<table_2>"]
}
POST api/streaming_import/replace_tables
This endpoint is no longer supported. Use api/streaming_import/clear_tables
instead.
The replace_tables
endpoint renames tables with temporary names to their final
names, deleting any existing tables with the final names.
The JSON body to use this API request contains a list of table names:
{
"tableNames": { "<table_1_temp>": "<table_1>", "<table_2_temp>": "<table_2>" }
}
POST api/streaming_import/import_airbyte_records
The import_airbyte_records
endpoint enables streaming ingress into a Convex
deployment and is designed to be called from an Airbyte destination connector.
It takes a map of streams and a list of messages in the JSON body. Each stream
has a name and JSON schema that will correspond to a Convex table. Streams where
records should be deduplicated include a primary key as well, which is
represented as a list of lists of strings that are field paths. Records for
streams without a primary key are appended to tables; records for streams with a
primary key replace an existing record where the primary key value matches or
are appended if there is no match. If you are using primary keys, you must call
the add_primary_key_indexes
endpoint first and wait for them to backfill by
polling primary_key_indexes_ready
.
Each message contains a stream name and a JSON document that will be inserted (or replaced, in the case of deduplicated sync) into the table with the corresponding stream name. Table names are same as the stream names. Airbyte records become Convex documents.
{
"tables": {
"<stream_name>": {
"primaryKey": [["<field1>"], ["<field2>", "<nested_field>"]],
"jsonSchema": // see https://json-schema.org/ for examples
}
},
"messages": [{
"tableName": "<table_name>",
"data": {} // JSON object conforming to the `json_schema` for that stream
}]
}
Similar to clear_tables
, it is possible to execute a partial import using
import_airbyte_records
if there is a failure after a transaction has
committed.
Expected API Usage:
- [Optional] Add any indexes if using primary keys and
deduplicated sync
(see
add_primary_key_indexes
above). - [Optional] Delete all documents in specified tables using
clear_tables
if using overwrite sync. - Make a request to
import_airbyte_records
with new records to sync and stream information.