Guides
Webhook Handler Integration

Webhook Handler Integration

Receive events from Stripe, Shopify, Segment, HubSpot, or any other webhook source and write the relevant data into the CDP synchronously. This guide covers the full production pattern: signature verification, idempotency, selective field writes, retry handling, and returning correct HTTP status codes to your provider.


The core pattern

A webhook handler has three jobs:

  1. Verify the request came from your provider (not an attacker)
  2. Extract the relevant fields from the event payload
  3. Write to the CDP — then return 200 OK to the provider
import os, hmac, hashlib, time, requests
from fastapi import FastAPI, Request, HTTPException, Header
 
app = FastAPI()
API_KEY = os.environ["EXPERITURE_API_KEY"]
BASE_URL = "https://api.experiture.ai/public/v1"
 
def cdp_upsert(object_name: str, record: dict, match_key: str, idempotency_key: str):
    resp = requests.post(
        f"{BASE_URL}/records/{object_name}/upsert",
        headers={
            "Authorization": f"Bearer {API_KEY}",
            "Content-Type": "application/json",
            "Idempotency-Key": idempotency_key,
        },
        json={"record": record, "matchKey": match_key},
    )
    if resp.status_code in (429, 500, 502, 503, 504):
        raise HTTPException(status_code=503, detail="upstream_retry")
    resp.raise_for_status()
    return resp.json()
 
def cdp_append(object_name: str, record: dict, idempotency_key: str):
    resp = requests.post(
        f"{BASE_URL}/records/{object_name}/append",
        headers={
            "Authorization": f"Bearer {API_KEY}",
            "Content-Type": "application/json",
            "Idempotency-Key": idempotency_key,
        },
        json={"record": record},
    )
    if resp.status_code in (429, 500, 502, 503, 504):
        raise HTTPException(status_code=503, detail="upstream_retry")
    resp.raise_for_status()
    return resp.json()

Return 200 fast. Most providers retry on anything other than a 2xx. If your handler is slow, you'll get duplicate deliveries. Write to the CDP synchronously, but keep the handler lean — don't do work that doesn't need to be in the critical path.


Stripe

Subscription events

import stripe
 
STRIPE_WEBHOOK_SECRET = "whsec_..."
 
@app.post("/webhooks/stripe")
async def handle_stripe(request: Request, stripe_signature: str = Header(None)):
    body = await request.body()
 
    # Verify signature
    try:
        event = stripe.Webhook.construct_event(body, stripe_signature, STRIPE_WEBHOOK_SECRET)
    except stripe.error.SignatureVerificationError:
        raise HTTPException(status_code=400, detail="Invalid signature")
 
    event_type = event["type"]
    event_id = event["id"]
 
    if event_type == "customer.subscription.updated":
        sub = event["data"]["object"]
        cdp_upsert(
            object_name="profiles",
            record={
                "email": sub.get("customer_email") or fetch_stripe_customer_email(sub["customer"]),
                "subscription_status": sub["status"],
                "subscription_plan": sub["plan"]["nickname"],
                "subscription_updated_at": _ts(sub["current_period_end"]),
                "trial_end_at": _ts(sub.get("trial_end")) if sub.get("trial_end") else None,
            },
            match_key="email",
            idempotency_key=event_id,
        )
 
    elif event_type == "invoice.payment_succeeded":
        invoice = event["data"]["object"]
        cdp_append(
            object_name="payments",
            record={
                "stripe_invoice_id": invoice["id"],
                "customer_email":   invoice["customer_email"],
                "amount_paid":      invoice["amount_paid"] / 100,
                "currency":         invoice["currency"].upper(),
                "paid_at":          _ts(invoice["status_transitions"]["paid_at"]),
            },
            idempotency_key=event_id,
        )
 
    return {"ok": True}
 
def _ts(unix: int | None) -> str | None:
    if unix is None:
        return None
    from datetime import datetime, timezone
    return datetime.fromtimestamp(unix, tz=timezone.utc).isoformat()

Key rules for Stripe

  • Use event["id"] as the idempotency key — Stripe guarantees it's stable across retries
  • Stripe may send the same event more than once; idempotency prevents double-writes
  • Return 503 (not 500) for transient CDP errors — most providers retry on 5xx, which is what you want

Shopify

Order and customer events

import hashlib, hmac, base64
 
SHOPIFY_WEBHOOK_SECRET = "your_shopify_secret"
 
def verify_shopify_signature(body: bytes, hmac_header: str) -> bool:
    digest = hmac.new(
        SHOPIFY_WEBHOOK_SECRET.encode("utf-8"),
        body,
        hashlib.sha256,
    ).digest()
    return base64.b64encode(digest).decode() == hmac_header
 
@app.post("/webhooks/shopify/orders")
async def handle_shopify_order(
    request: Request,
    x_shopify_hmac_sha256: str = Header(None),
    x_shopify_order_id: str = Header(None),
):
    body = await request.body()
    if not verify_shopify_signature(body, x_shopify_hmac_sha256):
        raise HTTPException(status_code=401, detail="Invalid HMAC")
 
    import json
    order = json.loads(body)
 
    cdp_upsert(
        object_name="profiles",
        record={
            "email":          order["email"].strip().lower(),
            "first_name":     order["billing_address"]["first_name"],
            "last_name":      order["billing_address"]["last_name"],
            "order_count":    order["customer"]["orders_count"],
            "ltv":            float(order["customer"]["total_spent"]),
            "last_order_at":  order["created_at"],
        },
        match_key="email",
        idempotency_key=f"shopify:order:{order['id']}:profile",
    )
 
    cdp_append(
        object_name="orders",
        record={
            "shopify_order_id": str(order["id"]),
            "customer_email":   order["email"].strip().lower(),
            "total":            float(order["total_price"]),
            "currency":         order["currency"],
            "line_item_count":  len(order["line_items"]),
            "placed_at":        order["created_at"],
        },
        idempotency_key=f"shopify:order:{order['id']}:event",
    )
 
    return {"ok": True}

Segment (Personas)

If you're receiving Segment track or identify calls forwarded to your endpoint:

import hashlib, hmac
 
SEGMENT_SHARED_SECRET = "your_segment_secret"
 
def verify_segment_signature(body: bytes, signature: str) -> bool:
    expected = hmac.new(SEGMENT_SHARED_SECRET.encode(), body, hashlib.sha1).hexdigest()
    return hmac.compare_digest(expected, signature)
 
@app.post("/webhooks/segment")
async def handle_segment(request: Request, x_signature: str = Header(None)):
    body = await request.body()
    if not verify_segment_signature(body, x_signature):
        raise HTTPException(status_code=401)
 
    import json
    event = json.loads(body)
    event_type = event.get("type")
    message_id = event["messageId"]
 
    if event_type == "identify":
        traits = event.get("traits", {})
        record = {"email": traits.get("email") or event.get("userId")}
        for field in ("firstName", "lastName", "phone", "company", "plan"):
            if field in traits:
                snake = _camel_to_snake(field)
                record[snake] = traits[field]
        cdp_upsert(
            object_name="profiles",
            record=record,
            match_key="email",
            idempotency_key=message_id,
        )
 
    elif event_type == "track":
        cdp_append(
            object_name="events",
            record={
                "event_name":   event["event"],
                "user_email":   event.get("email") or event.get("userId"),
                "properties":   json.dumps(event.get("properties", {})),
                "occurred_at":  event["timestamp"],
            },
            idempotency_key=message_id,
        )
 
    return {"ok": True}
 
def _camel_to_snake(name: str) -> str:
    import re
    return re.sub(r"(?<!^)(?=[A-Z])", "_", name).lower()

Idempotency keys for webhooks

Always use the provider's event ID as your idempotency key. This table summarizes where to find it per provider:

ProviderIdempotency key location
Stripeevent["id"] — e.g. evt_1PXyz...
ShopifyConstruct from order/customer ID: shopify:order:{id}
Segmentevent["messageId"]
HubSpotevent["eventId"] in webhook payload
SendGridevent["sg_message_id"] per event object
TwilioMessageSid from form-encoded body

If your provider doesn't supply a stable event ID, derive one from the payload content:

import hashlib, json
 
def derive_idempotency_key(event: dict) -> str:
    canonical = json.dumps(event, sort_keys=True)
    return hashlib.sha256(canonical.encode()).hexdigest()[:32]

Returning the right status codes

Your HTTP response controls whether the provider retries:

Your responseProvider behavior
200 / 201Delivery confirmed — no retry
4xx (except 429)Delivery failed permanently — provider stops retrying (varies by provider)
429Rate limited — provider backs off and retries
5xxTransient failure — provider retries

Map CDP errors to the right status:

resp = requests.post(
    f"{BASE_URL}/records/profiles/upsert",
    headers={"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"},
    json={"record": record, "matchKey": "email"},
)
if resp.status_code in (429, 500, 502, 503, 504):
    # Transient — tell provider to retry
    raise HTTPException(status_code=503, detail="upstream_unavailable")
elif resp.status_code == 422:
    # Schema error — data is bad, don't retry (log and alert)
    logger.error("Schema mismatch writing event %s: %s", event_id, resp.text)
    return {"ok": True}  # Return 200 to prevent retry storm on bad data
elif not resp.ok:
    raise HTTPException(status_code=400, detail=resp.text)

For 422 CDP_ETL.VALIDATION.REQUEST_SCHEMA errors: return 200 anyway. The data is malformed and retrying won't fix it. Instead, log the error and investigate your mapping. Returning 500 on schema errors causes infinite retry storms.


Handling out-of-order delivery

Webhooks don't arrive in chronological order. Stripe may send customer.subscription.updated with status: active after sending it with status: cancelled, if the events were generated close together.

For time-sensitive fields, include a *_updated_at timestamp alongside the value:

cdp_upsert(
    object_name="profiles",
    record={
        "email": customer_email,
        "subscription_status": sub["status"],
        "subscription_updated_at": _ts(sub["current_period_end"]),
    },
    match_key="email",
    idempotency_key=event_id,
)

Your campaign rules can then filter on subscription_updated_at > 2026-04-01 to ensure they're using fresh data.


Local testing with ngrok

To test your webhook handler locally before deploying:

# Expose local port 8000 to the internet
ngrok http 8000
 
# Your public URL e.g. https://abc123.ngrok.io/webhooks/stripe
# Register this URL in the Stripe dashboard as your webhook endpoint

Use ngrok inspect (at localhost:4040) to replay recent events and debug responses without triggering new events in your provider.


See Also