E-Commerce Events
Capture the full customer commerce journey — from product views to purchases to refunds — in the CDP. This guide covers the canonical event schema for Shopify and Stripe integrations, how to model order data alongside profile data, and patterns for calculating lifetime value.
Data model
Commerce data lives across two objects:
| Object | Content | Operation |
|---|---|---|
profiles | Who the customer is — identity, loyalty tier, computed lifetime value | Upsert (merge updates in) |
orders | Each individual transaction — immutable record of what happened | Append (never merge) |
Keep these separate. Profiles are stateful entities you update. Orders are events you append. Mixing them (putting order_id on a profile row) loses historical context and makes analytics harder.
Shopify webhooks
import os, requests
API_KEY = os.environ["EXPERITURE_API_KEY"]
BASE_URL = "https://api.experiture.ai/public/v1"
def cdp_append(object_name: str, record: dict, idempotency_key: str):
requests.post(
f"{BASE_URL}/records/{object_name}/append",
headers={
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json",
"Idempotency-Key": idempotency_key,
},
json={"record": record},
).raise_for_status()
def cdp_upsert(object_name: str, record: dict, match_key: str, idempotency_key: str):
requests.post(
f"{BASE_URL}/records/{object_name}/upsert",
headers={
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json",
"Idempotency-Key": idempotency_key,
},
json={"record": record, "matchKey": match_key},
).raise_for_status()Order placed
Subscribe to orders/create in Shopify to capture new purchases:
@app.post("/webhooks/shopify/orders/create")
async def shopify_order_created(request: Request, x_shopify_hmac_sha256: str = Header(None)):
body = await request.body()
if not verify_shopify_hmac(body, x_shopify_hmac_sha256):
raise HTTPException(status_code=401)
order = json.loads(body)
customer = order.get("customer", {})
email = (order.get("email") or customer.get("email", "")).strip().lower()
if not email:
return {"ok": True} # guest checkout without email — skip
# Append the order event
cdp_append(
object_name="orders",
record={
"shopify_order_id": str(order["id"]),
"customer_email": email,
"total": float(order["total_price"]),
"subtotal": float(order["subtotal_price"]),
"discount_total": float(order["total_discounts"]),
"currency": order["currency"],
"item_count": sum(li["quantity"] for li in order["line_items"]),
"financial_status": order["financial_status"],
"tags": order.get("tags", ""),
"placed_at": order["created_at"],
},
idempotency_key=f"shopify:order:{order['id']}:created",
)
# Upsert the profile — update cumulative fields
cdp_upsert(
object_name="profiles",
record={
"email": email,
"first_name": customer.get("first_name"),
"last_name": customer.get("last_name"),
"phone": normalize_phone(customer.get("phone")) if customer.get("phone") else None,
"order_count": customer.get("orders_count"),
"ltv": float(customer.get("total_spent", 0)),
"last_order_at": order["created_at"],
},
match_key="email",
idempotency_key=f"shopify:order:{order['id']}:profile",
),
return {"ok": True}Order updated (refund, cancellation)
@app.post("/webhooks/shopify/orders/updated")
async def shopify_order_updated(request: Request, x_shopify_hmac_sha256: str = Header(None)):
body = await request.body()
if not verify_shopify_hmac(body, x_shopify_hmac_sha256):
raise HTTPException(status_code=401)
order = json.loads(body)
email = (order.get("email") or "").strip().lower()
# Append a status-change event to preserve history
cdp_append(
object_name="orders",
record={
"shopify_order_id": str(order["id"]),
"customer_email": email,
"total": float(order["total_price"]),
"financial_status": order["financial_status"],
"fulfillment_status": order.get("fulfillment_status"),
"refund_total": sum(float(r["transactions"][0]["amount"]) for r in order.get("refunds", []) if r.get("transactions")),
"placed_at": order["created_at"],
"updated_at": order["updated_at"],
},
idempotency_key=f"shopify:order:{order['id']}:updated:{order['updated_at']}",
)
# Sync updated LTV back to profile
if email:
customer = order.get("customer", {})
cdp_upsert(
object_name="profiles",
record={
"email": email,
"ltv": float(customer.get("total_spent", 0)),
"order_count": customer.get("orders_count"),
},
match_key="email",
idempotency_key=f"shopify:order:{order['id']}:profile:updated",
)
return {"ok": True}Stripe webhooks
Payment succeeded
@app.post("/webhooks/stripe/payment")
async def stripe_payment(request: Request, stripe_signature: str = Header(None)):
body = await request.body()
try:
event = stripe.Webhook.construct_event(body, stripe_signature, STRIPE_WEBHOOK_SECRET)
except stripe.error.SignatureVerificationError:
raise HTTPException(status_code=400)
event_id = event["id"]
if event["type"] == "invoice.payment_succeeded":
invoice = event["data"]["object"]
email = invoice.get("customer_email", "").strip().lower()
cdp_append(
object_name="payments",
record={
"stripe_invoice_id": invoice["id"],
"stripe_customer_id": invoice["customer"],
"customer_email": email,
"amount_paid": invoice["amount_paid"] / 100,
"amount_due": invoice["amount_due"] / 100,
"currency": invoice["currency"].upper(),
"subscription_id": invoice.get("subscription"),
"period_start": _from_unix(invoice["period_start"]),
"period_end": _from_unix(invoice["period_end"]),
"paid_at": _from_unix(invoice["status_transitions"]["paid_at"]),
},
idempotency_key=event_id,
)
# Update profile with latest billing signals
if email:
cdp_upsert(
object_name="profiles",
record={
"email": email,
"stripe_customer_id": invoice["customer"],
"last_payment_at": _from_unix(invoice["status_transitions"]["paid_at"]),
"payment_status": "current",
},
match_key="email",
idempotency_key=f"{event_id}:profile",
)
elif event["type"] == "invoice.payment_failed":
invoice = event["data"]["object"]
email = invoice.get("customer_email", "").strip().lower()
if email:
cdp_upsert(
object_name="profiles",
record={
"email": email,
"payment_status": "past_due",
"payment_failed_at": _from_unix(int(time.time())),
},
match_key="email",
idempotency_key=f"{event_id}:profile",
)
return {"ok": True}
def _from_unix(ts: int) -> str:
from datetime import datetime, timezone
return datetime.fromtimestamp(ts, tz=timezone.utc).isoformat()Product view and cart events
For behavioral signals that don't come from a backend system, send them via your server-side tracking layer:
# Called from your server when a user views a product page
def track_product_view(user_email: str, product_id: str, product_name: str, price: float):
cdp_append(
object_name="product_events",
record={
"customer_email": user_email.lower(),
"event_type": "product_view",
"product_id": product_id,
"product_name": product_name,
"price": price,
"occurred_at": datetime.utcnow().isoformat() + "Z",
},
idempotency_key=f"pview:{user_email}:{product_id}:{int(time.time())}",
)
# Called when a user adds to cart
def track_add_to_cart(user_email: str, product_id: str, quantity: int, price: float):
cdp_append(
object_name="product_events",
record={
"customer_email": user_email.lower(),
"event_type": "add_to_cart",
"product_id": product_id,
"quantity": quantity,
"line_total": price * quantity,
"occurred_at": datetime.utcnow().isoformat() + "Z",
},
idempotency_key=f"cart:{user_email}:{product_id}:{int(time.time())}",
)Computing lifetime value
Lifetime value (ltv) should be a field on the profile. The two approaches:
1. Trust the source system (preferred when available)
Shopify and Stripe both maintain total_spent / total charges per customer. Sync this value back to the profile on every relevant event (as shown above). It's always current and you don't need to aggregate.
2. Periodic batch recalculation
If your commerce data is spread across multiple systems, run a nightly SQL query against your data warehouse and sync the result via Bulk File Import:
-- In your warehouse
SELECT
email,
SUM(total) AS ltv,
COUNT(*) AS order_count,
MAX(placed_at) AS last_order_at
FROM orders
WHERE financial_status = 'paid'
GROUP BY emailExport this as CSV, then run a nightly import job to upsert into profiles.
Order schema design
A well-designed orders object schema enables powerful commerce audiences. Recommended fields:
| Field | Type | Notes |
|---|---|---|
shopify_order_id | string | Source system identifier |
customer_email | string | Join key to profiles |
total | decimal | Order grand total |
subtotal | decimal | Before discounts |
discount_total | decimal | Total discount applied |
currency | string | ISO 4217 (USD, EUR) |
item_count | integer | Total units purchased |
financial_status | string | paid, refunded, partially_refunded |
fulfillment_status | string | fulfilled, partial, unfulfilled |
placed_at | timestamp | ISO 8601 with tz |
tags | string | Shopify order tags |
This schema supports audiences like:
- "Customers who placed > 3 orders in the last 90 days"
- "Customers with order value > $500 and fully paid"
- "Customers who had a refund in the last 30 days"
Avoiding duplicate events on retries
Commerce events (payments, orders) are especially important to keep deduplicated — double-counting revenue corrupts analytics.
Always derive idempotency keys from the source system's immutable identifiers:
# Stable across any number of retries:
idempotency_key = f"shopify:order:{order_id}:created" # append
idempotency_key = f"stripe:invoice:{invoice_id}:paid" # append
idempotency_key = f"stripe:invoice:{invoice_id}:paid:profile" # upsert
# WRONG — will generate a new key on each retry:
idempotency_key = str(uuid.uuid4())See Also
- Webhook Handler Integration — full webhook security and retry patterns
- Real-time Record Writes — upsert and append mechanics
- Building a Unified Profile — how profile and order data connect
- Records API reference