Guides
CRM & System Sync

CRM & System Sync

Keep your CDP profiles in sync with HubSpot, Salesforce, or any CRM or ERP system. This guide covers two sync patterns — push-based (your CRM calls your webhook on change) and pull-based (your code polls the CRM API and writes to the CDP) — and explains how to handle updates without corrupting data that other systems own.


Choosing a sync pattern

PatternWhen to useTrade-off
Push (webhook)CRM supports webhooks; you need near-real-time syncSimpler, lower latency; requires a public endpoint
Pull (scheduled)CRM has no webhooks or webhooks are unreliable; you need full-history syncYou control the schedule; always lags by one poll interval
HybridHigh-volume + real-timePush for fast updates, daily pull to catch gaps

Push pattern: HubSpot contact webhooks

HubSpot can POST to your endpoint whenever a contact property changes. Configure the webhook subscription in HubSpot's developer app under Webhooks — select the properties you want to track.

import os, requests
from fastapi import FastAPI, Request, HTTPException, Header
import hashlib, hmac, json
 
app = FastAPI()
API_KEY = os.environ["EXPERITURE_API_KEY"]
BASE_URL = "https://api.experiture.ai/public/v1"
 
HUBSPOT_CLIENT_SECRET = "your_hubspot_client_secret"
 
def verify_hubspot_signature(body: bytes, signature_v3: str, timestamp: str, request_uri: str) -> bool:
    source = f"POST{request_uri}{body.decode('utf-8')}{timestamp}"
    digest = hashlib.sha256(source.encode("utf-8")).hexdigest()
    return hmac.compare_digest(digest, signature_v3)
 
@app.post("/webhooks/hubspot")
async def handle_hubspot(
    request: Request,
    x_hubspot_signature_v3: str = Header(None),
    x_hubspot_request_timestamp: str = Header(None),
):
    body = await request.body()
    uri = str(request.url)
 
    if not verify_hubspot_signature(body, x_hubspot_signature_v3, x_hubspot_request_timestamp, uri):
        raise HTTPException(status_code=401, detail="Invalid signature")
 
    events = json.loads(body)
 
    for event in events:
        contact_id = str(event["objectId"])
        prop_name  = event.get("propertyName")
        prop_value = event.get("propertyValue")
        event_id   = f"hs:{event['eventId']}"
 
        record = {
            "external_id": contact_id,
            "hubspot_contact_id": contact_id,
        }
 
        FIELD_MAP = {
            "email":       "email",
            "firstname":   "first_name",
            "lastname":    "last_name",
            "phone":       "phone",
            "company":     "company_name",
            "jobtitle":    "job_title",
            "hs_lead_status": "crm_lead_status",
            "lifecyclestage": "crm_lifecycle_stage",
        }
 
        if prop_name in FIELD_MAP:
            record[FIELD_MAP[prop_name]] = prop_value
 
        resp = requests.post(
            f"{BASE_URL}/records/profiles/upsert",
            headers={
                "Authorization": f"Bearer {API_KEY}",
                "Content-Type": "application/json",
                "Idempotency-Key": event_id,
            },
            json={"record": record, "matchKey": "hubspot_contact_id"},
        )
        if resp.status_code in (429, 500, 502, 503, 504):
            raise HTTPException(status_code=503)
        resp.raise_for_status()
 
    return {"ok": True}

Pull pattern: Salesforce delta sync

For Salesforce (or any CRM that doesn't push), poll for records modified since the last sync time. Run this as a scheduled job — every 15 minutes is typically the right balance between freshness and API quota use.

import os, time, requests, datetime
 
API_KEY = os.environ["EXPERITURE_API_KEY"]
BASE_URL = "https://api.experiture.ai/public/v1"
SALESFORCE_INSTANCE = "https://yourorg.salesforce.com"
SALESFORCE_TOKEN    = "..."  # from OAuth flow
 
def get_modified_contacts(since: datetime.datetime) -> list[dict]:
    since_str = since.strftime("%Y-%m-%dT%H:%M:%S+00:00")
    query = f"""
        SELECT Id, Email, FirstName, LastName, Phone,
               Title, Account.Name, LeadSource, LastModifiedDate
        FROM Contact
        WHERE LastModifiedDate > {since_str}
        ORDER BY LastModifiedDate ASC
        LIMIT 2000
    """
    resp = requests.get(
        f"{SALESFORCE_INSTANCE}/services/data/v59.0/query",
        params={"q": query},
        headers={"Authorization": f"Bearer {SALESFORCE_TOKEN}"},
    )
    resp.raise_for_status()
    return resp.json()["records"]
 
def sync_salesforce_contacts(last_sync_at: datetime.datetime) -> datetime.datetime:
    contacts = get_modified_contacts(since=last_sync_at)
    if not contacts:
        return last_sync_at
 
    records = []
    for c in contacts:
        if not c.get("Email"):
            continue  # skip contacts without email
 
        record = {
            "email":          c["Email"].strip().lower(),
            "external_id":    c["Id"],
            "salesforce_id":  c["Id"],
            "first_name":     c.get("FirstName"),
            "last_name":      c.get("LastName"),
            "phone":          normalize_phone(c.get("Phone")) if c.get("Phone") else None,
            "job_title":      c.get("Title"),
            "company_name":   c.get("Account", {}).get("Name") if c.get("Account") else None,
            "lead_source":    c.get("LeadSource"),
            "crm_last_modified_at": c["LastModifiedDate"],
        }
        # Drop None values — don't clear fields that the CRM left blank
        record = {k: v for k, v in record.items() if v is not None}
        records.append(record)
 
    # Flush in batches of 500
    for i in range(0, len(records), 500):
        chunk = records[i:i+500]
        requests.post(
            f"{BASE_URL}/records/profiles/upsert-batch",
            headers={
                "Authorization": f"Bearer {API_KEY}",
                "Content-Type": "application/json",
            },
            json={"records": chunk, "matchKey": "salesforce_id"},
        ).raise_for_status()
 
    # Return the timestamp of the most recently modified contact
    latest = max(c["LastModifiedDate"] for c in contacts)
    return datetime.datetime.fromisoformat(latest.replace("Z", "+00:00"))

Store last_sync_at in a durable store (database, S3, etc.) between runs. If a job fails mid-run, you'll re-process some contacts on the next run — idempotency (via Salesforce ID as matchKey) ensures no duplicates.


Field ownership: who writes what

When multiple systems write to the same profile, each should write only the fields it owns. This prevents a CRM sync from overwriting data set by a Shopify webhook, or vice versa.

Define a clear ownership matrix for your workspace:

FieldOwnerNotes
emailAll systemsUse as matchKey; normalize before writing
first_name, last_nameCRMAuthoritative source for identity data
phoneCRM or SMS platformNormalize to E.164
job_title, company_nameCRMCRM has the most accurate firmographic data
subscription_statusBilling (Stripe)Never write this from CRM
order_count, ltvCommerce (Shopify)Never write this from CRM
email_opted_outESP (Mailchimp, etc.)Never write this from CRM unless CRM is system of record for consent

Each integration only writes the fields in its ownership column. If the CRM handler sends null for subscription_status because the CRM doesn't have that data, it will clear real data.

# WRONG — CRM sync writing fields it doesn't own
record = {
    "email": contact["email"],
    "first_name": contact["first_name"],
    "subscription_status": None,  # CRM doesn't know this — will clear it!
}
 
# RIGHT — only write what CRM owns
record = {
    "email": contact["email"],
    "first_name": contact["first_name"],
    "company_name": contact["company"],
    "crm_lead_status": contact["lead_status"],
}

Handling CRM identity keys

CRMs have their own record IDs. These are excellent matchKey candidates because:

  1. They're stable — a contact's Salesforce ID never changes
  2. They're unique — guaranteed by the CRM
  3. They're available on every write from that system

Add the CRM ID as an identity field in your workspace schema (e.g. salesforce_id, hubspot_contact_id), then use it as matchKey for all writes from that CRM.

def upsert_profile(record: dict, match_key: str, idempotency_key: str = None):
    hdrs = {"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"}
    if idempotency_key:
        hdrs["Idempotency-Key"] = idempotency_key
    requests.post(
        f"{BASE_URL}/records/profiles/upsert",
        headers=hdrs,
        json={"record": record, "matchKey": match_key},
    ).raise_for_status()
 
# First write from CRM — creates profile keyed on salesforce_id
upsert_profile(
    record={
        "salesforce_id": "003XXXXXXXXXXXX",
        "email":         "jane@example.com",
        "first_name":    "Jane",
    },
    match_key="salesforce_id",
)
 
# Subsequent write from billing — uses email to find same row
upsert_profile(
    record={
        "email": "jane@example.com",
        "subscription_status": "active",
    },
    match_key="email",
)

After the first write, the row has both salesforce_id and email. Future writes using either identifier resolve to the same row.


Full initial sync vs delta sync

For a new CDP workspace, start with a full import of your CRM export file:

CRM → Export all contacts as CSV → Import Jobs API (upsert, matchKey=salesforce_id)

Then switch to delta sync (webhook or scheduled pull) going forward. Full imports are one-time — don't run them daily or you'll repeatedly reprocess millions of records.

For the full import, see Bulk File Import and set matchKey to your CRM's ID field.


Monitoring sync health

Track these metrics in your observability stack:

  • Records written per sync run — a sudden drop means the CRM query is returning nothing (auth expiry, API quota, logic error)
  • Rejected records per batch — a spike means a schema change (new field in CRM not in CDP, type change)
  • Sync lag — time between a CRM update and the CDP reflecting it; should be < 15 min for pull-based, < 30 s for push-based
  • Last successful sync timestamp — alert if no successful run in > 2× your normal interval
import datadog  # or your observability client
 
def sync_with_observability():
    start = time.time()
    records_written = 0
    records_rejected = 0
 
    try:
        result = run_delta_sync()
        records_written = result.inserted + result.updated
        records_rejected = result.rejected_records
    except Exception as e:
        datadog.statsd.increment("crm_sync.failures")
        raise
    finally:
        elapsed = time.time() - start
        datadog.statsd.histogram("crm_sync.duration_seconds", elapsed)
        datadog.statsd.gauge("crm_sync.records_written", records_written)
        datadog.statsd.gauge("crm_sync.records_rejected", records_rejected)

See Also