Webhook Handler Integration
Receive events from Stripe, Shopify, Segment, HubSpot, or any other webhook source and write the relevant data into the CDP synchronously. This guide covers the full production pattern: signature verification, idempotency, selective field writes, retry handling, and returning correct HTTP status codes to your provider.
The core pattern
A webhook handler has three jobs:
- Verify the request came from your provider (not an attacker)
- Extract the relevant fields from the event payload
- Write to the CDP — then return
200 OKto the provider
import os, hmac, hashlib, time, requests
from fastapi import FastAPI, Request, HTTPException, Header
app = FastAPI()
API_KEY = os.environ["EXPERITURE_API_KEY"]
BASE_URL = "https://api.experiture.ai/public/v1"
def cdp_upsert(object_name: str, record: dict, match_key: str, idempotency_key: str):
resp = requests.post(
f"{BASE_URL}/records/{object_name}/upsert",
headers={
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json",
"Idempotency-Key": idempotency_key,
},
json={"record": record, "matchKey": match_key},
)
if resp.status_code in (429, 500, 502, 503, 504):
raise HTTPException(status_code=503, detail="upstream_retry")
resp.raise_for_status()
return resp.json()
def cdp_append(object_name: str, record: dict, idempotency_key: str):
resp = requests.post(
f"{BASE_URL}/records/{object_name}/append",
headers={
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json",
"Idempotency-Key": idempotency_key,
},
json={"record": record},
)
if resp.status_code in (429, 500, 502, 503, 504):
raise HTTPException(status_code=503, detail="upstream_retry")
resp.raise_for_status()
return resp.json()Return 200 fast. Most providers retry on anything other than a 2xx. If your handler is slow, you'll get duplicate deliveries. Write to the CDP synchronously, but keep the handler lean — don't do work that doesn't need to be in the critical path.
Stripe
Subscription events
import stripe
STRIPE_WEBHOOK_SECRET = "whsec_..."
@app.post("/webhooks/stripe")
async def handle_stripe(request: Request, stripe_signature: str = Header(None)):
body = await request.body()
# Verify signature
try:
event = stripe.Webhook.construct_event(body, stripe_signature, STRIPE_WEBHOOK_SECRET)
except stripe.error.SignatureVerificationError:
raise HTTPException(status_code=400, detail="Invalid signature")
event_type = event["type"]
event_id = event["id"]
if event_type == "customer.subscription.updated":
sub = event["data"]["object"]
cdp_upsert(
object_name="profiles",
record={
"email": sub.get("customer_email") or fetch_stripe_customer_email(sub["customer"]),
"subscription_status": sub["status"],
"subscription_plan": sub["plan"]["nickname"],
"subscription_updated_at": _ts(sub["current_period_end"]),
"trial_end_at": _ts(sub.get("trial_end")) if sub.get("trial_end") else None,
},
match_key="email",
idempotency_key=event_id,
)
elif event_type == "invoice.payment_succeeded":
invoice = event["data"]["object"]
cdp_append(
object_name="payments",
record={
"stripe_invoice_id": invoice["id"],
"customer_email": invoice["customer_email"],
"amount_paid": invoice["amount_paid"] / 100,
"currency": invoice["currency"].upper(),
"paid_at": _ts(invoice["status_transitions"]["paid_at"]),
},
idempotency_key=event_id,
)
return {"ok": True}
def _ts(unix: int | None) -> str | None:
if unix is None:
return None
from datetime import datetime, timezone
return datetime.fromtimestamp(unix, tz=timezone.utc).isoformat()Key rules for Stripe
- Use
event["id"]as the idempotency key — Stripe guarantees it's stable across retries - Stripe may send the same event more than once; idempotency prevents double-writes
- Return
503(not500) for transient CDP errors — most providers retry on5xx, which is what you want
Shopify
Order and customer events
import hashlib, hmac, base64
SHOPIFY_WEBHOOK_SECRET = "your_shopify_secret"
def verify_shopify_signature(body: bytes, hmac_header: str) -> bool:
digest = hmac.new(
SHOPIFY_WEBHOOK_SECRET.encode("utf-8"),
body,
hashlib.sha256,
).digest()
return base64.b64encode(digest).decode() == hmac_header
@app.post("/webhooks/shopify/orders")
async def handle_shopify_order(
request: Request,
x_shopify_hmac_sha256: str = Header(None),
x_shopify_order_id: str = Header(None),
):
body = await request.body()
if not verify_shopify_signature(body, x_shopify_hmac_sha256):
raise HTTPException(status_code=401, detail="Invalid HMAC")
import json
order = json.loads(body)
cdp_upsert(
object_name="profiles",
record={
"email": order["email"].strip().lower(),
"first_name": order["billing_address"]["first_name"],
"last_name": order["billing_address"]["last_name"],
"order_count": order["customer"]["orders_count"],
"ltv": float(order["customer"]["total_spent"]),
"last_order_at": order["created_at"],
},
match_key="email",
idempotency_key=f"shopify:order:{order['id']}:profile",
)
cdp_append(
object_name="orders",
record={
"shopify_order_id": str(order["id"]),
"customer_email": order["email"].strip().lower(),
"total": float(order["total_price"]),
"currency": order["currency"],
"line_item_count": len(order["line_items"]),
"placed_at": order["created_at"],
},
idempotency_key=f"shopify:order:{order['id']}:event",
)
return {"ok": True}Segment (Personas)
If you're receiving Segment track or identify calls forwarded to your endpoint:
import hashlib, hmac
SEGMENT_SHARED_SECRET = "your_segment_secret"
def verify_segment_signature(body: bytes, signature: str) -> bool:
expected = hmac.new(SEGMENT_SHARED_SECRET.encode(), body, hashlib.sha1).hexdigest()
return hmac.compare_digest(expected, signature)
@app.post("/webhooks/segment")
async def handle_segment(request: Request, x_signature: str = Header(None)):
body = await request.body()
if not verify_segment_signature(body, x_signature):
raise HTTPException(status_code=401)
import json
event = json.loads(body)
event_type = event.get("type")
message_id = event["messageId"]
if event_type == "identify":
traits = event.get("traits", {})
record = {"email": traits.get("email") or event.get("userId")}
for field in ("firstName", "lastName", "phone", "company", "plan"):
if field in traits:
snake = _camel_to_snake(field)
record[snake] = traits[field]
cdp_upsert(
object_name="profiles",
record=record,
match_key="email",
idempotency_key=message_id,
)
elif event_type == "track":
cdp_append(
object_name="events",
record={
"event_name": event["event"],
"user_email": event.get("email") or event.get("userId"),
"properties": json.dumps(event.get("properties", {})),
"occurred_at": event["timestamp"],
},
idempotency_key=message_id,
)
return {"ok": True}
def _camel_to_snake(name: str) -> str:
import re
return re.sub(r"(?<!^)(?=[A-Z])", "_", name).lower()Idempotency keys for webhooks
Always use the provider's event ID as your idempotency key. This table summarizes where to find it per provider:
| Provider | Idempotency key location |
|---|---|
| Stripe | event["id"] — e.g. evt_1PXyz... |
| Shopify | Construct from order/customer ID: shopify:order:{id} |
| Segment | event["messageId"] |
| HubSpot | event["eventId"] in webhook payload |
| SendGrid | event["sg_message_id"] per event object |
| Twilio | MessageSid from form-encoded body |
If your provider doesn't supply a stable event ID, derive one from the payload content:
import hashlib, json
def derive_idempotency_key(event: dict) -> str:
canonical = json.dumps(event, sort_keys=True)
return hashlib.sha256(canonical.encode()).hexdigest()[:32]Returning the right status codes
Your HTTP response controls whether the provider retries:
| Your response | Provider behavior |
|---|---|
200 / 201 | Delivery confirmed — no retry |
4xx (except 429) | Delivery failed permanently — provider stops retrying (varies by provider) |
429 | Rate limited — provider backs off and retries |
5xx | Transient failure — provider retries |
Map CDP errors to the right status:
resp = requests.post(
f"{BASE_URL}/records/profiles/upsert",
headers={"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"},
json={"record": record, "matchKey": "email"},
)
if resp.status_code in (429, 500, 502, 503, 504):
# Transient — tell provider to retry
raise HTTPException(status_code=503, detail="upstream_unavailable")
elif resp.status_code == 422:
# Schema error — data is bad, don't retry (log and alert)
logger.error("Schema mismatch writing event %s: %s", event_id, resp.text)
return {"ok": True} # Return 200 to prevent retry storm on bad data
elif not resp.ok:
raise HTTPException(status_code=400, detail=resp.text)For 422 CDP_ETL.VALIDATION.REQUEST_SCHEMA errors: return 200 anyway. The data is malformed and retrying won't fix it. Instead, log the error and investigate your mapping. Returning 500 on schema errors causes infinite retry storms.
Handling out-of-order delivery
Webhooks don't arrive in chronological order. Stripe may send customer.subscription.updated with status: active after sending it with status: cancelled, if the events were generated close together.
For time-sensitive fields, include a *_updated_at timestamp alongside the value:
cdp_upsert(
object_name="profiles",
record={
"email": customer_email,
"subscription_status": sub["status"],
"subscription_updated_at": _ts(sub["current_period_end"]),
},
match_key="email",
idempotency_key=event_id,
)Your campaign rules can then filter on subscription_updated_at > 2026-04-01 to ensure they're using fresh data.
Local testing with ngrok
To test your webhook handler locally before deploying:
# Expose local port 8000 to the internet
ngrok http 8000
# Your public URL e.g. https://abc123.ngrok.io/webhooks/stripe
# Register this URL in the Stripe dashboard as your webhook endpointUse ngrok inspect (at localhost:4040) to replay recent events and debug responses without triggering new events in your provider.
See Also
- Real-time Record Writes — full upsert mechanics, retry logic
- E-Commerce Events — Shopify/Stripe event schema patterns
- Error Handling & Retries — production-grade retry and failure handling
- Authentication — token scopes for webhook sink services