CRM & System Sync
Keep your CDP profiles in sync with HubSpot, Salesforce, or any CRM or ERP system. This guide covers two sync patterns — push-based (your CRM calls your webhook on change) and pull-based (your code polls the CRM API and writes to the CDP) — and explains how to handle updates without corrupting data that other systems own.
Choosing a sync pattern
| Pattern | When to use | Trade-off |
|---|---|---|
| Push (webhook) | CRM supports webhooks; you need near-real-time sync | Simpler, lower latency; requires a public endpoint |
| Pull (scheduled) | CRM has no webhooks or webhooks are unreliable; you need full-history sync | You control the schedule; always lags by one poll interval |
| Hybrid | High-volume + real-time | Push for fast updates, daily pull to catch gaps |
Push pattern: HubSpot contact webhooks
HubSpot can POST to your endpoint whenever a contact property changes. Configure the webhook subscription in HubSpot's developer app under Webhooks — select the properties you want to track.
import os, requests
from fastapi import FastAPI, Request, HTTPException, Header
import hashlib, hmac, json
app = FastAPI()
API_KEY = os.environ["EXPERITURE_API_KEY"]
BASE_URL = "https://api.experiture.ai/public/v1"
HUBSPOT_CLIENT_SECRET = "your_hubspot_client_secret"
def verify_hubspot_signature(body: bytes, signature_v3: str, timestamp: str, request_uri: str) -> bool:
source = f"POST{request_uri}{body.decode('utf-8')}{timestamp}"
digest = hashlib.sha256(source.encode("utf-8")).hexdigest()
return hmac.compare_digest(digest, signature_v3)
@app.post("/webhooks/hubspot")
async def handle_hubspot(
request: Request,
x_hubspot_signature_v3: str = Header(None),
x_hubspot_request_timestamp: str = Header(None),
):
body = await request.body()
uri = str(request.url)
if not verify_hubspot_signature(body, x_hubspot_signature_v3, x_hubspot_request_timestamp, uri):
raise HTTPException(status_code=401, detail="Invalid signature")
events = json.loads(body)
for event in events:
contact_id = str(event["objectId"])
prop_name = event.get("propertyName")
prop_value = event.get("propertyValue")
event_id = f"hs:{event['eventId']}"
record = {
"external_id": contact_id,
"hubspot_contact_id": contact_id,
}
FIELD_MAP = {
"email": "email",
"firstname": "first_name",
"lastname": "last_name",
"phone": "phone",
"company": "company_name",
"jobtitle": "job_title",
"hs_lead_status": "crm_lead_status",
"lifecyclestage": "crm_lifecycle_stage",
}
if prop_name in FIELD_MAP:
record[FIELD_MAP[prop_name]] = prop_value
resp = requests.post(
f"{BASE_URL}/records/profiles/upsert",
headers={
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json",
"Idempotency-Key": event_id,
},
json={"record": record, "matchKey": "hubspot_contact_id"},
)
if resp.status_code in (429, 500, 502, 503, 504):
raise HTTPException(status_code=503)
resp.raise_for_status()
return {"ok": True}Pull pattern: Salesforce delta sync
For Salesforce (or any CRM that doesn't push), poll for records modified since the last sync time. Run this as a scheduled job — every 15 minutes is typically the right balance between freshness and API quota use.
import os, time, requests, datetime
API_KEY = os.environ["EXPERITURE_API_KEY"]
BASE_URL = "https://api.experiture.ai/public/v1"
SALESFORCE_INSTANCE = "https://yourorg.salesforce.com"
SALESFORCE_TOKEN = "..." # from OAuth flow
def get_modified_contacts(since: datetime.datetime) -> list[dict]:
since_str = since.strftime("%Y-%m-%dT%H:%M:%S+00:00")
query = f"""
SELECT Id, Email, FirstName, LastName, Phone,
Title, Account.Name, LeadSource, LastModifiedDate
FROM Contact
WHERE LastModifiedDate > {since_str}
ORDER BY LastModifiedDate ASC
LIMIT 2000
"""
resp = requests.get(
f"{SALESFORCE_INSTANCE}/services/data/v59.0/query",
params={"q": query},
headers={"Authorization": f"Bearer {SALESFORCE_TOKEN}"},
)
resp.raise_for_status()
return resp.json()["records"]
def sync_salesforce_contacts(last_sync_at: datetime.datetime) -> datetime.datetime:
contacts = get_modified_contacts(since=last_sync_at)
if not contacts:
return last_sync_at
records = []
for c in contacts:
if not c.get("Email"):
continue # skip contacts without email
record = {
"email": c["Email"].strip().lower(),
"external_id": c["Id"],
"salesforce_id": c["Id"],
"first_name": c.get("FirstName"),
"last_name": c.get("LastName"),
"phone": normalize_phone(c.get("Phone")) if c.get("Phone") else None,
"job_title": c.get("Title"),
"company_name": c.get("Account", {}).get("Name") if c.get("Account") else None,
"lead_source": c.get("LeadSource"),
"crm_last_modified_at": c["LastModifiedDate"],
}
# Drop None values — don't clear fields that the CRM left blank
record = {k: v for k, v in record.items() if v is not None}
records.append(record)
# Flush in batches of 500
for i in range(0, len(records), 500):
chunk = records[i:i+500]
requests.post(
f"{BASE_URL}/records/profiles/upsert-batch",
headers={
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json",
},
json={"records": chunk, "matchKey": "salesforce_id"},
).raise_for_status()
# Return the timestamp of the most recently modified contact
latest = max(c["LastModifiedDate"] for c in contacts)
return datetime.datetime.fromisoformat(latest.replace("Z", "+00:00"))Store last_sync_at in a durable store (database, S3, etc.) between runs. If a job fails mid-run, you'll re-process some contacts on the next run — idempotency (via Salesforce ID as matchKey) ensures no duplicates.
Field ownership: who writes what
When multiple systems write to the same profile, each should write only the fields it owns. This prevents a CRM sync from overwriting data set by a Shopify webhook, or vice versa.
Define a clear ownership matrix for your workspace:
| Field | Owner | Notes |
|---|---|---|
email | All systems | Use as matchKey; normalize before writing |
first_name, last_name | CRM | Authoritative source for identity data |
phone | CRM or SMS platform | Normalize to E.164 |
job_title, company_name | CRM | CRM has the most accurate firmographic data |
subscription_status | Billing (Stripe) | Never write this from CRM |
order_count, ltv | Commerce (Shopify) | Never write this from CRM |
email_opted_out | ESP (Mailchimp, etc.) | Never write this from CRM unless CRM is system of record for consent |
Each integration only writes the fields in its ownership column. If the CRM handler sends null for subscription_status because the CRM doesn't have that data, it will clear real data.
# WRONG — CRM sync writing fields it doesn't own
record = {
"email": contact["email"],
"first_name": contact["first_name"],
"subscription_status": None, # CRM doesn't know this — will clear it!
}
# RIGHT — only write what CRM owns
record = {
"email": contact["email"],
"first_name": contact["first_name"],
"company_name": contact["company"],
"crm_lead_status": contact["lead_status"],
}Handling CRM identity keys
CRMs have their own record IDs. These are excellent matchKey candidates because:
- They're stable — a contact's Salesforce ID never changes
- They're unique — guaranteed by the CRM
- They're available on every write from that system
Add the CRM ID as an identity field in your workspace schema (e.g. salesforce_id, hubspot_contact_id), then use it as matchKey for all writes from that CRM.
def upsert_profile(record: dict, match_key: str, idempotency_key: str = None):
hdrs = {"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"}
if idempotency_key:
hdrs["Idempotency-Key"] = idempotency_key
requests.post(
f"{BASE_URL}/records/profiles/upsert",
headers=hdrs,
json={"record": record, "matchKey": match_key},
).raise_for_status()
# First write from CRM — creates profile keyed on salesforce_id
upsert_profile(
record={
"salesforce_id": "003XXXXXXXXXXXX",
"email": "jane@example.com",
"first_name": "Jane",
},
match_key="salesforce_id",
)
# Subsequent write from billing — uses email to find same row
upsert_profile(
record={
"email": "jane@example.com",
"subscription_status": "active",
},
match_key="email",
)After the first write, the row has both salesforce_id and email. Future writes using either identifier resolve to the same row.
Full initial sync vs delta sync
For a new CDP workspace, start with a full import of your CRM export file:
CRM → Export all contacts as CSV → Import Jobs API (upsert, matchKey=salesforce_id)Then switch to delta sync (webhook or scheduled pull) going forward. Full imports are one-time — don't run them daily or you'll repeatedly reprocess millions of records.
For the full import, see Bulk File Import and set matchKey to your CRM's ID field.
Monitoring sync health
Track these metrics in your observability stack:
- Records written per sync run — a sudden drop means the CRM query is returning nothing (auth expiry, API quota, logic error)
- Rejected records per batch — a spike means a schema change (new field in CRM not in CDP, type change)
- Sync lag — time between a CRM update and the CDP reflecting it; should be < 15 min for pull-based, < 30 s for push-based
- Last successful sync timestamp — alert if no successful run in > 2× your normal interval
import datadog # or your observability client
def sync_with_observability():
start = time.time()
records_written = 0
records_rejected = 0
try:
result = run_delta_sync()
records_written = result.inserted + result.updated
records_rejected = result.rejected_records
except Exception as e:
datadog.statsd.increment("crm_sync.failures")
raise
finally:
elapsed = time.time() - start
datadog.statsd.histogram("crm_sync.duration_seconds", elapsed)
datadog.statsd.gauge("crm_sync.records_written", records_written)
datadog.statsd.gauge("crm_sync.records_rejected", records_rejected)See Also
- Webhook Handler Integration — production webhook handler patterns
- Bulk File Import — for initial full exports from CRM
- Identity Resolution — using CRM IDs as stable matchKeys
- Building a Unified Profile — field ownership across multiple systems