Temps réel et événements#
Le temps réel en APIs recouvre des réalités très différentes : un chat qui affiche les messages en quelques centaines de millisecondes, un tableau de bord qui se rafraîchit toutes les secondes, un système de paiement qui notifie un partenaire d’une transaction. Chacun de ces cas appelle une technologie différente. Ce chapitre couvre le panorama complet — polling, SSE, WebSocket, webhooks — et approfondit la conception robuste : delivery guarantees, signature HMAC, retry avec backoff, et debugging.
Panorama des options#
Polling#
La forme la plus simple : le client interroge périodiquement le serveur.
Client → GET /api/notifications → Serveur
(attendre 5 secondes)
Client → GET /api/notifications → Serveur
(attendre 5 secondes)
...
Avantages : aucun état persistant côté serveur, compatible avec tous les proxies et firewalls, simple à implémenter et déboguer.
Inconvénients : latence de mise à jour = intervalle de polling. Charge serveur proportionnelle au nombre de clients × fréquence, même quand il n’y a rien de nouveau.
Quand l’utiliser : données qui changent rarement et où une latence de plusieurs secondes est acceptable. Dashboards internes, statuts de jobs.
Long Polling#
Le client envoie une requête que le serveur maintient ouverte jusqu’à ce qu’une mise à jour soit disponible ou qu’un timeout soit atteint.
Client → GET /api/notifications?timeout=30 → Serveur
(serveur bloque jusqu'à un événement ou 30s)
Serveur → 200 {"event": "message", "data": {...}} → Client
Client → GET /api/notifications?timeout=30 → Serveur (immédiatement)
Avantages : latence proche du temps réel. Compatible avec HTTP/1.1 et les proxies standards.
Inconvénients : maintient une connexion TCP par client. Complexité de gestion des timeouts et reconnexions. Les requêtes fréquentes génèrent du trafic HTTP overhead.
Server-Sent Events (SSE)#
Une connexion HTTP persistante sur laquelle le serveur envoie des événements en texte (text/event-stream). Le client utilise l’API EventSource.
Client → GET /api/stream → Serveur (connexion persistante)
Serveur → data: {"type":"message","text":"..."}\n\n → Client
Serveur → data: {"type":"presence","users":5}\n\n → Client
Avantages : protocole standard HTTP, reconnexion automatique native (EventSource), simple côté serveur, compatible avec les proxies HTTP.
Inconvénients : unidirectionnel (serveur → client uniquement). Limité à 6 connexions simultanées par domaine en HTTP/1.1 (levé avec HTTP/2). Pas de support binaire natif.
WebSocket#
Protocole bidirectionnel basé sur une connexion TCP persistante, initialisée par un HTTP Upgrade.
Client → HTTP GET (Upgrade: websocket) → Serveur
Serveur → 101 Switching Protocols → Client
(canal bidirectionnel ouvert)
Client ↔ frames binaires ou texte ↔ Serveur
Avantages : bidirectionnel, faible overhead par message (2-14 octets de header), support binaire, latence minimale.
Inconvénients : état persistant côté serveur (scalabilité horizontale nécessite un broker partagé : Redis, etc.). Plus complexe à déboguer. Certains proxies corporate bloquent les WebSockets.
WebRTC Data Channel#
Canal de données pair-à-pair basé sur SCTP, utilisé principalement pour les communications navigateur-à-navigateur.
Quand l’utiliser : partage de fichiers P2P, jeux en temps réel, visioconférence avec données annexes. Rarement approprié pour des APIs serveur-client.
Tableau récapitulatif#
Technologie |
Direction |
Latence |
Complexité |
Scalabilité |
Cas d’usage |
|---|---|---|---|---|---|
Polling |
C → S |
Secondes |
Très faible |
Excellente |
Statut de jobs |
Long Polling |
S → C |
~100ms |
Faible |
Bonne |
Notifications simples |
SSE |
S → C |
~50ms |
Faible |
Bonne |
Live feed, dashboard |
WebSocket |
Bidirectionnel |
~10ms |
Moyenne |
Complexe |
Chat, collaboration |
WebRTC |
P2P |
~5ms |
Haute |
N/A |
Vidéo, gaming |
WebSocket : patterns applicatifs#
Rooms et channels#
Le pattern rooms (ou channels) organise les connexions WebSocket par sujet. Chaque client s’abonne à des rooms ; les messages sont diffusés aux abonnés de la room concernée.
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Dict, Set
app = FastAPI()
class RoomManager:
def __init__(self):
# room_id → ensemble de connexions WebSocket actives
self.rooms: Dict[str, Set[WebSocket]] = {}
async def join(self, room_id: str, websocket: WebSocket):
await websocket.accept()
if room_id not in self.rooms:
self.rooms[room_id] = set()
self.rooms[room_id].add(websocket)
async def leave(self, room_id: str, websocket: WebSocket):
if room_id in self.rooms:
self.rooms[room_id].discard(websocket)
if not self.rooms[room_id]:
del self.rooms[room_id]
async def broadcast(self, room_id: str, message: dict, exclude: WebSocket = None):
if room_id not in self.rooms:
return
dead = set()
for ws in self.rooms[room_id]:
if ws == exclude:
continue
try:
await ws.send_json(message)
except Exception:
dead.add(ws)
# Nettoyer les connexions mortes
self.rooms[room_id] -= dead
def presence(self, room_id: str) -> int:
return len(self.rooms.get(room_id, set()))
manager = RoomManager()
@app.websocket("/ws/{room_id}")
async def websocket_endpoint(websocket: WebSocket, room_id: str):
await manager.join(room_id, websocket)
await manager.broadcast(room_id, {
"type": "presence",
"count": manager.presence(room_id)
})
try:
while True:
data = await websocket.receive_json()
await manager.broadcast(room_id, {
"type": "message",
"text": data.get("text", ""),
"room": room_id,
}, exclude=websocket)
except WebSocketDisconnect:
await manager.leave(room_id, websocket)
await manager.broadcast(room_id, {
"type": "presence",
"count": manager.presence(room_id)
})
Heartbeat et reconnexion automatique#
Les connexions WebSocket peuvent être coupées silencieusement (timeout NAT, pare-feu, réseau mobile). Le heartbeat détecte ces coupures :
import asyncio
import json
@app.websocket("/ws/{room_id}")
async def websocket_with_heartbeat(websocket: WebSocket, room_id: str):
await manager.join(room_id, websocket)
async def heartbeat():
while True:
await asyncio.sleep(30)
try:
await websocket.send_json({"type": "ping"})
except Exception:
break
heartbeat_task = asyncio.create_task(heartbeat())
try:
while True:
data = await websocket.receive_json()
if data.get("type") == "pong":
continue # réponse au heartbeat
await manager.broadcast(room_id, data, exclude=websocket)
except WebSocketDisconnect:
heartbeat_task.cancel()
await manager.leave(room_id, websocket)
Côté client JavaScript, la reconnexion automatique est implémentée avec un backoff exponentiel :
class ReconnectingWebSocket {
constructor(url, options = {}) {
this.url = url;
this.maxDelay = options.maxDelay || 30000;
this.attempts = 0;
this.connect();
}
connect() {
this.ws = new WebSocket(this.url);
this.ws.onopen = () => { this.attempts = 0; };
this.ws.onclose = () => {
const delay = Math.min(1000 * 2 ** this.attempts, this.maxDelay);
this.attempts++;
setTimeout(() => this.connect(), delay);
};
this.ws.onmessage = (event) => this.onmessage?.(JSON.parse(event.data));
}
}
Server-Sent Events#
Format du flux#
Le format text/event-stream est simple : chaque événement est une suite de lignes clé:valeur, séparées par une ligne vide.
id: 42\n
event: message\n
data: {"text": "Bonjour", "user": "Alice"}\n
\n
id: 43\n
event: presence\n
data: {"count": 5}\n
retry: 3000\n
\n
Champs disponibles :
data: contenu de l’événement (obligatoire, peut être multiligne)event: type d’événement personnaliséid: identifiant pour la reprise (Last-Event-ID)retry: délai de reconnexion en ms que le client doit utiliser
Implémentation FastAPI#
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import asyncio
import json
app = FastAPI()
async def event_generator(request: Request, channel: str):
last_id = request.headers.get("Last-Event-ID")
event_id = int(last_id) + 1 if last_id else 1
# Si last_id est fourni, rejouer les événements manqués depuis le store
if last_id:
missed = await get_events_since(channel, int(last_id))
for event in missed:
yield f"id: {event['id']}\ndata: {json.dumps(event['data'])}\n\n"
pubsub = get_pubsub()
async with pubsub.subscribe(f"channel:{channel}") as sub:
async for message in sub:
if await request.is_disconnected():
break
yield f"id: {event_id}\nevent: {message['type']}\ndata: {json.dumps(message['data'])}\n\n"
event_id += 1
@app.get("/sse/{channel}")
async def sse_endpoint(channel: str, request: Request):
return StreamingResponse(
event_generator(request, channel),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no", # désactiver le buffering nginx
}
)
Reconnexion automatique et Last-Event-ID#
L’API EventSource du navigateur gère automatiquement la reconnexion. Elle renvoie le Last-Event-ID du dernier événement reçu dans les en-têtes de la nouvelle connexion :
const source = new EventSource('/sse/general');
source.onopen = () => console.log('Connexion SSE établie');
source.onerror = (e) => console.error('Erreur SSE, reconnexion automatique...');
source.addEventListener('message', (event) => {
const data = JSON.parse(event.data);
console.log('Message:', data);
});
source.addEventListener('presence', (event) => {
const data = JSON.parse(event.data);
console.log(`Utilisateurs connectés : ${data.count}`);
});
Limitations#
SSE et HTTP/2
En HTTP/1.1, les navigateurs limitent à 6 connexions simultanées par domaine. Si l’utilisateur ouvre plusieurs onglets avec des connexions SSE, ils se concurrencent. HTTP/2 lève cette limite grâce au multiplexing — toutes les connexions SSE d’un même domaine partagent une seule connexion TCP.
Webhooks#
Définition et modèle de delivery#
Un webhook est un mécanisme de callback HTTP : au lieu que le client interroge le serveur, le serveur appelle une URL fournie par le client quand un événement se produit.
Événement → Producteur → POST /webhook/url (payload JSON) → Consommateur
Le modèle de delivery dominant est at-least-once : le producteur réessaie jusqu’à obtenir un accusé de réception (HTTP 2xx). Cela implique que le consommateur peut recevoir le même événement plusieurs fois — l”idempotence est donc obligatoire côté récepteur.
Retry avec backoff exponentiel#
Un système de webhook robuste doit réessayer les livraisons échouées avec un délai croissant pour ne pas surcharger un consommateur déjà en difficulté :
import asyncio
import httpx
import time
import logging
from dataclasses import dataclass, field
from typing import Optional
logger = logging.getLogger(__name__)
@dataclass
class DeliveryAttempt:
attempt_number: int
timestamp: float
status_code: Optional[int]
error: Optional[str]
duration_ms: float
@dataclass
class WebhookDelivery:
event_id: str
url: str
payload: dict
max_attempts: int = 5
base_delay: float = 1.0 # délai initial en secondes
max_delay: float = 300.0 # délai maximum en secondes (5 minutes)
attempts: list = field(default_factory=list)
def next_delay(self) -> float:
n = len(self.attempts)
# Backoff exponentiel avec jitter pour éviter les avalanches
import random
delay = min(self.base_delay * (2 ** n), self.max_delay)
jitter = random.uniform(0, delay * 0.1)
return delay + jitter
async def deliver(self) -> bool:
"""Tente de livrer le webhook. Retourne True si succès."""
for attempt_num in range(1, self.max_attempts + 1):
start = time.perf_counter()
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.post(
self.url,
json=self.payload,
headers={
"Content-Type": "application/json",
"X-Webhook-Event": self.payload.get("type", "event"),
"X-Webhook-ID": self.event_id,
"X-Webhook-Attempt": str(attempt_num),
}
)
elapsed = (time.perf_counter() - start) * 1000
self.attempts.append(DeliveryAttempt(
attempt_number=attempt_num,
timestamp=time.time(),
status_code=response.status_code,
error=None,
duration_ms=elapsed,
))
if 200 <= response.status_code < 300:
logger.info("Webhook %s livré (tentative %d, %dms)",
self.event_id, attempt_num, elapsed)
return True
logger.warning("Webhook %s : HTTP %d (tentative %d/%d)",
self.event_id, response.status_code,
attempt_num, self.max_attempts)
except Exception as exc:
elapsed = (time.perf_counter() - start) * 1000
self.attempts.append(DeliveryAttempt(
attempt_number=attempt_num,
timestamp=time.time(),
status_code=None,
error=str(exc),
duration_ms=elapsed,
))
logger.error("Webhook %s erreur réseau (tentative %d): %s",
self.event_id, attempt_num, exc)
if attempt_num < self.max_attempts:
delay = self.next_delay()
logger.info("Webhook %s : prochain essai dans %.1fs", self.event_id, delay)
await asyncio.sleep(delay)
logger.error("Webhook %s : échec définitif après %d tentatives",
self.event_id, self.max_attempts)
return False
Idempotence côté récepteur#
# Exemple : stocker les event_id traités pour détecter les doublons
from fastapi import FastAPI, Request, Header
from typing import Optional
import redis.asyncio as redis
app = FastAPI()
cache = redis.from_url("redis://localhost")
@app.post("/webhooks/payments")
async def receive_payment_webhook(
request: Request,
x_webhook_id: Optional[str] = Header(None),
):
body = await request.body()
# 1. Vérifier la signature (voir section 5)
verify_signature(body, request.headers.get("X-Webhook-Signature"))
# 2. Vérifier l'idempotence
if x_webhook_id:
already_processed = await cache.get(f"webhook:{x_webhook_id}")
if already_processed:
return {"status": "already_processed"}
payload = await request.json()
# 3. Traiter l'événement
await process_payment_event(payload)
# 4. Marquer comme traité (TTL 7 jours)
if x_webhook_id:
await cache.setex(f"webhook:{x_webhook_id}", 604800, "1")
return {"status": "ok"}
Ordre des événements#
Les webhooks at-least-once ne garantissent pas l’ordre. Deux solutions :
Inclure un timestamp ou un numéro de séquence dans chaque événement et ignorer les événements plus anciens que l’état courant.
Utiliser un champ de version : traiter uniquement si
event.version > current_version.
Signature et vérification de webhooks#
Pourquoi signer#
N’importe qui peut envoyer une requête POST à votre endpoint de webhook. La signature HMAC-SHA256 prouve que le payload provient bien du producteur légitime (qui connaît la clé secrète partagée).
Implémentation HMAC-SHA256#
Le producteur inclut dans les en-têtes une signature calculée sur le corps de la requête :
import hmac
import hashlib
import time
def sign_webhook_payload(secret: str, payload: bytes, timestamp: int = None) -> str:
"""
Génère la signature HMAC-SHA256 d'un payload webhook.
Inclut un timestamp pour prévenir les attaques par replay.
"""
if timestamp is None:
timestamp = int(time.time())
# Le message signé inclut le timestamp pour prévenir le replay
message = f"{timestamp}.".encode() + payload
signature = hmac.new(
secret.encode(),
message,
hashlib.sha256
).hexdigest()
return f"t={timestamp},v1={signature}"
def verify_webhook_signature(
secret: str,
payload: bytes,
signature_header: str,
tolerance_seconds: int = 300, # 5 minutes de tolérance
) -> bool:
"""
Vérifie la signature d'un webhook entrant.
Lève ValueError si la signature est invalide ou expirée.
"""
# Parser l'en-tête "t=1234567890,v1=abc123..."
parts = dict(item.split("=", 1) for item in signature_header.split(","))
timestamp = parts.get("t")
expected_sig = parts.get("v1")
if not timestamp or not expected_sig:
raise ValueError("En-tête de signature malformé")
# Vérifier que le timestamp n'est pas trop ancien
event_time = int(timestamp)
current_time = int(time.time())
if abs(current_time - event_time) > tolerance_seconds:
raise ValueError(
f"Signature expirée : delta = {abs(current_time - event_time)}s "
f"(tolérance = {tolerance_seconds}s)"
)
# Recalculer la signature attendue
message = f"{timestamp}.".encode() + payload
computed_sig = hmac.new(
secret.encode(),
message,
hashlib.sha256
).hexdigest()
# Comparaison en temps constant (timing-safe)
if not hmac.compare_digest(computed_sig, expected_sig):
raise ValueError("Signature invalide")
return True
Endpoint FastAPI avec vérification complète#
from fastapi import FastAPI, Request, HTTPException, Header
from typing import Optional
app = FastAPI()
WEBHOOK_SECRET = "whsec_votre_secret_partage_avec_le_producteur"
@app.post("/webhooks/stripe")
async def stripe_webhook(
request: Request,
stripe_signature: Optional[str] = Header(None, alias="Stripe-Signature"),
):
payload = await request.body()
if not stripe_signature:
raise HTTPException(status_code=400, detail="Signature manquante")
try:
verify_webhook_signature(WEBHOOK_SECRET, payload, stripe_signature)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
event = await request.json()
match event.get("type"):
case "payment_intent.succeeded":
await handle_payment_succeeded(event["data"]["object"])
case "payment_intent.payment_failed":
await handle_payment_failed(event["data"]["object"])
case _:
pass # événement non géré, retourner 200 quand même
return {"received": True}
hmac.compare_digest est obligatoire
Ne jamais utiliser == pour comparer des signatures cryptographiques. La comparaison standard court-circuite dès la première différence, ce qui permet des attaques par timing : en mesurant le temps de réponse, un attaquant peut deviner la signature octet par octet. hmac.compare_digest compare en temps constant, indépendamment du point de divergence.
Debugging webhooks#
Outils de développement local#
En développement, votre serveur local n’est pas accessible depuis l’Internet public. Plusieurs outils créent des tunnels :
ngrok : le plus répandu. Crée une URL publique HTTPS qui redirige vers localhost.
# Exposer le port 8000 local
ngrok http 8000
# → https://abc123.ngrok.io pointe vers http://localhost:8000
# Avec un domaine personnalisé (payant)
ngrok http --domain=webhooks.mondomaine.com 8000
ngrok fournit une interface web (http://localhost:4040) qui affiche toutes les requêtes entrantes avec possibilité de replay.
webhook.site : service en ligne qui capture et affiche les webhooks entrants. Utile pour inspecter le format d’un webhook sans écrire de code.
Replay et dead letter queue#
En production, les livraisons échouées après tous les retries doivent aller dans une dead letter queue (DLQ) plutôt qu’être silencieusement abandonnées :
async def deliver_with_dlq(delivery: WebhookDelivery):
success = await delivery.deliver()
if not success:
# Envoyer en DLQ (Redis Stream, SQS, Kafka Dead Letter Topic...)
await push_to_dlq({
"event_id": delivery.event_id,
"url": delivery.url,
"payload": delivery.payload,
"attempts": [
{
"number": a.attempt_number,
"status_code": a.status_code,
"error": a.error,
"duration_ms": a.duration_ms,
}
for a in delivery.attempts
],
"failed_at": time.time(),
})
# Interface de replay pour l'équipe ops
@app.post("/admin/webhooks/replay/{event_id}")
async def replay_webhook(event_id: str):
event = await get_from_dlq(event_id)
if not event:
raise HTTPException(404, "Événement introuvable en DLQ")
delivery = WebhookDelivery(
event_id=event["event_id"],
url=event["url"],
payload=event["payload"],
max_attempts=3,
)
success = await delivery.deliver()
return {"success": success, "attempts": len(delivery.attempts)}
Choix technologique#
Decision tree#
Le choix de la technologie temps réel dépend de plusieurs facteurs :
Communication bidirectionnelle nécessaire ?
Non → SSE ou Long Polling suffisent
Oui → WebSocket
Le client est-il un navigateur web ?
Oui → SSE (simple), WebSocket (interactif)
Non (service, script) → Webhooks si push asynchrone
Qui initie les messages ?
Serveur uniquement → SSE
Client et serveur → WebSocket
Serveur vers un autre serveur → Webhooks
Fréquence des événements ?
Quelques par heure → Webhooks
Quelques par minute → Long Polling ou SSE
Plusieurs par seconde → WebSocket
Garantie de delivery requise ?
Oui, at-least-once → Webhooks avec retry
Best-effort temps réel → WebSocket ou SSE
Cas d’usage typiques#
Cas d’usage |
Technologie recommandée |
Justification |
|---|---|---|
Chat en temps réel |
WebSocket |
Bidirectionnel, faible latence |
Notifications utilisateur |
SSE ou Long Polling |
Unidirectionnel, simple |
Paiement → partenaire |
Webhook |
Asynchrone, delivery garanti |
Live dashboard métriques |
SSE |
Flux serveur → client |
Éditeur collaboratif |
WebSocket |
Bidirectionnel, opérationnel |
Tracking livraison |
Long Polling ou SSE |
Rafraîchissement périodique |
Synchronisation CI/CD |
Webhook |
Événementiel, inter-services |
Cours boursiers |
WebSocket |
Volume élevé, faible latence |
Cellules Python exécutables#
# Vérification de signature webhook HMAC-SHA256 (fonctionnel, stdlib uniquement)
import hmac
import hashlib
import time
import json
def sign_payload(secret: str, payload: bytes, timestamp: int = None) -> str:
if timestamp is None:
timestamp = int(time.time())
message = f"{timestamp}.".encode() + payload
sig = hmac.new(secret.encode(), message, hashlib.sha256).hexdigest()
return f"t={timestamp},v1={sig}"
def verify_signature(secret: str, payload: bytes, header: str,
tolerance: int = 300) -> dict:
parts = dict(item.split("=", 1) for item in header.split(","))
timestamp_str = parts.get("t", "")
received_sig = parts.get("v1", "")
if not timestamp_str or not received_sig:
return {"valid": False, "reason": "En-tête malformé"}
try:
event_time = int(timestamp_str)
except ValueError:
return {"valid": False, "reason": "Timestamp invalide"}
age = abs(int(time.time()) - event_time)
if age > tolerance:
return {"valid": False, "reason": f"Signature expirée (âge={age}s > {tolerance}s)"}
message = f"{timestamp_str}.".encode() + payload
expected_sig = hmac.new(secret.encode(), message, hashlib.sha256).hexdigest()
if not hmac.compare_digest(expected_sig, received_sig):
return {"valid": False, "reason": "Signature incorrecte"}
return {"valid": True, "reason": "OK", "age_seconds": age}
# Démonstration
SECRET = "whsec_monSecretPartage2024"
payload = json.dumps({
"id": "evt_42",
"type": "payment.succeeded",
"data": {"amount": 4999, "currency": "EUR"}
}).encode()
# Cas 1 : signature valide
ts_now = int(time.time())
header_valid = sign_payload(SECRET, payload, timestamp=ts_now)
result = verify_signature(SECRET, payload, header_valid)
print(f"Cas 1 — Signature valide : {result}")
# Cas 2 : signature expirée (timestamp trop ancien)
ts_old = ts_now - 400 # 400 secondes = > 5 minutes de tolérance
header_old = sign_payload(SECRET, payload, timestamp=ts_old)
result = verify_signature(SECRET, payload, header_old)
print(f"Cas 2 — Signature expirée : {result}")
# Cas 3 : payload modifié (man-in-the-middle)
tampered_payload = json.dumps({
"id": "evt_42",
"type": "payment.succeeded",
"data": {"amount": 1, "currency": "EUR"} # montant modifié !
}).encode()
result = verify_signature(SECRET, tampered_payload, header_valid)
print(f"Cas 3 — Payload modifié : {result}")
# Cas 4 : mauvais secret
result = verify_signature("wrong_secret", payload, header_valid)
print(f"Cas 4 — Mauvais secret : {result}")
print("\n--- Détail de la signature générée ---")
print(f"Secret : {SECRET}")
print(f"Timestamp : {ts_now}")
print(f"Header : {header_valid[:80]}...")
Cas 1 — Signature valide : {'valid': True, 'reason': 'OK', 'age_seconds': 0}
Cas 2 — Signature expirée : {'valid': False, 'reason': 'Signature expirée (âge=400s > 300s)'}
Cas 3 — Payload modifié : {'valid': False, 'reason': 'Signature incorrecte'}
Cas 4 — Mauvais secret : {'valid': False, 'reason': 'Signature incorrecte'}
--- Détail de la signature générée ---
Secret : whsec_monSecretPartage2024
Timestamp : 1774518424
Header : t=1774518424,v1=155f4a321db2569e57db70fadd8ff340219b1399558a4b8dc46f2da4cc531da8...
# Simulation de delivery webhook avec retry et backoff exponentiel
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import random
import time
sns.set_theme(style="whitegrid", palette="muted", font_scale=1.1)
def simulate_webhook_delivery(
success_probability: float,
max_attempts: int = 5,
base_delay: float = 1.0,
max_delay: float = 300.0,
seed: int = 42
) -> dict:
"""Simule les tentatives de livraison d'un webhook."""
rng = random.Random(seed)
attempts = []
cumulative_time = 0.0
for attempt in range(1, max_attempts + 1):
# Simuler la durée de la requête HTTP
duration = rng.uniform(0.05, 0.5)
# Simuler succès ou échec
success = rng.random() < success_probability
attempts.append({
"attempt": attempt,
"time": cumulative_time,
"duration": duration,
"success": success,
"status_code": 200 if success else rng.choice([500, 503, 502, 408]),
})
cumulative_time += duration
if success:
break
if attempt < max_attempts:
delay = min(base_delay * (2 ** (attempt - 1)), max_delay)
jitter = rng.uniform(0, delay * 0.1)
actual_delay = delay + jitter
cumulative_time += actual_delay
return {
"attempts": attempts,
"total_time": cumulative_time,
"delivered": attempts[-1]["success"],
}
fig, axes = plt.subplots(1, 2, figsize=(14, 6))
# Graphique 1 : délais de backoff exponentiel
attempt_numbers = list(range(1, 7))
delays = [min(1.0 * (2 ** (n - 1)), 300.0) for n in attempt_numbers]
delays_with_jitter = [d * 1.05 for d in delays] # +5% jitter max illustratif
axes[0].bar(attempt_numbers, delays, color='#4C72B0', alpha=0.8, label='Délai base')
axes[0].bar(attempt_numbers, [j - d for j, d in zip(delays_with_jitter, delays)],
bottom=delays, color='#DD8452', alpha=0.7, label='Jitter (+10%)')
axes[0].set_xlabel("Numéro de tentative")
axes[0].set_ylabel("Délai avant prochaine tentative (s)")
axes[0].set_title("Backoff exponentiel (base=1s, max=300s)")
axes[0].legend()
for i, (attempt, delay) in enumerate(zip(attempt_numbers, delays)):
axes[0].text(attempt, delay + 2, f"{delay:.0f}s", ha='center', va='bottom', fontsize=9)
# Graphique 2 : simulation de 200 livraisons
n_simulations = 200
results_low = [simulate_webhook_delivery(0.4, seed=i) for i in range(n_simulations)]
results_high = [simulate_webhook_delivery(0.8, seed=i) for i in range(n_simulations)]
def count_attempts_distribution(results):
from collections import Counter
counts = Counter(len(r["attempts"]) for r in results)
return [counts.get(i, 0) for i in range(1, 7)]
dist_low = count_attempts_distribution(results_low)
dist_high = count_attempts_distribution(results_high)
x = np.arange(1, 6)
w = 0.35
axes[1].bar(x - w/2, dist_low[:5], w, label='P(succès)=40%', color='#DD8452', alpha=0.85)
axes[1].bar(x + w/2, dist_high[:5], w, label='P(succès)=80%', color='#55A868', alpha=0.85)
axes[1].set_xlabel("Nombre de tentatives nécessaires")
axes[1].set_ylabel(f"Fréquence (sur {n_simulations} webhooks)")
axes[1].set_title("Distribution des tentatives nécessaires")
axes[1].legend()
axes[1].set_xticks(x)
delivered_low = sum(r["delivered"] for r in results_low)
delivered_high = sum(r["delivered"] for r in results_high)
plt.suptitle(
f"Simulation webhook delivery — "
f"P=40% : {delivered_low}/{n_simulations} livrés | "
f"P=80% : {delivered_high}/{n_simulations} livrés",
fontweight='bold', fontsize=11
)
plt.tight_layout()
plt.show()
print(f"P(succès)=40% : {delivered_low}/{n_simulations} webhooks livrés ({delivered_low/n_simulations*100:.1f}%)")
print(f"P(succès)=80% : {delivered_high}/{n_simulations} webhooks livrés ({delivered_high/n_simulations*100:.1f}%)")
P(succès)=40% : 184/200 webhooks livrés (92.0%)
P(succès)=80% : 200/200 webhooks livrés (100.0%)
# Radar : comparaison des technologies temps réel
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
sns.set_theme(style="whitegrid", palette="muted", font_scale=1.1)
categories = [
"Faible\nlatence",
"Simplicité\nimpl.",
"Scalabilité\nhoriz.",
"Support\nnavigateur",
"Delivery\ngaranti",
"Bidi-\nrectionnel"
]
N = len(categories)
scores = {
"Polling": [1, 5, 5, 5, 4, 1],
"Long Polling": [3, 4, 3, 5, 3, 1],
"SSE": [4, 4, 4, 5, 2, 1],
"WebSocket": [5, 2, 2, 4, 2, 5],
"Webhooks": [1, 3, 5, 1, 5, 1],
}
angles = [n / float(N) * 2 * np.pi for n in range(N)]
angles += angles[:1]
fig, ax = plt.subplots(figsize=(9, 9), subplot_kw=dict(polar=True))
colors = ["#4C72B0", "#DD8452", "#55A868", "#8172B3", "#C44E52"]
for (tech, vals), color in zip(scores.items(), colors):
values = vals + vals[:1]
ax.plot(angles, values, linewidth=2.2, linestyle='solid', label=tech, color=color)
ax.fill(angles, values, alpha=0.1, color=color)
ax.set_xticks(angles[:-1])
ax.set_xticklabels(categories, size=10)
ax.set_ylim(0, 5)
ax.set_yticks([1, 2, 3, 4, 5])
ax.set_yticklabels(["1", "2", "3", "4", "5"], size=8, color='grey')
ax.set_title("Comparaison des technologies temps réel\n(score de 1 à 5 par critère)",
size=13, fontweight='bold', pad=20)
ax.legend(loc='upper right', bbox_to_anchor=(1.45, 1.15), fontsize=10)
plt.show()
# Decision tree "quelle technologie temps réel" (matplotlib flowchart)
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
import seaborn as sns
sns.set_theme(style="whitegrid", palette="muted", font_scale=1.0)
fig, ax = plt.subplots(figsize=(14, 10))
ax.set_xlim(0, 14)
ax.set_ylim(0, 10)
ax.axis('off')
def draw_box(ax, x, y, w, h, text, color, fontsize=9, text_color='white', shape='round'):
patch = mpatches.FancyBboxPatch(
(x - w/2, y - h/2), w, h,
boxstyle=f"round,pad=0.15",
facecolor=color, edgecolor='white', linewidth=1.5, alpha=0.92
)
ax.add_patch(patch)
ax.text(x, y, text, ha='center', va='center', fontsize=fontsize,
color=text_color, fontweight='bold', wrap=True,
multialignment='center')
def draw_diamond(ax, x, y, w, h, text, color="#F0C040", fontsize=8.5):
diamond = plt.Polygon(
[[x, y + h/2], [x + w/2, y], [x, y - h/2], [x - w/2, y]],
closed=True, facecolor=color, edgecolor='white', linewidth=1.5, alpha=0.92
)
ax.add_patch(diamond)
ax.text(x, y, text, ha='center', va='center', fontsize=fontsize,
fontweight='bold', color='#333333', multialignment='center')
def arrow(ax, x1, y1, x2, y2, label='', color='#555555'):
ax.annotate("", xy=(x2, y2), xytext=(x1, y1),
arrowprops=dict(arrowstyle='->', color=color, lw=1.8))
if label:
mx, my = (x1 + x2) / 2, (y1 + y2) / 2
ax.text(mx + 0.1, my, label, fontsize=8, color=color, fontstyle='italic')
# Noeud de départ
draw_box(ax, 7, 9.3, 3.0, 0.7, "Besoin de communication\ntemps réel ?", "#4C72B0", fontsize=9.5)
# Q1 : bidirectionnel ?
draw_diamond(ax, 7, 8.0, 3.2, 0.9, "Bidirectionnel\n(client → serveur) ?", "#E8B84B")
arrow(ax, 7, 8.95, 7, 8.45)
# Q2a : fréquence élevée ?
draw_diamond(ax, 3.5, 6.5, 3.0, 0.9, "Fréquence élevée\n(> 1 msg/s) ?", "#E8B84B")
arrow(ax, 5.4, 8.0, 4.6, 6.8, "Non")
# Q2b : serveur → client uniquement
draw_diamond(ax, 10.5, 6.5, 3.0, 0.9, "Delivery garanti\nnécessaire ?", "#E8B84B")
arrow(ax, 8.6, 8.0, 9.9, 6.8, "Oui")
# WebSocket (haute fréquence bidi)
draw_box(ax, 2.0, 5.0, 2.4, 0.7, "WebSocket", "#2d8a4e")
arrow(ax, 3.5, 6.05, 2.5, 5.35, "Oui")
# Long Polling / SSE (basse fréquence bidi)
draw_box(ax, 5.0, 5.0, 2.4, 0.7, "WebSocket\nou SSE + POST", "#55A868")
arrow(ax, 3.5, 6.05, 4.5, 5.35, "Non")
# Webhooks (delivery garanti)
draw_box(ax, 12.0, 5.0, 2.4, 0.7, "Webhooks\n(at-least-once)", "#C44E52")
arrow(ax, 10.5, 6.05, 11.5, 5.35, "Oui")
# Q3 : polling ou SSE
draw_diamond(ax, 9.5, 5.0, 2.8, 0.9, "Mises à jour\nfréquentes (> /min) ?", "#E8B84B")
arrow(ax, 10.5, 6.05, 10.0, 5.45, "Non")
# SSE
draw_box(ax, 9.5, 3.5, 2.3, 0.7, "SSE\n(EventSource)", "#4C72B0")
arrow(ax, 9.5, 4.55, 9.5, 3.85, "Oui")
# Polling
draw_box(ax, 7.0, 3.5, 2.3, 0.7, "Long Polling\nou Polling", "#8172B3")
arrow(ax, 8.1, 5.0, 7.9, 3.85, "Non")
ax.set_title("Decision tree — Quelle technologie temps réel ?",
fontsize=13, fontweight='bold', pad=10)
# Légende
legend_items = [
mpatches.Patch(color="#2d8a4e", label="WebSocket"),
mpatches.Patch(color="#55A868", label="WebSocket / SSE+POST"),
mpatches.Patch(color="#4C72B0", label="SSE"),
mpatches.Patch(color="#8172B3", label="Long Polling / Polling"),
mpatches.Patch(color="#C44E52", label="Webhooks"),
]
ax.legend(handles=legend_items, loc='lower left', fontsize=9, framealpha=0.9)
plt.show()
Résumé#
Le paysage des technologies temps réel offre un spectre allant du polling (simple, universel, latence élevée) au WebSocket (bidirectionnel, faible latence, complexité accrue). Le choix optimal dépend de trois axes : la direction des messages, la fréquence des événements, et les exigences de delivery.
Les SSE sont souvent sous-estimés : pour les flux unidirectionnels serveur → client, ils offrent une simplicité remarquable avec reconnexion automatique native et compatibilité HTTP. Les WebSockets sont indispensables pour les applications interactives (chat, collaboration) mais requièrent une infrastructure plus complexe pour la scalabilité horizontale (broker partagé Redis, Kafka).
Les webhooks sont la solution canonique pour la communication événementielle inter-services et vers des partenaires. La robustesse d’un système de webhooks repose sur trois piliers : la signature HMAC-SHA256 (authentifier les événements), le retry avec backoff exponentiel (absorber les indisponibilités transitoires), et l”idempotence côté récepteur (gérer les doublons inévitables en at-least-once). La dead letter queue complète le dispositif en assurant qu’aucun événement n’est silencieusement perdu.