gRPC et Protocol Buffers#
gRPC est un framework RPC (Remote Procedure Call) open-source développé par Google, publié en 2016. Il repose sur deux technologies fondamentales : Protocol Buffers comme langage de description d’interface (IDL) et format de sérialisation, et HTTP/2 comme protocole de transport. gRPC est conçu pour les communications microservices à haute performance, les APIs mobiles et les systèmes distribués où la latence et l’efficacité réseau sont critiques.
Ce chapitre couvre Protocol Buffers en profondeur (IDL, encodage binaire, types), les quatre modes de streaming de gRPC, son fonctionnement sur HTTP/2, et fournit un exemple complet illustratif d’un service de calcul.
Protocol Buffers : le langage de description d’interface#
Définition d’un schéma .proto#
Protocol Buffers (Protobuf) est à la fois un IDL (Interface Definition Language) pour décrire des structures de données et un format de sérialisation binaire ultra-compact. Contrairement à JSON ou XML, Protobuf nécessite un schéma préalable partagé entre l’émetteur et le récepteur.
Un fichier .proto définit des messages (structures de données) et des services (interfaces RPC) :
syntax = "proto3";
package calculatrice;
option go_package = "./pb";
// Types scalaires disponibles :
// double, float, int32, int64, uint32, uint64, sint32, sint64
// fixed32, fixed64, sfixed32, sfixed64, bool, string, bytes
// Énumération
enum TypeOperation {
ADDITION = 0; // Valeur par défaut obligatoirement 0
SOUSTRACTION = 1;
MULTIPLICATION = 2;
DIVISION = 3;
}
// Message simple
message NombreEntier {
int64 valeur = 1;
}
// Message avec champs de différents types
message Requete {
double operande_a = 1;
double operande_b = 2;
TypeOperation operation = 3;
string id_requete = 4; // Identifiant traçabilité
map<string, string> metadata = 5; // Métadonnées optionnelles
}
// Message imbriqué
message Resultat {
double valeur = 1;
bool succes = 2;
string message_erreur = 3; // Vide si succès
int64 duree_ms = 4; // Durée de calcul
// Message imbriqué
Statistiques stats = 5;
message Statistiques {
int32 n_calculs_serveur = 1;
double charge_cpu = 2;
}
}
// Flux de nombres pour le streaming
message FluxNombre {
double valeur = 1;
int64 sequence = 2;
int64 timestamp_ms = 3;
}
// Résultat cumulatif du streaming
message ResultatCumulatif {
double somme = 1;
double moyenne = 2;
double min = 3;
double max = 4;
int32 n_valeurs = 5;
}
// Champ oneof : une seule valeur présente
message ReponseFlexible {
oneof contenu {
Resultat resultat = 1;
string erreur = 2;
bytes donnees_binaires = 3;
}
}
// Champ repeated (liste)
message LotRequetes {
repeated Requete requetes = 1;
string id_lot = 2;
}
Types scalaires et leurs encodages#
Protobuf distingue plusieurs types numériques qui utilisent des encodages différents selon l’usage :
Encodage varint et wire types#
L’encodage varint (variable-length integer) est la clé de la compacité de Protobuf. Chaque octet utilise 7 bits pour les données et 1 bit (le bit de poids fort) pour indiquer si l’octet suivant fait partie du même nombre.
Encodage varint Protobuf :
==================================================
1 → 0x01 (1 octet(s)) — Valeur minimale varint
127 → 0x7F (1 octet(s)) — Max sur 1 octet (0x7F)
128 → 0x80 0x01 (2 octet(s)) — Min sur 2 octets (0x80)
300 → 0xAC 0x02 (2 octet(s)) — Exemple classique RFC
16383 → 0xFF 0x7F (2 octet(s)) — Max sur 2 octets
2097152 → 0x80 0x80 0x80 0x01 (4 octet(s)) — Valeur sur 3 octets
Encodage ZigZag pour entiers signés (sint32) :
==================================================
0 → 0x00 (1 octet(s))
-1 → 0x01 (1 octet(s))
1 → 0x02 (1 octet(s))
-2 → 0x03 (1 octet(s))
2 → 0x04 (1 octet(s))
-128 → 0xFF 0x01 (2 octet(s))
128 → 0x80 0x02 (2 octet(s))
Message Requete : {'operande_a': 3.14159265, 'operande_b': 2.71828182, 'operation': 'MULTIPLICATION', 'id_requete': 'req-abc123'}
Taille Protobuf : 32 octets
Contenu : 09 F1 D4 C8 53 FB 21 09 40 11 DD B0 F1 89 0A BF 05 40 18 02 22 0A 72 65 71 2D 61 62 63 31 32 33
Taille JSON : 104 octets
Contenu : {"operande_a":3.14159265,"operande_b":2.71828182,"operation":"MULTIPLICATION","id_requete":"req-abc123"}
Taille XML : 161 octets
Rapport Protobuf/JSON : 3.2x plus compact
Rapport Protobuf/XML : 5.0x plus compact
gRPC : les quatre types de RPC#
gRPC définit quatre modes de communication, offrant une flexibilité maximale selon les besoins :
Unary RPC (Requête-Réponse simple)#
Le client envoie une requête, le serveur répond une fois. C’est le modèle classique requête-réponse, équivalent à un appel de fonction.
service Calculatrice {
rpc Calculer(Requete) returns (Resultat);
}
Server Streaming RPC#
Le client envoie une requête, le serveur renvoie un flux de messages. Utile pour des résultats progressifs ou des flux de données.
service Calculatrice {
rpc SuiteNombres(Requete) returns (stream FluxNombre);
}
Client Streaming RPC#
Le client envoie un flux de messages, le serveur répond une seule fois (typiquement après avoir reçu tous les messages). Utile pour l’agrégation ou le chargement par lot.
service Calculatrice {
rpc AggregerNombres(stream FluxNombre) returns (ResultatCumulatif);
}
Bidirectional Streaming RPC#
Les deux parties envoient et reçoivent des flux de messages simultanément et indépendamment. Utile pour les protocoles de type chat, les jeux temps réel, ou les pipelines de traitement.
service Calculatrice {
rpc TraitementBidirectionnel(stream FluxNombre) returns (stream Resultat);
}
gRPC sur HTTP/2#
gRPC utilise HTTP/2 pour toutes ses communications. Cette dépendance n’est pas accidentelle : HTTP/2 fournit exactement les primitives dont gRPC a besoin.
Mapping gRPC → HTTP/2#
Chaque appel gRPC correspond à un stream HTTP/2 :
Un appel unary = 1 stream HTTP/2 (1 DATA frame requête + 1 DATA frame réponse)
Un streaming = 1 stream HTTP/2 avec N DATA frames
En-têtes HTTP/2 pour gRPC :
:method: POST
:path: /calculatrice.Calculatrice/Calculer
:scheme: https
:authority: api.example.com
content-type: application/grpc
grpc-encoding: gzip
grpc-accept-encoding: gzip, identity
te: trailers
Format d’un message gRPC (Length-Prefixed Message) :
Compressed-Flag (1 octet) | Message-Length (4 octets, big-endian) | Message (Protobuf)
Trailers HTTP/2 (fin d’appel) :
grpc-status: 0 (0 = OK)
grpc-message: "" (vide si succès)
Code complet : service Calculator#
Les blocs suivants sont illustratifs (non exécutables directement) et nécessitent grpcio et grpcio-tools installés (pip install grpcio grpcio-tools).
Fichier .proto complet#
// fichier: calculator.proto
syntax = "proto3";
package calculator;
option python_package = "calculator_pb2";
// Énumération des opérations
enum Operation {
ADDITION = 0;
SOUSTRACTION = 1;
MULTIPLICATION = 2;
DIVISION = 3;
PUISSANCE = 4;
}
// Message de requête unary
message CalculRequest {
double a = 1;
double b = 2;
Operation op = 3;
}
// Message de réponse
message CalculResponse {
double result = 1;
bool ok = 2;
string error = 3;
}
// Pour le streaming : séquence de nombres
message NumberStream {
double value = 1;
int64 sequence = 2;
}
// Résultat statistique (client streaming)
message StatsResponse {
double sum = 1;
double mean = 2;
double min = 3;
double max = 4;
int32 count = 5;
double variance = 6;
}
// Service avec les 4 types de RPC
service Calculator {
// 1. Unary : simple calcul
rpc Compute(CalculRequest) returns (CalculResponse);
// 2. Server streaming : suite mathématique (Fibonacci, suite de l'opération)
rpc ComputeSequence(CalculRequest) returns (stream NumberStream);
// 3. Client streaming : le client envoie des nombres, le serveur calcule les stats
rpc ComputeStats(stream NumberStream) returns (StatsResponse);
// 4. Bidirectionnel : le client envoie des requêtes, le serveur répond au fil de l'eau
rpc BidirectionalCompute(stream CalculRequest) returns (stream CalculResponse);
}
Génération du code Python#
# Installation
pip install grpcio grpcio-tools
# Génération des stubs Python
python -m grpc_tools.protoc \
--proto_path=. \
--python_out=. \
--grpc_python_out=. \
calculator.proto
# Génère deux fichiers :
# calculator_pb2.py — classes de messages
# calculator_pb2_grpc.py — stubs client et serveur
Serveur gRPC complet#
# fichier: server.py
import grpc
import math
import time
import logging
from concurrent import futures
import calculator_pb2
import calculator_pb2_grpc
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class CalculatorServicer(calculator_pb2_grpc.CalculatorServicer):
"""Implémentation du service Calculator."""
def Compute(self, request, context):
"""1. Unary RPC : calcul simple."""
logger.info(f"Compute: {request.a} op={request.op} {request.b}")
try:
if request.op == calculator_pb2.ADDITION:
result = request.a + request.b
elif request.op == calculator_pb2.SOUSTRACTION:
result = request.a - request.b
elif request.op == calculator_pb2.MULTIPLICATION:
result = request.a * request.b
elif request.op == calculator_pb2.DIVISION:
if request.b == 0:
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
context.set_details("Division par zéro")
return calculator_pb2.CalculResponse(ok=False, error="Division par zéro")
result = request.a / request.b
elif request.op == calculator_pb2.PUISSANCE:
result = request.a ** request.b
else:
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
return calculator_pb2.CalculResponse(ok=False, error="Opération inconnue")
return calculator_pb2.CalculResponse(result=result, ok=True)
except Exception as e:
context.set_code(grpc.StatusCode.INTERNAL)
context.set_details(str(e))
return calculator_pb2.CalculResponse(ok=False, error=str(e))
def ComputeSequence(self, request, context):
"""2. Server Streaming : génère une séquence de résultats."""
logger.info(f"ComputeSequence: base={request.a}, pas={request.b}, n={int(request.b)}")
valeur = request.a
n = max(1, int(abs(request.b)))
for i in range(n):
if context.is_active():
if request.op == calculator_pb2.ADDITION:
valeur += request.a
elif request.op == calculator_pb2.MULTIPLICATION:
valeur *= request.a
elif request.op == calculator_pb2.PUISSANCE:
valeur = request.a ** (i + 1)
yield calculator_pb2.NumberStream(value=valeur, sequence=i + 1)
time.sleep(0.1) # Simulation latence traitement
def ComputeStats(self, request_iterator, context):
"""3. Client Streaming : calcule les statistiques des nombres reçus."""
valeurs = []
for nombre in request_iterator:
valeurs.append(nombre.value)
logger.info(f"Reçu valeur #{nombre.sequence}: {nombre.value}")
if not valeurs:
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
context.set_details("Aucune valeur reçue")
return calculator_pb2.StatsResponse()
n = len(valeurs)
somme = sum(valeurs)
moyenne = somme / n
variance = sum((x - moyenne) ** 2 for x in valeurs) / n
return calculator_pb2.StatsResponse(
sum=somme,
mean=moyenne,
min=min(valeurs),
max=max(valeurs),
count=n,
variance=variance,
)
def BidirectionalCompute(self, request_iterator, context):
"""4. Bidirectionnel : traitement au fil de l'eau."""
for request in request_iterator:
logger.info(f"BidiCompute: {request.a} op {request.b}")
try:
if request.op == calculator_pb2.ADDITION:
result = request.a + request.b
elif request.op == calculator_pb2.MULTIPLICATION:
result = request.a * request.b
else:
result = request.a + request.b
yield calculator_pb2.CalculResponse(result=result, ok=True)
except Exception as e:
yield calculator_pb2.CalculResponse(ok=False, error=str(e))
def intercepteur_logging(continuation, handler_call_details):
"""Intercepteur gRPC pour le logging des appels."""
methode = handler_call_details.method
debut = time.time()
logger.info(f"[gRPC] → {methode}")
reponse = continuation(handler_call_details)
duree = (time.time() - debut) * 1000
logger.info(f"[gRPC] ← {methode} ({duree:.1f}ms)")
return reponse
def demarrer_serveur(port: int = 50051):
"""Démarre le serveur gRPC avec intercepteurs et TLS."""
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=10),
interceptors=[grpc.ServerInterceptor()], # Intercepteur simplifié
options=[
("grpc.max_send_message_length", 10 * 1024 * 1024), # 10 MB
("grpc.max_receive_message_length", 10 * 1024 * 1024),
("grpc.keepalive_time_ms", 10000),
("grpc.keepalive_timeout_ms", 5000),
]
)
calculator_pb2_grpc.add_CalculatorServicer_to_server(
CalculatorServicer(), server
)
# Écoute non chiffrée (dev) — en production, utiliser credentials TLS
server.add_insecure_port(f"[::]:{port}")
server.start()
logger.info(f"Serveur gRPC démarré sur le port {port}")
try:
server.wait_for_termination()
except KeyboardInterrupt:
server.stop(grace=5)
if __name__ == "__main__":
demarrer_serveur()
Client gRPC avec les 4 types de streaming#
# fichier: client.py
import grpc
import time
import logging
import calculator_pb2
import calculator_pb2_grpc
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def creer_canal(adresse: str = "localhost:50051") -> grpc.Channel:
"""Crée un canal gRPC avec options de connexion."""
return grpc.insecure_channel(
adresse,
options=[
("grpc.keepalive_time_ms", 10000),
("grpc.keepalive_timeout_ms", 5000),
("grpc.keepalive_permit_without_calls", True),
("grpc.http2.max_pings_without_data", 0),
]
)
def demo_unary(stub):
"""1. Unary RPC : calcul simple."""
print("\n=== 1. Unary RPC ===")
requete = calculator_pb2.CalculRequest(a=3.14159, b=2.71828, op=calculator_pb2.MULTIPLICATION)
try:
reponse = stub.Compute(requete, timeout=5.0)
if reponse.ok:
print(f"π × e = {reponse.result:.6f}")
else:
print(f"Erreur : {reponse.error}")
except grpc.RpcError as e:
print(f"Erreur gRPC : {e.code()} — {e.details()}")
def demo_server_streaming(stub):
"""2. Server Streaming : suite géométrique."""
print("\n=== 2. Server Streaming RPC ===")
requete = calculator_pb2.CalculRequest(a=2.0, b=10.0, op=calculator_pb2.PUISSANCE)
try:
for nombre in stub.ComputeSequence(requete, timeout=30.0):
print(f" 2^{nombre.sequence} = {nombre.value:.0f}")
except grpc.RpcError as e:
print(f"Erreur gRPC : {e.code()} — {e.details()}")
def demo_client_streaming(stub):
"""3. Client Streaming : envoi d'une série de mesures."""
print("\n=== 3. Client Streaming RPC ===")
def generateur_mesures():
import random
for i in range(20):
valeur = random.gauss(50, 10)
print(f" Envoi mesure #{i+1}: {valeur:.2f}")
yield calculator_pb2.NumberStream(value=valeur, sequence=i + 1)
time.sleep(0.05)
try:
stats = stub.ComputeStats(generateur_mesures(), timeout=30.0)
print(f"\nStatistiques reçues du serveur :")
print(f" N = {stats.count}")
print(f" Somme = {stats.sum:.2f}")
print(f" Moyenne = {stats.mean:.2f}")
print(f" Min/Max = {stats.min:.2f} / {stats.max:.2f}")
print(f" Écart-type = {stats.variance**0.5:.2f}")
except grpc.RpcError as e:
print(f"Erreur gRPC : {e.code()} — {e.details()}")
def demo_bidirectionnel(stub):
"""4. Bidirectionnel Streaming : pipeline de calculs."""
print("\n=== 4. Bidirectionnel Streaming RPC ===")
operations = [
(10.0, 5.0, calculator_pb2.ADDITION),
(10.0, 5.0, calculator_pb2.SOUSTRACTION),
(3.0, 4.0, calculator_pb2.MULTIPLICATION),
(15.0, 3.0, calculator_pb2.DIVISION),
(2.0, 8.0, calculator_pb2.PUISSANCE),
]
noms_op = {0: "+", 1: "-", 2: "×", 3: "÷", 4: "^"}
def generateur_requetes():
for a, b, op in operations:
print(f" Envoi : {a} {noms_op[op]} {b}")
yield calculator_pb2.CalculRequest(a=a, b=b, op=op)
time.sleep(0.1)
try:
for i, reponse in enumerate(stub.BidirectionalCompute(generateur_requetes())):
a, b, op = operations[i]
if reponse.ok:
print(f" Reçu : {a} {noms_op[op]} {b} = {reponse.result}")
else:
print(f" Erreur : {reponse.error}")
except grpc.RpcError as e:
print(f"Erreur gRPC : {e.code()} — {e.details()}")
def main():
with creer_canal("localhost:50051") as canal:
stub = calculator_pb2_grpc.CalculatorStub(canal)
demo_unary(stub)
demo_server_streaming(stub)
demo_client_streaming(stub)
demo_bidirectionnel(stub)
if __name__ == "__main__":
main()
Intercepteurs gRPC#
# Intercepteur d'authentification JWT
class AuthInterceptor(grpc.ServerInterceptor):
"""Intercepteur vérifiant le token JWT dans les métadonnées."""
def __init__(self, cle_secrete: str):
self.cle_secrete = cle_secrete
def intercept_service(self, continuation, handler_call_details):
# Récupérer les métadonnées (en-têtes gRPC)
metadata = dict(handler_call_details.invocation_metadata)
token = metadata.get("authorization", "").replace("Bearer ", "")
if not token:
def abort(request, context):
context.abort(grpc.StatusCode.UNAUTHENTICATED, "Token manquant")
return grpc.unary_unary_rpc_method_handler(abort)
# Vérification simplifiée (utiliser PyJWT en production)
if not self._verifier_token(token):
def abort(request, context):
context.abort(grpc.StatusCode.UNAUTHENTICATED, "Token invalide")
return grpc.unary_unary_rpc_method_handler(abort)
return continuation(handler_call_details)
def _verifier_token(self, token: str) -> bool:
# En production : jwt.decode(token, self.cle_secrete, algorithms=["HS256"])
return len(token) > 10
# Intercepteur de retry côté client
class RetryInterceptor(grpc.UnaryUnaryClientInterceptor):
"""Intercepteur client avec retry automatique sur erreurs transitoires."""
def __init__(self, max_retries: int = 3, delai_initial: float = 0.1):
self.max_retries = max_retries
self.delai_initial = delai_initial
def intercept_unary_unary(self, continuation, client_call_details, request):
derniere_erreur = None
for tentative in range(self.max_retries):
try:
return continuation(client_call_details, request)
except grpc.RpcError as e:
if e.code() in (grpc.StatusCode.UNAVAILABLE, grpc.StatusCode.DEADLINE_EXCEEDED):
derniere_erreur = e
delai = self.delai_initial * (2 ** tentative)
logger.warning(f"Retry {tentative+1}/{self.max_retries} dans {delai:.1f}s")
time.sleep(delai)
else:
raise
raise derniere_erreur
Comparaison REST vs gRPC#
Codes de statut gRPC#
gRPC définit ses propres codes de statut (distincts des codes HTTP), transmis dans les trailers HTTP/2 :
Résumé#
Protocol Buffers et gRPC forment un duo puissant pour les communications microservices. Protobuf garantit un schéma strict, une sérialisation compacte (2 à 10 fois plus petite que JSON) et une génération de code multilangage. gRPC exploite HTTP/2 pour le multiplexage, le streaming bidirectionnel et les faibles latences.
Les points clés :
Encodage varint : les petits entiers tiennent en 1-2 octets dans Protobuf
Wire types : 6 types d’encodage selon la nature des données (varint, 32-bit, 64-bit, length-delimited)
4 types de RPC : unary, server streaming, client streaming, bidirectionnel — tous sur un seul stream HTTP/2
Intercepteurs : pattern middleware pour le logging, l’auth, le retry
gRPC vs REST : gRPC excelle pour les communications internes à haute fréquence ; REST reste le standard pour les APIs publiques