12 — Patterns architecturaux#
Les patterns vus jusqu’ici opèrent à l’échelle d’une classe ou d’un module. Les patterns architecturaux opèrent à une échelle supérieure : ils organisent des couches entières, garantissent la cohérence des données, ou protègent le système contre les défaillances externes. Ils répondent à des problèmes que les patterns de conception ne peuvent pas adresser seuls.
Repository — abstraction de la couche de données#
Problème résolu#
Le code métier ne devrait pas connaître le mécanisme de persistance : SQL, NoSQL, API REST, fichier. Le Repository fournit une interface de collection pour accéder aux entités du domaine, indépendante du stockage sous-jacent.
Interface générique#
from abc import ABC, abstractmethod
from typing import Generic, TypeVar, Optional, List
T = TypeVar("T")
ID = TypeVar("ID")
class Repository(ABC, Generic[T, ID]):
@abstractmethod
def get(self, entity_id: ID) -> Optional[T]:
...
@abstractmethod
def list_all(self) -> List[T]:
...
@abstractmethod
def save(self, entity: T) -> T:
...
@abstractmethod
def delete(self, entity_id: ID) -> bool:
...
Quand l’utiliser#
Quand on veut tester le code métier sans base de données réelle (injection de l’implémentation in-memory)
Quand on anticipe un changement de base de données
Quand le code métier accumule des
SELECT,INSERT,UPDATEinline
Unit of Work — transaction logique#
Problème résolu#
Dans une opération métier, plusieurs repositories peuvent être modifiés. La cohérence exige que toutes les modifications soient commitées ensemble ou annulées ensemble. Le Unit of Work coordonne cette transaction logique.
from abc import ABC, abstractmethod
class UnitOfWork(ABC):
@abstractmethod
def __enter__(self) -> "UnitOfWork":
...
@abstractmethod
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
...
@abstractmethod
def commit(self) -> None:
...
@abstractmethod
def rollback(self) -> None:
...
La règle est simple : si une exception survient dans le bloc with, le __exit__ appelle rollback(). Si tout se passe bien, le code appelle explicitement commit() avant de sortir du bloc.
CQRS implémenté — séparation Command/Query#
Problème résolu#
Dans une même application, les opérations de lecture et d’écriture ont des exigences radicalement différentes. Les lectures veulent des données dénormalisées, rapides à charger. Les écritures veulent des invariants métier stricts. CQRS (Command Query Responsibility Segregation) sépare ces deux modèles.
Architecture#
Client
├── Commande (écriture) → CommandBus → CommandHandler → WriteModel → Store
└── Requête (lecture) → QueryBus → QueryHandler → ReadModel → Store
from abc import ABC, abstractmethod
from typing import Any, Dict, Type
class Command(ABC):
pass
class Query(ABC):
pass
class CommandHandler(ABC):
@abstractmethod
def handle(self, command: Command) -> None:
...
class QueryHandler(ABC):
@abstractmethod
def handle(self, query: Query) -> Any:
...
class CommandBus:
def __init__(self):
self._handlers: Dict[Type[Command], CommandHandler] = {}
def register(self, command_type: Type[Command], handler: CommandHandler) -> None:
self._handlers[command_type] = handler
def dispatch(self, command: Command) -> None:
handler = self._handlers.get(type(command))
if not handler:
raise ValueError(f"Aucun handler pour {type(command).__name__}")
handler.handle(command)
class QueryBus:
def __init__(self):
self._handlers: Dict[Type[Query], QueryHandler] = {}
def register(self, query_type: Type[Query], handler: QueryHandler) -> None:
self._handlers[query_type] = handler
def ask(self, query: Query) -> Any:
handler = self._handlers.get(type(query))
if not handler:
raise ValueError(f"Aucun handler pour {type(query).__name__}")
return handler.handle(query)
Outbox Pattern — cohérence transactionnelle des événements#
Problème résolu#
Publier un événement sur un broker (Kafka, RabbitMQ) et persister des données en base sont deux opérations distinctes. Sans précaution, elles peuvent diverger : la base est mise à jour mais l’événement n’est pas publié (crash réseau), ou l’inverse.
L’Outbox Pattern résout ce problème en stockant les événements dans la même transaction que les données, puis en les publiant de manière asynchrone.
Flux#
1. Transaction DB :
├── INSERT INTO orders (...) ← données métier
└── INSERT INTO outbox (event) ← événement à publier
2. Processus asynchrone (Outbox Relay) :
├── SELECT * FROM outbox WHERE published = false
├── Publier sur le broker
└── UPDATE outbox SET published = true
Ce pattern garantit at-least-once delivery : si le relay échoue après publication mais avant le UPDATE, il republiera au redémarrage. Les consommateurs doivent donc être idempotents.
from dataclasses import dataclass, field
from typing import List, Optional
import json
import time
import uuid
@dataclass
class OutboxEntry:
id: str = field(default_factory=lambda: str(uuid.uuid4()))
event_type: str = ""
payload: str = "" # JSON sérialisé
created_at: float = field(default_factory=time.time)
published: bool = False
published_at: Optional[float] = None
class OutboxStore:
"""Simulation d'une table outbox en mémoire."""
def __init__(self):
self._entries: List[OutboxEntry] = []
def append(self, event_type: str, payload: dict) -> OutboxEntry:
entry = OutboxEntry(event_type=event_type, payload=json.dumps(payload))
self._entries.append(entry)
return entry
def pending(self) -> List[OutboxEntry]:
return [e for e in self._entries if not e.published]
def mark_published(self, entry_id: str) -> None:
for e in self._entries:
if e.id == entry_id:
e.published = True
e.published_at = time.time()
def stats(self) -> dict:
total = len(self._entries)
published = sum(1 for e in self._entries if e.published)
return {"total": total, "published": published, "pending": total - published}
Circuit Breaker — protection contre les défaillances externes#
Problème résolu#
Appeler un service externe (API tierce, microservice) qui est en panne peut provoquer une cascade de défaillances : les requêtes s’accumulent, les threads se bloquent, la latence explose. Le Circuit Breaker interrompt les appels vers un service défaillant et laisse le temps de récupération.
États#
FERMÉ (CLOSED) : appels passent normalement
│ trop d'erreurs
▼
OUVERT (OPEN) : appels rejetés immédiatement (fail fast)
│ après timeout de récupération
▼
DEMI-OUVERT (HALF-OPEN) : quelques appels tests
│ si OK → FERMÉ │ si ERREUR → OUVERT
Strangler Fig — migration progressive#
Problème résolu#
Réécrire un système legacy en une seule fois est risqué. Le Strangler Fig permet une migration incrémentale : un nouveau système prend en charge les fonctionnalités une par une, pendant que l’ancien continue de tourner.
Principe#
Client
└── Façade (router)
├── Si nouvelle fonctionnalité → Nouveau système
└── Si ancienne fonctionnalité → Système legacy
À mesure que de nouvelles fonctionnalités sont portées, la façade redirige de plus en plus vers le nouveau système. Quand tout est migré, le legacy est éteint et la façade supprimée.
from enum import Enum, auto
from typing import Dict, Callable, Any
class SystemVersion(Enum):
LEGACY = auto()
NEW = auto()
class StranglerFig:
def __init__(self):
self._routes: Dict[str, SystemVersion] = {}
self._legacy_handler: Callable = None
self._new_handler: Callable = None
def set_legacy(self, handler: Callable) -> None:
self._legacy_handler = handler
def set_new(self, handler: Callable) -> None:
self._new_handler = handler
def migrate(self, feature: str) -> None:
"""Migre une fonctionnalité vers le nouveau système."""
self._routes[feature] = SystemVersion.NEW
print(f"[STRANGLER] Fonctionnalité '{feature}' migrée vers le nouveau système")
def handle(self, feature: str, *args, **kwargs) -> Any:
version = self._routes.get(feature, SystemVersion.LEGACY)
if version == SystemVersion.NEW:
return self._new_handler(feature, *args, **kwargs)
return self._legacy_handler(feature, *args, **kwargs)
def migration_progress(self) -> dict:
total = len(self._routes)
migrated = sum(1 for v in self._routes.values() if v == SystemVersion.NEW)
return {
"migrated": migrated,
"total": total,
"percent": (migrated / total * 100) if total > 0 else 0
}
Démonstrations exécutables#
Démo 1 — Circuit Breaker complet avec simulation de pannes#
import time
import random
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
import seaborn as sns
from enum import Enum, auto
from typing import Callable, Optional, Any, List, Tuple
from dataclasses import dataclass, field
class CircuitState(Enum):
CLOSED = "FERMÉ"
OPEN = "OUVERT"
HALF_OPEN = "DEMI-OUVERT"
@dataclass
class CircuitBreakerConfig:
failure_threshold: int = 5 # Nb d'erreurs avant ouverture
success_threshold: int = 2 # Nb de succès pour refermer
timeout_seconds: float = 3.0 # Durée d'état OPEN
half_open_max_calls: int = 3 # Appels max en HALF_OPEN
class CircuitBreaker:
def __init__(self, name: str, config: CircuitBreakerConfig = None):
self.name = name
self._config = config or CircuitBreakerConfig()
self._state = CircuitState.CLOSED
self._failure_count = 0
self._success_count = 0
self._last_failure_time: Optional[float] = None
self._half_open_calls = 0
self._history: List[Tuple[float, str, str]] = [] # (time, state, result)
@property
def state(self) -> CircuitState:
if self._state == CircuitState.OPEN:
elapsed = time.time() - self._last_failure_time
if elapsed >= self._config.timeout_seconds:
self._transition_to(CircuitState.HALF_OPEN)
return self._state
def _transition_to(self, new_state: CircuitState) -> None:
print(f" [CB:{self.name}] {self._state.value} → {new_state.value}")
self._state = new_state
if new_state == CircuitState.HALF_OPEN:
self._half_open_calls = 0
self._success_count = 0
def call(self, fn: Callable, *args, **kwargs) -> Any:
current_state = self.state
if current_state == CircuitState.OPEN:
self._history.append((time.time(), "OPEN", "REJECTED"))
raise RuntimeError(f"Circuit {self.name} OUVERT — appel rejeté")
if current_state == CircuitState.HALF_OPEN:
if self._half_open_calls >= self._config.half_open_max_calls:
self._history.append((time.time(), "HALF_OPEN", "REJECTED"))
raise RuntimeError(f"Circuit {self.name} DEMI-OUVERT — quota atteint")
self._half_open_calls += 1
try:
result = fn(*args, **kwargs)
self._on_success()
self._history.append((time.time(), current_state.name, "SUCCESS"))
return result
except Exception as e:
self._on_failure()
self._history.append((time.time(), current_state.name, "FAILURE"))
raise
def _on_success(self) -> None:
self._failure_count = 0
if self._state == CircuitState.HALF_OPEN:
self._success_count += 1
if self._success_count >= self._config.success_threshold:
self._transition_to(CircuitState.CLOSED)
def _on_failure(self) -> None:
self._failure_count += 1
self._last_failure_time = time.time()
if self._state == CircuitState.HALF_OPEN:
self._transition_to(CircuitState.OPEN)
elif self._failure_count >= self._config.failure_threshold:
self._transition_to(CircuitState.OPEN)
# Simulation d'un service externe instable
def unstable_service(fail_rate: float = 0.0) -> str:
if random.random() < fail_rate:
raise ConnectionError("Service externe indisponible")
return "OK"
config = CircuitBreakerConfig(
failure_threshold=3,
success_threshold=2,
timeout_seconds=0.5,
)
cb = CircuitBreaker("payment-api", config)
results = []
states = []
random.seed(42)
# Phase 1 : service stable (10 appels)
print("=== Phase 1 : service stable ===")
for i in range(10):
try:
cb.call(unstable_service, fail_rate=0.0)
results.append(1)
except:
results.append(0)
states.append(cb._state.name)
# Phase 2 : service dégradé (10 appels, 70% d'erreurs)
print("\n=== Phase 2 : service dégradé ===")
for i in range(10):
try:
cb.call(unstable_service, fail_rate=0.7)
results.append(1)
except:
results.append(0)
states.append(cb._state.name)
# Attente que le timeout expire
time.sleep(0.6)
# Phase 3 : service rétabli — circuit doit tester puis fermer
print("\n=== Phase 3 : service rétabli ===")
for i in range(10):
try:
cb.call(unstable_service, fail_rate=0.0)
results.append(1)
except:
results.append(0)
states.append(cb._state.name)
# Visualisation
sns.set_theme(style="whitegrid", palette="muted", font_scale=1.1)
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 7), sharex=True)
x = range(len(results))
colors_r = ["#55A868" if r == 1 else "#C44E52" for r in results]
ax1.bar(x, results, color=colors_r, width=0.7)
ax1.set_ylabel("Résultat")
ax1.set_yticks([0, 1])
ax1.set_yticklabels(["Échec / Rejeté", "Succès"])
ax1.set_title("Simulation Circuit Breaker — résultats appel par appel", fontsize=12, fontweight="bold")
state_map = {"CLOSED": 0, "OPEN": 2, "HALF_OPEN": 1}
state_colors = {"CLOSED": "#55A868", "OPEN": "#C44E52", "HALF_OPEN": "#DD8452"}
state_vals = [state_map[s] for s in states]
state_cols = [state_colors[s] for s in states]
ax2.bar(x, [1] * len(states), color=state_cols, width=0.7, alpha=0.8)
ax2.set_yticks([])
ax2.set_xlabel("Numéro d'appel")
ax2.set_title("État du circuit par appel", fontsize=11)
legend_elements = [
mpatches.Patch(facecolor="#55A868", label="FERMÉ"),
mpatches.Patch(facecolor="#DD8452", label="DEMI-OUVERT"),
mpatches.Patch(facecolor="#C44E52", label="OUVERT"),
]
ax2.legend(handles=legend_elements, loc="upper right", fontsize=9)
ax1.axvline(9.5, color="#555", linestyle="--", alpha=0.5, label="Dégradation")
ax1.axvline(19.5, color="#555", linestyle=":", alpha=0.5, label="Rétablissement")
ax1.legend(fontsize=9)
plt.savefig("circuit_breaker.png", dpi=120, bbox_inches="tight")
plt.show()
print(f"\nHistorique: {len(cb._history)} appels enregistrés")
=== Phase 1 : service stable ===
=== Phase 2 : service dégradé ===
[CB:payment-api] FERMÉ → OUVERT
=== Phase 3 : service rétabli ===
[CB:payment-api] OUVERT → DEMI-OUVERT
[CB:payment-api] DEMI-OUVERT → FERMÉ
Historique: 30 appels enregistrés
Démo 2 — Repository pattern avec deux adapters#
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Optional, List, Dict
import uuid
@dataclass
class User:
name: str
email: str
role: str = "user"
id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
def __repr__(self):
return f"User(id={self.id}, name={self.name}, role={self.role})"
class UserRepository(ABC):
@abstractmethod
def get(self, user_id: str) -> Optional[User]:
...
@abstractmethod
def list_all(self) -> List[User]:
...
@abstractmethod
def save(self, user: User) -> User:
...
@abstractmethod
def delete(self, user_id: str) -> bool:
...
@abstractmethod
def find_by_email(self, email: str) -> Optional[User]:
...
class InMemoryUserRepository(UserRepository):
"""Implémentation en mémoire — idéale pour les tests."""
def __init__(self):
self._store: Dict[str, User] = {}
def get(self, user_id: str) -> Optional[User]:
return self._store.get(user_id)
def list_all(self) -> List[User]:
return list(self._store.values())
def save(self, user: User) -> User:
self._store[user.id] = user
return user
def delete(self, user_id: str) -> bool:
if user_id in self._store:
del self._store[user_id]
return True
return False
def find_by_email(self, email: str) -> Optional[User]:
for user in self._store.values():
if user.email == email:
return user
return None
class SimulatedSQLiteRepository(UserRepository):
"""Simule une couche SQLite — mêmes opérations, stockage différent."""
def __init__(self):
# En production, ce serait une vraie connexion SQLite
self._rows: Dict[str, dict] = {} # Simule les rows SQL
self._sequence = 0
def _to_user(self, row: dict) -> User:
return User(id=row["id"], name=row["name"], email=row["email"], role=row["role"])
def _to_row(self, user: User) -> dict:
return {"id": user.id, "name": user.name, "email": user.email, "role": user.role}
def get(self, user_id: str) -> Optional[User]:
# Simule: SELECT * FROM users WHERE id = ?
row = self._rows.get(user_id)
return self._to_user(row) if row else None
def list_all(self) -> List[User]:
# Simule: SELECT * FROM users
return [self._to_user(row) for row in self._rows.values()]
def save(self, user: User) -> User:
# Simule: INSERT OR REPLACE INTO users (...)
self._rows[user.id] = self._to_row(user)
return user
def delete(self, user_id: str) -> bool:
# Simule: DELETE FROM users WHERE id = ?
if user_id in self._rows:
del self._rows[user_id]
return True
return False
def find_by_email(self, email: str) -> Optional[User]:
# Simule: SELECT * FROM users WHERE email = ?
for row in self._rows.values():
if row["email"] == email:
return self._to_user(row)
return None
# Code métier — identique quel que soit le repository
class UserService:
def __init__(self, repo: UserRepository):
self._repo = repo
def register(self, name: str, email: str, role: str = "user") -> User:
existing = self._repo.find_by_email(email)
if existing:
raise ValueError(f"Email déjà utilisé: {email}")
user = User(name=name, email=email, role=role)
return self._repo.save(user)
def promote_to_admin(self, user_id: str) -> User:
user = self._repo.get(user_id)
if not user:
raise ValueError(f"Utilisateur {user_id} introuvable")
user.role = "admin"
return self._repo.save(user)
def list_users(self) -> List[User]:
return self._repo.list_all()
# Test avec les deux implémentations
def demo_repository(repo: UserRepository, label: str):
print(f"\n=== {label} ===")
service = UserService(repo)
alice = service.register("Alice Martin", "alice@example.com", "admin")
bob = service.register("Bob Dupont", "bob@example.com")
claire = service.register("Claire Lefèvre", "claire@example.com")
print(f"Utilisateurs créés: {len(service.list_users())}")
claire_promoted = service.promote_to_admin(claire.id)
print(f"Rôle de Claire après promotion: {claire_promoted.role}")
found = repo.find_by_email("bob@example.com")
print(f"Recherche par email: {found}")
repo.delete(bob.id)
print(f"Après suppression de Bob: {len(service.list_users())} utilisateurs")
try:
service.register("Alice bis", "alice@example.com")
except ValueError as e:
print(f"Doublon détecté: {e}")
demo_repository(InMemoryUserRepository(), "Repository In-Memory")
demo_repository(SimulatedSQLiteRepository(), "Repository SQLite simulé")
print("\n✓ Le code métier (UserService) est identique pour les deux implémentations.")
=== Repository In-Memory ===
Utilisateurs créés: 3
Rôle de Claire après promotion: admin
Recherche par email: User(id=c2dcd76b, name=Bob Dupont, role=user)
Après suppression de Bob: 2 utilisateurs
Doublon détecté: Email déjà utilisé: alice@example.com
=== Repository SQLite simulé ===
Utilisateurs créés: 3
Rôle de Claire après promotion: admin
Recherche par email: User(id=985572bb, name=Bob Dupont, role=user)
Après suppression de Bob: 2 utilisateurs
Doublon détecté: Email déjà utilisé: alice@example.com
✓ Le code métier (UserService) est identique pour les deux implémentations.
Démo 3 — Simulation CQRS : blog avec write/read models séparés#
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any, Type
import uuid
import time
# ── Domaine (écriture) ──────────────────────────────────────────
@dataclass
class Article:
title: str
content: str
author_id: str
id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
published: bool = False
created_at: float = field(default_factory=time.time)
tags: List[str] = field(default_factory=list)
# ── Commandes ───────────────────────────────────────────────────
class Command(ABC):
pass
@dataclass
class CreateArticle(Command):
title: str
content: str
author_id: str
tags: List[str] = field(default_factory=list)
@dataclass
class PublishArticle(Command):
article_id: str
@dataclass
class AddTag(Command):
article_id: str
tag: str
# ── Requêtes ────────────────────────────────────────────────────
class Query(ABC):
pass
@dataclass
class GetArticle(Query):
article_id: str
@dataclass
class ListPublished(Query):
author_id: Optional[str] = None
@dataclass
class SearchByTag(Query):
tag: str
# ── Stores séparés ──────────────────────────────────────────────
class WriteStore:
"""Modèle d'écriture normalisé."""
def __init__(self):
self._articles: Dict[str, Article] = {}
self._events: List[dict] = []
def save(self, article: Article) -> None:
self._articles[article.id] = article
self._events.append({"type": "article_saved", "id": article.id, "at": time.time()})
def get(self, article_id: str) -> Optional[Article]:
return self._articles.get(article_id)
class ReadStore:
"""Modèle de lecture dénormalisé pour les requêtes rapides."""
def __init__(self):
self._published: Dict[str, dict] = {} # Vue dénormalisée
def upsert(self, article: Article, author_name: str = "") -> None:
self._published[article.id] = {
"id": article.id,
"title": article.title,
"author": author_name or article.author_id,
"tags": article.tags,
"published": article.published,
"preview": article.content[:100] + "..." if len(article.content) > 100 else article.content,
}
def get(self, article_id: str) -> Optional[dict]:
return self._published.get(article_id)
def list_published(self, author_id: Optional[str] = None) -> List[dict]:
items = [v for v in self._published.values() if v["published"]]
if author_id:
items = [v for v in items if v.get("author_id") == author_id]
return items
def search_by_tag(self, tag: str) -> List[dict]:
return [v for v in self._published.values() if tag in v.get("tags", [])]
# ── Handlers ────────────────────────────────────────────────────
class CreateArticleHandler:
def __init__(self, write_store: WriteStore, read_store: ReadStore):
self._write = write_store
self._read = read_store
def handle(self, cmd: CreateArticle) -> Article:
article = Article(
title=cmd.title, content=cmd.content,
author_id=cmd.author_id, tags=cmd.tags
)
self._write.save(article)
self._read.upsert(article)
print(f" [CMD] CreateArticle → id={article.id}, titre='{article.title}'")
return article
class PublishArticleHandler:
def __init__(self, write_store: WriteStore, read_store: ReadStore):
self._write = write_store
self._read = read_store
def handle(self, cmd: PublishArticle) -> None:
article = self._write.get(cmd.article_id)
if not article:
raise ValueError(f"Article {cmd.article_id} introuvable")
article.published = True
self._write.save(article)
self._read.upsert(article)
print(f" [CMD] PublishArticle → id={article.id} publié")
class GetArticleHandler:
def __init__(self, read_store: ReadStore):
self._read = read_store
def handle(self, query: GetArticle) -> Optional[dict]:
result = self._read.get(query.article_id)
print(f" [QRY] GetArticle({query.article_id}) → {result['title'] if result else 'None'}")
return result
class ListPublishedHandler:
def __init__(self, read_store: ReadStore):
self._read = read_store
def handle(self, query: ListPublished) -> List[dict]:
result = self._read.list_published(query.author_id)
print(f" [QRY] ListPublished → {len(result)} article(s)")
return result
class SearchByTagHandler:
def __init__(self, read_store: ReadStore):
self._read = read_store
def handle(self, query: SearchByTag) -> List[dict]:
result = self._read.search_by_tag(query.tag)
print(f" [QRY] SearchByTag('{query.tag}') → {len(result)} résultat(s)")
return result
# ── Démo ────────────────────────────────────────────────────────
write_store = WriteStore()
read_store = ReadStore()
create_handler = CreateArticleHandler(write_store, read_store)
publish_handler = PublishArticleHandler(write_store, read_store)
get_handler = GetArticleHandler(read_store)
list_handler = ListPublishedHandler(read_store)
search_handler = SearchByTagHandler(read_store)
print("=== Commandes (écriture) ===")
a1 = create_handler.handle(CreateArticle(
"Introduction à CQRS", "CQRS sépare les modèles de lecture et d'écriture...",
"author_1", tags=["architecture", "patterns"]
))
a2 = create_handler.handle(CreateArticle(
"Le pattern Repository", "Le Repository abstrait la couche de persistance...",
"author_1", tags=["patterns", "database"]
))
a3 = create_handler.handle(CreateArticle(
"Circuit Breaker en pratique", "Protéger les appels externes avec un circuit breaker...",
"author_2", tags=["résilience", "patterns"]
))
publish_handler.handle(PublishArticle(a1.id))
publish_handler.handle(PublishArticle(a3.id))
# a2 reste en brouillon
print("\n=== Requêtes (lecture) ===")
get_handler.handle(GetArticle(a1.id))
list_handler.handle(ListPublished())
search_handler.handle(SearchByTag("patterns"))
search_handler.handle(SearchByTag("résilience"))
=== Commandes (écriture) ===
[CMD] CreateArticle → id=49849469, titre='Introduction à CQRS'
[CMD] CreateArticle → id=81dc2576, titre='Le pattern Repository'
[CMD] CreateArticle → id=bd097938, titre='Circuit Breaker en pratique'
[CMD] PublishArticle → id=49849469 publié
[CMD] PublishArticle → id=bd097938 publié
=== Requêtes (lecture) ===
[QRY] GetArticle(49849469) → Introduction à CQRS
[QRY] ListPublished → 2 article(s)
[QRY] SearchByTag('patterns') → 3 résultat(s)
[QRY] SearchByTag('résilience') → 1 résultat(s)
[{'id': 'bd097938',
'title': 'Circuit Breaker en pratique',
'author': 'author_2',
'tags': ['résilience', 'patterns'],
'published': True,
'preview': 'Protéger les appels externes avec un circuit breaker...'}]
Démo 4 — Diagramme Outbox Pattern (flux de messages)#
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
import matplotlib.patheffects as pe
import seaborn as sns
import numpy as np
sns.set_theme(style="whitegrid", palette="muted", font_scale=1.0)
fig, ax = plt.subplots(figsize=(13, 7))
ax.set_xlim(0, 13)
ax.set_ylim(0, 7)
ax.axis("off")
ax.set_facecolor("#F8F9FA")
# Acteurs verticaux
actors = [
(1.5, "Application\n(Service métier)", "#4C72B0"),
(5.0, "Base de données\n(Transaction)", "#55A868"),
(8.5, "Outbox Relay\n(Processus async)", "#DD8452"),
(12.0, "Message Broker\n(Kafka / RabbitMQ)", "#C44E52"),
]
for x, label, color in actors:
box = mpatches.FancyBboxPatch((x - 0.9, 5.8), 1.8, 0.9,
boxstyle="round,pad=0.08", fc=color, ec="none", alpha=0.9)
ax.add_patch(box)
ax.text(x, 6.25, label, ha="center", va="center",
fontsize=8.5, fontweight="bold", color="white")
ax.plot([x, x], [5.8, 0.2], color=color, linestyle="--", linewidth=1.2, alpha=0.5)
# Messages (flèches avec étiquettes)
messages = [
(1.5, 5.0, 5.0, "① INSERT INTO orders\n (données métier)", "#4C72B0", 4.8),
(1.5, 5.0, 5.0, "② INSERT INTO outbox\n (event JSON)", "#4C72B0", 3.8),
(5.0, 8.5, 5.0, "③ SELECT * FROM outbox\n WHERE published=false", "#55A868", 2.8),
(8.5, 12.0, 5.0, "④ Publish(event)", "#DD8452", 1.8),
(8.5, 5.0, 12.0, "⑤ UPDATE outbox\n SET published=true", "#DD8452", 0.9),
]
for x_start, x_end, x_from, label, color, y in messages:
dx = x_end - x_start
ax.annotate("", xy=(x_end, y), xytext=(x_start, y),
arrowprops=dict(arrowstyle="->", color=color, lw=1.8))
mx = (x_start + x_end) / 2
offset = 0.18 if dx > 0 else -0.18
ax.text(mx, y + 0.22, label, ha="center", va="bottom",
fontsize=8, color=color, fontweight="bold",
bbox=dict(facecolor="white", edgecolor=color, alpha=0.85, boxstyle="round,pad=0.2"))
# Transaction box
tx_box = mpatches.FancyBboxPatch((0.4, 3.5), 5.8, 1.8,
boxstyle="round,pad=0.1", fc="none", ec="#55A868", linewidth=2, linestyle="--", alpha=0.7)
ax.add_patch(tx_box)
ax.text(3.3, 5.4, "Transaction atomique", ha="center", va="center",
fontsize=8.5, color="#55A868", style="italic", fontweight="bold")
ax.set_title("Outbox Pattern — garantie de livraison des événements\n"
"avec cohérence transactionnelle",
fontsize=13, fontweight="bold", pad=12, color="#2C3E50")
plt.savefig("outbox_pattern.png", dpi=120, bbox_inches="tight")
plt.show()
Résumé#
Les patterns architecturaux de ce chapitre opèrent sur la cohérence des données et la résilience du système — deux propriétés impossibles à obtenir uniquement avec des patterns de conception.
Repository découple le code métier du mécanisme de persistance. Le bénéfice immédiat est la testabilité (injection d’un in-memory) ; le bénéfice long terme est la capacité à changer de base de données sans réécriture.
Unit of Work coordonne les transactions logiques multi-repositories. Sans lui, deux repositories modifiés dans la même opération peuvent diverger silencieusement.
CQRS répond à la tension entre modèle d’écriture (normalisé, avec invariants) et modèle de lecture (dénormalisé, optimisé). Il ne faut l’adopter que quand cette tension est réelle — pas par principe.
Outbox Pattern résout le problème « écrire en base ET publier un événement de manière atomique ». C’est un pattern fondamental dans tout système qui combine persistance et messaging.
Circuit Breaker protège le système contre les cascades de défaillances. Ses trois états (fermé/ouvert/demi-ouvert) implémentent une politique de récupération progressive.
Strangler Fig est la stratégie de migration la moins risquée pour remplacer un système legacy : la façade est le point de contrôle ; on migre une fonctionnalité à la fois.
À retenir
Le Repository et le Unit of Work fonctionnent ensemble : le Repository encapsule les opérations CRUD sur une entité, le Unit of Work orchestre la transaction globale qui peut impliquer plusieurs repositories. Les séparer sans les coordonner conduit à des incohérences subtiles.
Piège courant
CQRS est souvent sur-appliqué. Dans la plupart des applications, un seul modèle suffit. N’introduire CQRS que quand les besoins de lecture et d’écriture sont genuinement contradictoires : volumes différents, modèles de données incompatibles, équipes séparées.
Bonne pratique
Le Circuit Breaker doit être observable : exposer ses métriques (nb d’erreurs, état courant, temps depuis le dernier changement d’état) dans le système de monitoring. Un circuit resté OUVERT trop longtemps est le signe d’un problème non résolu en amont.