gRPC et Protocol Buffers#

gRPC est un framework RPC (Remote Procedure Call) open-source développé par Google, publié en 2016. Il repose sur deux technologies fondamentales : Protocol Buffers comme langage de description d’interface (IDL) et format de sérialisation, et HTTP/2 comme protocole de transport. gRPC est conçu pour les communications microservices à haute performance, les APIs mobiles et les systèmes distribués où la latence et l’efficacité réseau sont critiques.

Ce chapitre couvre Protocol Buffers en profondeur (IDL, encodage binaire, types), les quatre modes de streaming de gRPC, son fonctionnement sur HTTP/2, et fournit un exemple complet illustratif d’un service de calcul.

Hide code cell source

import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
import seaborn as sns
import pandas as pd
import struct
import json

sns.set_theme(style="whitegrid", palette="muted", font_scale=1.1)
np.random.seed(42)

Protocol Buffers : le langage de description d’interface#

Définition d’un schéma .proto#

Protocol Buffers (Protobuf) est à la fois un IDL (Interface Definition Language) pour décrire des structures de données et un format de sérialisation binaire ultra-compact. Contrairement à JSON ou XML, Protobuf nécessite un schéma préalable partagé entre l’émetteur et le récepteur.

Un fichier .proto définit des messages (structures de données) et des services (interfaces RPC) :

syntax = "proto3";

package calculatrice;

option go_package = "./pb";

// Types scalaires disponibles :
// double, float, int32, int64, uint32, uint64, sint32, sint64
// fixed32, fixed64, sfixed32, sfixed64, bool, string, bytes

// Énumération
enum TypeOperation {
  ADDITION       = 0;  // Valeur par défaut obligatoirement 0
  SOUSTRACTION   = 1;
  MULTIPLICATION = 2;
  DIVISION       = 3;
}

// Message simple
message NombreEntier {
  int64 valeur = 1;
}

// Message avec champs de différents types
message Requete {
  double operande_a = 1;
  double operande_b = 2;
  TypeOperation operation = 3;
  string id_requete = 4;        // Identifiant traçabilité
  map<string, string> metadata = 5; // Métadonnées optionnelles
}

// Message imbriqué
message Resultat {
  double valeur = 1;
  bool succes = 2;
  string message_erreur = 3;   // Vide si succès
  int64 duree_ms = 4;           // Durée de calcul

  // Message imbriqué
  Statistiques stats = 5;

  message Statistiques {
    int32 n_calculs_serveur = 1;
    double charge_cpu = 2;
  }
}

// Flux de nombres pour le streaming
message FluxNombre {
  double valeur = 1;
  int64 sequence = 2;
  int64 timestamp_ms = 3;
}

// Résultat cumulatif du streaming
message ResultatCumulatif {
  double somme = 1;
  double moyenne = 2;
  double min = 3;
  double max = 4;
  int32 n_valeurs = 5;
}

// Champ oneof : une seule valeur présente
message ReponseFlexible {
  oneof contenu {
    Resultat resultat = 1;
    string erreur = 2;
    bytes donnees_binaires = 3;
  }
}

// Champ repeated (liste)
message LotRequetes {
  repeated Requete requetes = 1;
  string id_lot = 2;
}

Types scalaires et leurs encodages#

Protobuf distingue plusieurs types numériques qui utilisent des encodages différents selon l’usage :

Hide code cell source

# Tableau des types scalaires Protobuf et leurs encodages
fig, ax = plt.subplots(figsize=(13, 6))
ax.axis("off")

colonnes = ["Type Protobuf", "Wire Type", "Encodage", "Taille (octets)", "Cas d'usage recommandé"]
lignes = [
    ["int32", "0 (varint)", "Varint signé (zigzag non)", "1-5", "Entiers positifs, IDs"],
    ["int64", "0 (varint)", "Varint signé (zigzag non)", "1-10", "Grands entiers positifs"],
    ["sint32", "0 (varint)", "ZigZag → Varint", "1-5", "Entiers négatifs fréquents"],
    ["sint64", "0 (varint)", "ZigZag → Varint", "1-10", "Grands entiers négatifs"],
    ["uint32", "0 (varint)", "Varint non signé", "1-5", "Compteurs, tailles"],
    ["bool", "0 (varint)", "0 ou 1", "1", "Booléens"],
    ["enum", "0 (varint)", "Varint (valeur int)", "1-5", "Énumérations"],
    ["fixed32", "5 (32-bit)", "Little-endian 32 bits", "4", "Valeurs > 2²⁸ (efficace)"],
    ["fixed64", "1 (64-bit)", "Little-endian 64 bits", "8", "Timestamps, hash 64 bits"],
    ["float", "5 (32-bit)", "IEEE 754 32 bits", "4", "Flottants 32 bits"],
    ["double", "1 (64-bit)", "IEEE 754 64 bits", "8", "Flottants 64 bits"],
    ["string", "2 (LEN)", "Varint(len) + UTF-8", "2 + len", "Texte UTF-8"],
    ["bytes", "2 (LEN)", "Varint(len) + données", "2 + len", "Données binaires"],
    ["message", "2 (LEN)", "Varint(len) + encodé", "2 + taille", "Messages imbriqués"],
]

couleurs = []
for i in range(len(lignes)):
    if lignes[i][1].startswith("0"):
        couleurs.append(["#E3F2FD"] * 5)
    elif lignes[i][1].startswith("1") or lignes[i][1].startswith("5"):
        couleurs.append(["#E8F5E9"] * 5)
    else:
        couleurs.append(["#FFF8E1"] * 5)

table = ax.table(
    cellText=lignes,
    colLabels=colonnes,
    cellLoc="center",
    loc="center",
    cellColours=couleurs,
)
table.auto_set_font_size(False)
table.set_fontsize(8)
table.scale(1, 1.6)

for j in range(5):
    table[0, j].set_facecolor("#37474F")
    table[0, j].set_text_props(color="white", fontweight="bold")

ax.set_title("Types scalaires Protocol Buffers et leurs wire types", fontsize=12, fontweight="bold", pad=15)
plt.tight_layout()
plt.show()
_images/eeac1d9390041d35c0cadf3d9606f083bbd929532e3fcbc79916dfb67a6673c0.png

Encodage varint et wire types#

L’encodage varint (variable-length integer) est la clé de la compacité de Protobuf. Chaque octet utilise 7 bits pour les données et 1 bit (le bit de poids fort) pour indiquer si l’octet suivant fait partie du même nombre.

Hide code cell source

def encoder_varint(n: int) -> bytes:
    """Encode un entier non signé en varint Protobuf."""
    if n < 0:
        n += (1 << 64)  # Traitement des négatifs (two's complement sur 64 bits)
    result = []
    while True:
        bits = n & 0x7F
        n >>= 7
        if n:
            result.append(bits | 0x80)  # Bit continuation = 1
        else:
            result.append(bits)
            break
    return bytes(result)

def encoder_zigzag_varint(n: int) -> bytes:
    """Encode un entier signé avec ZigZag puis varint."""
    zigzag = (n << 1) ^ (n >> 31)
    return encoder_varint(zigzag)

def tag_field(field_number: int, wire_type: int) -> bytes:
    """Encode le tag d'un champ Protobuf."""
    return encoder_varint((field_number << 3) | wire_type)

# Démonstration encodage
print("Encodage varint Protobuf :")
print("=" * 50)
exemples = [
    (1, "Valeur minimale varint"),
    (127, "Max sur 1 octet (0x7F)"),
    (128, "Min sur 2 octets (0x80)"),
    (300, "Exemple classique RFC"),
    (16383, "Max sur 2 octets"),
    (2097152, "Valeur sur 3 octets"),
]

for val, desc in exemples:
    encoded = encoder_varint(val)
    hex_str = " ".join(f"0x{b:02X}" for b in encoded)
    bits_str = " ".join(f"{b:08b}" for b in encoded)
    print(f"{val:>10}{hex_str:<25} ({len(encoded)} octet(s)) — {desc}")

print("\nEncodage ZigZag pour entiers signés (sint32) :")
print("=" * 50)
for val in [0, -1, 1, -2, 2, -128, 128]:
    encoded = encoder_zigzag_varint(val)
    hex_str = " ".join(f"0x{b:02X}" for b in encoded)
    print(f"{val:>6}{hex_str:<20} ({len(encoded)} octet(s))")
Encodage varint Protobuf :
==================================================
         1 → 0x01                      (1 octet(s)) — Valeur minimale varint
       127 → 0x7F                      (1 octet(s)) — Max sur 1 octet (0x7F)
       128 → 0x80 0x01                 (2 octet(s)) — Min sur 2 octets (0x80)
       300 → 0xAC 0x02                 (2 octet(s)) — Exemple classique RFC
     16383 → 0xFF 0x7F                 (2 octet(s)) — Max sur 2 octets
   2097152 → 0x80 0x80 0x80 0x01       (4 octet(s)) — Valeur sur 3 octets

Encodage ZigZag pour entiers signés (sint32) :
==================================================
     0 → 0x00                 (1 octet(s))
    -1 → 0x01                 (1 octet(s))
     1 → 0x02                 (1 octet(s))
    -2 → 0x03                 (1 octet(s))
     2 → 0x04                 (1 octet(s))
  -128 → 0xFF 0x01            (2 octet(s))
   128 → 0x80 0x02            (2 octet(s))

Hide code cell source

# Simulation encodage d'un message Protobuf complet
def simuler_encodage_protobuf(operande_a: float, operande_b: float,
                               operation: int, id_req: str) -> bytes:
    """
    Simule l'encodage Protobuf du message Requete :
      message Requete {
        double operande_a = 1;   wire type 1 (64-bit)
        double operande_b = 2;   wire type 1 (64-bit)
        TypeOperation = 3;       wire type 0 (varint)
        string id_requete = 4;   wire type 2 (length-delimited)
      }
    """
    result = bytearray()

    # Champ 1 : operande_a (double, wire type 1 = 64-bit)
    result.extend(tag_field(1, 1))
    result.extend(struct.pack("<d", operande_a))

    # Champ 2 : operande_b (double, wire type 1)
    result.extend(tag_field(2, 1))
    result.extend(struct.pack("<d", operande_b))

    # Champ 3 : operation (enum/int32, wire type 0 = varint)
    result.extend(tag_field(3, 0))
    result.extend(encoder_varint(operation))

    # Champ 4 : id_requete (string, wire type 2 = length-delimited)
    id_bytes = id_req.encode("utf-8")
    result.extend(tag_field(4, 2))
    result.extend(encoder_varint(len(id_bytes)))
    result.extend(id_bytes)

    return bytes(result)

# Encodage du même message en JSON pour comparaison
message_dict = {
    "operande_a": 3.14159265,
    "operande_b": 2.71828182,
    "operation": "MULTIPLICATION",
    "id_requete": "req-abc123"
}

# Protobuf binaire simulé
proto_bytes = simuler_encodage_protobuf(3.14159265, 2.71828182, 2, "req-abc123")
# JSON équivalent
json_bytes = json.dumps(message_dict, separators=(",", ":")).encode("utf-8")
# XML équivalent (simulé)
xml_str = (f'<Requete><operande_a>3.14159265</operande_a>'
           f'<operande_b>2.71828182</operande_b>'
           f'<operation>MULTIPLICATION</operation>'
           f'<id_requete>req-abc123</id_requete></Requete>')
xml_bytes = xml_str.encode("utf-8")

print(f"Message Requete : {message_dict}")
print()
print(f"Taille Protobuf : {len(proto_bytes)} octets")
print(f"Contenu : {' '.join(f'{b:02X}' for b in proto_bytes)}")
print()
print(f"Taille JSON     : {len(json_bytes)} octets")
print(f"Contenu : {json_bytes.decode()}")
print()
print(f"Taille XML      : {len(xml_bytes)} octets")
print()
print(f"Rapport Protobuf/JSON : {len(json_bytes)/len(proto_bytes):.1f}x plus compact")
print(f"Rapport Protobuf/XML  : {len(xml_bytes)/len(proto_bytes):.1f}x plus compact")
Message Requete : {'operande_a': 3.14159265, 'operande_b': 2.71828182, 'operation': 'MULTIPLICATION', 'id_requete': 'req-abc123'}

Taille Protobuf : 32 octets
Contenu : 09 F1 D4 C8 53 FB 21 09 40 11 DD B0 F1 89 0A BF 05 40 18 02 22 0A 72 65 71 2D 61 62 63 31 32 33

Taille JSON     : 104 octets
Contenu : {"operande_a":3.14159265,"operande_b":2.71828182,"operation":"MULTIPLICATION","id_requete":"req-abc123"}

Taille XML      : 161 octets

Rapport Protobuf/JSON : 3.2x plus compact
Rapport Protobuf/XML  : 5.0x plus compact

Hide code cell source

# Visualisation comparative : tailles de sérialisation
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# Test sur différents types de messages
messages_test = [
    {
        "nom": "Message\nsimple\n(4 champs)",
        "protobuf": len(proto_bytes),
        "json": len(json_bytes),
        "xml": len(xml_bytes),
    },
    {
        "nom": "Liste\n100 entiers",
        "protobuf": 200,    # ~2 octets par int32 en moyenne
        "json": 400,        # ["1","2",...] ~4 chars par nombre
        "xml": 1500,        # <item>1</item> * 100
    },
    {
        "nom": "Message\ncomplexe\n(20 champs)",
        "protobuf": 120,
        "json": 480,
        "xml": 850,
    },
    {
        "nom": "Flux temps\nréel 1000\nmesures",
        "protobuf": 12000,   # ~12 octets : tag(1) + double(8) + seq(1-3)
        "json": 45000,       # {"v":1.23,"s":1,"ts":1715000000}
        "xml": 90000,        # <m><v>...</v><s>...</s></m>
    },
]

noms = [m["nom"] for m in messages_test]
proto_sizes = [m["protobuf"] for m in messages_test]
json_sizes = [m["json"] for m in messages_test]
xml_sizes = [m["xml"] for m in messages_test]

x = np.arange(len(noms))
width = 0.25

ax1 = axes[0]
bars1 = ax1.bar(x - width, proto_sizes, width, label="Protocol Buffers", color="#4CAF50", alpha=0.85)
bars2 = ax1.bar(x, json_sizes, width, label="JSON", color="#2196F3", alpha=0.85)
bars3 = ax1.bar(x + width, xml_sizes, width, label="XML", color="#F44336", alpha=0.85)

ax1.set_xticks(x)
ax1.set_xticklabels(noms, fontsize=8.5)
ax1.set_ylabel("Taille (octets)", fontsize=11)
ax1.set_title("Comparaison de taille de sérialisation\nProtobuf vs JSON vs XML", fontsize=11)
ax1.legend(fontsize=10)
ax1.grid(True, alpha=0.3, axis="y")

for bars in [bars1, bars2, bars3]:
    for bar in bars:
        h = bar.get_height()
        ax1.text(bar.get_x() + bar.get_width()/2, h + max(proto_sizes)*0.01,
                 f"{h}", ha="center", va="bottom", fontsize=7)

# Graphique 2 : ratio de compression
ax2 = axes[1]
ratios_json = [j/p for j, p in zip(json_sizes, proto_sizes)]
ratios_xml = [x/p for x, p in zip(xml_sizes, proto_sizes)]

x_pos = np.arange(len(noms))
ax2.bar(x_pos - 0.2, ratios_json, 0.4, label="JSON / Protobuf", color="#2196F3", alpha=0.85)
ax2.bar(x_pos + 0.2, ratios_xml, 0.4, label="XML / Protobuf", color="#F44336", alpha=0.85)
ax2.axhline(y=1, color="gray", linestyle="--", linewidth=1.5, label="Référence Protobuf (x1)")
ax2.set_xticks(x_pos)
ax2.set_xticklabels(noms, fontsize=8.5)
ax2.set_ylabel("Ratio de taille (× Protobuf)", fontsize=11)
ax2.set_title("Facteur de surcoût par rapport à Protobuf\n(plus haut = plus lourd)", fontsize=11)
ax2.legend(fontsize=10)
ax2.grid(True, alpha=0.3, axis="y")

for i, (rj, rx) in enumerate(zip(ratios_json, ratios_xml)):
    ax2.text(i - 0.2, rj + 0.05, f"{rj:.1f}×", ha="center", fontsize=9, color="#1565C0")
    ax2.text(i + 0.2, rx + 0.05, f"{rx:.1f}×", ha="center", fontsize=9, color="#C62828")

plt.tight_layout()
plt.show()
_images/cf12d721563c42ba5e9b493759eba8e8483e5465fce4c2775a7269159df2c1b9.png

gRPC : les quatre types de RPC#

gRPC définit quatre modes de communication, offrant une flexibilité maximale selon les besoins :

Unary RPC (Requête-Réponse simple)#

Le client envoie une requête, le serveur répond une fois. C’est le modèle classique requête-réponse, équivalent à un appel de fonction.

service Calculatrice {
  rpc Calculer(Requete) returns (Resultat);
}

Server Streaming RPC#

Le client envoie une requête, le serveur renvoie un flux de messages. Utile pour des résultats progressifs ou des flux de données.

service Calculatrice {
  rpc SuiteNombres(Requete) returns (stream FluxNombre);
}

Client Streaming RPC#

Le client envoie un flux de messages, le serveur répond une seule fois (typiquement après avoir reçu tous les messages). Utile pour l’agrégation ou le chargement par lot.

service Calculatrice {
  rpc AggregerNombres(stream FluxNombre) returns (ResultatCumulatif);
}

Bidirectional Streaming RPC#

Les deux parties envoient et reçoivent des flux de messages simultanément et indépendamment. Utile pour les protocoles de type chat, les jeux temps réel, ou les pipelines de traitement.

service Calculatrice {
  rpc TraitementBidirectionnel(stream FluxNombre) returns (stream Resultat);
}

Hide code cell source

# Visualisation des 4 types de streaming gRPC
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
fig.suptitle("Les 4 types de RPC gRPC", fontsize=14, fontweight="bold")

def dessiner_rpc(ax, titre, messages, sous_titre=""):
    ax.set_xlim(0, 10)
    ax.set_ylim(-0.5, len(messages) + 0.5)
    ax.set_title(titre, fontsize=12, fontweight="bold")
    ax.axis("off")
    if sous_titre:
        ax.text(5, len(messages) + 0.3, sous_titre, ha="center", fontsize=8.5,
                color="#607D8B", style="italic")

    ax.axvline(x=2, color="#2196F3", linewidth=2.5, ymin=0.05, ymax=0.95)
    ax.axvline(x=8, color="#F44336", linewidth=2.5, ymin=0.05, ymax=0.95)
    ax.text(2, len(messages) + 0.0, "Client", ha="center", fontsize=10,
            fontweight="bold", color="#2196F3")
    ax.text(8, len(messages) + 0.0, "Serveur", ha="center", fontsize=10,
            fontweight="bold", color="#F44336")

    for i, (direction, label, color) in enumerate(messages):
        y = len(messages) - 1 - i
        if direction == "→":
            x1, x2 = 2.2, 7.8
        else:
            x1, x2 = 7.8, 2.2
        ax.annotate("", xy=(x2, y), xytext=(x1, y),
                    arrowprops=dict(arrowstyle="->", color=color, lw=2.0))
        ax.text(5, y + 0.18, label, ha="center", fontsize=8.0, color=color)

# 1. Unary
msgs_unary = [
    ("→", "Calculer(Requete{a=3.14, b=2.71, op=MUL})", "#2196F3"),
    ("←", "Resultat{valeur=8.54, succes=true}", "#4CAF50"),
]
dessiner_rpc(axes[0][0], "1. Unary RPC\n(Requête-Réponse)", msgs_unary,
             "Un envoi, une réponse — modèle REST classique")

# 2. Server streaming
msgs_server = [
    ("→", "SuiteNombres(Requete{n=5})", "#2196F3"),
    ("←", "FluxNombre{valeur=1.0, seq=1}", "#FF9800"),
    ("←", "FluxNombre{valeur=2.0, seq=2}", "#FF9800"),
    ("←", "FluxNombre{valeur=3.0, seq=3}", "#FF9800"),
    ("←", "FluxNombre{valeur=4.0, seq=4}", "#FF9800"),
    ("←", "FluxNombre{valeur=5.0, seq=5}", "#FF9800"),
    ("←", "EOF (fin du flux serveur)", "#9E9E9E"),
]
dessiner_rpc(axes[0][1], "2. Server Streaming RPC\n(1 requête, flux de réponses)", msgs_server,
             "Le serveur envoie plusieurs messages — dashboards, export")

# 3. Client streaming
msgs_client = [
    ("→", "FluxNombre{valeur=10.0, seq=1}", "#9C27B0"),
    ("→", "FluxNombre{valeur=20.0, seq=2}", "#9C27B0"),
    ("→", "FluxNombre{valeur=30.0, seq=3}", "#9C27B0"),
    ("→", "FluxNombre{valeur=40.0, seq=4}", "#9C27B0"),
    ("→", "EOF (fin du flux client)", "#9E9E9E"),
    ("←", "ResultatCumulatif{somme=100, moy=25}", "#4CAF50"),
]
dessiner_rpc(axes[1][0], "3. Client Streaming RPC\n(flux de requêtes, 1 réponse)", msgs_client,
             "Le client envoie plusieurs messages — upload, agrégation")

# 4. Bidirectionnel
msgs_bidi = [
    ("→", "FluxNombre{valeur=5.0, seq=1}", "#E91E63"),
    ("→", "FluxNombre{valeur=10.0, seq=2}", "#E91E63"),
    ("←", "Resultat{valeur=5.0 (résultat seq=1)}", "#FF9800"),
    ("→", "FluxNombre{valeur=15.0, seq=3}", "#E91E63"),
    ("←", "Resultat{valeur=50.0 (résultat seq=2)}", "#FF9800"),
    ("←", "Resultat{valeur=150.0 (résultat seq=3)}", "#FF9800"),
    ("→", "EOF", "#9E9E9E"),
    ("←", "EOF", "#9E9E9E"),
]
dessiner_rpc(axes[1][1], "4. Bidirectional Streaming RPC\n(flux simultanés dans les deux sens)", msgs_bidi,
             "Échange indépendant — chat temps réel, jeux, pipelines")

plt.tight_layout()
plt.show()
_images/69d0b45a02ea4285a2d044537ffef6494f2d05448118506ca1480ecda521c270.png

gRPC sur HTTP/2#

gRPC utilise HTTP/2 pour toutes ses communications. Cette dépendance n’est pas accidentelle : HTTP/2 fournit exactement les primitives dont gRPC a besoin.

Mapping gRPC → HTTP/2#

Chaque appel gRPC correspond à un stream HTTP/2 :

  • Un appel unary = 1 stream HTTP/2 (1 DATA frame requête + 1 DATA frame réponse)

  • Un streaming = 1 stream HTTP/2 avec N DATA frames

En-têtes HTTP/2 pour gRPC :

:method: POST
:path: /calculatrice.Calculatrice/Calculer
:scheme: https
:authority: api.example.com
content-type: application/grpc
grpc-encoding: gzip
grpc-accept-encoding: gzip, identity
te: trailers

Format d’un message gRPC (Length-Prefixed Message) :

Compressed-Flag (1 octet) | Message-Length (4 octets, big-endian) | Message (Protobuf)

Trailers HTTP/2 (fin d’appel) :

grpc-status: 0          (0 = OK)
grpc-message: ""        (vide si succès)

Hide code cell source

# Visualisation du format de frame gRPC sur HTTP/2
fig, ax = plt.subplots(figsize=(13, 4))
ax.set_xlim(0, 14)
ax.set_ylim(-0.5, 4)
ax.axis("off")
ax.set_title("Format d'un message gRPC sur HTTP/2 (Length-Prefixed Message)", fontsize=12, fontweight="bold")

# Frame HTTP/2 DATA
champs_http2 = [
    (0, 3, "Length\n(24 bits)", "#37474F", "Taille de la\nDATA frame"),
    (3, 1, "Type\n0x0", "#2196F3", "DATA frame"),
    (4, 1, "Flags", "#607D8B", "END_STREAM\nsi fin"),
    (5, 4, "Stream ID\n(31 bits)", "#9C27B0", "ID du stream\ngRPC"),
    (9, 1, "Comp.\nFlag", "#FF9800", "0=non compressé\n1=compressé"),
    (10, 4, "Message\nLength (32b)", "#F44336", "Taille du message\nProtobuf"),
]

y_frame = 2.8
for x, w, label, color, tooltip in champs_http2:
    rect = mpatches.FancyBboxPatch((x * 0.93, y_frame - 0.5), w * 0.93 - 0.05, 1.0,
                                    boxstyle="round,pad=0.05",
                                    facecolor=color, edgecolor="white",
                                    alpha=0.85, linewidth=2, zorder=5)
    ax.add_patch(rect)
    ax.text(x * 0.93 + w * 0.93 / 2, y_frame, label, ha="center", va="center",
            fontsize=7.5, color="white", fontweight="bold", zorder=6)
    ax.text(x * 0.93 + w * 0.93 / 2, y_frame - 0.9, tooltip, ha="center", va="top",
            fontsize=6.5, color=color, multialignment="center")

# Message Protobuf
rect_proto = mpatches.FancyBboxPatch((9.3 * 0.93, y_frame - 0.5),
                                      4.5, 1.0,
                                      boxstyle="round,pad=0.05",
                                      facecolor="#4CAF50", edgecolor="white",
                                      alpha=0.85, linewidth=2, zorder=5)
ax.add_patch(rect_proto)
ax.text(9.3 * 0.93 + 2.25, y_frame, "Message\nProtobuf sérialisé\n(payload gRPC)",
        ha="center", va="center", fontsize=8, color="white", fontweight="bold", zorder=6)

# Étiquettes
ax.annotate("", xy=(4.5 * 0.93 + 2.25, y_frame - 0.5), xytext=(4.5 * 0.93 + 2.25, 0.7),
            arrowprops=dict(arrowstyle="<->", color="#607D8B", lw=1.5))
ax.text(4.5 * 0.93 + 2.25 + 0.1, 1.1, "En-tête gRPC\n(5 octets)", fontsize=8, color="#607D8B")

ax.annotate("", xy=(0, y_frame - 0.5), xytext=(0, 0.7),
            arrowprops=dict(arrowstyle="<->", color="#2196F3", lw=1.5))
ax.text(0.1, 1.1, "En-tête HTTP/2\n(9 octets)", fontsize=8, color="#2196F3")

ax.text(7, -0.3,
        "Chaque appel gRPC = 1 stream HTTP/2  |  "
        "gRPC-status dans les TRAILERS HTTP/2  |  "
        "Multiplexage : N appels sur 1 connexion TCP",
        ha="center", fontsize=8.5, color="#37474F", style="italic")

plt.tight_layout()
plt.show()
_images/28bbd07d1d8a2d53b0e84593585e4f6da92fc587df654181f3293f8316b9f0b4.png

Code complet : service Calculator#

Les blocs suivants sont illustratifs (non exécutables directement) et nécessitent grpcio et grpcio-tools installés (pip install grpcio grpcio-tools).

Fichier .proto complet#

// fichier: calculator.proto
syntax = "proto3";

package calculator;

option python_package = "calculator_pb2";

// Énumération des opérations
enum Operation {
  ADDITION       = 0;
  SOUSTRACTION   = 1;
  MULTIPLICATION = 2;
  DIVISION       = 3;
  PUISSANCE      = 4;
}

// Message de requête unary
message CalculRequest {
  double a = 1;
  double b = 2;
  Operation op = 3;
}

// Message de réponse
message CalculResponse {
  double result = 1;
  bool ok = 2;
  string error = 3;
}

// Pour le streaming : séquence de nombres
message NumberStream {
  double value = 1;
  int64 sequence = 2;
}

// Résultat statistique (client streaming)
message StatsResponse {
  double sum = 1;
  double mean = 2;
  double min = 3;
  double max = 4;
  int32 count = 5;
  double variance = 6;
}

// Service avec les 4 types de RPC
service Calculator {
  // 1. Unary : simple calcul
  rpc Compute(CalculRequest) returns (CalculResponse);

  // 2. Server streaming : suite mathématique (Fibonacci, suite de l'opération)
  rpc ComputeSequence(CalculRequest) returns (stream NumberStream);

  // 3. Client streaming : le client envoie des nombres, le serveur calcule les stats
  rpc ComputeStats(stream NumberStream) returns (StatsResponse);

  // 4. Bidirectionnel : le client envoie des requêtes, le serveur répond au fil de l'eau
  rpc BidirectionalCompute(stream CalculRequest) returns (stream CalculResponse);
}

Génération du code Python#

# Installation
pip install grpcio grpcio-tools

# Génération des stubs Python
python -m grpc_tools.protoc \
    --proto_path=. \
    --python_out=. \
    --grpc_python_out=. \
    calculator.proto

# Génère deux fichiers :
#   calculator_pb2.py      — classes de messages
#   calculator_pb2_grpc.py — stubs client et serveur

Serveur gRPC complet#

# fichier: server.py
import grpc
import math
import time
import logging
from concurrent import futures

import calculator_pb2
import calculator_pb2_grpc

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class CalculatorServicer(calculator_pb2_grpc.CalculatorServicer):
    """Implémentation du service Calculator."""

    def Compute(self, request, context):
        """1. Unary RPC : calcul simple."""
        logger.info(f"Compute: {request.a} op={request.op} {request.b}")
        try:
            if request.op == calculator_pb2.ADDITION:
                result = request.a + request.b
            elif request.op == calculator_pb2.SOUSTRACTION:
                result = request.a - request.b
            elif request.op == calculator_pb2.MULTIPLICATION:
                result = request.a * request.b
            elif request.op == calculator_pb2.DIVISION:
                if request.b == 0:
                    context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
                    context.set_details("Division par zéro")
                    return calculator_pb2.CalculResponse(ok=False, error="Division par zéro")
                result = request.a / request.b
            elif request.op == calculator_pb2.PUISSANCE:
                result = request.a ** request.b
            else:
                context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
                return calculator_pb2.CalculResponse(ok=False, error="Opération inconnue")

            return calculator_pb2.CalculResponse(result=result, ok=True)

        except Exception as e:
            context.set_code(grpc.StatusCode.INTERNAL)
            context.set_details(str(e))
            return calculator_pb2.CalculResponse(ok=False, error=str(e))

    def ComputeSequence(self, request, context):
        """2. Server Streaming : génère une séquence de résultats."""
        logger.info(f"ComputeSequence: base={request.a}, pas={request.b}, n={int(request.b)}")
        valeur = request.a
        n = max(1, int(abs(request.b)))

        for i in range(n):
            if context.is_active():
                if request.op == calculator_pb2.ADDITION:
                    valeur += request.a
                elif request.op == calculator_pb2.MULTIPLICATION:
                    valeur *= request.a
                elif request.op == calculator_pb2.PUISSANCE:
                    valeur = request.a ** (i + 1)

                yield calculator_pb2.NumberStream(value=valeur, sequence=i + 1)
                time.sleep(0.1)  # Simulation latence traitement

    def ComputeStats(self, request_iterator, context):
        """3. Client Streaming : calcule les statistiques des nombres reçus."""
        valeurs = []
        for nombre in request_iterator:
            valeurs.append(nombre.value)
            logger.info(f"Reçu valeur #{nombre.sequence}: {nombre.value}")

        if not valeurs:
            context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
            context.set_details("Aucune valeur reçue")
            return calculator_pb2.StatsResponse()

        n = len(valeurs)
        somme = sum(valeurs)
        moyenne = somme / n
        variance = sum((x - moyenne) ** 2 for x in valeurs) / n

        return calculator_pb2.StatsResponse(
            sum=somme,
            mean=moyenne,
            min=min(valeurs),
            max=max(valeurs),
            count=n,
            variance=variance,
        )

    def BidirectionalCompute(self, request_iterator, context):
        """4. Bidirectionnel : traitement au fil de l'eau."""
        for request in request_iterator:
            logger.info(f"BidiCompute: {request.a} op {request.b}")
            try:
                if request.op == calculator_pb2.ADDITION:
                    result = request.a + request.b
                elif request.op == calculator_pb2.MULTIPLICATION:
                    result = request.a * request.b
                else:
                    result = request.a + request.b

                yield calculator_pb2.CalculResponse(result=result, ok=True)
            except Exception as e:
                yield calculator_pb2.CalculResponse(ok=False, error=str(e))


def intercepteur_logging(continuation, handler_call_details):
    """Intercepteur gRPC pour le logging des appels."""
    methode = handler_call_details.method
    debut = time.time()
    logger.info(f"[gRPC] → {methode}")
    reponse = continuation(handler_call_details)
    duree = (time.time() - debut) * 1000
    logger.info(f"[gRPC] ← {methode} ({duree:.1f}ms)")
    return reponse


def demarrer_serveur(port: int = 50051):
    """Démarre le serveur gRPC avec intercepteurs et TLS."""
    server = grpc.server(
        futures.ThreadPoolExecutor(max_workers=10),
        interceptors=[grpc.ServerInterceptor()],  # Intercepteur simplifié
        options=[
            ("grpc.max_send_message_length", 10 * 1024 * 1024),   # 10 MB
            ("grpc.max_receive_message_length", 10 * 1024 * 1024),
            ("grpc.keepalive_time_ms", 10000),
            ("grpc.keepalive_timeout_ms", 5000),
        ]
    )

    calculator_pb2_grpc.add_CalculatorServicer_to_server(
        CalculatorServicer(), server
    )

    # Écoute non chiffrée (dev) — en production, utiliser credentials TLS
    server.add_insecure_port(f"[::]:{port}")
    server.start()
    logger.info(f"Serveur gRPC démarré sur le port {port}")

    try:
        server.wait_for_termination()
    except KeyboardInterrupt:
        server.stop(grace=5)


if __name__ == "__main__":
    demarrer_serveur()

Client gRPC avec les 4 types de streaming#

# fichier: client.py
import grpc
import time
import logging

import calculator_pb2
import calculator_pb2_grpc

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def creer_canal(adresse: str = "localhost:50051") -> grpc.Channel:
    """Crée un canal gRPC avec options de connexion."""
    return grpc.insecure_channel(
        adresse,
        options=[
            ("grpc.keepalive_time_ms", 10000),
            ("grpc.keepalive_timeout_ms", 5000),
            ("grpc.keepalive_permit_without_calls", True),
            ("grpc.http2.max_pings_without_data", 0),
        ]
    )


def demo_unary(stub):
    """1. Unary RPC : calcul simple."""
    print("\n=== 1. Unary RPC ===")
    requete = calculator_pb2.CalculRequest(a=3.14159, b=2.71828, op=calculator_pb2.MULTIPLICATION)
    try:
        reponse = stub.Compute(requete, timeout=5.0)
        if reponse.ok:
            print(f"π × e = {reponse.result:.6f}")
        else:
            print(f"Erreur : {reponse.error}")
    except grpc.RpcError as e:
        print(f"Erreur gRPC : {e.code()}{e.details()}")


def demo_server_streaming(stub):
    """2. Server Streaming : suite géométrique."""
    print("\n=== 2. Server Streaming RPC ===")
    requete = calculator_pb2.CalculRequest(a=2.0, b=10.0, op=calculator_pb2.PUISSANCE)
    try:
        for nombre in stub.ComputeSequence(requete, timeout=30.0):
            print(f"  2^{nombre.sequence} = {nombre.value:.0f}")
    except grpc.RpcError as e:
        print(f"Erreur gRPC : {e.code()}{e.details()}")


def demo_client_streaming(stub):
    """3. Client Streaming : envoi d'une série de mesures."""
    print("\n=== 3. Client Streaming RPC ===")

    def generateur_mesures():
        import random
        for i in range(20):
            valeur = random.gauss(50, 10)
            print(f"  Envoi mesure #{i+1}: {valeur:.2f}")
            yield calculator_pb2.NumberStream(value=valeur, sequence=i + 1)
            time.sleep(0.05)

    try:
        stats = stub.ComputeStats(generateur_mesures(), timeout=30.0)
        print(f"\nStatistiques reçues du serveur :")
        print(f"  N = {stats.count}")
        print(f"  Somme = {stats.sum:.2f}")
        print(f"  Moyenne = {stats.mean:.2f}")
        print(f"  Min/Max = {stats.min:.2f} / {stats.max:.2f}")
        print(f"  Écart-type = {stats.variance**0.5:.2f}")
    except grpc.RpcError as e:
        print(f"Erreur gRPC : {e.code()}{e.details()}")


def demo_bidirectionnel(stub):
    """4. Bidirectionnel Streaming : pipeline de calculs."""
    print("\n=== 4. Bidirectionnel Streaming RPC ===")

    operations = [
        (10.0, 5.0, calculator_pb2.ADDITION),
        (10.0, 5.0, calculator_pb2.SOUSTRACTION),
        (3.0, 4.0, calculator_pb2.MULTIPLICATION),
        (15.0, 3.0, calculator_pb2.DIVISION),
        (2.0, 8.0, calculator_pb2.PUISSANCE),
    ]
    noms_op = {0: "+", 1: "-", 2: "×", 3: "÷", 4: "^"}

    def generateur_requetes():
        for a, b, op in operations:
            print(f"  Envoi : {a} {noms_op[op]} {b}")
            yield calculator_pb2.CalculRequest(a=a, b=b, op=op)
            time.sleep(0.1)

    try:
        for i, reponse in enumerate(stub.BidirectionalCompute(generateur_requetes())):
            a, b, op = operations[i]
            if reponse.ok:
                print(f"  Reçu  : {a} {noms_op[op]} {b} = {reponse.result}")
            else:
                print(f"  Erreur : {reponse.error}")
    except grpc.RpcError as e:
        print(f"Erreur gRPC : {e.code()}{e.details()}")


def main():
    with creer_canal("localhost:50051") as canal:
        stub = calculator_pb2_grpc.CalculatorStub(canal)

        demo_unary(stub)
        demo_server_streaming(stub)
        demo_client_streaming(stub)
        demo_bidirectionnel(stub)


if __name__ == "__main__":
    main()

Intercepteurs gRPC#

# Intercepteur d'authentification JWT
class AuthInterceptor(grpc.ServerInterceptor):
    """Intercepteur vérifiant le token JWT dans les métadonnées."""

    def __init__(self, cle_secrete: str):
        self.cle_secrete = cle_secrete

    def intercept_service(self, continuation, handler_call_details):
        # Récupérer les métadonnées (en-têtes gRPC)
        metadata = dict(handler_call_details.invocation_metadata)
        token = metadata.get("authorization", "").replace("Bearer ", "")

        if not token:
            def abort(request, context):
                context.abort(grpc.StatusCode.UNAUTHENTICATED, "Token manquant")
            return grpc.unary_unary_rpc_method_handler(abort)

        # Vérification simplifiée (utiliser PyJWT en production)
        if not self._verifier_token(token):
            def abort(request, context):
                context.abort(grpc.StatusCode.UNAUTHENTICATED, "Token invalide")
            return grpc.unary_unary_rpc_method_handler(abort)

        return continuation(handler_call_details)

    def _verifier_token(self, token: str) -> bool:
        # En production : jwt.decode(token, self.cle_secrete, algorithms=["HS256"])
        return len(token) > 10


# Intercepteur de retry côté client
class RetryInterceptor(grpc.UnaryUnaryClientInterceptor):
    """Intercepteur client avec retry automatique sur erreurs transitoires."""

    def __init__(self, max_retries: int = 3, delai_initial: float = 0.1):
        self.max_retries = max_retries
        self.delai_initial = delai_initial

    def intercept_unary_unary(self, continuation, client_call_details, request):
        derniere_erreur = None
        for tentative in range(self.max_retries):
            try:
                return continuation(client_call_details, request)
            except grpc.RpcError as e:
                if e.code() in (grpc.StatusCode.UNAVAILABLE, grpc.StatusCode.DEADLINE_EXCEEDED):
                    derniere_erreur = e
                    delai = self.delai_initial * (2 ** tentative)
                    logger.warning(f"Retry {tentative+1}/{self.max_retries} dans {delai:.1f}s")
                    time.sleep(delai)
                else:
                    raise
        raise derniere_erreur

Comparaison REST vs gRPC#

Hide code cell source

# Comparaison REST vs gRPC
fig, ax = plt.subplots(figsize=(14, 7))
ax.axis("off")

colonnes = ["Critère", "REST / JSON", "gRPC / Protobuf"]
lignes = [
    ["Protocole transport", "HTTP/1.1 ou HTTP/2", "HTTP/2 (obligatoire)"],
    ["Format des données", "JSON (texte lisible)", "Protocol Buffers (binaire)"],
    ["Taille des messages", "Élevée (verbeux)", "Très faible (2-10× plus compact)"],
    ["Performance sérialisation", "Lente (parsing JSON)", "Très rapide (binaire)"],
    ["Typage des données", "Dynamique (duck typing)", "Strict (schéma .proto)"],
    ["Contrat d'interface", "OpenAPI (optionnel)", ".proto (obligatoire)"],
    ["Génération de code client", "Swagger codegen", "grpc_tools.protoc (tous langages)"],
    ["Streaming", "Limité (SSE, WebSocket séparé)", "4 modes natifs (unary, 3 streaming)"],
    ["Support navigateur", "Universel (fetch, XHR)", "Limité (grpc-web, proxy requis)"],
    ["Débogage humain", "Facile (JSON lisible, curl)", "Difficile (binaire, outils dédiés)"],
    ["Cas d'usage typique", "APIs publiques, mobile, web", "Microservices internes, mobile backend"],
    ["Latence", "Moyenne", "Très faible"],
    ["Maturité / adoption", "Très large (universel)", "Large (Google, Cloudflare, Netflix)"],
    ["Versioning", "URL, header, paramètre", "Compatibilité champs Protobuf (additive)"],
]

couleurs = []
for i, ligne in enumerate(lignes):
    if i % 2 == 0:
        couleurs.append(["#ECEFF1", "#FFF8E1", "#E8F5E9"])
    else:
        couleurs.append(["#F5F5F5", "#FFFDE7", "#F1F8E9"])

table = ax.table(
    cellText=lignes,
    colLabels=colonnes,
    cellLoc="center",
    loc="center",
    cellColours=couleurs,
)
table.auto_set_font_size(False)
table.set_fontsize(8.5)
table.scale(1, 1.65)

col_colors = ["#37474F", "#1565C0", "#2E7D32"]
for j, color in enumerate(col_colors):
    table[0, j].set_facecolor(color)
    table[0, j].set_text_props(color="white", fontweight="bold")

ax.set_title("Comparaison REST/JSON vs gRPC/Protocol Buffers", fontsize=13, fontweight="bold", pad=20)
plt.tight_layout()
plt.show()
_images/0130c2085c569def3fe2c73678d9898577ca0838d934f1bedf9cda8388acd016.png

Hide code cell source

# Benchmark simulé : latence et débit REST vs gRPC
fig, axes = plt.subplots(1, 2, figsize=(13, 5))

# Latence selon le nombre d'appels simultanés
concurrence = [1, 5, 10, 50, 100, 500, 1000]
latence_rest = [5, 6, 8, 15, 25, 80, 200]      # ms, incluant parsing JSON
latence_grpc = [1.5, 2, 2.5, 4, 7, 20, 45]    # ms, binaire + HTTP/2

ax1 = axes[0]
ax1.semilogy(concurrence, latence_rest, "o-", color="#F44336", linewidth=2.5,
             markersize=7, label="REST/JSON (HTTP/1.1)")
ax1.semilogy(concurrence, latence_grpc, "s-", color="#4CAF50", linewidth=2.5,
             markersize=7, label="gRPC/Protobuf (HTTP/2)")
ax1.set_xlabel("Requêtes simultanées", fontsize=11)
ax1.set_ylabel("Latence P99 (ms, échelle log)", fontsize=11)
ax1.set_title("Latence REST vs gRPC\n(simulation benchmark interne)", fontsize=11)
ax1.legend(fontsize=10)
ax1.grid(True, alpha=0.3, which="both")
ax1.set_xlim(1, 1000)

# Débit (requêtes/seconde) selon la taille du message
tailles_kb = [0.1, 0.5, 1, 5, 10, 50, 100]
debit_rest = [8000, 5000, 3000, 800, 400, 90, 45]
debit_grpc = [25000, 18000, 12000, 3500, 1800, 400, 200]

ax2 = axes[1]
ax2.loglog(tailles_kb, debit_rest, "o-", color="#F44336", linewidth=2.5,
           markersize=7, label="REST/JSON")
ax2.loglog(tailles_kb, debit_grpc, "s-", color="#4CAF50", linewidth=2.5,
           markersize=7, label="gRPC/Protobuf")
ax2.set_xlabel("Taille du message (Ko)", fontsize=11)
ax2.set_ylabel("Débit (req/s, échelle log)", fontsize=11)
ax2.set_title("Débit selon la taille du message\n(simulation, serveur 8 cœurs)", fontsize=11)
ax2.legend(fontsize=10)
ax2.grid(True, alpha=0.3, which="both")

plt.tight_layout()
plt.show()
_images/97dc898ba4f9b9d6605ba1276ddadc2ad0b4cc9fb5c41ed4ee101236d38e0e04.png

Codes de statut gRPC#

gRPC définit ses propres codes de statut (distincts des codes HTTP), transmis dans les trailers HTTP/2 :

Hide code cell source

fig, ax = plt.subplots(figsize=(12, 5))
ax.axis("off")

colonnes = ["Code", "Nom", "Description", "Équivalent HTTP"]
lignes = [
    ["0", "OK", "Succès", "200"],
    ["1", "CANCELLED", "Annulé par le client", "—"],
    ["2", "UNKNOWN", "Erreur inconnue", "500"],
    ["3", "INVALID_ARGUMENT", "Argument invalide", "400"],
    ["4", "DEADLINE_EXCEEDED", "Timeout", "408 / 504"],
    ["5", "NOT_FOUND", "Ressource non trouvée", "404"],
    ["6", "ALREADY_EXISTS", "Ressource déjà existante", "409"],
    ["7", "PERMISSION_DENIED", "Permission refusée", "403"],
    ["8", "RESOURCE_EXHAUSTED", "Quota dépassé, rate limit", "429"],
    ["9", "FAILED_PRECONDITION", "Prérequis non satisfait", "412"],
    ["10", "ABORTED", "Opération avortée (conflit)", "409"],
    ["12", "UNIMPLEMENTED", "Méthode non implémentée", "501"],
    ["13", "INTERNAL", "Erreur interne serveur", "500"],
    ["14", "UNAVAILABLE", "Service indisponible (retry)", "503"],
    ["16", "UNAUTHENTICATED", "Authentification requise", "401"],
]

couleurs = []
for i, ligne in enumerate(lignes):
    code = int(ligne[0])
    if code == 0:
        couleurs.append(["#E8F5E9"] * 4)
    elif code in (3, 5, 6, 7, 8, 9, 10, 12, 16):
        couleurs.append(["#FFF8E1"] * 4)
    else:
        couleurs.append(["#FFEBEE"] * 4)

table = ax.table(
    cellText=lignes, colLabels=colonnes,
    cellLoc="center", loc="center",
    cellColours=couleurs,
)
table.auto_set_font_size(False)
table.set_fontsize(8.5)
table.scale(1, 1.6)

for j in range(4):
    table[0, j].set_facecolor("#37474F")
    table[0, j].set_text_props(color="white", fontweight="bold")

ax.set_title("Codes de statut gRPC (RFC 9114)", fontsize=12, fontweight="bold", pad=15)
plt.tight_layout()
plt.show()
_images/67fd39df61aee4d15c01a8c1fbe2aa8ea243d80dfd348a6fa5d2df98eb9f5425.png

Résumé#

Protocol Buffers et gRPC forment un duo puissant pour les communications microservices. Protobuf garantit un schéma strict, une sérialisation compacte (2 à 10 fois plus petite que JSON) et une génération de code multilangage. gRPC exploite HTTP/2 pour le multiplexage, le streaming bidirectionnel et les faibles latences.

Les points clés :

  • Encodage varint : les petits entiers tiennent en 1-2 octets dans Protobuf

  • Wire types : 6 types d’encodage selon la nature des données (varint, 32-bit, 64-bit, length-delimited)

  • 4 types de RPC : unary, server streaming, client streaming, bidirectionnel — tous sur un seul stream HTTP/2

  • Intercepteurs : pattern middleware pour le logging, l’auth, le retry

  • gRPC vs REST : gRPC excelle pour les communications internes à haute fréquence ; REST reste le standard pour les APIs publiques