Chapitre 3 — Sécurité des APIs#
La sécurité d’une API ne se réduit pas à l’authentification. Elle couvre la conception des ressources, la validation des entrées, le contrôle du débit, la protection des données sensibles et l’architecture globale du service. Ce chapitre s’appuie sur l’OWASP API Security Top 10 (2023) et présente des mitigations concrètes pour chaque catégorie.
OWASP API Security Top 10 — 2023#
L’OWASP (Open Web Application Security Project) maintient une liste des 10 risques les plus critiques pour les APIs. La version 2023 reflète l’évolution des architectures microservices et API-first.
# |
Catégorie |
Description synthétique |
|---|---|---|
API1 |
Broken Object Level Authorization |
Accès à des objets d’autres utilisateurs via manipulation d’ID |
API2 |
Broken Authentication |
Authentification faible, tokens prévisibles, rotation absente |
API3 |
Broken Object Property Level Authorization |
Exposition/modification de propriétés d’objets non autorisées |
API4 |
Unrestricted Resource Consumption |
Absence de rate limiting, requêtes coûteuses non limitées |
API5 |
Broken Function Level Authorization |
Accès à des fonctions admin via manipulation de rôles/URLs |
API6 |
Unrestricted Access to Sensitive Business Flows |
Abus de flux métier légitimes (scraping, spam, fraude) |
API7 |
Server-Side Request Forgery (SSRF) |
Forçage du serveur à effectuer des requêtes vers des cibles internes |
API8 |
Security Misconfiguration |
Headers de sécurité absents, CORS trop permissif, debug activé |
API9 |
Improper Inventory Management |
APIs non documentées, versions obsolètes accessibles |
API10 |
Unsafe Consumption of APIs |
Confiance aveugle dans les APIs tierces intégrées |
Ce classement n’est pas un ordre de priorité absolu : API1 (BOLA) est la vulnérabilité la plus fréquemment exploitée dans les audits réels.
Injection#
Les APIs sont exposées aux mêmes risques d’injection que les applications web, souvent amplifiés par la surface d’attaque plus large des formats acceptés (JSON, XML, multipart).
SQL Injection via API#
# VULNÉRABLE
@app.get("/api/v1/users")
async def search_users(name: str):
query = f"SELECT * FROM users WHERE name LIKE '%{name}%'" # Injection directe
return db.execute(query)
# CORRECT — paramètres liés (parameterized queries)
@app.get("/api/v1/users")
async def search_users(name: str):
return db.execute(
"SELECT * FROM users WHERE name LIKE :pattern",
{"pattern": f"%{name}%"}
)
NoSQL Injection#
Les bases NoSQL (MongoDB) sont vulnérables à des injections via les opérateurs de requête. Un payload JSON {"username": {"$gt": ""}} peut contourner une authentification naïve.
Mitigation : valider le type des champs (Pydantic interdit les objets là où une string est attendue).
Command Injection#
import subprocess
# VULNÉRABLE
@app.post("/api/v1/convert")
async def convert_image(filename: str):
result = subprocess.run(f"convert {filename} output.png", shell=True)
# CORRECT — liste de paramètres, pas de shell=True
@app.post("/api/v1/convert")
async def convert_image(filename: str):
if not re.match(r'^[\w\-]+\.(jpg|png|gif)$', filename):
raise HTTPException(status_code=400, detail="Nom de fichier invalide")
result = subprocess.run(["convert", filename, "output.png"], check=True)
Validation de schéma comme première ligne de défense#
from pydantic import BaseModel, Field, field_validator
import re
class CreateUserRequest(BaseModel):
username: str = Field(min_length=3, max_length=50, pattern=r'^[a-zA-Z0-9_\-]+$')
email: str = Field(max_length=254)
age: int = Field(ge=0, le=150)
@field_validator("email")
@classmethod
def validate_email(cls, v: str) -> str:
if not re.match(r'^[^@]+@[^@]+\.[^@]+$', v):
raise ValueError("Format email invalide")
return v.lower()
Pydantic rejettera automatiquement les champs supplémentaires si model_config = ConfigDict(extra="forbid") est défini.
Rate Limiting et throttling#
Le rate limiting protège l’API contre les abus, les attaques par force brute, le scraping massif, et les surcharges involontaires.
Stratégies de limitation#
Par IP — protection basique contre les sources malveillantes. Contournable via des proxies.
Par utilisateur authentifié — plus précis que l’IP, adapté aux APIs avec authentification.
Par API key — granularité par client ; permet des quotas différenciés par tier (freemium vs payant).
Par endpoint — les endpoints coûteux (recherche full-text, génération de rapport) méritent des limites plus strictes.
Par tenant — dans les architectures SaaS multi-tenant, chaque tenant a son propre quota.
Algorithme Token Bucket#
Le token bucket est l’algorithme le plus répandu pour le rate limiting. Chaque client possède un seau de jetons qui se remplit à un taux constant. Chaque requête consomme un jeton. Si le seau est vide, la requête est rejetée.
from fastapi import FastAPI, Request, HTTPException
import time
app = FastAPI()
class TokenBucket:
def __init__(self, capacity: int, refill_rate: float):
self.capacity = capacity # Nombre max de jetons
self.refill_rate = refill_rate # Jetons ajoutés par seconde
self.tokens = capacity
self.last_refill = time.monotonic()
def consume(self, tokens: int = 1) -> tuple[bool, int]:
"""Retourne (autorisé, jetons_restants)."""
now = time.monotonic()
elapsed = now - self.last_refill
# Remplissage proportionnel au temps écoulé
self.tokens = min(self.capacity, self.tokens + elapsed * self.refill_rate)
self.last_refill = now
if self.tokens >= tokens:
self.tokens -= tokens
return True, int(self.tokens)
return False, 0
Algorithme Sliding Window#
Le sliding window compte les requêtes dans une fenêtre temporelle glissante, évitant l’effet « burst en début de fenêtre fixe » du fixed window counter.
Réponse 429 Too Many Requests#
from fastapi.responses import JSONResponse
@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
client_id = request.headers.get("X-API-Key") or request.client.host
bucket = get_bucket(client_id) # récupère ou crée un bucket par client
allowed, remaining = bucket.consume()
if not allowed:
reset_time = int(time.time()) + 60
return JSONResponse(
status_code=429,
content={"detail": "Trop de requêtes"},
headers={
"X-RateLimit-Limit": str(bucket.capacity),
"X-RateLimit-Remaining": "0",
"X-RateLimit-Reset": str(reset_time),
"Retry-After": "60",
}
)
response = await call_next(request)
response.headers["X-RateLimit-Remaining"] = str(remaining)
return response
Broken Authentication — révocation et rotation#
Rotation des secrets#
Les secrets (clés de signature JWT, client secrets OAuth) doivent être rotatifs sans interruption de service :
Générer une nouvelle clé (clé N+1)
Configurer le serveur pour accepter les tokens signés avec N et N+1
Émettre tous les nouveaux tokens avec N+1
Après expiration de tous les tokens signés avec N, retirer la clé N
Révocation de tokens JWT#
JWT étant stateless, la révocation nécessite une liste de refus (blacklist) stockée côté serveur. Le claim jti (JWT ID) permet d’identifier individuellement les tokens.
import time
from collections import defaultdict
class JTIBlacklist:
def __init__(self):
self._store: dict[str, float] = {} # jti -> expiration timestamp
def revoke(self, jti: str, exp: float) -> None:
self._store[jti] = exp
def is_revoked(self, jti: str) -> bool:
if jti not in self._store:
return False
# Token expiré naturellement → peut être retiré de la blacklist
if self._store[jti] < time.time():
del self._store[jti]
return False
return True
def cleanup_expired(self) -> int:
"""Nettoie les tokens expirés — à appeler périodiquement."""
now = time.time()
expired = [jti for jti, exp in self._store.items() if exp < now]
for jti in expired:
del self._store[jti]
return len(expired)
Tokens de courte durée#
La meilleure défense contre les tokens volés est une durée de vie courte. Un access token qui expire dans 15 minutes limite la fenêtre d’exploitation à 15 minutes même si il est compromis.
Durées recommandées
Access token : 15 minutes à 1 heure selon la sensibilité
Refresh token : 7 à 30 jours, révocable côté serveur
API Key : pas d’expiration intégrée — prévoir une rotation trimestrielle et un mécanisme de révocation immédiate
Mass Assignment#
La mass assignment survient quand une API accepte aveuglément tous les champs d’une requête et les applique à un objet en base, permettant à un utilisateur de modifier des champs qu’il ne devrait pas contrôler (is_admin, balance, role).
Le problème#
# VULNÉRABLE — accepte tous les champs
@app.put("/api/v1/users/{user_id}")
async def update_user(user_id: int, body: dict, user=Depends(get_current_user)):
db.update("users", user_id, body) # body peut contenir is_admin=True !
Protection par allowlist avec Pydantic#
from pydantic import BaseModel, ConfigDict
from typing import Optional
# Modèle de mise à jour — uniquement les champs modifiables par l'utilisateur
class UserUpdateRequest(BaseModel):
model_config = ConfigDict(extra="forbid") # Rejet des champs inconnus
display_name: Optional[str] = Field(None, max_length=100)
bio: Optional[str] = Field(None, max_length=500)
avatar_url: Optional[str] = None
# Modèle interne complet — jamais exposé directement
class UserInDB(BaseModel):
id: str
email: str
display_name: str
is_admin: bool = False
created_at: datetime
# ...
@app.patch("/api/v1/users/{user_id}")
async def update_user(user_id: str, body: UserUpdateRequest, user=Depends(get_current_user)):
if user_id != user.sub:
raise HTTPException(status_code=403)
# Seuls les champs de UserUpdateRequest sont appliqués
updates = body.model_dump(exclude_none=True)
return db.update_user(user_id, updates)
extra= »forbid » vs extra= »ignore »
extra="forbid" retourne une erreur 422 si des champs inconnus sont présents — recommandé pour les APIs strictes. extra="ignore" les silencieusement supprime — plus permissif. Ne jamais utiliser le comportement par défaut extra="allow" sur les endpoints d’écriture.
Exposition de données sensibles#
Champs à masquer systématiquement#
Mots de passe (même hashés — ne jamais les retourner)
Tokens, secrets, clés API
Numéros de carte bancaire complets (PCI DSS : afficher seulement les 4 derniers chiffres)
Numéros de sécurité sociale, données de santé
Adresses IP internes, noms d’hôtes internes (dans les messages d’erreur)
Modèles de réponse distincts#
from pydantic import BaseModel
class UserInDB(BaseModel):
id: str
email: str
password_hash: str # NE JAMAIS retourner
api_key_hash: str # NE JAMAIS retourner
role: str
class UserResponse(BaseModel):
id: str
email: str
role: str
# password_hash et api_key_hash sont absents par construction
@app.get("/api/v1/users/{user_id}", response_model=UserResponse)
async def get_user(user_id: str):
user_db = db.get_user(user_id) # UserInDB
return UserResponse.model_validate(user_db)
# FastAPI filtre automatiquement via response_model
Logging sécurisé#
import re
SENSITIVE_PATTERNS = [
(r'("password"\s*:\s*)"[^"]*"', r'\1"[MASQUÉ]"'),
(r'("token"\s*:\s*)"[^"]*"', r'\1"[MASQUÉ]"'),
(r'("api_key"\s*:\s*)"[^"]*"', r'\1"[MASQUÉ]"'),
(r'Authorization:\s*Bearer\s+\S+', 'Authorization: Bearer [MASQUÉ]'),
]
def sanitize_for_log(data: str) -> str:
for pattern, replacement in SENSITIVE_PATTERNS:
data = re.sub(pattern, replacement, data, flags=re.IGNORECASE)
return data
Server-Side Request Forgery (SSRF)#
Le SSRF survient quand une API permet à un attaquant de forcer le serveur à effectuer des requêtes vers des cibles arbitraires — notamment les services internes (metadata AWS, bases de données, services d’administration).
Vecteur d’attaque typique#
POST /api/v1/webhooks
{
"url": "http://169.254.169.254/latest/meta-data/iam/security-credentials/",
"event": "order.created"
}
Si le serveur effectue naïvement une requête vers l’URL fournie, l’attaquant récupère les credentials IAM de l’instance EC2.
Mitigation#
import ipaddress, urllib.parse, re
from fastapi import HTTPException
SSRF_BLOCKED_RANGES = [
ipaddress.ip_network("10.0.0.0/8"),
ipaddress.ip_network("172.16.0.0/12"),
ipaddress.ip_network("192.168.0.0/16"),
ipaddress.ip_network("127.0.0.0/8"),
ipaddress.ip_network("169.254.0.0/16"), # AWS metadata, lien-local
ipaddress.ip_network("::1/128"),
ipaddress.ip_network("fc00::/7"),
]
WEBHOOK_ALLOWED_DOMAINS = re.compile(
r'^([\w\-]+\.)?example\.com$|^([\w\-]+\.)?trusted-partner\.com$'
)
def validate_webhook_url(url: str) -> str:
parsed = urllib.parse.urlparse(url)
if parsed.scheme not in ("https",): # HTTPS uniquement
raise HTTPException(status_code=400, detail="Seul HTTPS est accepté")
host = parsed.hostname
if not host:
raise HTTPException(status_code=400, detail="URL invalide")
# Allowlist de domaines (le plus strict)
if not WEBHOOK_ALLOWED_DOMAINS.match(host):
raise HTTPException(status_code=400, detail="Domaine non autorisé")
# Résolution DNS → vérification de l'IP réelle
try:
import socket
ip = socket.gethostbyname(host)
ip_obj = ipaddress.ip_address(ip)
for blocked in SSRF_BLOCKED_RANGES:
if ip_obj in blocked:
raise HTTPException(status_code=400, detail="IP interne non autorisée")
except socket.gaierror:
raise HTTPException(status_code=400, detail="Hôte non résolvable")
return url
DNS rebinding
La vérification DNS doit être effectuée au moment de la résolution et au moment de la connexion effective. Un attaquant peut faire pointer un DNS vers une IP externe au moment de la vérification, puis le rebinder vers une IP interne au moment de la requête. Des bibliothèques comme ssrf-filter (Node.js) ou une résolution DNS custom en Python gèrent ce cas.
Checklist de sécurité API#
Une checklist opérationnelle pour les revues de code et les audits :
Authentification et tokens
Tous les endpoints sensibles nécessitent une authentification
Tokens avec expiration courte (access ≤ 1h)
Refresh tokens révocables côté serveur
Rotation des secrets de signature prévue
jtiblacklist pour la révocation de tokens individuels
Autorisation
Vérification d’appartenance à chaque accès à un objet (anti-BOLA)
Vérification des permissions fonctionnelles (anti-BFLA)
Principe de moindre privilège sur les scopes OAuth
Séparation des modèles de lecture et d’écriture (anti-mass assignment)
Validation des entrées
Schémas Pydantic stricts sur tous les endpoints d’écriture (
extra="forbid")Requêtes paramétrées (pas de concaténation de chaînes SQL)
Validation et assainissement des URLs (anti-SSRF)
Taille maximale des corps de requête limitée
Rate limiting
Rate limiting activé sur tous les endpoints publics
Limites renforcées sur les endpoints d’authentification (anti-brute force)
Headers
X-RateLimit-*etRetry-Afterdans les réponses 429
En-têtes HTTP
Strict-Transport-Securityavec durée ≥ 1 anX-Content-Type-Options: nosniffCORS configuré avec liste blanche d’origines (pas de
*avec credentials)Headers de sécurité ajoutés via middleware centralisé
Données sensibles
Mots de passe et tokens absents des réponses API
Logging sans données sensibles
Secrets en variables d’environnement (jamais dans le code)
Données PII minimales dans les tokens JWT
Inventaire
Toutes les versions d’API documentées
Versions dépréciées avec date de fin de vie annoncée
Pas d’endpoints de debug ou d’administration exposés publiquement
Cellules d’analyse et de visualisation#
Token Bucket — simulation et visualisation#
import time
import random
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
import seaborn as sns
sns.set_theme(style="whitegrid", palette="muted", font_scale=1.1)
class TokenBucket:
def __init__(self, capacity: int, refill_rate: float):
self.capacity = capacity
self.refill_rate = refill_rate
self.tokens = float(capacity)
self.last_refill = 0.0
def consume(self, now: float, tokens: int = 1) -> tuple[bool, float]:
elapsed = now - self.last_refill
self.tokens = min(self.capacity, self.tokens + elapsed * self.refill_rate)
self.last_refill = now
if self.tokens >= tokens:
self.tokens -= tokens
return True, self.tokens
return False, self.tokens
# Simulation : 100 requêtes sur 30 secondes
capacity = 20
refill_rate = 1.0 # 1 token/seconde
bucket = TokenBucket(capacity, refill_rate)
times = []
token_levels = []
statuses = []
random.seed(42)
t = 0.0
for _ in range(80):
t += random.uniform(0.1, 0.8)
allowed, remaining = bucket.consume(t)
times.append(t)
token_levels.append(remaining)
statuses.append(allowed)
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(11, 7), sharex=True,
gridspec_kw={"height_ratios": [3, 1]})
# Niveau de tokens
ax1.plot(times, token_levels, color="#4C72B0", lw=2, label="Tokens restants")
ax1.axhline(y=capacity, color="#aaaaaa", linestyle="--", linewidth=1,
label=f"Capacité max ({capacity})")
ax1.fill_between(times, token_levels, alpha=0.15, color="#4C72B0")
ax1.set_ylabel("Tokens disponibles", fontsize=10)
ax1.set_ylim(-1, capacity + 3)
ax1.legend(fontsize=9)
ax1.set_title(f"Simulation Token Bucket — capacité={capacity}, refill={refill_rate} token/s",
fontsize=12, fontweight="bold")
# Statut des requêtes
for i, (t_val, ok) in enumerate(zip(times, statuses)):
color = "#55A868" if ok else "#C44E52"
ax2.axvline(x=t_val, color=color, lw=1.5, alpha=0.7)
ax2.set_yticks([])
ax2.set_xlabel("Temps (secondes)", fontsize=10)
ax2.set_ylabel("Requêtes", fontsize=9)
accepted = sum(statuses)
rejected = len(statuses) - accepted
ax2.set_title(
f"Statut requêtes : {accepted} acceptées (vert) / {rejected} rejetées (rouge)",
fontsize=9.5
)
plt.savefig("token_bucket_simulation.png", dpi=120, bbox_inches="tight")
plt.show()
print(f"Résultats de la simulation :")
print(f" Total requêtes : {len(statuses)}")
print(f" Acceptées : {accepted} ({accepted/len(statuses)*100:.1f}%)")
print(f" Rejetées : {rejected} ({rejected/len(statuses)*100:.1f}%)")
print(f" Taux de refill : {refill_rate} token/s → {refill_rate*60:.0f} requêtes/min en régime permanent")
Résultats de la simulation :
Total requêtes : 80
Acceptées : 53 (66.2%)
Rejetées : 27 (33.8%)
Taux de refill : 1.0 token/s → 60 requêtes/min en régime permanent
Sliding Window Rate Limiter#
import collections
import matplotlib.pyplot as plt
import seaborn as sns
sns.set_theme(style="whitegrid", palette="muted", font_scale=1.1)
class SlidingWindowRateLimiter:
"""
Sliding window counter — fenêtre glissante basée sur les timestamps des requêtes.
"""
def __init__(self, limit: int, window_seconds: float):
self.limit = limit
self.window = window_seconds
self._requests: collections.deque = collections.deque()
def is_allowed(self, now: float) -> tuple[bool, int]:
# Supprimer les requêtes hors de la fenêtre
cutoff = now - self.window
while self._requests and self._requests[0] <= cutoff:
self._requests.popleft()
count = len(self._requests)
if count < self.limit:
self._requests.append(now)
return True, self.limit - count - 1
return False, 0
@property
def current_count(self) -> int:
return len(self._requests)
# Comparaison Fixed Window vs Sliding Window
import random
random.seed(7)
limit = 10
window = 10.0
fixed_window = {"counts": collections.defaultdict(int)}
sliding = SlidingWindowRateLimiter(limit, window)
times_sim = []
fixed_allowed = []
sliding_allowed = []
t = 0.0
for _ in range(120):
t += random.uniform(0.05, 0.4)
times_sim.append(t)
# Fixed window
bucket_key = int(t // window)
can_fixed = fixed_window["counts"][bucket_key] < limit
if can_fixed:
fixed_window["counts"][bucket_key] += 1
fixed_allowed.append(can_fixed)
# Sliding window
can_sliding, _ = sliding.is_allowed(t)
sliding_allowed.append(can_sliding)
fig, axes = plt.subplots(2, 1, figsize=(12, 6), sharex=True)
for ax, allowed, title, color in zip(
axes,
[fixed_allowed, sliding_allowed],
["Fixed Window Counter", "Sliding Window Counter"],
["#4C72B0", "#DD8452"]
):
for t_val, ok in zip(times_sim, allowed):
ax.axvline(x=t_val, color=("#55A868" if ok else "#C44E52"), lw=1.2, alpha=0.6)
# Frontières des fenêtres fixes
for boundary in range(0, int(max(times_sim)) + 1, int(window)):
ax.axvline(x=boundary, color="#888888", lw=1.5, linestyle="--", alpha=0.5)
accepted = sum(allowed)
ax.set_title(f"{title} — {accepted}/{len(allowed)} requêtes acceptées", fontsize=10.5)
ax.set_yticks([])
axes[-1].set_xlabel("Temps (secondes)", fontsize=10)
fig.suptitle(
f"Comparaison Fixed Window vs Sliding Window (limite={limit} req / {window:.0f}s)",
fontsize=12, fontweight="bold"
)
plt.savefig("sliding_window_comparison.png", dpi=120, bbox_inches="tight")
plt.show()
# Analyser l'effet burst en début de fenêtre
print("Analyse : burst en fin/début de fenêtre fixe")
print(" Avec Fixed Window : un attaquant peut envoyer 10 req à t=9.9s")
print(" puis 10 req à t=10.1s → 20 req effectives en 0.2s")
print(" Sliding Window empêche ce pattern.")
Analyse : burst en fin/début de fenêtre fixe
Avec Fixed Window : un attaquant peut envoyer 10 req à t=9.9s
puis 10 req à t=10.1s → 20 req effectives en 0.2s
Sliding Window empêche ce pattern.
OWASP API Top 10 — Bubble Chart#
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
import numpy as np
import seaborn as sns
sns.set_theme(style="whitegrid", palette="muted", font_scale=1.0)
categories = [
"API1\nBOLA/IDOR",
"API2\nBroken Auth",
"API3\nProp. Auth",
"API4\nRes. Consumption",
"API5\nFunc. Auth",
"API6\nBiz. Flow",
"API7\nSSRF",
"API8\nMisconfiguration",
"API9\nInventaire",
"API10\nAPIs tierces",
]
# Impact (1-10), Fréquence (1-10), Facilité d'exploitation (taille bulle)
impact = [9, 8, 7, 6, 8, 7, 8, 6, 5, 6]
frequence = [9, 7, 7, 8, 6, 6, 5, 8, 7, 5]
exploitabilite = [9, 7, 6, 8, 7, 7, 6, 7, 7, 5]
palette = sns.color_palette("muted", 10)
fig, ax = plt.subplots(figsize=(11, 8))
for i, (cat, imp, freq, expl) in enumerate(
zip(categories, impact, frequence, exploitabilite)):
size = expl * 80
scatter = ax.scatter(freq, imp, s=size * 12, color=palette[i],
alpha=0.72, edgecolors="white", linewidth=1.5)
# Numéro OWASP
num = cat.split("\n")[0]
ax.text(freq, imp + 0.35, num, ha="center", va="bottom",
fontsize=8, fontweight="bold", color=palette[i])
# Nom court
short = cat.split("\n")[1]
ax.text(freq, imp - 0.35, short, ha="center", va="top",
fontsize=7.5, color="#555555")
# Quadrants
ax.axhline(y=7, color="#cccccc", linestyle="--", linewidth=1)
ax.axvline(x=7, color="#cccccc", linestyle="--", linewidth=1)
ax.text(3.0, 9.5, "Fréquent faible impact", fontsize=8, color="#aaaaaa", ha="center")
ax.text(8.5, 9.5, "ZONE CRITIQUE", fontsize=9, color="#C44E52",
ha="center", fontweight="bold",
bbox=dict(boxstyle="round,pad=0.2", facecolor="#FFEBEE", edgecolor="#C44E52", alpha=0.7))
ax.text(3.0, 5.5, "Rare faible impact", fontsize=8, color="#aaaaaa", ha="center")
ax.text(8.5, 5.5, "Impact élevé rare", fontsize=8, color="#888888", ha="center")
ax.set_xlim(3, 11)
ax.set_ylim(4, 10.5)
ax.set_xlabel("Fréquence (1=rare → 10=systématique)", fontsize=10)
ax.set_ylabel("Impact (1=faible → 10=critique)", fontsize=10)
ax.set_title("OWASP API Security Top 10 — 2023\nImpact × Fréquence (taille = facilité d'exploitation)",
fontsize=12, fontweight="bold", pad=10)
# Légende taille
for expl_val, label in [(5, "Faible"), (7, "Moyen"), (9, "Élevé")]:
ax.scatter([], [], s=expl_val * 120, color="#aaaaaa", alpha=0.6,
label=f"Exploitabilité : {label}")
ax.legend(title="Facilité d'exploitation", loc="lower right", fontsize=8.5)
plt.savefig("owasp_api_top10.png", dpi=120, bbox_inches="tight")
plt.show()
JTI Blacklist — simulation de révocation#
import time
import hashlib
import random
import json
from datetime import datetime, timezone
class JTIBlacklist:
"""
Blacklist de JWT IDs avec TTL automatique.
En production : Redis avec EXPIRE.
"""
def __init__(self):
self._store: dict[str, float] = {}
self._revoke_log: list[dict] = []
def revoke(self, jti: str, exp: float, reason: str = "manuel") -> None:
self._store[jti] = exp
self._revoke_log.append({
"jti": jti[:12] + "...",
"exp": datetime.fromtimestamp(exp).isoformat(),
"reason": reason,
"revoked_at": datetime.now().isoformat(),
})
def is_revoked(self, jti: str) -> bool:
if jti not in self._store:
return False
if self._store[jti] < time.time():
del self._store[jti] # Auto-nettoyage
return False # Expiré naturellement — plus dans la blacklist utile
return True
def cleanup(self) -> int:
now = time.time()
expired = [j for j, exp in self._store.items() if exp < now]
for j in expired:
del self._store[j]
return len(expired)
@property
def size(self) -> int:
return len(self._store)
def generate_jti() -> str:
return hashlib.sha256(str(random.random()).encode()).hexdigest()[:24]
# Simulation
bl = JTIBlacklist()
now = time.time()
# Génération de tokens simulés
tokens = []
for i in range(20):
jti = generate_jti()
exp = now + random.choice([300, 900, 3600, -100]) # certains déjà expirés
tokens.append({"jti": jti, "exp": exp, "user": f"user_{i:02d}"})
print("=== Simulation de révocation JTI ===\n")
# Révoquer quelques tokens
revoke_indices = [2, 5, 8, 12, 15]
for idx in revoke_indices:
t = tokens[idx]
bl.revoke(t["jti"], t["exp"], reason="logout_all_devices")
print(f" Révoqué : {t['jti'][:16]}... (user={t['user']})")
print(f"\n Blacklist size : {bl.size} tokens\n")
# Vérification de tous les tokens
print("=== Vérification des tokens ===")
for i, t in enumerate(tokens[:10]):
revoked = bl.is_revoked(t["jti"])
expired = t["exp"] < now
status = "RÉVOQUÉ" if revoked else ("EXPIRÉ" if expired else "VALIDE")
print(f" Token {i:02d} ({t['user']}) : {status}")
# Nettoyage
nettoyés = bl.cleanup()
print(f"\n Nettoyage : {nettoyés} tokens expirés retirés de la blacklist")
print(f" Blacklist size après nettoyage : {bl.size}")
print("\n=== Journal de révocations ===")
for entry in bl._revoke_log:
print(f" {entry['jti']} | {entry['reason']} | {entry['revoked_at']}")
=== Simulation de révocation JTI ===
Révoqué : 96cb0010ddcfc8d1... (user=user_02)
Révoqué : f99c4be1555fb3ae... (user=user_05)
Révoqué : 249db6c299e3020c... (user=user_08)
Révoqué : 1e19f9d7f5f9f51c... (user=user_12)
Révoqué : b0bf7847168a998c... (user=user_15)
Blacklist size : 5 tokens
=== Vérification des tokens ===
Token 00 (user_00) : EXPIRÉ
Token 01 (user_01) : EXPIRÉ
Token 02 (user_02) : RÉVOQUÉ
Token 03 (user_03) : VALIDE
Token 04 (user_04) : VALIDE
Token 05 (user_05) : RÉVOQUÉ
Token 06 (user_06) : VALIDE
Token 07 (user_07) : VALIDE
Token 08 (user_08) : RÉVOQUÉ
Token 09 (user_09) : EXPIRÉ
Nettoyage : 0 tokens expirés retirés de la blacklist
Blacklist size après nettoyage : 5
=== Journal de révocations ===
96cb0010ddcf... | logout_all_devices | 2026-03-26T10:46:33.296383
f99c4be1555f... | logout_all_devices | 2026-03-26T10:46:33.296410
249db6c299e3... | logout_all_devices | 2026-03-26T10:46:33.296428
1e19f9d7f5f9... | logout_all_devices | 2026-03-26T10:46:33.296442
b0bf7847168a... | logout_all_devices | 2026-03-26T10:46:33.296457
Résumé#
Ce chapitre a couvert les principaux vecteurs de vulnérabilité des APIs modernes et leurs mitigations :
BOLA/IDOR — la vulnérabilité la plus fréquente. La seule protection efficace est la vérification systématique d’appartenance en base de données, pas au niveau de l’application cliente. UUIDs pour réduire l’énumération, 404 plutôt que 403 pour ne pas confirmer l’existence.
Injection — parameterized queries sans exception, validation Pydantic stricte en entrée, jamais shell=True dans les appels système.
Rate limiting — token bucket pour les limites souples avec tolérance aux bursts, sliding window pour les limites strictes. Les headers X-RateLimit-* et Retry-After sont un contrat avec les clients.
Révocation de tokens — la blacklist jti avec TTL automatique est la solution minimale. En production : Redis avec EXPIRE pour la TTL native, et tâche planifiée de nettoyage.
Mass assignment — extra="forbid" dans Pydantic et des modèles de requête dédiés (jamais le modèle de base de données directement) éliminent le problème par construction.
SSRF — la liste blanche de domaines autorisés est plus sûre que la liste noire. La vérification DNS doit être faite au moment de la connexion pour résister au DNS rebinding.
Données sensibles — des modèles de réponse distincts du modèle de stockage et un middleware de sanitisation des logs sont les deux mécanismes complémentaires.