Temps réel et événements#

Le temps réel en APIs recouvre des réalités très différentes : un chat qui affiche les messages en quelques centaines de millisecondes, un tableau de bord qui se rafraîchit toutes les secondes, un système de paiement qui notifie un partenaire d’une transaction. Chacun de ces cas appelle une technologie différente. Ce chapitre couvre le panorama complet — polling, SSE, WebSocket, webhooks — et approfondit la conception robuste : delivery guarantees, signature HMAC, retry avec backoff, et debugging.

Panorama des options#

Polling#

La forme la plus simple : le client interroge périodiquement le serveur.

Client → GET /api/notifications → Serveur
(attendre 5 secondes)
Client → GET /api/notifications → Serveur
(attendre 5 secondes)
...

Avantages : aucun état persistant côté serveur, compatible avec tous les proxies et firewalls, simple à implémenter et déboguer.

Inconvénients : latence de mise à jour = intervalle de polling. Charge serveur proportionnelle au nombre de clients × fréquence, même quand il n’y a rien de nouveau.

Quand l’utiliser : données qui changent rarement et où une latence de plusieurs secondes est acceptable. Dashboards internes, statuts de jobs.

Long Polling#

Le client envoie une requête que le serveur maintient ouverte jusqu’à ce qu’une mise à jour soit disponible ou qu’un timeout soit atteint.

Client → GET /api/notifications?timeout=30 → Serveur
(serveur bloque jusqu'à un événement ou 30s)
Serveur → 200 {"event": "message", "data": {...}} → Client
Client → GET /api/notifications?timeout=30 → Serveur (immédiatement)

Avantages : latence proche du temps réel. Compatible avec HTTP/1.1 et les proxies standards.

Inconvénients : maintient une connexion TCP par client. Complexité de gestion des timeouts et reconnexions. Les requêtes fréquentes génèrent du trafic HTTP overhead.

Server-Sent Events (SSE)#

Une connexion HTTP persistante sur laquelle le serveur envoie des événements en texte (text/event-stream). Le client utilise l’API EventSource.

Client → GET /api/stream → Serveur (connexion persistante)
Serveur → data: {"type":"message","text":"..."}\n\n → Client
Serveur → data: {"type":"presence","users":5}\n\n → Client

Avantages : protocole standard HTTP, reconnexion automatique native (EventSource), simple côté serveur, compatible avec les proxies HTTP.

Inconvénients : unidirectionnel (serveur → client uniquement). Limité à 6 connexions simultanées par domaine en HTTP/1.1 (levé avec HTTP/2). Pas de support binaire natif.

WebSocket#

Protocole bidirectionnel basé sur une connexion TCP persistante, initialisée par un HTTP Upgrade.

Client → HTTP GET (Upgrade: websocket) → Serveur
Serveur → 101 Switching Protocols → Client
(canal bidirectionnel ouvert)
Client ↔ frames binaires ou texte ↔ Serveur

Avantages : bidirectionnel, faible overhead par message (2-14 octets de header), support binaire, latence minimale.

Inconvénients : état persistant côté serveur (scalabilité horizontale nécessite un broker partagé : Redis, etc.). Plus complexe à déboguer. Certains proxies corporate bloquent les WebSockets.

WebRTC Data Channel#

Canal de données pair-à-pair basé sur SCTP, utilisé principalement pour les communications navigateur-à-navigateur.

Quand l’utiliser : partage de fichiers P2P, jeux en temps réel, visioconférence avec données annexes. Rarement approprié pour des APIs serveur-client.

Tableau récapitulatif#

Technologie

Direction

Latence

Complexité

Scalabilité

Cas d’usage

Polling

C → S

Secondes

Très faible

Excellente

Statut de jobs

Long Polling

S → C

~100ms

Faible

Bonne

Notifications simples

SSE

S → C

~50ms

Faible

Bonne

Live feed, dashboard

WebSocket

Bidirectionnel

~10ms

Moyenne

Complexe

Chat, collaboration

WebRTC

P2P

~5ms

Haute

N/A

Vidéo, gaming

WebSocket : patterns applicatifs#

Rooms et channels#

Le pattern rooms (ou channels) organise les connexions WebSocket par sujet. Chaque client s’abonne à des rooms ; les messages sont diffusés aux abonnés de la room concernée.

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Dict, Set

app = FastAPI()

class RoomManager:
    def __init__(self):
        # room_id → ensemble de connexions WebSocket actives
        self.rooms: Dict[str, Set[WebSocket]] = {}

    async def join(self, room_id: str, websocket: WebSocket):
        await websocket.accept()
        if room_id not in self.rooms:
            self.rooms[room_id] = set()
        self.rooms[room_id].add(websocket)

    async def leave(self, room_id: str, websocket: WebSocket):
        if room_id in self.rooms:
            self.rooms[room_id].discard(websocket)
            if not self.rooms[room_id]:
                del self.rooms[room_id]

    async def broadcast(self, room_id: str, message: dict, exclude: WebSocket = None):
        if room_id not in self.rooms:
            return
        dead = set()
        for ws in self.rooms[room_id]:
            if ws == exclude:
                continue
            try:
                await ws.send_json(message)
            except Exception:
                dead.add(ws)
        # Nettoyer les connexions mortes
        self.rooms[room_id] -= dead

    def presence(self, room_id: str) -> int:
        return len(self.rooms.get(room_id, set()))

manager = RoomManager()

@app.websocket("/ws/{room_id}")
async def websocket_endpoint(websocket: WebSocket, room_id: str):
    await manager.join(room_id, websocket)
    await manager.broadcast(room_id, {
        "type": "presence",
        "count": manager.presence(room_id)
    })
    try:
        while True:
            data = await websocket.receive_json()
            await manager.broadcast(room_id, {
                "type": "message",
                "text": data.get("text", ""),
                "room": room_id,
            }, exclude=websocket)
    except WebSocketDisconnect:
        await manager.leave(room_id, websocket)
        await manager.broadcast(room_id, {
            "type": "presence",
            "count": manager.presence(room_id)
        })

Heartbeat et reconnexion automatique#

Les connexions WebSocket peuvent être coupées silencieusement (timeout NAT, pare-feu, réseau mobile). Le heartbeat détecte ces coupures :

import asyncio
import json

@app.websocket("/ws/{room_id}")
async def websocket_with_heartbeat(websocket: WebSocket, room_id: str):
    await manager.join(room_id, websocket)

    async def heartbeat():
        while True:
            await asyncio.sleep(30)
            try:
                await websocket.send_json({"type": "ping"})
            except Exception:
                break

    heartbeat_task = asyncio.create_task(heartbeat())
    try:
        while True:
            data = await websocket.receive_json()
            if data.get("type") == "pong":
                continue  # réponse au heartbeat
            await manager.broadcast(room_id, data, exclude=websocket)
    except WebSocketDisconnect:
        heartbeat_task.cancel()
        await manager.leave(room_id, websocket)

Côté client JavaScript, la reconnexion automatique est implémentée avec un backoff exponentiel :

class ReconnectingWebSocket {
  constructor(url, options = {}) {
    this.url = url;
    this.maxDelay = options.maxDelay || 30000;
    this.attempts = 0;
    this.connect();
  }

  connect() {
    this.ws = new WebSocket(this.url);
    this.ws.onopen = () => { this.attempts = 0; };
    this.ws.onclose = () => {
      const delay = Math.min(1000 * 2 ** this.attempts, this.maxDelay);
      this.attempts++;
      setTimeout(() => this.connect(), delay);
    };
    this.ws.onmessage = (event) => this.onmessage?.(JSON.parse(event.data));
  }
}

Server-Sent Events#

Format du flux#

Le format text/event-stream est simple : chaque événement est une suite de lignes clé:valeur, séparées par une ligne vide.

id: 42\n
event: message\n
data: {"text": "Bonjour", "user": "Alice"}\n
\n
id: 43\n
event: presence\n
data: {"count": 5}\n
retry: 3000\n
\n

Champs disponibles :

  • data : contenu de l’événement (obligatoire, peut être multiligne)

  • event : type d’événement personnalisé

  • id : identifiant pour la reprise (Last-Event-ID)

  • retry : délai de reconnexion en ms que le client doit utiliser

Implémentation FastAPI#

from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import asyncio
import json

app = FastAPI()

async def event_generator(request: Request, channel: str):
    last_id = request.headers.get("Last-Event-ID")
    event_id = int(last_id) + 1 if last_id else 1

    # Si last_id est fourni, rejouer les événements manqués depuis le store
    if last_id:
        missed = await get_events_since(channel, int(last_id))
        for event in missed:
            yield f"id: {event['id']}\ndata: {json.dumps(event['data'])}\n\n"

    pubsub = get_pubsub()
    async with pubsub.subscribe(f"channel:{channel}") as sub:
        async for message in sub:
            if await request.is_disconnected():
                break
            yield f"id: {event_id}\nevent: {message['type']}\ndata: {json.dumps(message['data'])}\n\n"
            event_id += 1

@app.get("/sse/{channel}")
async def sse_endpoint(channel: str, request: Request):
    return StreamingResponse(
        event_generator(request, channel),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",  # désactiver le buffering nginx
        }
    )

Reconnexion automatique et Last-Event-ID#

L’API EventSource du navigateur gère automatiquement la reconnexion. Elle renvoie le Last-Event-ID du dernier événement reçu dans les en-têtes de la nouvelle connexion :

const source = new EventSource('/sse/general');

source.onopen = () => console.log('Connexion SSE établie');
source.onerror = (e) => console.error('Erreur SSE, reconnexion automatique...');

source.addEventListener('message', (event) => {
  const data = JSON.parse(event.data);
  console.log('Message:', data);
});

source.addEventListener('presence', (event) => {
  const data = JSON.parse(event.data);
  console.log(`Utilisateurs connectés : ${data.count}`);
});

Limitations#

SSE et HTTP/2

En HTTP/1.1, les navigateurs limitent à 6 connexions simultanées par domaine. Si l’utilisateur ouvre plusieurs onglets avec des connexions SSE, ils se concurrencent. HTTP/2 lève cette limite grâce au multiplexing — toutes les connexions SSE d’un même domaine partagent une seule connexion TCP.

Webhooks#

Définition et modèle de delivery#

Un webhook est un mécanisme de callback HTTP : au lieu que le client interroge le serveur, le serveur appelle une URL fournie par le client quand un événement se produit.

Événement → Producteur → POST /webhook/url (payload JSON) → Consommateur

Le modèle de delivery dominant est at-least-once : le producteur réessaie jusqu’à obtenir un accusé de réception (HTTP 2xx). Cela implique que le consommateur peut recevoir le même événement plusieurs fois — l”idempotence est donc obligatoire côté récepteur.

Retry avec backoff exponentiel#

Un système de webhook robuste doit réessayer les livraisons échouées avec un délai croissant pour ne pas surcharger un consommateur déjà en difficulté :

import asyncio
import httpx
import time
import logging
from dataclasses import dataclass, field
from typing import Optional

logger = logging.getLogger(__name__)

@dataclass
class DeliveryAttempt:
    attempt_number: int
    timestamp: float
    status_code: Optional[int]
    error: Optional[str]
    duration_ms: float

@dataclass
class WebhookDelivery:
    event_id: str
    url: str
    payload: dict
    max_attempts: int = 5
    base_delay: float = 1.0   # délai initial en secondes
    max_delay: float = 300.0  # délai maximum en secondes (5 minutes)
    attempts: list = field(default_factory=list)

    def next_delay(self) -> float:
        n = len(self.attempts)
        # Backoff exponentiel avec jitter pour éviter les avalanches
        import random
        delay = min(self.base_delay * (2 ** n), self.max_delay)
        jitter = random.uniform(0, delay * 0.1)
        return delay + jitter

    async def deliver(self) -> bool:
        """Tente de livrer le webhook. Retourne True si succès."""
        for attempt_num in range(1, self.max_attempts + 1):
            start = time.perf_counter()
            try:
                async with httpx.AsyncClient(timeout=10.0) as client:
                    response = await client.post(
                        self.url,
                        json=self.payload,
                        headers={
                            "Content-Type": "application/json",
                            "X-Webhook-Event": self.payload.get("type", "event"),
                            "X-Webhook-ID": self.event_id,
                            "X-Webhook-Attempt": str(attempt_num),
                        }
                    )
                elapsed = (time.perf_counter() - start) * 1000
                self.attempts.append(DeliveryAttempt(
                    attempt_number=attempt_num,
                    timestamp=time.time(),
                    status_code=response.status_code,
                    error=None,
                    duration_ms=elapsed,
                ))

                if 200 <= response.status_code < 300:
                    logger.info("Webhook %s livré (tentative %d, %dms)",
                                self.event_id, attempt_num, elapsed)
                    return True

                logger.warning("Webhook %s : HTTP %d (tentative %d/%d)",
                               self.event_id, response.status_code,
                               attempt_num, self.max_attempts)

            except Exception as exc:
                elapsed = (time.perf_counter() - start) * 1000
                self.attempts.append(DeliveryAttempt(
                    attempt_number=attempt_num,
                    timestamp=time.time(),
                    status_code=None,
                    error=str(exc),
                    duration_ms=elapsed,
                ))
                logger.error("Webhook %s erreur réseau (tentative %d): %s",
                             self.event_id, attempt_num, exc)

            if attempt_num < self.max_attempts:
                delay = self.next_delay()
                logger.info("Webhook %s : prochain essai dans %.1fs", self.event_id, delay)
                await asyncio.sleep(delay)

        logger.error("Webhook %s : échec définitif après %d tentatives",
                     self.event_id, self.max_attempts)
        return False

Idempotence côté récepteur#

# Exemple : stocker les event_id traités pour détecter les doublons
from fastapi import FastAPI, Request, Header
from typing import Optional
import redis.asyncio as redis

app = FastAPI()
cache = redis.from_url("redis://localhost")

@app.post("/webhooks/payments")
async def receive_payment_webhook(
    request: Request,
    x_webhook_id: Optional[str] = Header(None),
):
    body = await request.body()

    # 1. Vérifier la signature (voir section 5)
    verify_signature(body, request.headers.get("X-Webhook-Signature"))

    # 2. Vérifier l'idempotence
    if x_webhook_id:
        already_processed = await cache.get(f"webhook:{x_webhook_id}")
        if already_processed:
            return {"status": "already_processed"}

    payload = await request.json()

    # 3. Traiter l'événement
    await process_payment_event(payload)

    # 4. Marquer comme traité (TTL 7 jours)
    if x_webhook_id:
        await cache.setex(f"webhook:{x_webhook_id}", 604800, "1")

    return {"status": "ok"}

Ordre des événements#

Les webhooks at-least-once ne garantissent pas l’ordre. Deux solutions :

  • Inclure un timestamp ou un numéro de séquence dans chaque événement et ignorer les événements plus anciens que l’état courant.

  • Utiliser un champ de version : traiter uniquement si event.version > current_version.

Signature et vérification de webhooks#

Pourquoi signer#

N’importe qui peut envoyer une requête POST à votre endpoint de webhook. La signature HMAC-SHA256 prouve que le payload provient bien du producteur légitime (qui connaît la clé secrète partagée).

Implémentation HMAC-SHA256#

Le producteur inclut dans les en-têtes une signature calculée sur le corps de la requête :

import hmac
import hashlib
import time

def sign_webhook_payload(secret: str, payload: bytes, timestamp: int = None) -> str:
    """
    Génère la signature HMAC-SHA256 d'un payload webhook.
    Inclut un timestamp pour prévenir les attaques par replay.
    """
    if timestamp is None:
        timestamp = int(time.time())
    # Le message signé inclut le timestamp pour prévenir le replay
    message = f"{timestamp}.".encode() + payload
    signature = hmac.new(
        secret.encode(),
        message,
        hashlib.sha256
    ).hexdigest()
    return f"t={timestamp},v1={signature}"

def verify_webhook_signature(
    secret: str,
    payload: bytes,
    signature_header: str,
    tolerance_seconds: int = 300,  # 5 minutes de tolérance
) -> bool:
    """
    Vérifie la signature d'un webhook entrant.
    Lève ValueError si la signature est invalide ou expirée.
    """
    # Parser l'en-tête "t=1234567890,v1=abc123..."
    parts = dict(item.split("=", 1) for item in signature_header.split(","))
    timestamp = parts.get("t")
    expected_sig = parts.get("v1")

    if not timestamp or not expected_sig:
        raise ValueError("En-tête de signature malformé")

    # Vérifier que le timestamp n'est pas trop ancien
    event_time = int(timestamp)
    current_time = int(time.time())
    if abs(current_time - event_time) > tolerance_seconds:
        raise ValueError(
            f"Signature expirée : delta = {abs(current_time - event_time)}s "
            f"(tolérance = {tolerance_seconds}s)"
        )

    # Recalculer la signature attendue
    message = f"{timestamp}.".encode() + payload
    computed_sig = hmac.new(
        secret.encode(),
        message,
        hashlib.sha256
    ).hexdigest()

    # Comparaison en temps constant (timing-safe)
    if not hmac.compare_digest(computed_sig, expected_sig):
        raise ValueError("Signature invalide")

    return True

Endpoint FastAPI avec vérification complète#

from fastapi import FastAPI, Request, HTTPException, Header
from typing import Optional

app = FastAPI()
WEBHOOK_SECRET = "whsec_votre_secret_partage_avec_le_producteur"

@app.post("/webhooks/stripe")
async def stripe_webhook(
    request: Request,
    stripe_signature: Optional[str] = Header(None, alias="Stripe-Signature"),
):
    payload = await request.body()

    if not stripe_signature:
        raise HTTPException(status_code=400, detail="Signature manquante")

    try:
        verify_webhook_signature(WEBHOOK_SECRET, payload, stripe_signature)
    except ValueError as e:
        raise HTTPException(status_code=400, detail=str(e))

    event = await request.json()

    match event.get("type"):
        case "payment_intent.succeeded":
            await handle_payment_succeeded(event["data"]["object"])
        case "payment_intent.payment_failed":
            await handle_payment_failed(event["data"]["object"])
        case _:
            pass  # événement non géré, retourner 200 quand même

    return {"received": True}

hmac.compare_digest est obligatoire

Ne jamais utiliser == pour comparer des signatures cryptographiques. La comparaison standard court-circuite dès la première différence, ce qui permet des attaques par timing : en mesurant le temps de réponse, un attaquant peut deviner la signature octet par octet. hmac.compare_digest compare en temps constant, indépendamment du point de divergence.

Debugging webhooks#

Outils de développement local#

En développement, votre serveur local n’est pas accessible depuis l’Internet public. Plusieurs outils créent des tunnels :

ngrok : le plus répandu. Crée une URL publique HTTPS qui redirige vers localhost.

# Exposer le port 8000 local
ngrok http 8000
# → https://abc123.ngrok.io pointe vers http://localhost:8000

# Avec un domaine personnalisé (payant)
ngrok http --domain=webhooks.mondomaine.com 8000

ngrok fournit une interface web (http://localhost:4040) qui affiche toutes les requêtes entrantes avec possibilité de replay.

webhook.site : service en ligne qui capture et affiche les webhooks entrants. Utile pour inspecter le format d’un webhook sans écrire de code.

Replay et dead letter queue#

En production, les livraisons échouées après tous les retries doivent aller dans une dead letter queue (DLQ) plutôt qu’être silencieusement abandonnées :

async def deliver_with_dlq(delivery: WebhookDelivery):
    success = await delivery.deliver()

    if not success:
        # Envoyer en DLQ (Redis Stream, SQS, Kafka Dead Letter Topic...)
        await push_to_dlq({
            "event_id": delivery.event_id,
            "url": delivery.url,
            "payload": delivery.payload,
            "attempts": [
                {
                    "number": a.attempt_number,
                    "status_code": a.status_code,
                    "error": a.error,
                    "duration_ms": a.duration_ms,
                }
                for a in delivery.attempts
            ],
            "failed_at": time.time(),
        })

# Interface de replay pour l'équipe ops
@app.post("/admin/webhooks/replay/{event_id}")
async def replay_webhook(event_id: str):
    event = await get_from_dlq(event_id)
    if not event:
        raise HTTPException(404, "Événement introuvable en DLQ")
    delivery = WebhookDelivery(
        event_id=event["event_id"],
        url=event["url"],
        payload=event["payload"],
        max_attempts=3,
    )
    success = await delivery.deliver()
    return {"success": success, "attempts": len(delivery.attempts)}

Choix technologique#

Decision tree#

Le choix de la technologie temps réel dépend de plusieurs facteurs :

  1. Communication bidirectionnelle nécessaire ?

    • Non → SSE ou Long Polling suffisent

    • Oui → WebSocket

  2. Le client est-il un navigateur web ?

    • Oui → SSE (simple), WebSocket (interactif)

    • Non (service, script) → Webhooks si push asynchrone

  3. Qui initie les messages ?

    • Serveur uniquement → SSE

    • Client et serveur → WebSocket

    • Serveur vers un autre serveur → Webhooks

  4. Fréquence des événements ?

    • Quelques par heure → Webhooks

    • Quelques par minute → Long Polling ou SSE

    • Plusieurs par seconde → WebSocket

  5. Garantie de delivery requise ?

    • Oui, at-least-once → Webhooks avec retry

    • Best-effort temps réel → WebSocket ou SSE

Cas d’usage typiques#

Cas d’usage

Technologie recommandée

Justification

Chat en temps réel

WebSocket

Bidirectionnel, faible latence

Notifications utilisateur

SSE ou Long Polling

Unidirectionnel, simple

Paiement → partenaire

Webhook

Asynchrone, delivery garanti

Live dashboard métriques

SSE

Flux serveur → client

Éditeur collaboratif

WebSocket

Bidirectionnel, opérationnel

Tracking livraison

Long Polling ou SSE

Rafraîchissement périodique

Synchronisation CI/CD

Webhook

Événementiel, inter-services

Cours boursiers

WebSocket

Volume élevé, faible latence


Cellules Python exécutables#

# Vérification de signature webhook HMAC-SHA256 (fonctionnel, stdlib uniquement)
import hmac
import hashlib
import time
import json

def sign_payload(secret: str, payload: bytes, timestamp: int = None) -> str:
    if timestamp is None:
        timestamp = int(time.time())
    message = f"{timestamp}.".encode() + payload
    sig = hmac.new(secret.encode(), message, hashlib.sha256).hexdigest()
    return f"t={timestamp},v1={sig}"

def verify_signature(secret: str, payload: bytes, header: str,
                     tolerance: int = 300) -> dict:
    parts = dict(item.split("=", 1) for item in header.split(","))
    timestamp_str = parts.get("t", "")
    received_sig = parts.get("v1", "")

    if not timestamp_str or not received_sig:
        return {"valid": False, "reason": "En-tête malformé"}

    try:
        event_time = int(timestamp_str)
    except ValueError:
        return {"valid": False, "reason": "Timestamp invalide"}

    age = abs(int(time.time()) - event_time)
    if age > tolerance:
        return {"valid": False, "reason": f"Signature expirée (âge={age}s > {tolerance}s)"}

    message = f"{timestamp_str}.".encode() + payload
    expected_sig = hmac.new(secret.encode(), message, hashlib.sha256).hexdigest()

    if not hmac.compare_digest(expected_sig, received_sig):
        return {"valid": False, "reason": "Signature incorrecte"}

    return {"valid": True, "reason": "OK", "age_seconds": age}

# Démonstration
SECRET = "whsec_monSecretPartage2024"
payload = json.dumps({
    "id": "evt_42",
    "type": "payment.succeeded",
    "data": {"amount": 4999, "currency": "EUR"}
}).encode()

# Cas 1 : signature valide
ts_now = int(time.time())
header_valid = sign_payload(SECRET, payload, timestamp=ts_now)
result = verify_signature(SECRET, payload, header_valid)
print(f"Cas 1 — Signature valide : {result}")

# Cas 2 : signature expirée (timestamp trop ancien)
ts_old = ts_now - 400  # 400 secondes = > 5 minutes de tolérance
header_old = sign_payload(SECRET, payload, timestamp=ts_old)
result = verify_signature(SECRET, payload, header_old)
print(f"Cas 2 — Signature expirée : {result}")

# Cas 3 : payload modifié (man-in-the-middle)
tampered_payload = json.dumps({
    "id": "evt_42",
    "type": "payment.succeeded",
    "data": {"amount": 1, "currency": "EUR"}  # montant modifié !
}).encode()
result = verify_signature(SECRET, tampered_payload, header_valid)
print(f"Cas 3 — Payload modifié : {result}")

# Cas 4 : mauvais secret
result = verify_signature("wrong_secret", payload, header_valid)
print(f"Cas 4 — Mauvais secret : {result}")

print("\n--- Détail de la signature générée ---")
print(f"Secret : {SECRET}")
print(f"Timestamp : {ts_now}")
print(f"Header : {header_valid[:80]}...")
Cas 1 — Signature valide : {'valid': True, 'reason': 'OK', 'age_seconds': 0}
Cas 2 — Signature expirée : {'valid': False, 'reason': 'Signature expirée (âge=400s > 300s)'}
Cas 3 — Payload modifié : {'valid': False, 'reason': 'Signature incorrecte'}
Cas 4 — Mauvais secret : {'valid': False, 'reason': 'Signature incorrecte'}

--- Détail de la signature générée ---
Secret : whsec_monSecretPartage2024
Timestamp : 1774518424
Header : t=1774518424,v1=155f4a321db2569e57db70fadd8ff340219b1399558a4b8dc46f2da4cc531da8...
# Simulation de delivery webhook avec retry et backoff exponentiel
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import random
import time

sns.set_theme(style="whitegrid", palette="muted", font_scale=1.1)

def simulate_webhook_delivery(
    success_probability: float,
    max_attempts: int = 5,
    base_delay: float = 1.0,
    max_delay: float = 300.0,
    seed: int = 42
) -> dict:
    """Simule les tentatives de livraison d'un webhook."""
    rng = random.Random(seed)
    attempts = []
    cumulative_time = 0.0

    for attempt in range(1, max_attempts + 1):
        # Simuler la durée de la requête HTTP
        duration = rng.uniform(0.05, 0.5)
        # Simuler succès ou échec
        success = rng.random() < success_probability

        attempts.append({
            "attempt": attempt,
            "time": cumulative_time,
            "duration": duration,
            "success": success,
            "status_code": 200 if success else rng.choice([500, 503, 502, 408]),
        })
        cumulative_time += duration

        if success:
            break

        if attempt < max_attempts:
            delay = min(base_delay * (2 ** (attempt - 1)), max_delay)
            jitter = rng.uniform(0, delay * 0.1)
            actual_delay = delay + jitter
            cumulative_time += actual_delay

    return {
        "attempts": attempts,
        "total_time": cumulative_time,
        "delivered": attempts[-1]["success"],
    }

fig, axes = plt.subplots(1, 2, figsize=(14, 6))

# Graphique 1 : délais de backoff exponentiel
attempt_numbers = list(range(1, 7))
delays = [min(1.0 * (2 ** (n - 1)), 300.0) for n in attempt_numbers]
delays_with_jitter = [d * 1.05 for d in delays]  # +5% jitter max illustratif

axes[0].bar(attempt_numbers, delays, color='#4C72B0', alpha=0.8, label='Délai base')
axes[0].bar(attempt_numbers, [j - d for j, d in zip(delays_with_jitter, delays)],
            bottom=delays, color='#DD8452', alpha=0.7, label='Jitter (+10%)')
axes[0].set_xlabel("Numéro de tentative")
axes[0].set_ylabel("Délai avant prochaine tentative (s)")
axes[0].set_title("Backoff exponentiel (base=1s, max=300s)")
axes[0].legend()
for i, (attempt, delay) in enumerate(zip(attempt_numbers, delays)):
    axes[0].text(attempt, delay + 2, f"{delay:.0f}s", ha='center', va='bottom', fontsize=9)

# Graphique 2 : simulation de 200 livraisons
n_simulations = 200
results_low = [simulate_webhook_delivery(0.4, seed=i) for i in range(n_simulations)]
results_high = [simulate_webhook_delivery(0.8, seed=i) for i in range(n_simulations)]

def count_attempts_distribution(results):
    from collections import Counter
    counts = Counter(len(r["attempts"]) for r in results)
    return [counts.get(i, 0) for i in range(1, 7)]

dist_low = count_attempts_distribution(results_low)
dist_high = count_attempts_distribution(results_high)

x = np.arange(1, 6)
w = 0.35
axes[1].bar(x - w/2, dist_low[:5], w, label='P(succès)=40%', color='#DD8452', alpha=0.85)
axes[1].bar(x + w/2, dist_high[:5], w, label='P(succès)=80%', color='#55A868', alpha=0.85)
axes[1].set_xlabel("Nombre de tentatives nécessaires")
axes[1].set_ylabel(f"Fréquence (sur {n_simulations} webhooks)")
axes[1].set_title("Distribution des tentatives nécessaires")
axes[1].legend()
axes[1].set_xticks(x)

delivered_low = sum(r["delivered"] for r in results_low)
delivered_high = sum(r["delivered"] for r in results_high)

plt.suptitle(
    f"Simulation webhook delivery — "
    f"P=40% : {delivered_low}/{n_simulations} livrés | "
    f"P=80% : {delivered_high}/{n_simulations} livrés",
    fontweight='bold', fontsize=11
)
plt.tight_layout()
plt.show()

print(f"P(succès)=40% : {delivered_low}/{n_simulations} webhooks livrés ({delivered_low/n_simulations*100:.1f}%)")
print(f"P(succès)=80% : {delivered_high}/{n_simulations} webhooks livrés ({delivered_high/n_simulations*100:.1f}%)")
_images/7a986ff33200f7f9a6601d1ac3ee9b237e0ba970b9f6adb80cc24b76ad7bd984.png
P(succès)=40% : 184/200 webhooks livrés (92.0%)
P(succès)=80% : 200/200 webhooks livrés (100.0%)
# Radar : comparaison des technologies temps réel
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns

sns.set_theme(style="whitegrid", palette="muted", font_scale=1.1)

categories = [
    "Faible\nlatence",
    "Simplicité\nimpl.",
    "Scalabilité\nhoriz.",
    "Support\nnavigateur",
    "Delivery\ngaranti",
    "Bidi-\nrectionnel"
]
N = len(categories)

scores = {
    "Polling":       [1, 5, 5, 5, 4, 1],
    "Long Polling":  [3, 4, 3, 5, 3, 1],
    "SSE":           [4, 4, 4, 5, 2, 1],
    "WebSocket":     [5, 2, 2, 4, 2, 5],
    "Webhooks":      [1, 3, 5, 1, 5, 1],
}

angles = [n / float(N) * 2 * np.pi for n in range(N)]
angles += angles[:1]

fig, ax = plt.subplots(figsize=(9, 9), subplot_kw=dict(polar=True))
colors = ["#4C72B0", "#DD8452", "#55A868", "#8172B3", "#C44E52"]

for (tech, vals), color in zip(scores.items(), colors):
    values = vals + vals[:1]
    ax.plot(angles, values, linewidth=2.2, linestyle='solid', label=tech, color=color)
    ax.fill(angles, values, alpha=0.1, color=color)

ax.set_xticks(angles[:-1])
ax.set_xticklabels(categories, size=10)
ax.set_ylim(0, 5)
ax.set_yticks([1, 2, 3, 4, 5])
ax.set_yticklabels(["1", "2", "3", "4", "5"], size=8, color='grey')
ax.set_title("Comparaison des technologies temps réel\n(score de 1 à 5 par critère)",
             size=13, fontweight='bold', pad=20)
ax.legend(loc='upper right', bbox_to_anchor=(1.45, 1.15), fontsize=10)
plt.show()
_images/e9283d399b0485e341e4f94200d1b8de40cac993debf031555889c45b92346d2.png
# Decision tree "quelle technologie temps réel" (matplotlib flowchart)
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
import seaborn as sns

sns.set_theme(style="whitegrid", palette="muted", font_scale=1.0)

fig, ax = plt.subplots(figsize=(14, 10))
ax.set_xlim(0, 14)
ax.set_ylim(0, 10)
ax.axis('off')

def draw_box(ax, x, y, w, h, text, color, fontsize=9, text_color='white', shape='round'):
    patch = mpatches.FancyBboxPatch(
        (x - w/2, y - h/2), w, h,
        boxstyle=f"round,pad=0.15",
        facecolor=color, edgecolor='white', linewidth=1.5, alpha=0.92
    )
    ax.add_patch(patch)
    ax.text(x, y, text, ha='center', va='center', fontsize=fontsize,
            color=text_color, fontweight='bold', wrap=True,
            multialignment='center')

def draw_diamond(ax, x, y, w, h, text, color="#F0C040", fontsize=8.5):
    diamond = plt.Polygon(
        [[x, y + h/2], [x + w/2, y], [x, y - h/2], [x - w/2, y]],
        closed=True, facecolor=color, edgecolor='white', linewidth=1.5, alpha=0.92
    )
    ax.add_patch(diamond)
    ax.text(x, y, text, ha='center', va='center', fontsize=fontsize,
            fontweight='bold', color='#333333', multialignment='center')

def arrow(ax, x1, y1, x2, y2, label='', color='#555555'):
    ax.annotate("", xy=(x2, y2), xytext=(x1, y1),
                arrowprops=dict(arrowstyle='->', color=color, lw=1.8))
    if label:
        mx, my = (x1 + x2) / 2, (y1 + y2) / 2
        ax.text(mx + 0.1, my, label, fontsize=8, color=color, fontstyle='italic')

# Noeud de départ
draw_box(ax, 7, 9.3, 3.0, 0.7, "Besoin de communication\ntemps réel ?", "#4C72B0", fontsize=9.5)

# Q1 : bidirectionnel ?
draw_diamond(ax, 7, 8.0, 3.2, 0.9, "Bidirectionnel\n(client → serveur) ?", "#E8B84B")
arrow(ax, 7, 8.95, 7, 8.45)

# Q2a : fréquence élevée ?
draw_diamond(ax, 3.5, 6.5, 3.0, 0.9, "Fréquence élevée\n(> 1 msg/s) ?", "#E8B84B")
arrow(ax, 5.4, 8.0, 4.6, 6.8, "Non")

# Q2b : serveur → client uniquement
draw_diamond(ax, 10.5, 6.5, 3.0, 0.9, "Delivery garanti\nnécessaire ?", "#E8B84B")
arrow(ax, 8.6, 8.0, 9.9, 6.8, "Oui")

# WebSocket (haute fréquence bidi)
draw_box(ax, 2.0, 5.0, 2.4, 0.7, "WebSocket", "#2d8a4e")
arrow(ax, 3.5, 6.05, 2.5, 5.35, "Oui")

# Long Polling / SSE (basse fréquence bidi)
draw_box(ax, 5.0, 5.0, 2.4, 0.7, "WebSocket\nou SSE + POST", "#55A868")
arrow(ax, 3.5, 6.05, 4.5, 5.35, "Non")

# Webhooks (delivery garanti)
draw_box(ax, 12.0, 5.0, 2.4, 0.7, "Webhooks\n(at-least-once)", "#C44E52")
arrow(ax, 10.5, 6.05, 11.5, 5.35, "Oui")

# Q3 : polling ou SSE
draw_diamond(ax, 9.5, 5.0, 2.8, 0.9, "Mises à jour\nfréquentes (> /min) ?", "#E8B84B")
arrow(ax, 10.5, 6.05, 10.0, 5.45, "Non")

# SSE
draw_box(ax, 9.5, 3.5, 2.3, 0.7, "SSE\n(EventSource)", "#4C72B0")
arrow(ax, 9.5, 4.55, 9.5, 3.85, "Oui")

# Polling
draw_box(ax, 7.0, 3.5, 2.3, 0.7, "Long Polling\nou Polling", "#8172B3")
arrow(ax, 8.1, 5.0, 7.9, 3.85, "Non")

ax.set_title("Decision tree — Quelle technologie temps réel ?",
             fontsize=13, fontweight='bold', pad=10)

# Légende
legend_items = [
    mpatches.Patch(color="#2d8a4e", label="WebSocket"),
    mpatches.Patch(color="#55A868", label="WebSocket / SSE+POST"),
    mpatches.Patch(color="#4C72B0", label="SSE"),
    mpatches.Patch(color="#8172B3", label="Long Polling / Polling"),
    mpatches.Patch(color="#C44E52", label="Webhooks"),
]
ax.legend(handles=legend_items, loc='lower left', fontsize=9, framealpha=0.9)
plt.show()
_images/19a68c67ff2b04f7e8a2c425a7185a0435bb822b1c0fbabbc4bf0d99f5578dae.png

Résumé#

Le paysage des technologies temps réel offre un spectre allant du polling (simple, universel, latence élevée) au WebSocket (bidirectionnel, faible latence, complexité accrue). Le choix optimal dépend de trois axes : la direction des messages, la fréquence des événements, et les exigences de delivery.

Les SSE sont souvent sous-estimés : pour les flux unidirectionnels serveur → client, ils offrent une simplicité remarquable avec reconnexion automatique native et compatibilité HTTP. Les WebSockets sont indispensables pour les applications interactives (chat, collaboration) mais requièrent une infrastructure plus complexe pour la scalabilité horizontale (broker partagé Redis, Kafka).

Les webhooks sont la solution canonique pour la communication événementielle inter-services et vers des partenaires. La robustesse d’un système de webhooks repose sur trois piliers : la signature HMAC-SHA256 (authentifier les événements), le retry avec backoff exponentiel (absorber les indisponibilités transitoires), et l”idempotence côté récepteur (gérer les doublons inévitables en at-least-once). La dead letter queue complète le dispositif en assurant qu’aucun événement n’est silencieusement perdu.