12 — Patterns architecturaux#

Les patterns vus jusqu’ici opèrent à l’échelle d’une classe ou d’un module. Les patterns architecturaux opèrent à une échelle supérieure : ils organisent des couches entières, garantissent la cohérence des données, ou protègent le système contre les défaillances externes. Ils répondent à des problèmes que les patterns de conception ne peuvent pas adresser seuls.

Repository — abstraction de la couche de données#

Problème résolu#

Le code métier ne devrait pas connaître le mécanisme de persistance : SQL, NoSQL, API REST, fichier. Le Repository fournit une interface de collection pour accéder aux entités du domaine, indépendante du stockage sous-jacent.

Interface générique#

from abc import ABC, abstractmethod
from typing import Generic, TypeVar, Optional, List

T = TypeVar("T")
ID = TypeVar("ID")

class Repository(ABC, Generic[T, ID]):
    @abstractmethod
    def get(self, entity_id: ID) -> Optional[T]:
        ...

    @abstractmethod
    def list_all(self) -> List[T]:
        ...

    @abstractmethod
    def save(self, entity: T) -> T:
        ...

    @abstractmethod
    def delete(self, entity_id: ID) -> bool:
        ...

Quand l’utiliser#

  • Quand on veut tester le code métier sans base de données réelle (injection de l’implémentation in-memory)

  • Quand on anticipe un changement de base de données

  • Quand le code métier accumule des SELECT, INSERT, UPDATE inline


Unit of Work — transaction logique#

Problème résolu#

Dans une opération métier, plusieurs repositories peuvent être modifiés. La cohérence exige que toutes les modifications soient commitées ensemble ou annulées ensemble. Le Unit of Work coordonne cette transaction logique.

from abc import ABC, abstractmethod

class UnitOfWork(ABC):
    @abstractmethod
    def __enter__(self) -> "UnitOfWork":
        ...

    @abstractmethod
    def __exit__(self, exc_type, exc_val, exc_tb) -> None:
        ...

    @abstractmethod
    def commit(self) -> None:
        ...

    @abstractmethod
    def rollback(self) -> None:
        ...

La règle est simple : si une exception survient dans le bloc with, le __exit__ appelle rollback(). Si tout se passe bien, le code appelle explicitement commit() avant de sortir du bloc.


CQRS implémenté — séparation Command/Query#

Problème résolu#

Dans une même application, les opérations de lecture et d’écriture ont des exigences radicalement différentes. Les lectures veulent des données dénormalisées, rapides à charger. Les écritures veulent des invariants métier stricts. CQRS (Command Query Responsibility Segregation) sépare ces deux modèles.

Architecture#

Client
  ├── Commande (écriture) → CommandBus → CommandHandler → WriteModel → Store
  └── Requête  (lecture)  → QueryBus  → QueryHandler  → ReadModel  → Store
from abc import ABC, abstractmethod
from typing import Any, Dict, Type

class Command(ABC):
    pass

class Query(ABC):
    pass

class CommandHandler(ABC):
    @abstractmethod
    def handle(self, command: Command) -> None:
        ...

class QueryHandler(ABC):
    @abstractmethod
    def handle(self, query: Query) -> Any:
        ...

class CommandBus:
    def __init__(self):
        self._handlers: Dict[Type[Command], CommandHandler] = {}

    def register(self, command_type: Type[Command], handler: CommandHandler) -> None:
        self._handlers[command_type] = handler

    def dispatch(self, command: Command) -> None:
        handler = self._handlers.get(type(command))
        if not handler:
            raise ValueError(f"Aucun handler pour {type(command).__name__}")
        handler.handle(command)

class QueryBus:
    def __init__(self):
        self._handlers: Dict[Type[Query], QueryHandler] = {}

    def register(self, query_type: Type[Query], handler: QueryHandler) -> None:
        self._handlers[query_type] = handler

    def ask(self, query: Query) -> Any:
        handler = self._handlers.get(type(query))
        if not handler:
            raise ValueError(f"Aucun handler pour {type(query).__name__}")
        return handler.handle(query)

Outbox Pattern — cohérence transactionnelle des événements#

Problème résolu#

Publier un événement sur un broker (Kafka, RabbitMQ) et persister des données en base sont deux opérations distinctes. Sans précaution, elles peuvent diverger : la base est mise à jour mais l’événement n’est pas publié (crash réseau), ou l’inverse.

L’Outbox Pattern résout ce problème en stockant les événements dans la même transaction que les données, puis en les publiant de manière asynchrone.

Flux#

1. Transaction DB :
   ├── INSERT INTO orders (...)       ← données métier
   └── INSERT INTO outbox (event)     ← événement à publier

2. Processus asynchrone (Outbox Relay) :
   ├── SELECT * FROM outbox WHERE published = false
   ├── Publier sur le broker
   └── UPDATE outbox SET published = true

Ce pattern garantit at-least-once delivery : si le relay échoue après publication mais avant le UPDATE, il republiera au redémarrage. Les consommateurs doivent donc être idempotents.

from dataclasses import dataclass, field
from typing import List, Optional
import json
import time
import uuid

@dataclass
class OutboxEntry:
    id: str = field(default_factory=lambda: str(uuid.uuid4()))
    event_type: str = ""
    payload: str = ""       # JSON sérialisé
    created_at: float = field(default_factory=time.time)
    published: bool = False
    published_at: Optional[float] = None

class OutboxStore:
    """Simulation d'une table outbox en mémoire."""
    def __init__(self):
        self._entries: List[OutboxEntry] = []

    def append(self, event_type: str, payload: dict) -> OutboxEntry:
        entry = OutboxEntry(event_type=event_type, payload=json.dumps(payload))
        self._entries.append(entry)
        return entry

    def pending(self) -> List[OutboxEntry]:
        return [e for e in self._entries if not e.published]

    def mark_published(self, entry_id: str) -> None:
        for e in self._entries:
            if e.id == entry_id:
                e.published = True
                e.published_at = time.time()

    def stats(self) -> dict:
        total = len(self._entries)
        published = sum(1 for e in self._entries if e.published)
        return {"total": total, "published": published, "pending": total - published}

Circuit Breaker — protection contre les défaillances externes#

Problème résolu#

Appeler un service externe (API tierce, microservice) qui est en panne peut provoquer une cascade de défaillances : les requêtes s’accumulent, les threads se bloquent, la latence explose. Le Circuit Breaker interrompt les appels vers un service défaillant et laisse le temps de récupération.

États#

FERMÉ (CLOSED)          : appels passent normalement
  │ trop d'erreurs
  ▼
OUVERT (OPEN)           : appels rejetés immédiatement (fail fast)
  │ après timeout de récupération
  ▼
DEMI-OUVERT (HALF-OPEN) : quelques appels tests
  │ si OK → FERMÉ       │ si ERREUR → OUVERT

Strangler Fig — migration progressive#

Problème résolu#

Réécrire un système legacy en une seule fois est risqué. Le Strangler Fig permet une migration incrémentale : un nouveau système prend en charge les fonctionnalités une par une, pendant que l’ancien continue de tourner.

Principe#

Client
  └── Façade (router)
        ├── Si nouvelle fonctionnalité → Nouveau système
        └── Si ancienne fonctionnalité → Système legacy

À mesure que de nouvelles fonctionnalités sont portées, la façade redirige de plus en plus vers le nouveau système. Quand tout est migré, le legacy est éteint et la façade supprimée.

from enum import Enum, auto
from typing import Dict, Callable, Any

class SystemVersion(Enum):
    LEGACY = auto()
    NEW = auto()

class StranglerFig:
    def __init__(self):
        self._routes: Dict[str, SystemVersion] = {}
        self._legacy_handler: Callable = None
        self._new_handler: Callable = None

    def set_legacy(self, handler: Callable) -> None:
        self._legacy_handler = handler

    def set_new(self, handler: Callable) -> None:
        self._new_handler = handler

    def migrate(self, feature: str) -> None:
        """Migre une fonctionnalité vers le nouveau système."""
        self._routes[feature] = SystemVersion.NEW
        print(f"[STRANGLER] Fonctionnalité '{feature}' migrée vers le nouveau système")

    def handle(self, feature: str, *args, **kwargs) -> Any:
        version = self._routes.get(feature, SystemVersion.LEGACY)
        if version == SystemVersion.NEW:
            return self._new_handler(feature, *args, **kwargs)
        return self._legacy_handler(feature, *args, **kwargs)

    def migration_progress(self) -> dict:
        total = len(self._routes)
        migrated = sum(1 for v in self._routes.values() if v == SystemVersion.NEW)
        return {
            "migrated": migrated,
            "total": total,
            "percent": (migrated / total * 100) if total > 0 else 0
        }

Démonstrations exécutables#

Démo 1 — Circuit Breaker complet avec simulation de pannes#

import time
import random
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
import seaborn as sns
from enum import Enum, auto
from typing import Callable, Optional, Any, List, Tuple
from dataclasses import dataclass, field

class CircuitState(Enum):
    CLOSED = "FERMÉ"
    OPEN = "OUVERT"
    HALF_OPEN = "DEMI-OUVERT"

@dataclass
class CircuitBreakerConfig:
    failure_threshold: int = 5       # Nb d'erreurs avant ouverture
    success_threshold: int = 2       # Nb de succès pour refermer
    timeout_seconds: float = 3.0     # Durée d'état OPEN
    half_open_max_calls: int = 3     # Appels max en HALF_OPEN

class CircuitBreaker:
    def __init__(self, name: str, config: CircuitBreakerConfig = None):
        self.name = name
        self._config = config or CircuitBreakerConfig()
        self._state = CircuitState.CLOSED
        self._failure_count = 0
        self._success_count = 0
        self._last_failure_time: Optional[float] = None
        self._half_open_calls = 0
        self._history: List[Tuple[float, str, str]] = []  # (time, state, result)

    @property
    def state(self) -> CircuitState:
        if self._state == CircuitState.OPEN:
            elapsed = time.time() - self._last_failure_time
            if elapsed >= self._config.timeout_seconds:
                self._transition_to(CircuitState.HALF_OPEN)
        return self._state

    def _transition_to(self, new_state: CircuitState) -> None:
        print(f"  [CB:{self.name}] {self._state.value}{new_state.value}")
        self._state = new_state
        if new_state == CircuitState.HALF_OPEN:
            self._half_open_calls = 0
            self._success_count = 0

    def call(self, fn: Callable, *args, **kwargs) -> Any:
        current_state = self.state

        if current_state == CircuitState.OPEN:
            self._history.append((time.time(), "OPEN", "REJECTED"))
            raise RuntimeError(f"Circuit {self.name} OUVERT — appel rejeté")

        if current_state == CircuitState.HALF_OPEN:
            if self._half_open_calls >= self._config.half_open_max_calls:
                self._history.append((time.time(), "HALF_OPEN", "REJECTED"))
                raise RuntimeError(f"Circuit {self.name} DEMI-OUVERT — quota atteint")
            self._half_open_calls += 1

        try:
            result = fn(*args, **kwargs)
            self._on_success()
            self._history.append((time.time(), current_state.name, "SUCCESS"))
            return result
        except Exception as e:
            self._on_failure()
            self._history.append((time.time(), current_state.name, "FAILURE"))
            raise

    def _on_success(self) -> None:
        self._failure_count = 0
        if self._state == CircuitState.HALF_OPEN:
            self._success_count += 1
            if self._success_count >= self._config.success_threshold:
                self._transition_to(CircuitState.CLOSED)

    def _on_failure(self) -> None:
        self._failure_count += 1
        self._last_failure_time = time.time()
        if self._state == CircuitState.HALF_OPEN:
            self._transition_to(CircuitState.OPEN)
        elif self._failure_count >= self._config.failure_threshold:
            self._transition_to(CircuitState.OPEN)


# Simulation d'un service externe instable
def unstable_service(fail_rate: float = 0.0) -> str:
    if random.random() < fail_rate:
        raise ConnectionError("Service externe indisponible")
    return "OK"

config = CircuitBreakerConfig(
    failure_threshold=3,
    success_threshold=2,
    timeout_seconds=0.5,
)
cb = CircuitBreaker("payment-api", config)

results = []
states = []
random.seed(42)

# Phase 1 : service stable (10 appels)
print("=== Phase 1 : service stable ===")
for i in range(10):
    try:
        cb.call(unstable_service, fail_rate=0.0)
        results.append(1)
    except:
        results.append(0)
    states.append(cb._state.name)

# Phase 2 : service dégradé (10 appels, 70% d'erreurs)
print("\n=== Phase 2 : service dégradé ===")
for i in range(10):
    try:
        cb.call(unstable_service, fail_rate=0.7)
        results.append(1)
    except:
        results.append(0)
    states.append(cb._state.name)

# Attente que le timeout expire
time.sleep(0.6)

# Phase 3 : service rétabli — circuit doit tester puis fermer
print("\n=== Phase 3 : service rétabli ===")
for i in range(10):
    try:
        cb.call(unstable_service, fail_rate=0.0)
        results.append(1)
    except:
        results.append(0)
    states.append(cb._state.name)

# Visualisation
sns.set_theme(style="whitegrid", palette="muted", font_scale=1.1)
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 7), sharex=True)

x = range(len(results))
colors_r = ["#55A868" if r == 1 else "#C44E52" for r in results]
ax1.bar(x, results, color=colors_r, width=0.7)
ax1.set_ylabel("Résultat")
ax1.set_yticks([0, 1])
ax1.set_yticklabels(["Échec / Rejeté", "Succès"])
ax1.set_title("Simulation Circuit Breaker — résultats appel par appel", fontsize=12, fontweight="bold")

state_map = {"CLOSED": 0, "OPEN": 2, "HALF_OPEN": 1}
state_colors = {"CLOSED": "#55A868", "OPEN": "#C44E52", "HALF_OPEN": "#DD8452"}
state_vals = [state_map[s] for s in states]
state_cols = [state_colors[s] for s in states]
ax2.bar(x, [1] * len(states), color=state_cols, width=0.7, alpha=0.8)
ax2.set_yticks([])
ax2.set_xlabel("Numéro d'appel")
ax2.set_title("État du circuit par appel", fontsize=11)

legend_elements = [
    mpatches.Patch(facecolor="#55A868", label="FERMÉ"),
    mpatches.Patch(facecolor="#DD8452", label="DEMI-OUVERT"),
    mpatches.Patch(facecolor="#C44E52", label="OUVERT"),
]
ax2.legend(handles=legend_elements, loc="upper right", fontsize=9)

ax1.axvline(9.5, color="#555", linestyle="--", alpha=0.5, label="Dégradation")
ax1.axvline(19.5, color="#555", linestyle=":", alpha=0.5, label="Rétablissement")
ax1.legend(fontsize=9)

plt.savefig("circuit_breaker.png", dpi=120, bbox_inches="tight")
plt.show()
print(f"\nHistorique: {len(cb._history)} appels enregistrés")
=== Phase 1 : service stable ===

=== Phase 2 : service dégradé ===
  [CB:payment-api] FERMÉ → OUVERT
=== Phase 3 : service rétabli ===
  [CB:payment-api] OUVERT → DEMI-OUVERT
  [CB:payment-api] DEMI-OUVERT → FERMÉ
_images/e6e746850fad0dd0e2101cb9252e5b33c3fde6fcdb84bb5238eb70020bb786a4.png
Historique: 30 appels enregistrés

Démo 2 — Repository pattern avec deux adapters#

from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Optional, List, Dict
import uuid

@dataclass
class User:
    name: str
    email: str
    role: str = "user"
    id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])

    def __repr__(self):
        return f"User(id={self.id}, name={self.name}, role={self.role})"


class UserRepository(ABC):
    @abstractmethod
    def get(self, user_id: str) -> Optional[User]:
        ...

    @abstractmethod
    def list_all(self) -> List[User]:
        ...

    @abstractmethod
    def save(self, user: User) -> User:
        ...

    @abstractmethod
    def delete(self, user_id: str) -> bool:
        ...

    @abstractmethod
    def find_by_email(self, email: str) -> Optional[User]:
        ...


class InMemoryUserRepository(UserRepository):
    """Implémentation en mémoire — idéale pour les tests."""
    def __init__(self):
        self._store: Dict[str, User] = {}

    def get(self, user_id: str) -> Optional[User]:
        return self._store.get(user_id)

    def list_all(self) -> List[User]:
        return list(self._store.values())

    def save(self, user: User) -> User:
        self._store[user.id] = user
        return user

    def delete(self, user_id: str) -> bool:
        if user_id in self._store:
            del self._store[user_id]
            return True
        return False

    def find_by_email(self, email: str) -> Optional[User]:
        for user in self._store.values():
            if user.email == email:
                return user
        return None


class SimulatedSQLiteRepository(UserRepository):
    """Simule une couche SQLite — mêmes opérations, stockage différent."""
    def __init__(self):
        # En production, ce serait une vraie connexion SQLite
        self._rows: Dict[str, dict] = {}  # Simule les rows SQL
        self._sequence = 0

    def _to_user(self, row: dict) -> User:
        return User(id=row["id"], name=row["name"], email=row["email"], role=row["role"])

    def _to_row(self, user: User) -> dict:
        return {"id": user.id, "name": user.name, "email": user.email, "role": user.role}

    def get(self, user_id: str) -> Optional[User]:
        # Simule: SELECT * FROM users WHERE id = ?
        row = self._rows.get(user_id)
        return self._to_user(row) if row else None

    def list_all(self) -> List[User]:
        # Simule: SELECT * FROM users
        return [self._to_user(row) for row in self._rows.values()]

    def save(self, user: User) -> User:
        # Simule: INSERT OR REPLACE INTO users (...)
        self._rows[user.id] = self._to_row(user)
        return user

    def delete(self, user_id: str) -> bool:
        # Simule: DELETE FROM users WHERE id = ?
        if user_id in self._rows:
            del self._rows[user_id]
            return True
        return False

    def find_by_email(self, email: str) -> Optional[User]:
        # Simule: SELECT * FROM users WHERE email = ?
        for row in self._rows.values():
            if row["email"] == email:
                return self._to_user(row)
        return None


# Code métier — identique quel que soit le repository
class UserService:
    def __init__(self, repo: UserRepository):
        self._repo = repo

    def register(self, name: str, email: str, role: str = "user") -> User:
        existing = self._repo.find_by_email(email)
        if existing:
            raise ValueError(f"Email déjà utilisé: {email}")
        user = User(name=name, email=email, role=role)
        return self._repo.save(user)

    def promote_to_admin(self, user_id: str) -> User:
        user = self._repo.get(user_id)
        if not user:
            raise ValueError(f"Utilisateur {user_id} introuvable")
        user.role = "admin"
        return self._repo.save(user)

    def list_users(self) -> List[User]:
        return self._repo.list_all()


# Test avec les deux implémentations
def demo_repository(repo: UserRepository, label: str):
    print(f"\n=== {label} ===")
    service = UserService(repo)

    alice = service.register("Alice Martin", "alice@example.com", "admin")
    bob = service.register("Bob Dupont", "bob@example.com")
    claire = service.register("Claire Lefèvre", "claire@example.com")

    print(f"Utilisateurs créés: {len(service.list_users())}")

    claire_promoted = service.promote_to_admin(claire.id)
    print(f"Rôle de Claire après promotion: {claire_promoted.role}")

    found = repo.find_by_email("bob@example.com")
    print(f"Recherche par email: {found}")

    repo.delete(bob.id)
    print(f"Après suppression de Bob: {len(service.list_users())} utilisateurs")

    try:
        service.register("Alice bis", "alice@example.com")
    except ValueError as e:
        print(f"Doublon détecté: {e}")

demo_repository(InMemoryUserRepository(), "Repository In-Memory")
demo_repository(SimulatedSQLiteRepository(), "Repository SQLite simulé")
print("\n✓ Le code métier (UserService) est identique pour les deux implémentations.")
=== Repository In-Memory ===
Utilisateurs créés: 3
Rôle de Claire après promotion: admin
Recherche par email: User(id=c2dcd76b, name=Bob Dupont, role=user)
Après suppression de Bob: 2 utilisateurs
Doublon détecté: Email déjà utilisé: alice@example.com

=== Repository SQLite simulé ===
Utilisateurs créés: 3
Rôle de Claire après promotion: admin
Recherche par email: User(id=985572bb, name=Bob Dupont, role=user)
Après suppression de Bob: 2 utilisateurs
Doublon détecté: Email déjà utilisé: alice@example.com

✓ Le code métier (UserService) est identique pour les deux implémentations.

Démo 3 — Simulation CQRS : blog avec write/read models séparés#

from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any, Type
import uuid
import time

# ── Domaine (écriture) ──────────────────────────────────────────

@dataclass
class Article:
    title: str
    content: str
    author_id: str
    id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
    published: bool = False
    created_at: float = field(default_factory=time.time)
    tags: List[str] = field(default_factory=list)

# ── Commandes ───────────────────────────────────────────────────

class Command(ABC):
    pass

@dataclass
class CreateArticle(Command):
    title: str
    content: str
    author_id: str
    tags: List[str] = field(default_factory=list)

@dataclass
class PublishArticle(Command):
    article_id: str

@dataclass
class AddTag(Command):
    article_id: str
    tag: str

# ── Requêtes ────────────────────────────────────────────────────

class Query(ABC):
    pass

@dataclass
class GetArticle(Query):
    article_id: str

@dataclass
class ListPublished(Query):
    author_id: Optional[str] = None

@dataclass
class SearchByTag(Query):
    tag: str

# ── Stores séparés ──────────────────────────────────────────────

class WriteStore:
    """Modèle d'écriture normalisé."""
    def __init__(self):
        self._articles: Dict[str, Article] = {}
        self._events: List[dict] = []

    def save(self, article: Article) -> None:
        self._articles[article.id] = article
        self._events.append({"type": "article_saved", "id": article.id, "at": time.time()})

    def get(self, article_id: str) -> Optional[Article]:
        return self._articles.get(article_id)


class ReadStore:
    """Modèle de lecture dénormalisé pour les requêtes rapides."""
    def __init__(self):
        self._published: Dict[str, dict] = {}  # Vue dénormalisée

    def upsert(self, article: Article, author_name: str = "") -> None:
        self._published[article.id] = {
            "id": article.id,
            "title": article.title,
            "author": author_name or article.author_id,
            "tags": article.tags,
            "published": article.published,
            "preview": article.content[:100] + "..." if len(article.content) > 100 else article.content,
        }

    def get(self, article_id: str) -> Optional[dict]:
        return self._published.get(article_id)

    def list_published(self, author_id: Optional[str] = None) -> List[dict]:
        items = [v for v in self._published.values() if v["published"]]
        if author_id:
            items = [v for v in items if v.get("author_id") == author_id]
        return items

    def search_by_tag(self, tag: str) -> List[dict]:
        return [v for v in self._published.values() if tag in v.get("tags", [])]


# ── Handlers ────────────────────────────────────────────────────

class CreateArticleHandler:
    def __init__(self, write_store: WriteStore, read_store: ReadStore):
        self._write = write_store
        self._read = read_store

    def handle(self, cmd: CreateArticle) -> Article:
        article = Article(
            title=cmd.title, content=cmd.content,
            author_id=cmd.author_id, tags=cmd.tags
        )
        self._write.save(article)
        self._read.upsert(article)
        print(f"  [CMD] CreateArticle → id={article.id}, titre='{article.title}'")
        return article


class PublishArticleHandler:
    def __init__(self, write_store: WriteStore, read_store: ReadStore):
        self._write = write_store
        self._read = read_store

    def handle(self, cmd: PublishArticle) -> None:
        article = self._write.get(cmd.article_id)
        if not article:
            raise ValueError(f"Article {cmd.article_id} introuvable")
        article.published = True
        self._write.save(article)
        self._read.upsert(article)
        print(f"  [CMD] PublishArticle → id={article.id} publié")


class GetArticleHandler:
    def __init__(self, read_store: ReadStore):
        self._read = read_store

    def handle(self, query: GetArticle) -> Optional[dict]:
        result = self._read.get(query.article_id)
        print(f"  [QRY] GetArticle({query.article_id}) → {result['title'] if result else 'None'}")
        return result


class ListPublishedHandler:
    def __init__(self, read_store: ReadStore):
        self._read = read_store

    def handle(self, query: ListPublished) -> List[dict]:
        result = self._read.list_published(query.author_id)
        print(f"  [QRY] ListPublished → {len(result)} article(s)")
        return result


class SearchByTagHandler:
    def __init__(self, read_store: ReadStore):
        self._read = read_store

    def handle(self, query: SearchByTag) -> List[dict]:
        result = self._read.search_by_tag(query.tag)
        print(f"  [QRY] SearchByTag('{query.tag}') → {len(result)} résultat(s)")
        return result


# ── Démo ────────────────────────────────────────────────────────

write_store = WriteStore()
read_store = ReadStore()

create_handler = CreateArticleHandler(write_store, read_store)
publish_handler = PublishArticleHandler(write_store, read_store)
get_handler = GetArticleHandler(read_store)
list_handler = ListPublishedHandler(read_store)
search_handler = SearchByTagHandler(read_store)

print("=== Commandes (écriture) ===")
a1 = create_handler.handle(CreateArticle(
    "Introduction à CQRS", "CQRS sépare les modèles de lecture et d'écriture...",
    "author_1", tags=["architecture", "patterns"]
))
a2 = create_handler.handle(CreateArticle(
    "Le pattern Repository", "Le Repository abstrait la couche de persistance...",
    "author_1", tags=["patterns", "database"]
))
a3 = create_handler.handle(CreateArticle(
    "Circuit Breaker en pratique", "Protéger les appels externes avec un circuit breaker...",
    "author_2", tags=["résilience", "patterns"]
))

publish_handler.handle(PublishArticle(a1.id))
publish_handler.handle(PublishArticle(a3.id))
# a2 reste en brouillon

print("\n=== Requêtes (lecture) ===")
get_handler.handle(GetArticle(a1.id))
list_handler.handle(ListPublished())
search_handler.handle(SearchByTag("patterns"))
search_handler.handle(SearchByTag("résilience"))
=== Commandes (écriture) ===
  [CMD] CreateArticle → id=49849469, titre='Introduction à CQRS'
  [CMD] CreateArticle → id=81dc2576, titre='Le pattern Repository'
  [CMD] CreateArticle → id=bd097938, titre='Circuit Breaker en pratique'
  [CMD] PublishArticle → id=49849469 publié
  [CMD] PublishArticle → id=bd097938 publié

=== Requêtes (lecture) ===
  [QRY] GetArticle(49849469) → Introduction à CQRS
  [QRY] ListPublished → 2 article(s)
  [QRY] SearchByTag('patterns') → 3 résultat(s)
  [QRY] SearchByTag('résilience') → 1 résultat(s)
[{'id': 'bd097938',
  'title': 'Circuit Breaker en pratique',
  'author': 'author_2',
  'tags': ['résilience', 'patterns'],
  'published': True,
  'preview': 'Protéger les appels externes avec un circuit breaker...'}]

Démo 4 — Diagramme Outbox Pattern (flux de messages)#

import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
import matplotlib.patheffects as pe
import seaborn as sns
import numpy as np

sns.set_theme(style="whitegrid", palette="muted", font_scale=1.0)

fig, ax = plt.subplots(figsize=(13, 7))
ax.set_xlim(0, 13)
ax.set_ylim(0, 7)
ax.axis("off")
ax.set_facecolor("#F8F9FA")

# Acteurs verticaux
actors = [
    (1.5, "Application\n(Service métier)", "#4C72B0"),
    (5.0, "Base de données\n(Transaction)", "#55A868"),
    (8.5, "Outbox Relay\n(Processus async)", "#DD8452"),
    (12.0, "Message Broker\n(Kafka / RabbitMQ)", "#C44E52"),
]

for x, label, color in actors:
    box = mpatches.FancyBboxPatch((x - 0.9, 5.8), 1.8, 0.9,
        boxstyle="round,pad=0.08", fc=color, ec="none", alpha=0.9)
    ax.add_patch(box)
    ax.text(x, 6.25, label, ha="center", va="center",
            fontsize=8.5, fontweight="bold", color="white")
    ax.plot([x, x], [5.8, 0.2], color=color, linestyle="--", linewidth=1.2, alpha=0.5)

# Messages (flèches avec étiquettes)
messages = [
    (1.5, 5.0, 5.0, "① INSERT INTO orders\n   (données métier)", "#4C72B0", 4.8),
    (1.5, 5.0, 5.0, "② INSERT INTO outbox\n   (event JSON)", "#4C72B0", 3.8),
    (5.0, 8.5, 5.0, "③ SELECT * FROM outbox\n   WHERE published=false", "#55A868", 2.8),
    (8.5, 12.0, 5.0, "④ Publish(event)", "#DD8452", 1.8),
    (8.5, 5.0, 12.0, "⑤ UPDATE outbox\n   SET published=true", "#DD8452", 0.9),
]

for x_start, x_end, x_from, label, color, y in messages:
    dx = x_end - x_start
    ax.annotate("", xy=(x_end, y), xytext=(x_start, y),
                arrowprops=dict(arrowstyle="->", color=color, lw=1.8))
    mx = (x_start + x_end) / 2
    offset = 0.18 if dx > 0 else -0.18
    ax.text(mx, y + 0.22, label, ha="center", va="bottom",
            fontsize=8, color=color, fontweight="bold",
            bbox=dict(facecolor="white", edgecolor=color, alpha=0.85, boxstyle="round,pad=0.2"))

# Transaction box
tx_box = mpatches.FancyBboxPatch((0.4, 3.5), 5.8, 1.8,
    boxstyle="round,pad=0.1", fc="none", ec="#55A868", linewidth=2, linestyle="--", alpha=0.7)
ax.add_patch(tx_box)
ax.text(3.3, 5.4, "Transaction atomique", ha="center", va="center",
        fontsize=8.5, color="#55A868", style="italic", fontweight="bold")

ax.set_title("Outbox Pattern — garantie de livraison des événements\n"
             "avec cohérence transactionnelle",
             fontsize=13, fontweight="bold", pad=12, color="#2C3E50")
plt.savefig("outbox_pattern.png", dpi=120, bbox_inches="tight")
plt.show()
_images/650d98f5b8ce583c04f72717f25b03104b0300d3924987cdc87f15bebdc86f37.png

Résumé#

Les patterns architecturaux de ce chapitre opèrent sur la cohérence des données et la résilience du système — deux propriétés impossibles à obtenir uniquement avec des patterns de conception.

  • Repository découple le code métier du mécanisme de persistance. Le bénéfice immédiat est la testabilité (injection d’un in-memory) ; le bénéfice long terme est la capacité à changer de base de données sans réécriture.

  • Unit of Work coordonne les transactions logiques multi-repositories. Sans lui, deux repositories modifiés dans la même opération peuvent diverger silencieusement.

  • CQRS répond à la tension entre modèle d’écriture (normalisé, avec invariants) et modèle de lecture (dénormalisé, optimisé). Il ne faut l’adopter que quand cette tension est réelle — pas par principe.

  • Outbox Pattern résout le problème « écrire en base ET publier un événement de manière atomique ». C’est un pattern fondamental dans tout système qui combine persistance et messaging.

  • Circuit Breaker protège le système contre les cascades de défaillances. Ses trois états (fermé/ouvert/demi-ouvert) implémentent une politique de récupération progressive.

  • Strangler Fig est la stratégie de migration la moins risquée pour remplacer un système legacy : la façade est le point de contrôle ; on migre une fonctionnalité à la fois.

À retenir

Le Repository et le Unit of Work fonctionnent ensemble : le Repository encapsule les opérations CRUD sur une entité, le Unit of Work orchestre la transaction globale qui peut impliquer plusieurs repositories. Les séparer sans les coordonner conduit à des incohérences subtiles.

Piège courant

CQRS est souvent sur-appliqué. Dans la plupart des applications, un seul modèle suffit. N’introduire CQRS que quand les besoins de lecture et d’écriture sont genuinement contradictoires : volumes différents, modèles de données incompatibles, équipes séparées.

Bonne pratique

Le Circuit Breaker doit être observable : exposer ses métriques (nb d’erreurs, état courant, temps depuis le dernier changement d’état) dans le système de monitoring. Un circuit resté OUVERT trop longtemps est le signe d’un problème non résolu en amont.