Guides
Bulk File Import

Bulk File Import

Load 100k to 200M rows from a file into the CDP using the Import Jobs API. This is the right path for one-off backfills, nightly syncs from your data warehouse, and any workload where your data already lives in a file rather than a stream.

When to use this path:

  • Your data is already in a file (CSV, JSONL, Parquet) in S3, GCS, Azure Blob, or on disk
  • Row count exceeds what inline batching handles well (> 10,000 rows)
  • You want schema validation and per-row error reporting before data lands in the CDP

When to use something else:


The four-step flow

Every import follows the same lifecycle: initialize → map → start → monitor.

POST /import-jobs               → jobId, uploadUrl, expiresAt
PUT  {uploadUrl}                → upload file directly
POST /import-jobs/{id}/mapping  → validates + stores field mapping
POST /import-jobs/{id}/start    → state: running
GET  /import-jobs/{id}          → poll until completed | failed
GET  /import-jobs/{id}/errors   → download rows that failed validation

Step 1 — Initialize the job

Provide the file metadata and tell the API what object to write into. The API returns a pre-signed uploadUrl — PUT your file directly to that URL to deliver the data.

curl -X POST https://api.experiture.ai/public/v1/import-jobs \
  -H "Authorization: Bearer <your_access_token>" \
  -H "Content-Type: application/json" \
  -d '{
    "objectName": "profiles",
    "fileName": "spring-2026.csv",
    "fileSize": 4823041
  }'

The response includes a pre-signed uploadUrl. PUT your file there before calling the mapping or start endpoints:

curl -X PUT "$(echo $UPLOAD_URL)" \
  -H "Content-Type: text/csv" \
  --data-binary @spring-2026.csv

Response

{
  "success": true,
  "data": {
    "jobId": "imp_01HXYZ",
    "uploadUrl": "https://upload.experiture.ai/jobs/imp_01HXYZ/file?X-Amz-Signature=...",
    "expiresAt": "2026-04-21T16:30:00Z",
    "landingPath": "jobs/imp_01HXYZ/file",
    "method": "PUT",
    "headers": {},
    "requestedBy": "usr_01HXYZ"
  }
}

uploadUrl expires at expiresAt — upload your file before then. You can optionally pass createList, listName, targetListId, createAudience, audienceName, or autoProfileAfterImport in the init request (see the list creation section below).

Supported formats

FormatExtensionNotes
CSV.csvUTF-8; first row must be a header
JSONL.jsonl, .ndjsonOne JSON object per line
Parquet.parquetColumnar; fastest for > 1M rows
Gzip-compressed.csv.gz, .jsonl.gzDecompressed before processing

Max file size: 50 GB. Files > 10 GB are accepted but see the operational tips below.


Step 2 — Set the mapping

Declare which source columns map to which CDP fields. Columns you omit are silently ignored — a clean way to drop PII you don't want to land in the CDP.

curl -X POST https://api.experiture.ai/public/v1/import-jobs/imp_01HXYZ/mapping \
  -H "Authorization: Bearer <your_access_token>" \
  -H "Content-Type: application/json" \
  -d '{
    "sourceFields": {
      "email_address": "string",
      "FirstName":     "string",
      "LastName":      "string",
      "signup_ts":     "datetime"
    },
    "fieldMap": {
      "email_address": "email",
      "FirstName":     "first_name",
      "LastName":      "last_name",
      "signup_ts":     "signed_up_at"
    }
  }'

sourceFields maps each source column name to its data type ("string", "datetime", "integer", etc.). Columns omitted from fieldMap are ignored.

Checking the target schema

If you're not sure what field names exist on the target object:

curl https://api.experiture.ai/public/v1/metadata/objects/profiles \
  -H "Authorization: Bearer <your_access_token>" \
  | jq '[.data.fields[] | {name: .name, type: .type, required: .required}]'

Unknown destination fields cause CDP_ETL.VALIDATION.REQUEST_SCHEMA at the mapping stage — not mid-job.


Step 3 — Start the job

curl -X POST https://api.experiture.ai/public/v1/import-jobs/imp_01HXYZ/start \
  -H "Authorization: Bearer <your_access_token>" \
  -H "Content-Type: application/json" \
  -d '{}'

State transitions to running. The pipeline begins reading rows, applying the mapping, validating types, and merging records into the target object.


Step 4 — Monitor to completion

Poll with exponential backoff. Start at 5 seconds and cap at 60 seconds.

Typical durations

Row countTypical duration
< 100k30 s – 2 min
100k – 1M1 min – 8 min
1M – 10M5 min – 40 min
> 10M30 min – 4 hrs

Python poller

import time, requests
 
def wait_for_job(job_id: str, token: str, timeout: int = 3600) -> dict:
    url = f"https://api.experiture.ai/public/v1/import-jobs/{job_id}"
    headers = {"Authorization": f"Bearer {token}"}
    deadline = time.time() + timeout
    backoff = 5.0
 
    while time.time() < deadline:
        resp = requests.get(url, headers=headers)
        resp.raise_for_status()
        data = resp.json()["data"]
 
        state = data["state"]
        print(
            f"[{state}] read={data.get('readRows', 0)}"
            f" valid={data.get('validRows', 0)}"
            f" invalid={data.get('invalidRows', 0)}"
        )
 
        if state in ("completed", "failed"):
            return data
 
        time.sleep(backoff)
        backoff = min(backoff * 1.5, 60)
 
    raise TimeoutError(f"Job {job_id} did not finish within {timeout}s")

Terminal states and what they mean

StateMeaning
completedAll rows processed; check invalidRows for partial failures
failedJob-level failure (file unreadable, schema mismatch, or mapping error); no rows written

Handling validation errors

When a job completes, check invalidRows. If non-zero, fetch the error file location:

curl https://api.experiture.ai/public/v1/import-jobs/imp_01HXYZ/errors \
  -H "Authorization: Bearer <your_access_token>"
{
  "success": true,
  "data": {
    "hasErrors": true,
    "errorFileUrl": "https://...",
    "expiresAt": "2026-04-22T15:38:41Z",
    "errorCount": 33
  }
}

Then download the JSONL file at errorFileUrl. Each line is the original source row plus the rejection reason:

{"rowNumber": 42, "sourceRow": {"email_address": "not-an-email", "signup_ts": "2026-04-21"}, "error": "email_address: invalid email format"}
{"rowNumber": 891, "sourceRow": {"email_address": "jane@example.com", "signup_ts": "2026-04-21"}, "error": "signed_up_at: naive datetime — include timezone offset"}

Recovery pattern: fix the source rows, re-upload, and start a new import job. Jobs are not re-runnable after reaching a terminal state. Point the new job at a fixed file — you don't need to re-process good rows.


List and audience creation during import

You can tell the import to create a static list from the imported records, or add them into an existing list. These options are passed at initialization time alongside objectName, fileName, and fileSize.

Create a new static list:

{
  "objectName": "profiles",
  "fileName": "spring-2026.csv",
  "fileSize": 4823041,
  "createList": true,
  "listName": "Spring 2026 Backfill"
}

Add successfully imported profiles to an existing list:

{
  "objectName": "profiles",
  "fileName": "spring-2026.csv",
  "fileSize": 4823041,
  "targetListId": "lst_01HXYZ"
}

Create a dynamic audience from the import (separate from list creation):

{
  "objectName": "profiles",
  "fileName": "spring-2026.csv",
  "fileSize": 4823041,
  "createAudience": true,
  "audienceName": "Spring 2026 Prospects"
}

The list or audience is built after all rows are validated and merged. Records that fail validation are not added.


Full Python example

import os, time, requests
 
API_KEY = os.environ["EXPERITURE_API_KEY"]
BASE = "https://api.experiture.ai/public/v1"
headers = {"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"}
 
file_path = "spring-2026.csv"
file_size = os.path.getsize(file_path)
 
# 1. Initialize
init_resp = requests.post(f"{BASE}/import-jobs", headers=headers, json={
    "objectName": "profiles",
    "fileName": "spring-2026.csv",
    "fileSize": file_size,
}).json()
job = init_resp["data"]
job_id = job["jobId"]
upload_url = job["uploadUrl"]
print(f"Job created: {job_id}")
 
# 2. Upload the file
with open(file_path, "rb") as f:
    requests.put(upload_url, data=f, headers={"Content-Type": "text/csv"}).raise_for_status()
 
# 3. Set mapping
requests.post(f"{BASE}/import-jobs/{job_id}/mapping", headers=headers, json={
    "sourceFields": {
        "email_address": "string",
        "FirstName":     "string",
        "LastName":      "string",
        "signup_ts":     "datetime",
    },
    "fieldMap": {
        "email_address": "email",
        "FirstName":     "first_name",
        "LastName":      "last_name",
        "signup_ts":     "signed_up_at",
    },
}).raise_for_status()
 
# 4. Start
requests.post(f"{BASE}/import-jobs/{job_id}/start", headers=headers, json={}).raise_for_status()
 
# 5. Monitor
result = wait_for_job(job_id, API_KEY, timeout=7200)
print(f"Done: {result.get('mergedInserts',0)} inserts, {result.get('mergedUpdates',0)} updates, {result.get('invalidRows',0)} invalid")
 
if result.get("invalidRows", 0) > 0:
    errors_meta = requests.get(f"{BASE}/import-jobs/{job_id}/errors", headers=headers).json()["data"]
    if errors_meta.get("hasErrors"):
        import urllib.request
        lines = urllib.request.urlopen(errors_meta["errorFileUrl"]).read().decode().strip().split("\n")
        for line in lines[:10]:
            print(f"  {line}")

Operational tips

Use Parquet for > 1M rows. Columnar format means the pipeline only reads the fields you're mapping — 3–5× faster than CSV for wide tables with many unused columns.

Pre-sort by matchKey. Merge is significantly faster when all updates for the same key appear contiguously in the file. For CSVs: sort -t, -k1,1 spring-2026.csv > spring-2026-sorted.csv.

Split files > 10 GB into 5–8 GB chunks. Maximum file size is 50 GB, but smaller jobs restart more quickly after infrastructure events and give you better progress visibility.

Avoid overlapping jobs on the same object. Two concurrent jobs merging into the same object can race on shared keys. Run them sequentially unless your keys are non-overlapping.

Tag records with a batch identifier. Map a static value into a field like import_source or import_batch via fieldMap from a column you prepend to your file. This makes it easy to query "all records from this import" later and diagnose downstream issues.

Budget for 2× the row count in merge compute. The pipeline reads each row once for validation and once for the actual upsert — plan storage and compute quotas accordingly.


Common failure modes

failed job with CDP_ETL.IMPORT.* source error — the uploaded file could not be read (upload incomplete, file corrupted). Re-initialize to get a fresh uploadUrl and re-upload.

failed job with CDP_ETL.VALIDATION.REQUEST_SCHEMA — a mapped destination field doesn't exist. Check with GET /metadata/objects/{name} and fix the mapping.

completed but all rows in invalidRows — your source data has a systematic type error (e.g. dates without timezone). Download errors, look at the first 10, fix the source transformation.

Job stuck in running for > 2× typical duration — contact support with the jobId. This usually indicates an infrastructure event; jobs resume automatically in most cases but support can check.


See Also