Skip to main content

Streaming Import

Convex supports streaming import. Convex provides a connector implementation for Airbyte. Those connectors use the following APIs.

Streaming import support is automatically enabled for all Convex projects.

Streaming import requests require deployment admin authorization via the HTTP header Authorization. The value is Convex <access_key> where the access key comes from "Deploy key" on the Convex dashboard and gives full read and write access to your Convex data.

Headers

Streaming import endpoints accept a Convex-Client: streaming-import-<version> header, where the version follows Semver guidelines. If this header is not specified, Convex will default to the latest version. We recommend using the header to ensure the consumer of this API does not break as the API changes.

GET /api/streaming_import/primary_key_indexes_ready

The primary_key_indexes_ready endpoint takes a list of table names and returns true if the primary key indexes (created by add_primary_key_indexes) on all those tables are ready. If the tables are newly created, the indexes should be ready immediately; however if there are existing documents in the tables, it may take some time to backfill the primary key indexes. The response looks like:

{
"indexesReady": true
}

PUT /api/streaming_import/add_primary_key_indexes

The add_primary_key_indexes endpoint takes a JSON body containing the primary keys for tables and creates indexes on the primary keys to be backfilled. Note that they are not immediately ready to query - the primary_key_indexes_ready endpoint needs to be polled until it returns True before calling import_airbyte_records with records that require primary key indexes. Also note that Convex queries will not have access to these added indexes. These are solely for use in import_airbyte_records. The body takes the form of a map of index names to list of field paths to index. Each field path is represented by a list of fields that can represent nested field paths.

{
"indexes": {
"<table_name>": [["<field1>"], ["<field2>", "<nested_field>"]]
}
}

Expected API Usage:

  1. Add indexes for primary keys by making a request to add_primary_key_indexes.
  2. Poll primary_key_indexes_ready until the response is true.
  3. Query using the added indexes.

PUT api/streaming_import/clear_tables

The clear_tables endpoint deletes all documents from the specified tables. Note that this may require multiple transactions. If there is an intermediate error only some documents may be deleted. The JSON body to use this API request contains a list of table names:

{
"tableNames": ["<table_1>", "<table_2>"]
}

POST api/streaming_import/replace_tables

This endpoint is no longer supported. Use api/streaming_import/clear_tables instead.

The replace_tables endpoint renames tables with temporary names to their final names, deleting any existing tables with the final names.

The JSON body to use this API request contains a list of table names:

{
"tableNames": { "<table_1_temp>": "<table_1>", "<table_2_temp>": "<table_2>" }
}

POST api/streaming_import/import_airbyte_records

The import_airbyte_records endpoint enables streaming ingress into a Convex deployment and is designed to be called from an Airbyte destination connector.

It takes a map of streams and a list of messages in the JSON body. Each stream has a name and JSON schema that will correspond to a Convex table. Streams where records should be deduplicated include a primary key as well, which is represented as a list of lists of strings that are field paths. Records for streams without a primary key are appended to tables; records for streams with a primary key replace an existing record where the primary key value matches or are appended if there is no match. If you are using primary keys, you must call the add_primary_key_indexes endpoint first and wait for them to backfill by polling primary_key_indexes_ready.

Each message contains a stream name and a JSON document that will be inserted (or replaced, in the case of deduplicated sync) into the table with the corresponding stream name. Table names are same as the stream names. Airbyte records become Convex documents.

{
"tables": {
"<stream_name>": {
"primaryKey": [["<field1>"], ["<field2>", "<nested_field>"]],
"jsonSchema": // see https://json-schema.org/ for examples
}
},
"messages": [{
"tableName": "<table_name>",
"data": {} // JSON object conforming to the `json_schema` for that stream
}]
}

Similar to clear_tables, it is possible to execute a partial import using import_airbyte_records if there is a failure after a transaction has committed.

Expected API Usage:

  1. [Optional] Add any indexes if using primary keys and deduplicated sync (see add_primary_key_indexes above).
  2. [Optional] Delete all documents in specified tables using clear_tables if using overwrite sync.
  3. Make a request to import_airbyte_records with new records to sync and stream information.