Batch Record Writes
Send up to 10,000 records in a single HTTP request — dramatically more efficient than individual writes when you're processing queued events, syncing periodic feeds, or flushing a local buffer. This path is fully synchronous: the response confirms every record's outcome before returning.
When to use this path:
- You're processing events in bulk (queue consumer, scheduled job, micro-batch)
- Row count is between 2 and 10,000 per flush
- You want inline per-record validation — not a separate error download step
When to use something else:
- Single events as they arrive → Real-time Record Writes
-
10,000 rows, or file-based loads → Bulk File Import
- Backfilling millions of historical rows → Bulk File Import
The two batch operations
Batch append
POST /records/{object_name}/append-batch
Inserts every record as a new row. No merge, no dedup. Use for event streams where each row is independently meaningful: page views, transactions, log entries.
curl -X POST https://api.experiture.ai/public/v1/records/events/append-batch \
-H "Authorization: Bearer <your_access_token>" \
-H "Content-Type: application/json" \
-H "Idempotency-Key: $(uuidgen)" \
-d '{
"records": [
{ "session_id": "s_001", "event": "page_view", "url": "/pricing", "occurred_at": "2026-04-21T15:00:00Z" },
{ "session_id": "s_001", "event": "cta_click", "url": "/pricing", "occurred_at": "2026-04-21T15:01:12Z" },
{ "session_id": "s_002", "event": "page_view", "url": "/docs", "occurred_at": "2026-04-21T15:02:44Z" }
]
}'Batch upsert
POST /records/{object_name}/upsert-batch
Merges each record into an existing row if one matches matchKey; inserts if no match is found. Use for profile updates, subscription states, or any entity where you want last-write-wins across a feed.
curl -X POST https://api.experiture.ai/public/v1/records/profiles/upsert-batch \
-H "Authorization: Bearer <your_access_token>" \
-H "Content-Type: application/json" \
-H "Idempotency-Key: $(uuidgen)" \
-d '{
"records": [
{ "email": "alice@example.com", "tier": "gold", "last_seen_at": "2026-04-21T10:00:00Z" },
{ "email": "bob@example.com", "tier": "silver", "last_seen_at": "2026-04-21T11:30:00Z" }
],
"matchKey": "email"
}'Response format
202 Accepted
The batch endpoints are asynchronous. The response acknowledges that the records have been accepted and queued for processing, and returns a jobId you can use to poll for status.
{
"success": true,
"data": {
"operation": "upsert-batch",
"objectName": "profiles",
"jobId": "rjob_01HXYZ",
"state": "queued",
"acceptedRecords": 2,
"statusPath": "/records/profiles/jobs/rjob_01HXYZ"
}
}Poll GET /records/{object_name}/jobs/{job_id} to track processing. When the job reaches a terminal state, check metrics.invalidRows for any rows that failed validation during processing. Valid rows are committed even when others fail — the batch is not transactional.
Idempotency
Batch endpoints accept Idempotency-Key the same way single-write endpoints do. A re-send with the same key returns the cached original response for 24 hours.
Generate a key that represents the logical batch, not the HTTP attempt:
import hashlib, json, uuid
def batch_idempotency_key(records: list[dict]) -> str:
# Stable key: hash the canonical record set
canonical = json.dumps(records, sort_keys=True, ensure_ascii=True)
return str(uuid.UUID(hashlib.md5(canonical.encode()).hexdigest()))For queue-based consumers, derive the key from the message IDs that compose the batch:
def queue_batch_key(message_ids: list[str]) -> str:
combined = ":".join(sorted(message_ids))
return str(uuid.UUID(hashlib.md5(combined.encode()).hexdigest()))This ensures that if your flush loop restarts mid-batch, re-flushing the same messages doesn't double-write.
Buffering pattern
Most production integrations don't call the batch endpoint directly — they accumulate records in a local buffer and flush on size or time, whichever comes first.
import time, threading, requests
from collections import deque
class RecordBuffer:
def __init__(self, api_key: str, object_name: str, match_key: str,
max_size: int = 500, max_age_s: float = 5.0):
self.api_key = api_key
self.object_name = object_name
self.match_key = match_key
self.max_size = max_size
self.max_age_s = max_age_s
self._buf: deque[dict] = deque()
self._lock = threading.Lock()
self._last_flush = time.monotonic()
def add(self, record: dict):
with self._lock:
self._buf.append(record)
if len(self._buf) >= self.max_size:
self._flush_locked()
elif time.monotonic() - self._last_flush >= self.max_age_s:
self._flush_locked()
def flush(self):
with self._lock:
self._flush_locked()
def _flush_locked(self):
if not self._buf:
return
records = list(self._buf)
self._buf.clear()
self._last_flush = time.monotonic()
requests.post(
f"https://api.experiture.ai/public/v1/records/{self.object_name}/upsert-batch",
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
},
json={"records": records, "matchKey": self.match_key},
).raise_for_status()Call buffer.flush() at shutdown (or in a finally block) to drain any partial batch.
Handling per-record errors
After the batch job completes, poll GET /records/{object_name}/jobs/{job_id} and check metrics.invalidRows. Never silently discard rejected rows — route them to a dead-letter queue or log table for review:
import time, requests
def wait_for_batch_job(job_id: str, object_name: str, api_key: str, timeout: int = 120) -> dict:
url = f"https://api.experiture.ai/public/v1/records/{object_name}/jobs/{job_id}"
headers = {"Authorization": f"Bearer {api_key}"}
deadline = time.time() + timeout
backoff = 2.0
while time.time() < deadline:
data = requests.get(url, headers=headers).json()["data"]
if data["state"] in ("completed", "failed"):
return data
time.sleep(backoff)
backoff = min(backoff * 1.5, 15)
raise TimeoutError(f"Batch job {job_id} did not finish within {timeout}s")
# After submitting a batch, poll and check metrics
result = wait_for_batch_job(job_id, "profiles", api_key)
metrics = result.get("metrics", {})
if metrics.get("invalidRows", 0) > 0:
logger.warning("batch had %d rejected rows — check job errors", metrics["invalidRows"])
dlq.publish({"job_id": job_id, "invalid_rows": metrics["invalidRows"]})Common reasons rows are rejected:
CDP_ETL.VALIDATION.REQUEST_SCHEMA— field doesn't exist on the object schemaCDP_ETL.VALIDATION.REQUEST_INVALID— field type mismatch or invalid format (e.g. bare datetime)
Rejected rows are deterministic: retrying the same row will produce the same error. Fix the data before re-sending.
Retry logic
Retry the entire batch on transient errors (429, 5xx). The idempotency key ensures accepted records aren't doubled.
import random, time, requests
RETRYABLE_STATUS = {429, 500, 502, 503, 504}
def flush_with_retry(api_key: str, object_name: str, records: list, match_key: str, max_attempts: int = 5):
idempotency_key = queue_batch_key([r.get("email", str(i)) for i, r in enumerate(records)])
url = f"https://api.experiture.ai/public/v1/records/{object_name}/upsert-batch"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
"Idempotency-Key": idempotency_key,
}
body = {"records": records, "matchKey": match_key}
for attempt in range(max_attempts):
resp = requests.post(url, headers=headers, json=body)
if resp.status_code not in RETRYABLE_STATUS:
resp.raise_for_status()
return resp.json()
retry_after = float(resp.headers.get("Retry-After", 0))
backoff = min(30, (2 ** attempt) + random.random())
time.sleep(max(retry_after, backoff))
raise RuntimeError(f"Exhausted {max_attempts} attempts")Rate limits
Rate limits for batch writes depend on your plan — see rate limits documentation for current values. Each batch request counts as one request against the rate limit regardless of how many records it contains. Batching is the most efficient way to ingest at scale.
Batch size guidance
| Records per batch | Notes |
|---|---|
| 1–10 | Inefficient — use single-write endpoint instead |
| 50–500 | Good default for most queue consumers |
| 500–2,000 | Optimal for high-throughput pipelines |
| 2,000–10,000 | Max efficiency — request latency increases linearly but total throughput is highest |
Latency scales roughly linearly with batch size. For latency-sensitive workloads (sub-200ms P99), keep batches under 1,000 records.
Common failure modes
413 Payload Too Large — body exceeds 10 MB. Either reduce batch size or compress field values (timestamps especially tend to inflate payloads).
CDP_ETL.VALIDATION.REQUEST_INVALID (too many records) — batch has more than 10,000 records. Split and re-send.
CDP_ETL.VALIDATION.REQUEST_SCHEMA on the outer request — one or more field names in your batch don't match the object schema. Unlike per-record validation, this rejects the entire batch. Validate field names before sending.
High invalidRows in job metrics — indicates a systematic mapping issue. Check the job's error details, fix your transformation, and replay from the DLQ.
Full example — SQS consumer
import os, boto3, json
sqs = boto3.client("sqs", region_name="us-east-1")
API_KEY = os.environ["EXPERITURE_API_KEY"]
QUEUE_URL = "https://sqs.us-east-1.amazonaws.com/123456789/profile-updates"
def process_batch():
response = sqs.receive_message(
QueueUrl=QUEUE_URL,
MaxNumberOfMessages=10, # SQS max per poll
WaitTimeSeconds=20,
)
messages = response.get("Messages", [])
if not messages:
return
records = []
receipt_handles = []
for msg in messages:
body = json.loads(msg["Body"])
records.append({
"email": body["email"],
"first_name": body.get("first_name"),
"last_seen_at": body["timestamp"],
})
receipt_handles.append(msg["ReceiptHandle"])
result = flush_with_retry(API_KEY, "profiles", records, match_key="email")
# Only delete messages after the batch has been accepted
for handle in receipt_handles:
sqs.delete_message(QueueUrl=QUEUE_URL, ReceiptHandle=handle)
# Poll the job for completion and check for invalid rows
job_result = wait_for_batch_job(result["data"]["jobId"], "profiles", API_KEY)
metrics = job_result.get("metrics", {})
if metrics.get("invalidRows", 0):
# Rejected rows won't succeed on retry — log and move on
logger.warning("%d rows rejected", metrics["invalidRows"])See Also
- Real-time Record Writes — when you need one write confirmed synchronously
- Bulk File Import — for file-based loads > 10k rows
- Error Handling & Retries — production retry patterns
- Records API reference