Bulk File Import
Load 100k to 200M rows from a file into the CDP using the Import Jobs API. This is the right path for one-off backfills, nightly syncs from your data warehouse, and any workload where your data already lives in a file rather than a stream.
When to use this path:
- Your data is already in a file (CSV, JSONL, Parquet) in S3, GCS, Azure Blob, or on disk
- Row count exceeds what inline batching handles well (> 10,000 rows)
- You want schema validation and per-row error reporting before data lands in the CDP
When to use something else:
- Real-time events → Real-time Record Writes
- Small batches (< 10k rows) that you're already generating in code → Batch Record Writes
- Ongoing queue-based sync → Batch Record Writes with a buffer
The four-step flow
Every import follows the same lifecycle: initialize → map → start → monitor.
POST /import-jobs → jobId, uploadUrl, expiresAt
PUT {uploadUrl} → upload file directly
POST /import-jobs/{id}/mapping → validates + stores field mapping
POST /import-jobs/{id}/start → state: running
GET /import-jobs/{id} → poll until completed | failed
GET /import-jobs/{id}/errors → download rows that failed validationStep 1 — Initialize the job
Provide the file metadata and tell the API what object to write into. The API returns a pre-signed uploadUrl — PUT your file directly to that URL to deliver the data.
curl -X POST https://api.experiture.ai/public/v1/import-jobs \
-H "Authorization: Bearer <your_access_token>" \
-H "Content-Type: application/json" \
-d '{
"objectName": "profiles",
"fileName": "spring-2026.csv",
"fileSize": 4823041
}'The response includes a pre-signed uploadUrl. PUT your file there before calling the mapping or start endpoints:
curl -X PUT "$(echo $UPLOAD_URL)" \
-H "Content-Type: text/csv" \
--data-binary @spring-2026.csvResponse
{
"success": true,
"data": {
"jobId": "imp_01HXYZ",
"uploadUrl": "https://upload.experiture.ai/jobs/imp_01HXYZ/file?X-Amz-Signature=...",
"expiresAt": "2026-04-21T16:30:00Z",
"landingPath": "jobs/imp_01HXYZ/file",
"method": "PUT",
"headers": {},
"requestedBy": "usr_01HXYZ"
}
}uploadUrl expires at expiresAt — upload your file before then. You can optionally pass createList, listName, targetListId, createAudience, audienceName, or autoProfileAfterImport in the init request (see the list creation section below).
Supported formats
| Format | Extension | Notes |
|---|---|---|
| CSV | .csv | UTF-8; first row must be a header |
| JSONL | .jsonl, .ndjson | One JSON object per line |
| Parquet | .parquet | Columnar; fastest for > 1M rows |
| Gzip-compressed | .csv.gz, .jsonl.gz | Decompressed before processing |
Max file size: 50 GB. Files > 10 GB are accepted but see the operational tips below.
Step 2 — Set the mapping
Declare which source columns map to which CDP fields. Columns you omit are silently ignored — a clean way to drop PII you don't want to land in the CDP.
curl -X POST https://api.experiture.ai/public/v1/import-jobs/imp_01HXYZ/mapping \
-H "Authorization: Bearer <your_access_token>" \
-H "Content-Type: application/json" \
-d '{
"sourceFields": {
"email_address": "string",
"FirstName": "string",
"LastName": "string",
"signup_ts": "datetime"
},
"fieldMap": {
"email_address": "email",
"FirstName": "first_name",
"LastName": "last_name",
"signup_ts": "signed_up_at"
}
}'sourceFields maps each source column name to its data type ("string", "datetime", "integer", etc.). Columns omitted from fieldMap are ignored.
Checking the target schema
If you're not sure what field names exist on the target object:
curl https://api.experiture.ai/public/v1/metadata/objects/profiles \
-H "Authorization: Bearer <your_access_token>" \
| jq '[.data.fields[] | {name: .name, type: .type, required: .required}]'Unknown destination fields cause CDP_ETL.VALIDATION.REQUEST_SCHEMA at the mapping stage — not mid-job.
Step 3 — Start the job
curl -X POST https://api.experiture.ai/public/v1/import-jobs/imp_01HXYZ/start \
-H "Authorization: Bearer <your_access_token>" \
-H "Content-Type: application/json" \
-d '{}'State transitions to running. The pipeline begins reading rows, applying the mapping, validating types, and merging records into the target object.
Step 4 — Monitor to completion
Poll with exponential backoff. Start at 5 seconds and cap at 60 seconds.
Typical durations
| Row count | Typical duration |
|---|---|
| < 100k | 30 s – 2 min |
| 100k – 1M | 1 min – 8 min |
| 1M – 10M | 5 min – 40 min |
| > 10M | 30 min – 4 hrs |
Python poller
import time, requests
def wait_for_job(job_id: str, token: str, timeout: int = 3600) -> dict:
url = f"https://api.experiture.ai/public/v1/import-jobs/{job_id}"
headers = {"Authorization": f"Bearer {token}"}
deadline = time.time() + timeout
backoff = 5.0
while time.time() < deadline:
resp = requests.get(url, headers=headers)
resp.raise_for_status()
data = resp.json()["data"]
state = data["state"]
print(
f"[{state}] read={data.get('readRows', 0)}"
f" valid={data.get('validRows', 0)}"
f" invalid={data.get('invalidRows', 0)}"
)
if state in ("completed", "failed"):
return data
time.sleep(backoff)
backoff = min(backoff * 1.5, 60)
raise TimeoutError(f"Job {job_id} did not finish within {timeout}s")Terminal states and what they mean
| State | Meaning |
|---|---|
completed | All rows processed; check invalidRows for partial failures |
failed | Job-level failure (file unreadable, schema mismatch, or mapping error); no rows written |
Handling validation errors
When a job completes, check invalidRows. If non-zero, fetch the error file location:
curl https://api.experiture.ai/public/v1/import-jobs/imp_01HXYZ/errors \
-H "Authorization: Bearer <your_access_token>"{
"success": true,
"data": {
"hasErrors": true,
"errorFileUrl": "https://...",
"expiresAt": "2026-04-22T15:38:41Z",
"errorCount": 33
}
}Then download the JSONL file at errorFileUrl. Each line is the original source row plus the rejection reason:
{"rowNumber": 42, "sourceRow": {"email_address": "not-an-email", "signup_ts": "2026-04-21"}, "error": "email_address: invalid email format"}
{"rowNumber": 891, "sourceRow": {"email_address": "jane@example.com", "signup_ts": "2026-04-21"}, "error": "signed_up_at: naive datetime — include timezone offset"}Recovery pattern: fix the source rows, re-upload, and start a new import job. Jobs are not re-runnable after reaching a terminal state. Point the new job at a fixed file — you don't need to re-process good rows.
List and audience creation during import
You can tell the import to create a static list from the imported records, or add them into an existing list. These options are passed at initialization time alongside objectName, fileName, and fileSize.
Create a new static list:
{
"objectName": "profiles",
"fileName": "spring-2026.csv",
"fileSize": 4823041,
"createList": true,
"listName": "Spring 2026 Backfill"
}Add successfully imported profiles to an existing list:
{
"objectName": "profiles",
"fileName": "spring-2026.csv",
"fileSize": 4823041,
"targetListId": "lst_01HXYZ"
}Create a dynamic audience from the import (separate from list creation):
{
"objectName": "profiles",
"fileName": "spring-2026.csv",
"fileSize": 4823041,
"createAudience": true,
"audienceName": "Spring 2026 Prospects"
}The list or audience is built after all rows are validated and merged. Records that fail validation are not added.
Full Python example
import os, time, requests
API_KEY = os.environ["EXPERITURE_API_KEY"]
BASE = "https://api.experiture.ai/public/v1"
headers = {"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"}
file_path = "spring-2026.csv"
file_size = os.path.getsize(file_path)
# 1. Initialize
init_resp = requests.post(f"{BASE}/import-jobs", headers=headers, json={
"objectName": "profiles",
"fileName": "spring-2026.csv",
"fileSize": file_size,
}).json()
job = init_resp["data"]
job_id = job["jobId"]
upload_url = job["uploadUrl"]
print(f"Job created: {job_id}")
# 2. Upload the file
with open(file_path, "rb") as f:
requests.put(upload_url, data=f, headers={"Content-Type": "text/csv"}).raise_for_status()
# 3. Set mapping
requests.post(f"{BASE}/import-jobs/{job_id}/mapping", headers=headers, json={
"sourceFields": {
"email_address": "string",
"FirstName": "string",
"LastName": "string",
"signup_ts": "datetime",
},
"fieldMap": {
"email_address": "email",
"FirstName": "first_name",
"LastName": "last_name",
"signup_ts": "signed_up_at",
},
}).raise_for_status()
# 4. Start
requests.post(f"{BASE}/import-jobs/{job_id}/start", headers=headers, json={}).raise_for_status()
# 5. Monitor
result = wait_for_job(job_id, API_KEY, timeout=7200)
print(f"Done: {result.get('mergedInserts',0)} inserts, {result.get('mergedUpdates',0)} updates, {result.get('invalidRows',0)} invalid")
if result.get("invalidRows", 0) > 0:
errors_meta = requests.get(f"{BASE}/import-jobs/{job_id}/errors", headers=headers).json()["data"]
if errors_meta.get("hasErrors"):
import urllib.request
lines = urllib.request.urlopen(errors_meta["errorFileUrl"]).read().decode().strip().split("\n")
for line in lines[:10]:
print(f" {line}")Operational tips
Use Parquet for > 1M rows. Columnar format means the pipeline only reads the fields you're mapping — 3–5× faster than CSV for wide tables with many unused columns.
Pre-sort by matchKey. Merge is significantly faster when all updates for the same key appear contiguously in the file. For CSVs: sort -t, -k1,1 spring-2026.csv > spring-2026-sorted.csv.
Split files > 10 GB into 5–8 GB chunks. Maximum file size is 50 GB, but smaller jobs restart more quickly after infrastructure events and give you better progress visibility.
Avoid overlapping jobs on the same object. Two concurrent jobs merging into the same object can race on shared keys. Run them sequentially unless your keys are non-overlapping.
Tag records with a batch identifier. Map a static value into a field like import_source or import_batch via fieldMap from a column you prepend to your file. This makes it easy to query "all records from this import" later and diagnose downstream issues.
Budget for 2× the row count in merge compute. The pipeline reads each row once for validation and once for the actual upsert — plan storage and compute quotas accordingly.
Common failure modes
failed job with CDP_ETL.IMPORT.* source error — the uploaded file could not be read (upload incomplete, file corrupted). Re-initialize to get a fresh uploadUrl and re-upload.
failed job with CDP_ETL.VALIDATION.REQUEST_SCHEMA — a mapped destination field doesn't exist. Check with GET /metadata/objects/{name} and fix the mapping.
completed but all rows in invalidRows — your source data has a systematic type error (e.g. dates without timezone). Download errors, look at the first 10, fix the source transformation.
Job stuck in running for > 2× typical duration — contact support with the jobId. This usually indicates an infrastructure event; jobs resume automatically in most cases but support can check.
See Also
- Import Jobs API reference
- Batch Record Writes — for < 10k rows you're generating in code
- Metadata API — inspect the target schema before mapping
- List Management — create a list from imported records