Guides
E-Commerce Events

E-Commerce Events

Capture the full customer commerce journey — from product views to purchases to refunds — in the CDP. This guide covers the canonical event schema for Shopify and Stripe integrations, how to model order data alongside profile data, and patterns for calculating lifetime value.


Data model

Commerce data lives across two objects:

ObjectContentOperation
profilesWho the customer is — identity, loyalty tier, computed lifetime valueUpsert (merge updates in)
ordersEach individual transaction — immutable record of what happenedAppend (never merge)

Keep these separate. Profiles are stateful entities you update. Orders are events you append. Mixing them (putting order_id on a profile row) loses historical context and makes analytics harder.


Shopify webhooks

import os, requests
 
API_KEY = os.environ["EXPERITURE_API_KEY"]
BASE_URL = "https://api.experiture.ai/public/v1"
 
def cdp_append(object_name: str, record: dict, idempotency_key: str):
    requests.post(
        f"{BASE_URL}/records/{object_name}/append",
        headers={
            "Authorization": f"Bearer {API_KEY}",
            "Content-Type": "application/json",
            "Idempotency-Key": idempotency_key,
        },
        json={"record": record},
    ).raise_for_status()
 
def cdp_upsert(object_name: str, record: dict, match_key: str, idempotency_key: str):
    requests.post(
        f"{BASE_URL}/records/{object_name}/upsert",
        headers={
            "Authorization": f"Bearer {API_KEY}",
            "Content-Type": "application/json",
            "Idempotency-Key": idempotency_key,
        },
        json={"record": record, "matchKey": match_key},
    ).raise_for_status()

Order placed

Subscribe to orders/create in Shopify to capture new purchases:

@app.post("/webhooks/shopify/orders/create")
async def shopify_order_created(request: Request, x_shopify_hmac_sha256: str = Header(None)):
    body = await request.body()
    if not verify_shopify_hmac(body, x_shopify_hmac_sha256):
        raise HTTPException(status_code=401)
 
    order = json.loads(body)
    customer = order.get("customer", {})
    email = (order.get("email") or customer.get("email", "")).strip().lower()
 
    if not email:
        return {"ok": True}  # guest checkout without email — skip
 
    # Append the order event
    cdp_append(
        object_name="orders",
        record={
            "shopify_order_id":  str(order["id"]),
            "customer_email":    email,
            "total":             float(order["total_price"]),
            "subtotal":          float(order["subtotal_price"]),
            "discount_total":    float(order["total_discounts"]),
            "currency":          order["currency"],
            "item_count":        sum(li["quantity"] for li in order["line_items"]),
            "financial_status":  order["financial_status"],
            "tags":              order.get("tags", ""),
            "placed_at":         order["created_at"],
        },
        idempotency_key=f"shopify:order:{order['id']}:created",
    )
 
    # Upsert the profile — update cumulative fields
    cdp_upsert(
        object_name="profiles",
        record={
            "email":           email,
            "first_name":      customer.get("first_name"),
            "last_name":       customer.get("last_name"),
            "phone":           normalize_phone(customer.get("phone")) if customer.get("phone") else None,
            "order_count":     customer.get("orders_count"),
            "ltv":             float(customer.get("total_spent", 0)),
            "last_order_at":   order["created_at"],
        },
        match_key="email",
        idempotency_key=f"shopify:order:{order['id']}:profile",
    ),
 
    return {"ok": True}

Order updated (refund, cancellation)

@app.post("/webhooks/shopify/orders/updated")
async def shopify_order_updated(request: Request, x_shopify_hmac_sha256: str = Header(None)):
    body = await request.body()
    if not verify_shopify_hmac(body, x_shopify_hmac_sha256):
        raise HTTPException(status_code=401)
 
    order = json.loads(body)
    email = (order.get("email") or "").strip().lower()
 
    # Append a status-change event to preserve history
    cdp_append(
        object_name="orders",
        record={
            "shopify_order_id":  str(order["id"]),
            "customer_email":    email,
            "total":             float(order["total_price"]),
            "financial_status":  order["financial_status"],
            "fulfillment_status": order.get("fulfillment_status"),
            "refund_total":      sum(float(r["transactions"][0]["amount"]) for r in order.get("refunds", []) if r.get("transactions")),
            "placed_at":         order["created_at"],
            "updated_at":        order["updated_at"],
        },
        idempotency_key=f"shopify:order:{order['id']}:updated:{order['updated_at']}",
    )
 
    # Sync updated LTV back to profile
    if email:
        customer = order.get("customer", {})
        cdp_upsert(
            object_name="profiles",
            record={
                "email": email,
                "ltv":   float(customer.get("total_spent", 0)),
                "order_count": customer.get("orders_count"),
            },
            match_key="email",
            idempotency_key=f"shopify:order:{order['id']}:profile:updated",
        )
 
    return {"ok": True}

Stripe webhooks

Payment succeeded

@app.post("/webhooks/stripe/payment")
async def stripe_payment(request: Request, stripe_signature: str = Header(None)):
    body = await request.body()
    try:
        event = stripe.Webhook.construct_event(body, stripe_signature, STRIPE_WEBHOOK_SECRET)
    except stripe.error.SignatureVerificationError:
        raise HTTPException(status_code=400)
 
    event_id = event["id"]
 
    if event["type"] == "invoice.payment_succeeded":
        invoice = event["data"]["object"]
        email = invoice.get("customer_email", "").strip().lower()
 
        cdp_append(
            object_name="payments",
            record={
                "stripe_invoice_id":    invoice["id"],
                "stripe_customer_id":   invoice["customer"],
                "customer_email":       email,
                "amount_paid":          invoice["amount_paid"] / 100,
                "amount_due":           invoice["amount_due"] / 100,
                "currency":             invoice["currency"].upper(),
                "subscription_id":      invoice.get("subscription"),
                "period_start":         _from_unix(invoice["period_start"]),
                "period_end":           _from_unix(invoice["period_end"]),
                "paid_at":              _from_unix(invoice["status_transitions"]["paid_at"]),
            },
            idempotency_key=event_id,
        )
 
        # Update profile with latest billing signals
        if email:
            cdp_upsert(
                object_name="profiles",
                record={
                    "email": email,
                    "stripe_customer_id": invoice["customer"],
                    "last_payment_at": _from_unix(invoice["status_transitions"]["paid_at"]),
                    "payment_status": "current",
                },
                match_key="email",
                idempotency_key=f"{event_id}:profile",
            )
 
    elif event["type"] == "invoice.payment_failed":
        invoice = event["data"]["object"]
        email = invoice.get("customer_email", "").strip().lower()
        if email:
            cdp_upsert(
                object_name="profiles",
                record={
                    "email": email,
                    "payment_status": "past_due",
                    "payment_failed_at": _from_unix(int(time.time())),
                },
                match_key="email",
                idempotency_key=f"{event_id}:profile",
            )
 
    return {"ok": True}
 
def _from_unix(ts: int) -> str:
    from datetime import datetime, timezone
    return datetime.fromtimestamp(ts, tz=timezone.utc).isoformat()

Product view and cart events

For behavioral signals that don't come from a backend system, send them via your server-side tracking layer:

# Called from your server when a user views a product page
def track_product_view(user_email: str, product_id: str, product_name: str, price: float):
    cdp_append(
        object_name="product_events",
        record={
            "customer_email":  user_email.lower(),
            "event_type":      "product_view",
            "product_id":      product_id,
            "product_name":    product_name,
            "price":           price,
            "occurred_at":     datetime.utcnow().isoformat() + "Z",
        },
        idempotency_key=f"pview:{user_email}:{product_id}:{int(time.time())}",
    )
 
# Called when a user adds to cart
def track_add_to_cart(user_email: str, product_id: str, quantity: int, price: float):
    cdp_append(
        object_name="product_events",
        record={
            "customer_email":  user_email.lower(),
            "event_type":      "add_to_cart",
            "product_id":      product_id,
            "quantity":        quantity,
            "line_total":      price * quantity,
            "occurred_at":     datetime.utcnow().isoformat() + "Z",
        },
        idempotency_key=f"cart:{user_email}:{product_id}:{int(time.time())}",
    )

Computing lifetime value

Lifetime value (ltv) should be a field on the profile. The two approaches:

1. Trust the source system (preferred when available)

Shopify and Stripe both maintain total_spent / total charges per customer. Sync this value back to the profile on every relevant event (as shown above). It's always current and you don't need to aggregate.

2. Periodic batch recalculation

If your commerce data is spread across multiple systems, run a nightly SQL query against your data warehouse and sync the result via Bulk File Import:

-- In your warehouse
SELECT
    email,
    SUM(total) AS ltv,
    COUNT(*) AS order_count,
    MAX(placed_at) AS last_order_at
FROM orders
WHERE financial_status = 'paid'
GROUP BY email

Export this as CSV, then run a nightly import job to upsert into profiles.


Order schema design

A well-designed orders object schema enables powerful commerce audiences. Recommended fields:

FieldTypeNotes
shopify_order_idstringSource system identifier
customer_emailstringJoin key to profiles
totaldecimalOrder grand total
subtotaldecimalBefore discounts
discount_totaldecimalTotal discount applied
currencystringISO 4217 (USD, EUR)
item_countintegerTotal units purchased
financial_statusstringpaid, refunded, partially_refunded
fulfillment_statusstringfulfilled, partial, unfulfilled
placed_attimestampISO 8601 with tz
tagsstringShopify order tags

This schema supports audiences like:

  • "Customers who placed > 3 orders in the last 90 days"
  • "Customers with order value > $500 and fully paid"
  • "Customers who had a refund in the last 30 days"

Avoiding duplicate events on retries

Commerce events (payments, orders) are especially important to keep deduplicated — double-counting revenue corrupts analytics.

Always derive idempotency keys from the source system's immutable identifiers:

# Stable across any number of retries:
idempotency_key = f"shopify:order:{order_id}:created"         # append
idempotency_key = f"stripe:invoice:{invoice_id}:paid"         # append
idempotency_key = f"stripe:invoice:{invoice_id}:paid:profile" # upsert
 
# WRONG — will generate a new key on each retry:
idempotency_key = str(uuid.uuid4())

See Also