Chapitre 19 — Patterns avancés#
Les chapitres précédents ont couvert les bases de REST, l’authentification, le design, et l’observabilité. Ce chapitre traite des situations que ces bases ne couvrent pas : les opérations longues, les mises à jour partielles complexes, les uploads de fichiers volumineux, le streaming, et les APIs événementielles. Ces patterns apparaissent inévitablement dans des APIs de production.
Long-running operations#
Certaines opérations métier prennent plusieurs secondes, minutes, ou heures : traitement d’une vidéo, génération d’un rapport, migration de données, entraînement d’un modèle. Répondre de manière synchrone est impossible — les timeouts HTTP (30–60 s typiquement) et les proxys intermédiaires coupent la connexion.
Le problème du timeout#
Un client qui attend 5 minutes une réponse HTTP va :
Expirer son timeout applicatif (30 s dans
requestspar défaut)Voir sa connexion coupée par un load balancer ou reverse proxy (Nginx par défaut : 60 s)
Ne jamais recevoir le résultat
Pattern polling — 202 Accepted + Location#
Le pattern polling est la solution REST idiomatique. Le serveur accepte la tâche immédiatement (202 Accepted), retourne l’URL de suivi dans le header Location, et le client poll cette URL jusqu’à complétion.
POST /api/v2/reports/generate
Content-Type: application/json
{"date_range": "2024-Q4", "format": "pdf"}
HTTP/1.1 202 Accepted
Location: /api/v2/jobs/job-7f3a91c2
Retry-After: 5
Content-Type: application/json
{"job_id": "job-7f3a91c2", "status": "pending", "created_at": "2024-11-15T14:30:00Z"}
---
GET /api/v2/jobs/job-7f3a91c2
HTTP/1.1 200 OK
{"job_id": "job-7f3a91c2", "status": "running", "progress": 45}
---
GET /api/v2/jobs/job-7f3a91c2
HTTP/1.1 200 OK
{"job_id": "job-7f3a91c2", "status": "completed",
"result_url": "/api/v2/reports/rpt-8a2b3c4d.pdf", "completed_at": "2024-11-15T14:32:15Z"}
Le header Retry-After suggère au client combien de secondes attendre avant le prochain poll, évitant le polling agressif.
Implémentation FastAPI complète#
import asyncio
import uuid
from datetime import datetime, timezone
from enum import Enum
from typing import Optional
from fastapi import FastAPI, BackgroundTasks, HTTPException
from fastapi.responses import JSONResponse
from pydantic import BaseModel
app = FastAPI()
# Store en mémoire (Redis en production)
job_store: dict[str, dict] = {}
class JobStatus(str, Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
class GenerateReportRequest(BaseModel):
date_range: str
format: str = "pdf"
async def process_report(job_id: str, request: GenerateReportRequest):
"""Simule le traitement long d'un rapport."""
job_store[job_id]["status"] = JobStatus.RUNNING
job_store[job_id]["started_at"] = datetime.now(timezone.utc).isoformat()
try:
for i in range(10):
await asyncio.sleep(2) # simulation du traitement
job_store[job_id]["progress"] = (i + 1) * 10
job_store[job_id]["status"] = JobStatus.COMPLETED
job_store[job_id]["result_url"] = f"/api/v2/reports/{job_id}.pdf"
job_store[job_id]["completed_at"] = datetime.now(timezone.utc).isoformat()
except Exception as e:
job_store[job_id]["status"] = JobStatus.FAILED
job_store[job_id]["error"] = str(e)
@app.post("/api/v2/reports/generate", status_code=202)
async def generate_report(
request: GenerateReportRequest,
background_tasks: BackgroundTasks
):
job_id = f"job-{uuid.uuid4().hex[:12]}"
job_store[job_id] = {
"job_id": job_id,
"status": JobStatus.PENDING,
"progress": 0,
"created_at": datetime.now(timezone.utc).isoformat(),
}
background_tasks.add_task(process_report, job_id, request)
return JSONResponse(
status_code=202,
headers={"Location": f"/api/v2/jobs/{job_id}", "Retry-After": "5"},
content=job_store[job_id]
)
@app.get("/api/v2/jobs/{job_id}")
async def get_job(job_id: str):
job = job_store.get(job_id)
if not job:
raise HTTPException(status_code=404, detail="Job not found")
return job
Pattern callback (webhook)#
Si le client peut recevoir des appels entrants, le pattern callback est plus efficace que le polling. À la création de la tâche, le client fournit une URL de callback :
class GenerateReportRequest(BaseModel):
date_range: str
format: str = "pdf"
callback_url: Optional[str] = None # URL de notification
# À la fin du traitement :
async def notify_callback(callback_url: str, job: dict):
async with httpx.AsyncClient() as client:
await client.post(callback_url, json=job, timeout=10)
Pattern SSE pour le suivi#
Server-Sent Events permettent au client de recevoir des mises à jour en temps réel sans polling :
from fastapi.responses import StreamingResponse
import asyncio
@app.get("/api/v2/jobs/{job_id}/stream")
async def stream_job_progress(job_id: str):
async def event_generator():
while True:
job = job_store.get(job_id)
if not job:
yield "event: error\ndata: {\"message\": \"Job not found\"}\n\n"
break
yield f"event: progress\ndata: {json.dumps(job)}\n\n"
if job["status"] in (JobStatus.COMPLETED, JobStatus.FAILED):
break
await asyncio.sleep(1)
return StreamingResponse(event_generator(), media_type="text/event-stream")
PATCH et mises à jour partielles#
HTTP PATCH est prévu pour les mises à jour partielles. Mais le format du corps de la requête PATCH n’est pas standardisé par HTTP — il dépend du type MIME. Deux standards existent.
JSON Patch — RFC 6902#
JSON Patch représente les modifications comme une liste d’opérations atomiques.
PATCH /api/v2/users/42
Content-Type: application/json-patch+json
[
{"op": "replace", "path": "/name", "value": "Alice Martin"},
{"op": "add", "path": "/tags/-", "value": "premium"},
{"op": "remove", "path": "/legacy_id"},
{"op": "move", "path": "/nickname", "from": "/alias"},
{"op": "copy", "path": "/display_name", "from": "/name"},
{"op": "test", "path": "/version", "value": 5}
]
Les opérations :
add : ajoute une valeur à un chemin (crée les nœuds intermédiaires, ou ajoute en fin de tableau si
-)remove : supprime la valeur au chemin
replace : équivalent à remove + add (le chemin doit exister)
move : déplace la valeur de
fromverspathcopy : copie la valeur de
fromverspathtest : vérifie que la valeur au chemin correspond à
value— si non, tout le patch échoue (atomicité)
L’opération test permet l’optimistic locking : vérifier la version avant de modifier.
JSON Merge Patch — RFC 7396#
JSON Merge Patch est plus simple : on envoie un objet JSON partiel. Les champs présents remplacent les champs existants. Les champs avec valeur null sont supprimés. Les champs absents sont inchangés.
PATCH /api/v2/users/42
Content-Type: application/merge-patch+json
{
"name": "Alice Martin",
"bio": null,
"preferences": {"theme": "dark"}
}
Résultat : name est mis à jour, bio est supprimé, preferences.theme est mis à jour sans toucher aux autres clés de preferences.
Limite : on ne peut pas mettre une valeur à null (null = suppression). On ne peut pas opérer sur des éléments individuels d’un tableau.
Comparaison et recommandations#
JSON Patch vs JSON Merge Patch
JSON Merge Patch est plus lisible et suffit pour 80% des cas. Privilégiez-le pour les ressources simples.
JSON Patch est nécessaire pour : manipuler des tableaux (ajout/suppression d’éléments), l’optimistic locking via test, les modifications conditionnelles atomiques.
Bulk APIs#
Les opérations bulk permettent de traiter plusieurs ressources en un seul appel HTTP, réduisant la latence réseau pour les clients qui doivent créer ou modifier des dizaines ou centaines d’objets.
POST /batch#
POST /api/v2/batch
Content-Type: application/json
{
"requests": [
{"method": "POST", "path": "/api/v2/users", "body": {"name": "Alice"}},
{"method": "GET", "path": "/api/v2/users/42"},
{"method": "DELETE", "path": "/api/v2/users/99"}
]
}
PATCH /resources — mise à jour de collection#
PATCH /api/v2/products
Content-Type: application/json
[
{"id": "prod-1", "price": 29.99},
{"id": "prod-2", "price": 49.99},
{"id": "prod-3", "status": "archived"}
]
Multi-Status#
Les opérations bulk doivent gérer les succès partiels. Le code 207 Multi-Status (WebDAV, mais accepté en REST) permet de retourner un status par item :
HTTP/1.1 207 Multi-Status
Content-Type: application/json
{
"results": [
{"index": 0, "status": 201, "id": "usr-456"},
{"index": 1, "status": 422, "error": "email already exists"},
{"index": 2, "status": 201, "id": "usr-789"}
]
}
Idempotence des bulk
Les opérations bulk doivent être idempotentes si elles modifient des données. Utilisez un batch_id unique en header (Idempotency-Key) pour que le serveur puisse dépliquer les retry réseau.
File upload#
multipart/form-data#
L’upload standard combine des métadonnées et un fichier binaire dans un seul POST :
from fastapi import FastAPI, UploadFile, File, Form
@app.post("/api/v2/documents")
async def upload_document(
file: UploadFile = File(...),
title: str = Form(...),
folder_id: str = Form(...)
):
content = await file.read()
# Traitement du fichier...
return {"document_id": "doc-123", "size": len(content)}
Limite : le fichier passe par le serveur API, qui doit le lire en RAM ou le streamer. Impraticable pour des fichiers de plusieurs gigaoctets.
Direct upload — presigned URL S3#
La solution scalable contourne le serveur API : le client upload directement dans le stockage objet (S3, GCS).
import boto3
from datetime import timedelta
s3 = boto3.client("s3")
@app.post("/api/v2/documents/upload-url")
async def get_upload_url(filename: str, content_type: str):
"""
Génère une presigned URL pour upload direct vers S3.
Le serveur API n'est plus dans le chemin du fichier.
"""
key = f"uploads/{uuid.uuid4()}/{filename}"
presigned = s3.generate_presigned_post(
Bucket="my-bucket",
Key=key,
Fields={"Content-Type": content_type},
Conditions=[
{"Content-Type": content_type},
["content-length-range", 1, 100 * 1024 * 1024] # max 100 MB
],
ExpiresIn=3600
)
return {
"upload_url": presigned["url"],
"upload_fields": presigned["fields"],
"storage_key": key,
"expires_in": 3600
}
@app.post("/api/v2/documents/confirm")
async def confirm_upload(storage_key: str, title: str):
"""Après l'upload S3, le client confirme pour créer la ressource."""
# Vérifier que le fichier existe en S3
# Créer la ressource en BDD
return {"document_id": "doc-123", "storage_key": storage_key}
TUS protocol — uploads résumables#
Le protocole TUS (tus.io) standardise les uploads résumables. Si l’upload est interrompu, le client reprend depuis le dernier octet reçu.
Workflow :
POST /uploads→ créer la session, retourne l’URL de l’upload +Upload-Offset: 0PATCH /uploads/{id}→ envoyer un chunk (headerUpload-Offset,Content-Type: application/offset+octet-stream)Répéter jusqu’à ce que
Upload-Offset == Upload-LengthLe serveur notifie la complétion
TUS est supporté nativement par tus-js-client (navigateur), tusd (serveur Go), et des plugins pour AWS S3/GCS.
API de recherche#
Paramètres de recherche vs endpoint dédié#
Pour une recherche simple (filtrage), des query params sur la collection suffisent :
GET /api/v2/users?name=alice&status=active&created_after=2024-01-01
Pour une recherche complexe (full-text, facettes, scoring, suggestions), un endpoint dédié est plus clair :
POST /api/v2/search
Content-Type: application/json
{
"query": "machine learning",
"filters": {"category": "books", "price_max": 50},
"facets": ["category", "author"],
"sort": {"field": "relevance"},
"page": {"limit": 20, "cursor": "eyJpZCI6MTIz"}
}
L’utilisation de POST pour la recherche est acceptable quand la requête est trop complexe pour tenir dans une URL.
Facettes#
Les facettes agrègent les résultats par dimension pour permettre le filtrage progressif :
{
"results": [...],
"facets": {
"category": [
{"value": "books", "count": 142},
{"value": "videos", "count": 87}
],
"price_range": [
{"value": "0-25", "count": 89},
{"value": "25-50", "count": 63}
]
},
"total": 229
}
Streaming de réponses#
Certaines ressources sont trop volumineuses pour tenir en mémoire ou pour être envoyées en une seule réponse. Le streaming découple la génération de la réponse de sa transmission.
Transfer-Encoding: chunked#
HTTP/1.1 supporte le chunked transfer encoding : le serveur envoie la réponse par morceaux sans connaître sa taille totale à l’avance.
StreamingResponse FastAPI#
import csv
import io
from fastapi.responses import StreamingResponse
@app.get("/api/v2/reports/export.csv")
async def export_csv():
"""Export CSV streamed — pas de limite de taille."""
async def generate_rows():
# En-tête CSV
yield "id,name,email,created_at\n"
# Les lignes sont lues et envoyées par batch
async for batch in fetch_users_by_batch(batch_size=1000):
for user in batch:
yield f'{user["id"]},{user["name"]},{user["email"]},{user["created_at"]}\n'
return StreamingResponse(
generate_rows(),
media_type="text/csv",
headers={"Content-Disposition": "attachment; filename=users.csv"}
)
NDJSON streaming#
NDJSON (Newline-Delimited JSON) est idéal pour streamer des collections d’objets JSON :
import json
@app.get("/api/v2/events/stream")
async def stream_events():
async def generate():
async for event in fetch_events_stream():
yield json.dumps(event) + "\n"
return StreamingResponse(generate(), media_type="application/x-ndjson")
Chaque ligne est un document JSON valide, facilement parseable côté client avec readline().
API événementielles#
Poll vs push#
Poll : le client interroge régulièrement l’API pour détecter les changements. Simple à implémenter, mais inefficace (nombreuses requêtes vides).
Push : le serveur notifie le client quand un changement survient. Plus efficace, mais nécessite un canal persistant (webhook, SSE, WebSocket) ou une infrastructure de messagerie.
ETag pour les listes changeantes#
Le polling peut être rendu efficace avec les ETags. Le serveur retourne un ETag représentant l’état de la collection. Le client envoie ce tag dans sa prochaine requête :
GET /api/v2/notifications
ETag: "v2-1234567890"
→ HTTP 200 OK si les notifications ont changé
→ HTTP 304 Not Modified si rien de nouveau (pas de corps, très rapide)
Changes endpoints#
Certaines APIs exposent un endpoint de changements : retourne uniquement les ressources créées/modifiées/supprimées depuis un cursor donné.
GET /api/v2/users/changes?since=2024-11-15T14:00:00Z&limit=100
{
"changes": [
{"type": "created", "resource": {...}, "timestamp": "2024-11-15T14:05:00Z"},
{"type": "updated", "resource": {...}, "timestamp": "2024-11-15T14:07:00Z"},
{"type": "deleted", "id": "usr-99", "timestamp": "2024-11-15T14:10:00Z"}
],
"next_cursor": "2024-11-15T14:10:00Z",
"has_more": false
}
Cursor vs timestamp
Utilisez un cursor opaque plutôt qu’un timestamp pour les changes endpoints. Les timestamps ont des problèmes d’horloge (clock skew) et de granularité. Un cursor peut encoder l’ID du dernier événement traité, ce qui est exact et atomique.
Cellules exécutables#
Implémentation JSON Patch RFC 6902#
import json
import copy
from typing import Any
def json_pointer_get(doc: Any, pointer: str) -> tuple[Any, str]:
"""Navigue dans doc en suivant le JSON Pointer (RFC 6901). Retourne (parent, key)."""
if pointer == "":
return None, ""
parts = pointer.lstrip("/").split("/")
# Décoder les caractères spéciaux RFC 6901
parts = [p.replace("~1", "/").replace("~0", "~") for p in parts]
parent = doc
for part in parts[:-1]:
if isinstance(parent, list):
parent = parent[int(part)]
else:
parent = parent[part]
return parent, parts[-1]
def apply_json_patch(doc: dict, patch: list[dict]) -> dict:
"""
Applique une liste d'opérations JSON Patch (RFC 6902) à un document.
Retourne le document modifié. L'original n'est pas muté.
Lève une ValueError si une opération test échoue.
"""
result = copy.deepcopy(doc)
for op_index, op in enumerate(patch):
operation = op["op"]
path = op.get("path", "")
value = op.get("value")
from_path = op.get("from", "")
if operation == "test":
parent, key = json_pointer_get(result, path)
target = parent[int(key)] if isinstance(parent, list) else parent[key]
if target != value:
raise ValueError(
f"Test failed at op {op_index}: path='{path}', "
f"expected={value!r}, got={target!r}"
)
elif operation == "add":
parent, key = json_pointer_get(result, path)
if isinstance(parent, list):
if key == "-":
parent.append(value)
else:
parent.insert(int(key), value)
else:
parent[key] = value
elif operation == "remove":
parent, key = json_pointer_get(result, path)
if isinstance(parent, list):
parent.pop(int(key))
else:
del parent[key]
elif operation == "replace":
parent, key = json_pointer_get(result, path)
if isinstance(parent, list):
parent[int(key)] = value
else:
if key not in parent:
raise ValueError(f"Replace: path '{path}' does not exist")
parent[key] = value
elif operation == "move":
src_parent, src_key = json_pointer_get(result, from_path)
val = src_parent.pop(src_key) if not isinstance(src_parent, list) else src_parent.pop(int(src_key))
dst_parent, dst_key = json_pointer_get(result, path)
if isinstance(dst_parent, list):
dst_parent.insert(int(dst_key) if dst_key != "-" else len(dst_parent), val)
else:
dst_parent[dst_key] = val
elif operation == "copy":
src_parent, src_key = json_pointer_get(result, from_path)
val = copy.deepcopy(src_parent[int(src_key)] if isinstance(src_parent, list) else src_parent[src_key])
dst_parent, dst_key = json_pointer_get(result, path)
if isinstance(dst_parent, list):
dst_parent.insert(int(dst_key) if dst_key != "-" else len(dst_parent), val)
else:
dst_parent[dst_key] = val
return result
# --- Démo ---
original = {
"id": 42,
"name": "Alice Dupont",
"email": "alice@example.com",
"tags": ["user", "trial"],
"address": {"city": "Paris", "zip": "75001"},
"version": 3
}
patch = [
{"op": "test", "path": "/version", "value": 3}, # vérification
{"op": "replace", "path": "/name", "value": "Alice Martin"},
{"op": "add", "path": "/tags/-", "value": "premium"}, # ajout en fin
{"op": "remove", "path": "/tags/0"}, # supprime "user"
{"op": "replace", "path": "/address/city", "value": "Lyon"},
{"op": "copy", "path": "/display_name", "from": "/name"},
{"op": "replace", "path": "/version", "value": 4},
]
result = apply_json_patch(original, patch)
print("=== Document original ===")
print(json.dumps(original, indent=2, ensure_ascii=False))
print("\n=== Après JSON Patch ===")
print(json.dumps(result, indent=2, ensure_ascii=False))
# Test de l'opération test en échec
print("\n=== Test d'une opération test en échec ===")
try:
apply_json_patch(original, [{"op": "test", "path": "/version", "value": 99}])
except ValueError as e:
print(f"ValueError levée : {e}")
=== Document original ===
{
"id": 42,
"name": "Alice Dupont",
"email": "alice@example.com",
"tags": [
"user",
"trial"
],
"address": {
"city": "Paris",
"zip": "75001"
},
"version": 3
}
=== Après JSON Patch ===
{
"id": 42,
"name": "Alice Martin",
"email": "alice@example.com",
"tags": [
"trial",
"premium"
],
"address": {
"city": "Lyon",
"zip": "75001"
},
"version": 4,
"display_name": "Alice Martin"
}
=== Test d'une opération test en échec ===
ValueError levée : Test failed at op 0: path='/version', expected=99, got=3
Simulation long-running operation avec polling#
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
import seaborn as sns
import time
sns.set_theme(style="whitegrid", palette="muted", font_scale=1.0)
class JobStore:
"""Store de jobs simulé."""
def __init__(self):
self._jobs: dict[str, dict] = {}
self._counter = 0
def create(self, job_type: str) -> str:
self._counter += 1
job_id = f"job-{self._counter:04d}"
self._jobs[job_id] = {
"job_id": job_id,
"type": job_type,
"status": "pending",
"progress": 0,
"created_t": 0
}
return job_id
def advance(self, job_id: str, t: int):
job = self._jobs[job_id]
if job["status"] == "pending":
job["status"] = "running"
job["started_t"] = t
job["progress"] = min(100, job["progress"] + 12)
if job["progress"] >= 100:
job["status"] = "completed"
job["completed_t"] = t
def get(self, job_id: str) -> dict:
return self._jobs[job_id]
store = JobStore()
# Simulation de 3 clients qui soumettent des jobs et pollent
timeline = [] # (t, client, event, job_id, status)
# Client A : soumet à t=0, poll toutes les 2s
# Client B : soumet à t=3, poll toutes les 3s
# Client C : soumet à t=5, poll toutes les 5s
scenarios = [
("Client A", 0, 2, "rapport-annuel"),
("Client B", 3, 3, "export-csv"),
("Client C", 5, 5, "migration-BDD"),
]
job_ids = {}
for client, start_t, poll_interval, job_type in scenarios:
jid = store.create(job_type)
job_ids[client] = jid
timeline.append((start_t, client, "submit", jid, "pending"))
MAX_T = 20
for t in range(MAX_T + 1):
for client, start_t, poll_interval, _ in scenarios:
jid = job_ids[client]
job = store.get(jid)
if t < start_t or job["status"] == "completed":
continue
store.advance(jid, t)
if (t - start_t) % poll_interval == 0:
status = store.get(jid)["status"]
progress = store.get(jid)["progress"]
timeline.append((t, client, "poll", jid, f"{status} {progress}%"))
# Visualisation
fig, ax = plt.subplots(figsize=(13, 5))
client_y = {"Client A": 2.5, "Client B": 1.5, "Client C": 0.5}
colors_map = {"submit": "#4c72b0", "poll": "#55a868"}
for t, client, event, jid, status in timeline:
y = client_y[client]
color = colors_map[event]
marker = "^" if event == "submit" else "o"
ax.scatter(t, y, color=color, marker=marker, s=80, zorder=3)
label = f"{'▶' if event == 'submit' else '?'} {status}"
ax.text(t, y + 0.12, label, ha="center", fontsize=6.5, color=color)
for client, y in client_y.items():
ax.axhline(y=y, color="lightgray", linewidth=1, zorder=1)
ax.text(-0.5, y, client, ha="right", va="center", fontsize=9, fontweight="bold")
legend_patches = [
mpatches.Patch(color="#4c72b0", label="Soumission (POST → 202)"),
mpatches.Patch(color="#55a868", label="Poll (GET job status)"),
]
ax.legend(handles=legend_patches, loc="upper right")
ax.set_xlabel("Temps simulé (secondes)")
ax.set_title("Pattern polling — 3 clients, long-running operations")
ax.set_xlim(-1, MAX_T + 1)
ax.set_ylim(0, 3.5)
ax.set_yticks([])
plt.show()
Comparaison JSON Patch vs JSON Merge Patch#
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
import seaborn as sns
sns.set_theme(style="whitegrid", font_scale=0.95)
fig, ax = plt.subplots(figsize=(13, 7))
ax.axis("off")
headers = ["Critère", "JSON Patch (RFC 6902)", "JSON Merge Patch (RFC 7396)"]
rows = [
("Content-Type", "application/json-patch+json", "application/merge-patch+json"),
("Format", "Tableau d'opérations", "Objet JSON partiel"),
("Lisibilité", "⚠ Verbeux", "✓ Intuitif"),
("Manipulation de tableau", "✓ add/remove sur index précis", "✗ Remplace tout le tableau"),
("Supprimer un champ", "remove op", "Mettre null"),
("Mettre une valeur null", "✓ Possible (replace, value: null)", "✗ Null = suppression"),
("Optimistic locking", "✓ Opération test", "✗ Non supporté"),
("Opérations atomiques", "✓ Tout ou rien", "✓ Implicitement atomique"),
("Complexité serveur", "⚠ Implémentation non triviale", "✓ Simple (merge récursif)"),
("Cas d'usage principal", "Patches complexes, arrays", "Mises à jour simples"),
]
col_widths = [3.0, 4.5, 4.5]
x_offsets = [0.2, 3.4, 8.1]
row_h = 0.5
# En-têtes
header_colors = ["#d9d9d9", "#aec7e8", "#ffbb78"]
for col_idx, (header, x, w) in enumerate(zip(headers, x_offsets, col_widths)):
rect = mpatches.FancyBboxPatch(
(x, len(rows) * row_h + 0.1), w, 0.55,
boxstyle="round,pad=0.04",
facecolor=header_colors[col_idx], edgecolor="#555555", linewidth=1.2
)
ax.add_patch(rect)
ax.text(x + w / 2, len(rows) * row_h + 0.38, header,
ha="center", va="center", fontsize=9, fontweight="bold")
# Lignes
for row_idx, row in enumerate(rows):
y = (len(rows) - row_idx - 1) * row_h + 0.1
bg = "#f7f7f7" if row_idx % 2 == 0 else "white"
for col_idx, (cell, x, w) in enumerate(zip(row, x_offsets, col_widths)):
rect = mpatches.FancyBboxPatch(
(x, y), w, row_h - 0.06,
boxstyle="round,pad=0.03",
facecolor=bg, edgecolor="#cccccc", linewidth=0.7
)
ax.add_patch(rect)
ax.text(x + 0.15, y + (row_h - 0.06) / 2, cell,
ha="left", va="center", fontsize=8.5)
ax.set_xlim(0, 13)
ax.set_ylim(0, len(rows) * row_h + 1.0)
ax.set_title("JSON Patch vs JSON Merge Patch — comparaison", fontsize=12,
fontweight="bold", pad=8)
plt.show()
Simulation chunked upload TUS — diagramme de séquence#
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
import seaborn as sns
sns.set_theme(style="whitegrid", font_scale=0.95)
fig, ax = plt.subplots(figsize=(13, 8))
ax.axis("off")
ax.set_xlim(0, 13)
ax.set_ylim(0, 8.5)
# Acteurs
actors = [("Client", 2), ("Serveur TUS", 7), ("Stockage\nobjet", 11)]
actor_y = 8.0
for name, x in actors:
rect = mpatches.FancyBboxPatch(
(x - 0.9, actor_y - 0.25), 1.8, 0.55,
boxstyle="round,pad=0.08",
facecolor="#aec7e8", edgecolor="#555555", linewidth=1.5
)
ax.add_patch(rect)
ax.text(x, actor_y + 0.02, name, ha="center", va="center",
fontsize=9, fontweight="bold")
# Ligne de vie
ax.axvline(x=x, color="#aaaaaa", linewidth=1, linestyle="--",
ymin=0.0, ymax=(actor_y - 0.3) / 8.5)
# Messages (y, x_from, x_to, label, color, direction)
messages = [
(7.2, 2, 7, "POST /uploads (Upload-Length: 10MB, filename: video.mp4)", "#4c72b0", "→"),
(6.7, 7, 2, "201 Created Location: /uploads/abc123 Upload-Offset: 0", "#2ca02c", "←"),
(6.1, 2, 7, "PATCH /uploads/abc123 (chunk 1/5, Upload-Offset: 0)", "#4c72b0", "→"),
(5.6, 7, 2, "204 No Content Upload-Offset: 2097152 (2 MB reçus)", "#2ca02c", "←"),
(5.1, 2, 7, "PATCH /uploads/abc123 (chunk 2/5, Upload-Offset: 2097152)", "#4c72b0", "→"),
(4.6, 7, 2, "204 No Content Upload-Offset: 4194304", "#2ca02c", "←"),
(4.2, 2, 2, "⚡ Interruption réseau", "#d62728", "•"),
(3.7, 2, 7, "HEAD /uploads/abc123 (où en est l'upload ?)", "#ff7f0e", "→"),
(3.2, 7, 2, "200 OK Upload-Offset: 4194304 (reprise depuis 4 MB)", "#2ca02c", "←"),
(2.7, 2, 7, "PATCH /uploads/abc123 (chunk 3/5, Upload-Offset: 4194304)", "#4c72b0", "→"),
(2.1, 7, 11,"PUT /bucket/abc123 (stream final vers stockage)", "#9467bd", "→"),
(1.6, 11, 7,"200 OK (fichier stocké)", "#55a868", "←"),
(1.1, 7, 2, "204 No Content Upload-Offset: 10485760 (complet)", "#2ca02c", "←"),
]
for y, x1, x2, label, color, direction in messages:
if direction == "•":
ax.scatter(x1, y, color=color, s=100, zorder=3)
ax.text(x1 + 0.2, y, label, color=color, fontsize=8, va="center")
continue
arrow_x1 = x1 + (0.1 if x1 < x2 else -0.1)
arrow_x2 = x2 - (0.1 if x1 < x2 else -0.1)
ax.annotate("", xy=(arrow_x2, y), xytext=(arrow_x1, y),
arrowprops=dict(arrowstyle="->", color=color, lw=1.3))
mid_x = (x1 + x2) / 2
ax.text(mid_x, y + 0.12, label, ha="center", va="bottom",
fontsize=7.5, color=color)
ax.set_title("Upload résumable TUS — diagramme de séquence avec reprise", fontsize=11,
fontweight="bold", pad=8)
plt.show()
Résumé#
Ce chapitre a couvert les patterns qui comblent les lacunes du CRUD standard.
Long-running operations : le pattern polling (
202 Accepted+Location+Retry-After) est la solution REST idiomatique. Le pattern webhook est plus efficace quand le client peut recevoir des appels entrants. SSE convient pour le suivi en temps réel dans un navigateur.PATCH : JSON Merge Patch (RFC 7396) est suffisant pour 80% des cas de mise à jour partielle. JSON Patch (RFC 6902) est nécessaire pour manipuler des tableaux, mettre une valeur à
null, ou implémenter l’optimistic locking via l’opérationtest.Bulk APIs : le code
207 Multi-Statusgère les succès partiels. L”Idempotency-Keyheader protège contre les retry réseau.File upload : les presigned URLs S3 déchargent le serveur API pour les gros fichiers. Le protocole TUS standardise les uploads résumables pour les fichiers très volumineux ou les connexions instables.
Streaming :
StreamingResponseFastAPI avec NDJSON est la solution la plus simple pour streamer des collections.Transfer-Encoding: chunkedest transparent pour le code applicatif.Changes endpoints : préférez un cursor opaque à un timestamp pour les endpoints de changements, afin d’éviter les problèmes de clock skew et de granularité.