Guides
Batch Record Writes

Batch Record Writes

Send up to 10,000 records in a single HTTP request — dramatically more efficient than individual writes when you're processing queued events, syncing periodic feeds, or flushing a local buffer. This path is fully synchronous: the response confirms every record's outcome before returning.

When to use this path:

  • You're processing events in bulk (queue consumer, scheduled job, micro-batch)
  • Row count is between 2 and 10,000 per flush
  • You want inline per-record validation — not a separate error download step

When to use something else:


The two batch operations

Batch append

POST /records/{object_name}/append-batch

Inserts every record as a new row. No merge, no dedup. Use for event streams where each row is independently meaningful: page views, transactions, log entries.

curl -X POST https://api.experiture.ai/public/v1/records/events/append-batch \
  -H "Authorization: Bearer <your_access_token>" \
  -H "Content-Type: application/json" \
  -H "Idempotency-Key: $(uuidgen)" \
  -d '{
    "records": [
      { "session_id": "s_001", "event": "page_view", "url": "/pricing", "occurred_at": "2026-04-21T15:00:00Z" },
      { "session_id": "s_001", "event": "cta_click",  "url": "/pricing", "occurred_at": "2026-04-21T15:01:12Z" },
      { "session_id": "s_002", "event": "page_view", "url": "/docs",    "occurred_at": "2026-04-21T15:02:44Z" }
    ]
  }'

Batch upsert

POST /records/{object_name}/upsert-batch

Merges each record into an existing row if one matches matchKey; inserts if no match is found. Use for profile updates, subscription states, or any entity where you want last-write-wins across a feed.

curl -X POST https://api.experiture.ai/public/v1/records/profiles/upsert-batch \
  -H "Authorization: Bearer <your_access_token>" \
  -H "Content-Type: application/json" \
  -H "Idempotency-Key: $(uuidgen)" \
  -d '{
    "records": [
      { "email": "alice@example.com", "tier": "gold",   "last_seen_at": "2026-04-21T10:00:00Z" },
      { "email": "bob@example.com",   "tier": "silver", "last_seen_at": "2026-04-21T11:30:00Z" }
    ],
    "matchKey": "email"
  }'

Response format

202 Accepted

The batch endpoints are asynchronous. The response acknowledges that the records have been accepted and queued for processing, and returns a jobId you can use to poll for status.

{
  "success": true,
  "data": {
    "operation": "upsert-batch",
    "objectName": "profiles",
    "jobId": "rjob_01HXYZ",
    "state": "queued",
    "acceptedRecords": 2,
    "statusPath": "/records/profiles/jobs/rjob_01HXYZ"
  }
}

Poll GET /records/{object_name}/jobs/{job_id} to track processing. When the job reaches a terminal state, check metrics.invalidRows for any rows that failed validation during processing. Valid rows are committed even when others fail — the batch is not transactional.


Idempotency

Batch endpoints accept Idempotency-Key the same way single-write endpoints do. A re-send with the same key returns the cached original response for 24 hours.

Generate a key that represents the logical batch, not the HTTP attempt:

import hashlib, json, uuid
 
def batch_idempotency_key(records: list[dict]) -> str:
    # Stable key: hash the canonical record set
    canonical = json.dumps(records, sort_keys=True, ensure_ascii=True)
    return str(uuid.UUID(hashlib.md5(canonical.encode()).hexdigest()))

For queue-based consumers, derive the key from the message IDs that compose the batch:

def queue_batch_key(message_ids: list[str]) -> str:
    combined = ":".join(sorted(message_ids))
    return str(uuid.UUID(hashlib.md5(combined.encode()).hexdigest()))

This ensures that if your flush loop restarts mid-batch, re-flushing the same messages doesn't double-write.


Buffering pattern

Most production integrations don't call the batch endpoint directly — they accumulate records in a local buffer and flush on size or time, whichever comes first.

import time, threading, requests
from collections import deque
 
class RecordBuffer:
    def __init__(self, api_key: str, object_name: str, match_key: str,
                 max_size: int = 500, max_age_s: float = 5.0):
        self.api_key = api_key
        self.object_name = object_name
        self.match_key = match_key
        self.max_size = max_size
        self.max_age_s = max_age_s
        self._buf: deque[dict] = deque()
        self._lock = threading.Lock()
        self._last_flush = time.monotonic()
 
    def add(self, record: dict):
        with self._lock:
            self._buf.append(record)
            if len(self._buf) >= self.max_size:
                self._flush_locked()
            elif time.monotonic() - self._last_flush >= self.max_age_s:
                self._flush_locked()
 
    def flush(self):
        with self._lock:
            self._flush_locked()
 
    def _flush_locked(self):
        if not self._buf:
            return
        records = list(self._buf)
        self._buf.clear()
        self._last_flush = time.monotonic()
        requests.post(
            f"https://api.experiture.ai/public/v1/records/{self.object_name}/upsert-batch",
            headers={
                "Authorization": f"Bearer {self.api_key}",
                "Content-Type": "application/json",
            },
            json={"records": records, "matchKey": self.match_key},
        ).raise_for_status()

Call buffer.flush() at shutdown (or in a finally block) to drain any partial batch.


Handling per-record errors

After the batch job completes, poll GET /records/{object_name}/jobs/{job_id} and check metrics.invalidRows. Never silently discard rejected rows — route them to a dead-letter queue or log table for review:

import time, requests
 
def wait_for_batch_job(job_id: str, object_name: str, api_key: str, timeout: int = 120) -> dict:
    url = f"https://api.experiture.ai/public/v1/records/{object_name}/jobs/{job_id}"
    headers = {"Authorization": f"Bearer {api_key}"}
    deadline = time.time() + timeout
    backoff = 2.0
    while time.time() < deadline:
        data = requests.get(url, headers=headers).json()["data"]
        if data["state"] in ("completed", "failed"):
            return data
        time.sleep(backoff)
        backoff = min(backoff * 1.5, 15)
    raise TimeoutError(f"Batch job {job_id} did not finish within {timeout}s")
 
# After submitting a batch, poll and check metrics
result = wait_for_batch_job(job_id, "profiles", api_key)
metrics = result.get("metrics", {})
if metrics.get("invalidRows", 0) > 0:
    logger.warning("batch had %d rejected rows — check job errors", metrics["invalidRows"])
    dlq.publish({"job_id": job_id, "invalid_rows": metrics["invalidRows"]})

Common reasons rows are rejected:

  • CDP_ETL.VALIDATION.REQUEST_SCHEMA — field doesn't exist on the object schema
  • CDP_ETL.VALIDATION.REQUEST_INVALID — field type mismatch or invalid format (e.g. bare datetime)

Rejected rows are deterministic: retrying the same row will produce the same error. Fix the data before re-sending.


Retry logic

Retry the entire batch on transient errors (429, 5xx). The idempotency key ensures accepted records aren't doubled.

import random, time, requests
 
RETRYABLE_STATUS = {429, 500, 502, 503, 504}
 
def flush_with_retry(api_key: str, object_name: str, records: list, match_key: str, max_attempts: int = 5):
    idempotency_key = queue_batch_key([r.get("email", str(i)) for i, r in enumerate(records)])
    url = f"https://api.experiture.ai/public/v1/records/{object_name}/upsert-batch"
    headers = {
        "Authorization": f"Bearer {api_key}",
        "Content-Type": "application/json",
        "Idempotency-Key": idempotency_key,
    }
    body = {"records": records, "matchKey": match_key}
    for attempt in range(max_attempts):
        resp = requests.post(url, headers=headers, json=body)
        if resp.status_code not in RETRYABLE_STATUS:
            resp.raise_for_status()
            return resp.json()
        retry_after = float(resp.headers.get("Retry-After", 0))
        backoff = min(30, (2 ** attempt) + random.random())
        time.sleep(max(retry_after, backoff))
    raise RuntimeError(f"Exhausted {max_attempts} attempts")

Rate limits

Rate limits for batch writes depend on your plan — see rate limits documentation for current values. Each batch request counts as one request against the rate limit regardless of how many records it contains. Batching is the most efficient way to ingest at scale.


Batch size guidance

Records per batchNotes
1–10Inefficient — use single-write endpoint instead
50–500Good default for most queue consumers
500–2,000Optimal for high-throughput pipelines
2,000–10,000Max efficiency — request latency increases linearly but total throughput is highest

Latency scales roughly linearly with batch size. For latency-sensitive workloads (sub-200ms P99), keep batches under 1,000 records.


Common failure modes

413 Payload Too Large — body exceeds 10 MB. Either reduce batch size or compress field values (timestamps especially tend to inflate payloads).

CDP_ETL.VALIDATION.REQUEST_INVALID (too many records) — batch has more than 10,000 records. Split and re-send.

CDP_ETL.VALIDATION.REQUEST_SCHEMA on the outer request — one or more field names in your batch don't match the object schema. Unlike per-record validation, this rejects the entire batch. Validate field names before sending.

High invalidRows in job metrics — indicates a systematic mapping issue. Check the job's error details, fix your transformation, and replay from the DLQ.


Full example — SQS consumer

import os, boto3, json
 
sqs = boto3.client("sqs", region_name="us-east-1")
API_KEY = os.environ["EXPERITURE_API_KEY"]
QUEUE_URL = "https://sqs.us-east-1.amazonaws.com/123456789/profile-updates"
 
def process_batch():
    response = sqs.receive_message(
        QueueUrl=QUEUE_URL,
        MaxNumberOfMessages=10,  # SQS max per poll
        WaitTimeSeconds=20,
    )
    messages = response.get("Messages", [])
    if not messages:
        return
 
    records = []
    receipt_handles = []
    for msg in messages:
        body = json.loads(msg["Body"])
        records.append({
            "email": body["email"],
            "first_name": body.get("first_name"),
            "last_seen_at": body["timestamp"],
        })
        receipt_handles.append(msg["ReceiptHandle"])
 
    result = flush_with_retry(API_KEY, "profiles", records, match_key="email")
 
    # Only delete messages after the batch has been accepted
    for handle in receipt_handles:
        sqs.delete_message(QueueUrl=QUEUE_URL, ReceiptHandle=handle)
 
    # Poll the job for completion and check for invalid rows
    job_result = wait_for_batch_job(result["data"]["jobId"], "profiles", API_KEY)
    metrics = job_result.get("metrics", {})
    if metrics.get("invalidRows", 0):
        # Rejected rows won't succeed on retry — log and move on
        logger.warning("%d rows rejected", metrics["invalidRows"])

See Also