Chapitre 3 — Sécurité des APIs#

La sécurité d’une API ne se réduit pas à l’authentification. Elle couvre la conception des ressources, la validation des entrées, le contrôle du débit, la protection des données sensibles et l’architecture globale du service. Ce chapitre s’appuie sur l’OWASP API Security Top 10 (2023) et présente des mitigations concrètes pour chaque catégorie.

OWASP API Security Top 10 — 2023#

L’OWASP (Open Web Application Security Project) maintient une liste des 10 risques les plus critiques pour les APIs. La version 2023 reflète l’évolution des architectures microservices et API-first.

#

Catégorie

Description synthétique

API1

Broken Object Level Authorization

Accès à des objets d’autres utilisateurs via manipulation d’ID

API2

Broken Authentication

Authentification faible, tokens prévisibles, rotation absente

API3

Broken Object Property Level Authorization

Exposition/modification de propriétés d’objets non autorisées

API4

Unrestricted Resource Consumption

Absence de rate limiting, requêtes coûteuses non limitées

API5

Broken Function Level Authorization

Accès à des fonctions admin via manipulation de rôles/URLs

API6

Unrestricted Access to Sensitive Business Flows

Abus de flux métier légitimes (scraping, spam, fraude)

API7

Server-Side Request Forgery (SSRF)

Forçage du serveur à effectuer des requêtes vers des cibles internes

API8

Security Misconfiguration

Headers de sécurité absents, CORS trop permissif, debug activé

API9

Improper Inventory Management

APIs non documentées, versions obsolètes accessibles

API10

Unsafe Consumption of APIs

Confiance aveugle dans les APIs tierces intégrées

Ce classement n’est pas un ordre de priorité absolu : API1 (BOLA) est la vulnérabilité la plus fréquemment exploitée dans les audits réels.

Broken Object Level Authorization (BOLA / IDOR)#

Le BOLA (Broken Object Level Authorization), aussi appelé IDOR (Insecure Direct Object Reference), est la vulnérabilité la plus répandue dans les APIs. Elle survient quand un endpoint expose un identifiant d’objet sans vérifier que l’utilisateur courant est bien autorisé à y accéder.

Le problème#

# VULNÉRABLE — aucun contrôle d'appartenance
@app.get("/api/v1/orders/{order_id}")
async def get_order(order_id: int, user=Depends(get_current_user)):
    return db.get_order(order_id)  # N'importe quel utilisateur accède à n'importe quelle commande

Un attaquant authentifié peut parcourir séquentiellement les IDs (/orders/1, /orders/2, …) pour exfiltrer toutes les données.

Prévention#

UUID au lieu d’entiers séquentiels — ne résout pas le problème fondamental (un UUID peut fuiter) mais élimine l’énumération triviale :

import uuid

# En base : uuid DEFAULT gen_random_uuid() (PostgreSQL)
# Génération Python :
resource_id = str(uuid.uuid4())  # "550e8400-e29b-41d4-a716-446655440000"

Vérification d’appartenance systématique :

@app.get("/api/v1/orders/{order_id}")
async def get_order(order_id: str, user=Depends(get_current_user)):
    order = db.get_order(order_id)
    if not order:
        raise HTTPException(status_code=404)
    # CRITIQUE : vérification d'appartenance
    if order.user_id != user.sub and "admin" not in user.roles:
        # Retourner 404 plutôt que 403 pour ne pas confirmer l'existence
        raise HTTPException(status_code=404)
    return order

403 vs 404 pour BOLA

Retourner 404 Not Found plutôt que 403 Forbidden quand un utilisateur accède à un objet d’un autre évite de confirmer l’existence de la ressource. Un attaquant qui obtient systématiquement des 404 ne peut pas distinguer « cet objet n’existe pas » de « vous n’y avez pas accès ».

Requêtes avec filtre utilisateur — la défense la plus robuste est d’inclure l’utilisateur dans la requête de données :

# En SQLAlchemy — la condition user_id est dans la requête elle-même
order = db.query(Order).filter(
    Order.id == order_id,
    Order.user_id == current_user.id  # Impossible de contourner
).first()

Injection#

Les APIs sont exposées aux mêmes risques d’injection que les applications web, souvent amplifiés par la surface d’attaque plus large des formats acceptés (JSON, XML, multipart).

SQL Injection via API#

# VULNÉRABLE
@app.get("/api/v1/users")
async def search_users(name: str):
    query = f"SELECT * FROM users WHERE name LIKE '%{name}%'"  # Injection directe
    return db.execute(query)

# CORRECT — paramètres liés (parameterized queries)
@app.get("/api/v1/users")
async def search_users(name: str):
    return db.execute(
        "SELECT * FROM users WHERE name LIKE :pattern",
        {"pattern": f"%{name}%"}
    )

NoSQL Injection#

Les bases NoSQL (MongoDB) sont vulnérables à des injections via les opérateurs de requête. Un payload JSON {"username": {"$gt": ""}} peut contourner une authentification naïve.

Mitigation : valider le type des champs (Pydantic interdit les objets là où une string est attendue).

Command Injection#

import subprocess

# VULNÉRABLE
@app.post("/api/v1/convert")
async def convert_image(filename: str):
    result = subprocess.run(f"convert {filename} output.png", shell=True)

# CORRECT — liste de paramètres, pas de shell=True
@app.post("/api/v1/convert")
async def convert_image(filename: str):
    if not re.match(r'^[\w\-]+\.(jpg|png|gif)$', filename):
        raise HTTPException(status_code=400, detail="Nom de fichier invalide")
    result = subprocess.run(["convert", filename, "output.png"], check=True)

Validation de schéma comme première ligne de défense#

from pydantic import BaseModel, Field, field_validator
import re

class CreateUserRequest(BaseModel):
    username: str = Field(min_length=3, max_length=50, pattern=r'^[a-zA-Z0-9_\-]+$')
    email: str = Field(max_length=254)
    age: int = Field(ge=0, le=150)

    @field_validator("email")
    @classmethod
    def validate_email(cls, v: str) -> str:
        if not re.match(r'^[^@]+@[^@]+\.[^@]+$', v):
            raise ValueError("Format email invalide")
        return v.lower()

Pydantic rejettera automatiquement les champs supplémentaires si model_config = ConfigDict(extra="forbid") est défini.

Rate Limiting et throttling#

Le rate limiting protège l’API contre les abus, les attaques par force brute, le scraping massif, et les surcharges involontaires.

Stratégies de limitation#

Par IP — protection basique contre les sources malveillantes. Contournable via des proxies.

Par utilisateur authentifié — plus précis que l’IP, adapté aux APIs avec authentification.

Par API key — granularité par client ; permet des quotas différenciés par tier (freemium vs payant).

Par endpoint — les endpoints coûteux (recherche full-text, génération de rapport) méritent des limites plus strictes.

Par tenant — dans les architectures SaaS multi-tenant, chaque tenant a son propre quota.

Algorithme Token Bucket#

Le token bucket est l’algorithme le plus répandu pour le rate limiting. Chaque client possède un seau de jetons qui se remplit à un taux constant. Chaque requête consomme un jeton. Si le seau est vide, la requête est rejetée.

from fastapi import FastAPI, Request, HTTPException
import time

app = FastAPI()

class TokenBucket:
    def __init__(self, capacity: int, refill_rate: float):
        self.capacity = capacity        # Nombre max de jetons
        self.refill_rate = refill_rate  # Jetons ajoutés par seconde
        self.tokens = capacity
        self.last_refill = time.monotonic()

    def consume(self, tokens: int = 1) -> tuple[bool, int]:
        """Retourne (autorisé, jetons_restants)."""
        now = time.monotonic()
        elapsed = now - self.last_refill
        # Remplissage proportionnel au temps écoulé
        self.tokens = min(self.capacity, self.tokens + elapsed * self.refill_rate)
        self.last_refill = now

        if self.tokens >= tokens:
            self.tokens -= tokens
            return True, int(self.tokens)
        return False, 0

Algorithme Sliding Window#

Le sliding window compte les requêtes dans une fenêtre temporelle glissante, évitant l’effet « burst en début de fenêtre fixe » du fixed window counter.

Réponse 429 Too Many Requests#

from fastapi.responses import JSONResponse

@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
    client_id = request.headers.get("X-API-Key") or request.client.host
    bucket = get_bucket(client_id)  # récupère ou crée un bucket par client
    allowed, remaining = bucket.consume()

    if not allowed:
        reset_time = int(time.time()) + 60
        return JSONResponse(
            status_code=429,
            content={"detail": "Trop de requêtes"},
            headers={
                "X-RateLimit-Limit": str(bucket.capacity),
                "X-RateLimit-Remaining": "0",
                "X-RateLimit-Reset": str(reset_time),
                "Retry-After": "60",
            }
        )

    response = await call_next(request)
    response.headers["X-RateLimit-Remaining"] = str(remaining)
    return response

Broken Authentication — révocation et rotation#

Rotation des secrets#

Les secrets (clés de signature JWT, client secrets OAuth) doivent être rotatifs sans interruption de service :

  1. Générer une nouvelle clé (clé N+1)

  2. Configurer le serveur pour accepter les tokens signés avec N et N+1

  3. Émettre tous les nouveaux tokens avec N+1

  4. Après expiration de tous les tokens signés avec N, retirer la clé N

Révocation de tokens JWT#

JWT étant stateless, la révocation nécessite une liste de refus (blacklist) stockée côté serveur. Le claim jti (JWT ID) permet d’identifier individuellement les tokens.

import time
from collections import defaultdict

class JTIBlacklist:
    def __init__(self):
        self._store: dict[str, float] = {}  # jti -> expiration timestamp

    def revoke(self, jti: str, exp: float) -> None:
        self._store[jti] = exp

    def is_revoked(self, jti: str) -> bool:
        if jti not in self._store:
            return False
        # Token expiré naturellement → peut être retiré de la blacklist
        if self._store[jti] < time.time():
            del self._store[jti]
            return False
        return True

    def cleanup_expired(self) -> int:
        """Nettoie les tokens expirés — à appeler périodiquement."""
        now = time.time()
        expired = [jti for jti, exp in self._store.items() if exp < now]
        for jti in expired:
            del self._store[jti]
        return len(expired)

Tokens de courte durée#

La meilleure défense contre les tokens volés est une durée de vie courte. Un access token qui expire dans 15 minutes limite la fenêtre d’exploitation à 15 minutes même si il est compromis.

Durées recommandées

  • Access token : 15 minutes à 1 heure selon la sensibilité

  • Refresh token : 7 à 30 jours, révocable côté serveur

  • API Key : pas d’expiration intégrée — prévoir une rotation trimestrielle et un mécanisme de révocation immédiate

Mass Assignment#

La mass assignment survient quand une API accepte aveuglément tous les champs d’une requête et les applique à un objet en base, permettant à un utilisateur de modifier des champs qu’il ne devrait pas contrôler (is_admin, balance, role).

Le problème#

# VULNÉRABLE — accepte tous les champs
@app.put("/api/v1/users/{user_id}")
async def update_user(user_id: int, body: dict, user=Depends(get_current_user)):
    db.update("users", user_id, body)  # body peut contenir is_admin=True !

Protection par allowlist avec Pydantic#

from pydantic import BaseModel, ConfigDict
from typing import Optional

# Modèle de mise à jour — uniquement les champs modifiables par l'utilisateur
class UserUpdateRequest(BaseModel):
    model_config = ConfigDict(extra="forbid")  # Rejet des champs inconnus

    display_name: Optional[str] = Field(None, max_length=100)
    bio: Optional[str] = Field(None, max_length=500)
    avatar_url: Optional[str] = None

# Modèle interne complet — jamais exposé directement
class UserInDB(BaseModel):
    id: str
    email: str
    display_name: str
    is_admin: bool = False
    created_at: datetime
    # ...

@app.patch("/api/v1/users/{user_id}")
async def update_user(user_id: str, body: UserUpdateRequest, user=Depends(get_current_user)):
    if user_id != user.sub:
        raise HTTPException(status_code=403)
    # Seuls les champs de UserUpdateRequest sont appliqués
    updates = body.model_dump(exclude_none=True)
    return db.update_user(user_id, updates)

extra= »forbid » vs extra= »ignore »

extra="forbid" retourne une erreur 422 si des champs inconnus sont présents — recommandé pour les APIs strictes. extra="ignore" les silencieusement supprime — plus permissif. Ne jamais utiliser le comportement par défaut extra="allow" sur les endpoints d’écriture.

Exposition de données sensibles#

Champs à masquer systématiquement#

  • Mots de passe (même hashés — ne jamais les retourner)

  • Tokens, secrets, clés API

  • Numéros de carte bancaire complets (PCI DSS : afficher seulement les 4 derniers chiffres)

  • Numéros de sécurité sociale, données de santé

  • Adresses IP internes, noms d’hôtes internes (dans les messages d’erreur)

Modèles de réponse distincts#

from pydantic import BaseModel

class UserInDB(BaseModel):
    id: str
    email: str
    password_hash: str  # NE JAMAIS retourner
    api_key_hash: str   # NE JAMAIS retourner
    role: str

class UserResponse(BaseModel):
    id: str
    email: str
    role: str
    # password_hash et api_key_hash sont absents par construction

@app.get("/api/v1/users/{user_id}", response_model=UserResponse)
async def get_user(user_id: str):
    user_db = db.get_user(user_id)  # UserInDB
    return UserResponse.model_validate(user_db)
    # FastAPI filtre automatiquement via response_model

Logging sécurisé#

import re

SENSITIVE_PATTERNS = [
    (r'("password"\s*:\s*)"[^"]*"', r'\1"[MASQUÉ]"'),
    (r'("token"\s*:\s*)"[^"]*"', r'\1"[MASQUÉ]"'),
    (r'("api_key"\s*:\s*)"[^"]*"', r'\1"[MASQUÉ]"'),
    (r'Authorization:\s*Bearer\s+\S+', 'Authorization: Bearer [MASQUÉ]'),
]

def sanitize_for_log(data: str) -> str:
    for pattern, replacement in SENSITIVE_PATTERNS:
        data = re.sub(pattern, replacement, data, flags=re.IGNORECASE)
    return data

Server-Side Request Forgery (SSRF)#

Le SSRF survient quand une API permet à un attaquant de forcer le serveur à effectuer des requêtes vers des cibles arbitraires — notamment les services internes (metadata AWS, bases de données, services d’administration).

Vecteur d’attaque typique#

POST /api/v1/webhooks
{
  "url": "http://169.254.169.254/latest/meta-data/iam/security-credentials/",
  "event": "order.created"
}

Si le serveur effectue naïvement une requête vers l’URL fournie, l’attaquant récupère les credentials IAM de l’instance EC2.

Mitigation#

import ipaddress, urllib.parse, re
from fastapi import HTTPException

SSRF_BLOCKED_RANGES = [
    ipaddress.ip_network("10.0.0.0/8"),
    ipaddress.ip_network("172.16.0.0/12"),
    ipaddress.ip_network("192.168.0.0/16"),
    ipaddress.ip_network("127.0.0.0/8"),
    ipaddress.ip_network("169.254.0.0/16"),  # AWS metadata, lien-local
    ipaddress.ip_network("::1/128"),
    ipaddress.ip_network("fc00::/7"),
]

WEBHOOK_ALLOWED_DOMAINS = re.compile(
    r'^([\w\-]+\.)?example\.com$|^([\w\-]+\.)?trusted-partner\.com$'
)

def validate_webhook_url(url: str) -> str:
    parsed = urllib.parse.urlparse(url)

    if parsed.scheme not in ("https",):  # HTTPS uniquement
        raise HTTPException(status_code=400, detail="Seul HTTPS est accepté")

    host = parsed.hostname
    if not host:
        raise HTTPException(status_code=400, detail="URL invalide")

    # Allowlist de domaines (le plus strict)
    if not WEBHOOK_ALLOWED_DOMAINS.match(host):
        raise HTTPException(status_code=400, detail="Domaine non autorisé")

    # Résolution DNS → vérification de l'IP réelle
    try:
        import socket
        ip = socket.gethostbyname(host)
        ip_obj = ipaddress.ip_address(ip)
        for blocked in SSRF_BLOCKED_RANGES:
            if ip_obj in blocked:
                raise HTTPException(status_code=400, detail="IP interne non autorisée")
    except socket.gaierror:
        raise HTTPException(status_code=400, detail="Hôte non résolvable")

    return url

DNS rebinding

La vérification DNS doit être effectuée au moment de la résolution et au moment de la connexion effective. Un attaquant peut faire pointer un DNS vers une IP externe au moment de la vérification, puis le rebinder vers une IP interne au moment de la requête. Des bibliothèques comme ssrf-filter (Node.js) ou une résolution DNS custom en Python gèrent ce cas.

Checklist de sécurité API#

Une checklist opérationnelle pour les revues de code et les audits :

Authentification et tokens

  • Tous les endpoints sensibles nécessitent une authentification

  • Tokens avec expiration courte (access ≤ 1h)

  • Refresh tokens révocables côté serveur

  • Rotation des secrets de signature prévue

  • jti blacklist pour la révocation de tokens individuels

Autorisation

  • Vérification d’appartenance à chaque accès à un objet (anti-BOLA)

  • Vérification des permissions fonctionnelles (anti-BFLA)

  • Principe de moindre privilège sur les scopes OAuth

  • Séparation des modèles de lecture et d’écriture (anti-mass assignment)

Validation des entrées

  • Schémas Pydantic stricts sur tous les endpoints d’écriture (extra="forbid")

  • Requêtes paramétrées (pas de concaténation de chaînes SQL)

  • Validation et assainissement des URLs (anti-SSRF)

  • Taille maximale des corps de requête limitée

Rate limiting

  • Rate limiting activé sur tous les endpoints publics

  • Limites renforcées sur les endpoints d’authentification (anti-brute force)

  • Headers X-RateLimit-* et Retry-After dans les réponses 429

En-têtes HTTP

  • Strict-Transport-Security avec durée ≥ 1 an

  • X-Content-Type-Options: nosniff

  • CORS configuré avec liste blanche d’origines (pas de * avec credentials)

  • Headers de sécurité ajoutés via middleware centralisé

Données sensibles

  • Mots de passe et tokens absents des réponses API

  • Logging sans données sensibles

  • Secrets en variables d’environnement (jamais dans le code)

  • Données PII minimales dans les tokens JWT

Inventaire

  • Toutes les versions d’API documentées

  • Versions dépréciées avec date de fin de vie annoncée

  • Pas d’endpoints de debug ou d’administration exposés publiquement


Cellules d’analyse et de visualisation#

Token Bucket — simulation et visualisation#

import time
import random
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
import seaborn as sns

sns.set_theme(style="whitegrid", palette="muted", font_scale=1.1)

class TokenBucket:
    def __init__(self, capacity: int, refill_rate: float):
        self.capacity = capacity
        self.refill_rate = refill_rate
        self.tokens = float(capacity)
        self.last_refill = 0.0

    def consume(self, now: float, tokens: int = 1) -> tuple[bool, float]:
        elapsed = now - self.last_refill
        self.tokens = min(self.capacity, self.tokens + elapsed * self.refill_rate)
        self.last_refill = now
        if self.tokens >= tokens:
            self.tokens -= tokens
            return True, self.tokens
        return False, self.tokens

# Simulation : 100 requêtes sur 30 secondes
capacity = 20
refill_rate = 1.0  # 1 token/seconde
bucket = TokenBucket(capacity, refill_rate)

times = []
token_levels = []
statuses = []

random.seed(42)
t = 0.0
for _ in range(80):
    t += random.uniform(0.1, 0.8)
    allowed, remaining = bucket.consume(t)
    times.append(t)
    token_levels.append(remaining)
    statuses.append(allowed)

fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(11, 7), sharex=True,
                                gridspec_kw={"height_ratios": [3, 1]})

# Niveau de tokens
ax1.plot(times, token_levels, color="#4C72B0", lw=2, label="Tokens restants")
ax1.axhline(y=capacity, color="#aaaaaa", linestyle="--", linewidth=1,
            label=f"Capacité max ({capacity})")
ax1.fill_between(times, token_levels, alpha=0.15, color="#4C72B0")
ax1.set_ylabel("Tokens disponibles", fontsize=10)
ax1.set_ylim(-1, capacity + 3)
ax1.legend(fontsize=9)
ax1.set_title(f"Simulation Token Bucket — capacité={capacity}, refill={refill_rate} token/s",
              fontsize=12, fontweight="bold")

# Statut des requêtes
for i, (t_val, ok) in enumerate(zip(times, statuses)):
    color = "#55A868" if ok else "#C44E52"
    ax2.axvline(x=t_val, color=color, lw=1.5, alpha=0.7)

ax2.set_yticks([])
ax2.set_xlabel("Temps (secondes)", fontsize=10)
ax2.set_ylabel("Requêtes", fontsize=9)

accepted = sum(statuses)
rejected = len(statuses) - accepted
ax2.set_title(
    f"Statut requêtes : {accepted} acceptées (vert) / {rejected} rejetées (rouge)",
    fontsize=9.5
)

plt.savefig("token_bucket_simulation.png", dpi=120, bbox_inches="tight")
plt.show()

print(f"Résultats de la simulation :")
print(f"  Total requêtes : {len(statuses)}")
print(f"  Acceptées      : {accepted} ({accepted/len(statuses)*100:.1f}%)")
print(f"  Rejetées       : {rejected} ({rejected/len(statuses)*100:.1f}%)")
print(f"  Taux de refill : {refill_rate} token/s → {refill_rate*60:.0f} requêtes/min en régime permanent")
_images/8b383bac20e8f0b56367a63c5dbcc73a5ab86efbe5b5779b6fa5acf17f1d09bf.png
Résultats de la simulation :
  Total requêtes : 80
  Acceptées      : 53 (66.2%)
  Rejetées       : 27 (33.8%)
  Taux de refill : 1.0 token/s → 60 requêtes/min en régime permanent

Sliding Window Rate Limiter#

import collections
import matplotlib.pyplot as plt
import seaborn as sns

sns.set_theme(style="whitegrid", palette="muted", font_scale=1.1)

class SlidingWindowRateLimiter:
    """
    Sliding window counter — fenêtre glissante basée sur les timestamps des requêtes.
    """
    def __init__(self, limit: int, window_seconds: float):
        self.limit = limit
        self.window = window_seconds
        self._requests: collections.deque = collections.deque()

    def is_allowed(self, now: float) -> tuple[bool, int]:
        # Supprimer les requêtes hors de la fenêtre
        cutoff = now - self.window
        while self._requests and self._requests[0] <= cutoff:
            self._requests.popleft()

        count = len(self._requests)
        if count < self.limit:
            self._requests.append(now)
            return True, self.limit - count - 1
        return False, 0

    @property
    def current_count(self) -> int:
        return len(self._requests)

# Comparaison Fixed Window vs Sliding Window
import random
random.seed(7)

limit = 10
window = 10.0

fixed_window = {"counts": collections.defaultdict(int)}
sliding = SlidingWindowRateLimiter(limit, window)

times_sim = []
fixed_allowed = []
sliding_allowed = []

t = 0.0
for _ in range(120):
    t += random.uniform(0.05, 0.4)
    times_sim.append(t)

    # Fixed window
    bucket_key = int(t // window)
    can_fixed = fixed_window["counts"][bucket_key] < limit
    if can_fixed:
        fixed_window["counts"][bucket_key] += 1
    fixed_allowed.append(can_fixed)

    # Sliding window
    can_sliding, _ = sliding.is_allowed(t)
    sliding_allowed.append(can_sliding)

fig, axes = plt.subplots(2, 1, figsize=(12, 6), sharex=True)

for ax, allowed, title, color in zip(
    axes,
    [fixed_allowed, sliding_allowed],
    ["Fixed Window Counter", "Sliding Window Counter"],
    ["#4C72B0", "#DD8452"]
):
    for t_val, ok in zip(times_sim, allowed):
        ax.axvline(x=t_val, color=("#55A868" if ok else "#C44E52"), lw=1.2, alpha=0.6)

    # Frontières des fenêtres fixes
    for boundary in range(0, int(max(times_sim)) + 1, int(window)):
        ax.axvline(x=boundary, color="#888888", lw=1.5, linestyle="--", alpha=0.5)

    accepted = sum(allowed)
    ax.set_title(f"{title}{accepted}/{len(allowed)} requêtes acceptées", fontsize=10.5)
    ax.set_yticks([])

axes[-1].set_xlabel("Temps (secondes)", fontsize=10)
fig.suptitle(
    f"Comparaison Fixed Window vs Sliding Window (limite={limit} req / {window:.0f}s)",
    fontsize=12, fontweight="bold"
)

plt.savefig("sliding_window_comparison.png", dpi=120, bbox_inches="tight")
plt.show()

# Analyser l'effet burst en début de fenêtre
print("Analyse : burst en fin/début de fenêtre fixe")
print("  Avec Fixed Window : un attaquant peut envoyer 10 req à t=9.9s")
print("  puis 10 req à t=10.1s → 20 req effectives en 0.2s")
print("  Sliding Window empêche ce pattern.")
_images/6a47efc888cdd2d6eadef6e9d2921332418f48eee5ba0a7aade219a7e1e1dfd6.png
Analyse : burst en fin/début de fenêtre fixe
  Avec Fixed Window : un attaquant peut envoyer 10 req à t=9.9s
  puis 10 req à t=10.1s → 20 req effectives en 0.2s
  Sliding Window empêche ce pattern.

OWASP API Top 10 — Bubble Chart#

import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
import numpy as np
import seaborn as sns

sns.set_theme(style="whitegrid", palette="muted", font_scale=1.0)

categories = [
    "API1\nBOLA/IDOR",
    "API2\nBroken Auth",
    "API3\nProp. Auth",
    "API4\nRes. Consumption",
    "API5\nFunc. Auth",
    "API6\nBiz. Flow",
    "API7\nSSRF",
    "API8\nMisconfiguration",
    "API9\nInventaire",
    "API10\nAPIs tierces",
]

# Impact (1-10), Fréquence (1-10), Facilité d'exploitation (taille bulle)
impact    = [9, 8, 7, 6, 8, 7, 8, 6, 5, 6]
frequence = [9, 7, 7, 8, 6, 6, 5, 8, 7, 5]
exploitabilite = [9, 7, 6, 8, 7, 7, 6, 7, 7, 5]

palette = sns.color_palette("muted", 10)

fig, ax = plt.subplots(figsize=(11, 8))

for i, (cat, imp, freq, expl) in enumerate(
        zip(categories, impact, frequence, exploitabilite)):
    size = expl * 80
    scatter = ax.scatter(freq, imp, s=size * 12, color=palette[i],
                         alpha=0.72, edgecolors="white", linewidth=1.5)
    # Numéro OWASP
    num = cat.split("\n")[0]
    ax.text(freq, imp + 0.35, num, ha="center", va="bottom",
            fontsize=8, fontweight="bold", color=palette[i])
    # Nom court
    short = cat.split("\n")[1]
    ax.text(freq, imp - 0.35, short, ha="center", va="top",
            fontsize=7.5, color="#555555")

# Quadrants
ax.axhline(y=7, color="#cccccc", linestyle="--", linewidth=1)
ax.axvline(x=7, color="#cccccc", linestyle="--", linewidth=1)
ax.text(3.0, 9.5, "Fréquent faible impact", fontsize=8, color="#aaaaaa", ha="center")
ax.text(8.5, 9.5, "ZONE CRITIQUE", fontsize=9, color="#C44E52",
        ha="center", fontweight="bold",
        bbox=dict(boxstyle="round,pad=0.2", facecolor="#FFEBEE", edgecolor="#C44E52", alpha=0.7))
ax.text(3.0, 5.5, "Rare faible impact", fontsize=8, color="#aaaaaa", ha="center")
ax.text(8.5, 5.5, "Impact élevé rare", fontsize=8, color="#888888", ha="center")

ax.set_xlim(3, 11)
ax.set_ylim(4, 10.5)
ax.set_xlabel("Fréquence (1=rare → 10=systématique)", fontsize=10)
ax.set_ylabel("Impact (1=faible → 10=critique)", fontsize=10)
ax.set_title("OWASP API Security Top 10 — 2023\nImpact × Fréquence (taille = facilité d'exploitation)",
             fontsize=12, fontweight="bold", pad=10)

# Légende taille
for expl_val, label in [(5, "Faible"), (7, "Moyen"), (9, "Élevé")]:
    ax.scatter([], [], s=expl_val * 120, color="#aaaaaa", alpha=0.6,
               label=f"Exploitabilité : {label}")
ax.legend(title="Facilité d'exploitation", loc="lower right", fontsize=8.5)

plt.savefig("owasp_api_top10.png", dpi=120, bbox_inches="tight")
plt.show()
_images/4e30019bbcd728d577ba1a71687366d6496e10ad559dc6b89f4160a30ed7dac1.png

JTI Blacklist — simulation de révocation#

import time
import hashlib
import random
import json
from datetime import datetime, timezone

class JTIBlacklist:
    """
    Blacklist de JWT IDs avec TTL automatique.
    En production : Redis avec EXPIRE.
    """
    def __init__(self):
        self._store: dict[str, float] = {}
        self._revoke_log: list[dict] = []

    def revoke(self, jti: str, exp: float, reason: str = "manuel") -> None:
        self._store[jti] = exp
        self._revoke_log.append({
            "jti": jti[:12] + "...",
            "exp": datetime.fromtimestamp(exp).isoformat(),
            "reason": reason,
            "revoked_at": datetime.now().isoformat(),
        })

    def is_revoked(self, jti: str) -> bool:
        if jti not in self._store:
            return False
        if self._store[jti] < time.time():
            del self._store[jti]  # Auto-nettoyage
            return False  # Expiré naturellement — plus dans la blacklist utile
        return True

    def cleanup(self) -> int:
        now = time.time()
        expired = [j for j, exp in self._store.items() if exp < now]
        for j in expired:
            del self._store[j]
        return len(expired)

    @property
    def size(self) -> int:
        return len(self._store)

def generate_jti() -> str:
    return hashlib.sha256(str(random.random()).encode()).hexdigest()[:24]

# Simulation
bl = JTIBlacklist()
now = time.time()

# Génération de tokens simulés
tokens = []
for i in range(20):
    jti = generate_jti()
    exp = now + random.choice([300, 900, 3600, -100])  # certains déjà expirés
    tokens.append({"jti": jti, "exp": exp, "user": f"user_{i:02d}"})

print("=== Simulation de révocation JTI ===\n")

# Révoquer quelques tokens
revoke_indices = [2, 5, 8, 12, 15]
for idx in revoke_indices:
    t = tokens[idx]
    bl.revoke(t["jti"], t["exp"], reason="logout_all_devices")
    print(f"  Révoqué : {t['jti'][:16]}... (user={t['user']})")

print(f"\n  Blacklist size : {bl.size} tokens\n")

# Vérification de tous les tokens
print("=== Vérification des tokens ===")
for i, t in enumerate(tokens[:10]):
    revoked = bl.is_revoked(t["jti"])
    expired = t["exp"] < now
    status = "RÉVOQUÉ" if revoked else ("EXPIRÉ" if expired else "VALIDE")
    print(f"  Token {i:02d} ({t['user']}) : {status}")

# Nettoyage
nettoyés = bl.cleanup()
print(f"\n  Nettoyage : {nettoyés} tokens expirés retirés de la blacklist")
print(f"  Blacklist size après nettoyage : {bl.size}")

print("\n=== Journal de révocations ===")
for entry in bl._revoke_log:
    print(f"  {entry['jti']} | {entry['reason']} | {entry['revoked_at']}")
=== Simulation de révocation JTI ===

  Révoqué : 96cb0010ddcfc8d1... (user=user_02)
  Révoqué : f99c4be1555fb3ae... (user=user_05)
  Révoqué : 249db6c299e3020c... (user=user_08)
  Révoqué : 1e19f9d7f5f9f51c... (user=user_12)
  Révoqué : b0bf7847168a998c... (user=user_15)

  Blacklist size : 5 tokens

=== Vérification des tokens ===
  Token 00 (user_00) : EXPIRÉ
  Token 01 (user_01) : EXPIRÉ
  Token 02 (user_02) : RÉVOQUÉ
  Token 03 (user_03) : VALIDE
  Token 04 (user_04) : VALIDE
  Token 05 (user_05) : RÉVOQUÉ
  Token 06 (user_06) : VALIDE
  Token 07 (user_07) : VALIDE
  Token 08 (user_08) : RÉVOQUÉ
  Token 09 (user_09) : EXPIRÉ

  Nettoyage : 0 tokens expirés retirés de la blacklist
  Blacklist size après nettoyage : 5

=== Journal de révocations ===
  96cb0010ddcf... | logout_all_devices | 2026-03-26T10:46:33.296383
  f99c4be1555f... | logout_all_devices | 2026-03-26T10:46:33.296410
  249db6c299e3... | logout_all_devices | 2026-03-26T10:46:33.296428
  1e19f9d7f5f9... | logout_all_devices | 2026-03-26T10:46:33.296442
  b0bf7847168a... | logout_all_devices | 2026-03-26T10:46:33.296457

Résumé#

Ce chapitre a couvert les principaux vecteurs de vulnérabilité des APIs modernes et leurs mitigations :

BOLA/IDOR — la vulnérabilité la plus fréquente. La seule protection efficace est la vérification systématique d’appartenance en base de données, pas au niveau de l’application cliente. UUIDs pour réduire l’énumération, 404 plutôt que 403 pour ne pas confirmer l’existence.

Injection — parameterized queries sans exception, validation Pydantic stricte en entrée, jamais shell=True dans les appels système.

Rate limiting — token bucket pour les limites souples avec tolérance aux bursts, sliding window pour les limites strictes. Les headers X-RateLimit-* et Retry-After sont un contrat avec les clients.

Révocation de tokens — la blacklist jti avec TTL automatique est la solution minimale. En production : Redis avec EXPIRE pour la TTL native, et tâche planifiée de nettoyage.

Mass assignmentextra="forbid" dans Pydantic et des modèles de requête dédiés (jamais le modèle de base de données directement) éliminent le problème par construction.

SSRF — la liste blanche de domaines autorisés est plus sûre que la liste noire. La vérification DNS doit être faite au moment de la connexion pour résister au DNS rebinding.

Données sensibles — des modèles de réponse distincts du modèle de stockage et un middleware de sanitisation des logs sont les deux mécanismes complémentaires.