Analyse réseau avec Scapy#
L’analyse réseau consiste à capturer, décoder et interpréter les paquets qui transitent sur un réseau. Elle est indispensable pour le débogage d’applications, l’audit de sécurité, la compréhension des protocoles et la détection d’anomalies. Dans ce chapitre, nous explorons la forge et la dissection de paquets avec Scapy, le parsing manuel avec struct, et les outils de capture comme Wireshark et tcpdump.
Introduction à Scapy#
Scapy est une bibliothèque Python de manipulation de paquets réseau. Elle permet de :
Forger des paquets arbitraires à n’importe quelle couche du modèle OSI.
Dissect (décoder) des paquets reçus ou lus depuis un fichier.
Envoyer des paquets (nécessite des privilèges root pour les sockets raw).
Lire des fichiers
.pcapproduits par Wireshark ou tcpdump.
Installation
pip install scapy
Pour l’envoi de paquets bruts, Scapy nécessite des droits root (ou CAP_NET_RAW). Dans ce chapitre, nous nous concentrons sur la forge illustrative et la dissection, qui ne requièrent pas de captures live.
### Structure en couches
Scapy modélise chaque protocole comme une classe Python. Les couches s'assemblent par l'opérateur `/` :
Ether() / IP() / TCP() / Raw(load=b »GET / HTTP/1.1\r\n\r\n »)
```{code-cell} python
# Illustration de la structure en couches sans envoi réel
# (Scapy n'est pas nécessairement installé — on montre la logique)
couches = [
("Couche 2 — Liaison", "Ether(dst='ff:ff:ff:ff:ff:ff', src='aa:bb:cc:dd:ee:ff', type=0x0800)"),
("Couche 3 — Réseau", "IP(src='192.168.1.10', dst='93.184.216.34', ttl=64)"),
("Couche 4 — Transport", "TCP(sport=54321, dport=80, flags='S', seq=1000)"),
("Couche 7 — Application","Raw(load=b'GET / HTTP/1.1\\r\\nHost: example.com\\r\\n\\r\\n')"),
]
print("Construction d'un paquet HTTP avec Scapy :")
print("=" * 60)
for nom, code in couches:
print(f"\n{nom}")
print(f" {code}")
print("\n─" * 30)
print("Assemblage final :")
print(" paquet = Ether() / IP(dst='93.184.216.34') / TCP(dport=80, flags='S') / Raw(...)")
Anatomie d’un paquet Ethernet/IP/TCP#
Avant de parser des paquets, il faut comprendre la structure précise de chaque en-tête.
fig, ax = plt.subplots(figsize=(12, 9))
ax.set_xlim(0, 32)
ax.set_ylim(0, 28)
ax.axis('off')
ax.set_title("Anatomie d'un paquet Ethernet / IP / TCP", fontsize=14, fontweight='bold', pad=15)
def draw_field(ax, x, y, w, h, label, sublabel, color):
rect = mpatches.FancyBboxPatch((x, y), w, h,
boxstyle="round,pad=0.05",
facecolor=color, edgecolor='white', linewidth=1.5)
ax.add_patch(rect)
ax.text(x + w/2, y + h/2 + 0.15, label, ha='center', va='center',
fontsize=8, fontweight='bold', color='white')
if sublabel:
ax.text(x + w/2, y + h/2 - 0.35, sublabel, ha='center', va='center',
fontsize=7, color='#eeeeee', style='italic')
# En-tête Ethernet (14 octets)
y_eth = 24
ax.text(0.2, y_eth + 1.3, "En-tête Ethernet (14 octets)", fontsize=10, fontweight='bold', color='#4575b4')
eth_fields = [
(0, 6, "MAC Destination", "6 octets", '#4575b4'),
(6, 6, "MAC Source", "6 octets", '#74add1'),
(12, 2, "EtherType", "2 oct.", '#abd9e9'),
]
for x, w, label, sub, col in eth_fields:
draw_field(ax, x, y_eth, w, 1.2, label, sub, col)
# En-tête IP (20 octets minimum)
y_ip = 20
ax.text(0.2, y_ip + 1.3, "En-tête IP (20 octets min.)", fontsize=10, fontweight='bold', color='#1a9850')
ip_fields = [
(0, 1, "Ver", "4b", '#1a9850'),
(1, 1, "IHL", "4b", '#31a354'),
(2, 1, "DSCP/ECN", "1o", '#74c476'),
(3, 2, "Total Length", "2 oct.", '#a1d99b'),
(5, 2, "Identification", "2 oct.", '#1a9850'),
(7, 1.5, "Flags/Frag", "3o", '#31a354'),
(8.5, 1, "TTL", "1o", '#74c476'),
(9.5, 1, "Protocol", "1o", '#a1d99b'),
(10.5, 2, "Checksum", "2 oct.", '#1a9850'),
(12.5, 4, "IP Source", "4 octets", '#31a354'),
(16.5, 4, "IP Destination", "4 octets", '#74c476'),
]
for x, w, label, sub, col in ip_fields:
draw_field(ax, x, y_ip, w, 1.2, label, sub, col)
# En-tête TCP (20 octets minimum)
y_tcp = 16
ax.text(0.2, y_tcp + 1.3, "En-tête TCP (20 octets min.)", fontsize=10, fontweight='bold', color='#d73027')
tcp_fields = [
(0, 4, "Port Source", "2 octets", '#d73027'),
(4, 4, "Port Destination", "2 octets", '#f46d43'),
(8, 8, "Numéro de séquence", "4 octets", '#d73027'),
(16, 8, "Numéro d'acquittement", "4 octets", '#f46d43'),
(24, 2, "DO", "4b", '#fdae61'),
(26, 3, "Flags", "9b", '#d73027'),
(29, 3, "Window", "2o", '#f46d43'),
]
for x, w, label, sub, col in tcp_fields:
draw_field(ax, x, y_tcp, w, 1.2, label, sub, col)
# Données
y_data = 12
ax.text(0.2, y_data + 1.3, "Données (payload)", fontsize=10, fontweight='bold', color='#6a3d9a')
draw_field(ax, 0, y_data, 32, 1.2, "Payload applicatif (variable)", "HTTP, DNS, TLS…", '#6a3d9a')
# Légende des tailles
for y_pos, texte in [(25.8, "Ethernet : 14o"), (21.8, "IP : 20o+"), (17.8, "TCP : 20o+")]:
ax.text(22, y_pos, texte, fontsize=8.5, color='#555555', style='italic')
plt.tight_layout()
plt.savefig('_static/paquet_anatomie.png', dpi=100, bbox_inches='tight')
plt.show()
Parser un paquet avec struct#
La bibliothèque struct de la stdlib Python permet de décoder des séquences d’octets brutes selon un format précis. C’est exactement ce que fait Scapy en interne.
import struct
import socket
def parse_ethernet(données: bytes) -> dict:
"""Décode un en-tête Ethernet (14 octets)."""
if len(données) < 14:
raise ValueError("Données trop courtes pour un en-tête Ethernet")
mac_dst_b = données[0:6]
mac_src_b = données[6:12]
ether_type = struct.unpack('!H', données[12:14])[0]
def fmt_mac(b: bytes) -> str:
return ':'.join(f'{x:02x}' for x in b)
return {
'mac_dst': fmt_mac(mac_dst_b),
'mac_src': fmt_mac(mac_src_b),
'ether_type': hex(ether_type),
'protocole': {0x0800: 'IPv4', 0x0806: 'ARP', 0x86DD: 'IPv6'}.get(ether_type, 'Inconnu'),
}
def parse_ip(données: bytes) -> dict:
"""Décode un en-tête IPv4 (minimum 20 octets)."""
if len(données) < 20:
raise ValueError("Données trop courtes pour un en-tête IPv4")
# !BBHHHBBH4s4s — big-endian
# ver_ihl(1) dscp(1) total_len(2) id(2) flags_frag(2) ttl(1) proto(1) cksum(2) src(4) dst(4)
(ver_ihl, dscp, total_len, ident, flags_frag,
ttl, proto, cksum, src_b, dst_b) = struct.unpack('!BBHHHBBH4s4s', données[:20])
version = (ver_ihl >> 4) & 0xF
ihl = (ver_ihl & 0xF) * 4
return {
'version': version,
'ihl': ihl,
'total_len': total_len,
'ttl': ttl,
'proto': proto,
'proto_nom': {1: 'ICMP', 6: 'TCP', 17: 'UDP'}.get(proto, str(proto)),
'src': socket.inet_ntoa(src_b),
'dst': socket.inet_ntoa(dst_b),
}
def parse_tcp(données: bytes) -> dict:
"""Décode un en-tête TCP (minimum 20 octets)."""
if len(données) < 20:
raise ValueError("Données trop courtes pour un en-tête TCP")
sport, dport, seq, ack, do_flags, window = struct.unpack('!HHLLHH', données[:14])
data_offset = ((do_flags >> 12) & 0xF) * 4
flags_val = do_flags & 0x1FF
noms_flags = {0x001: 'FIN', 0x002: 'SYN', 0x004: 'RST',
0x008: 'PSH', 0x010: 'ACK', 0x020: 'URG'}
flags_actifs = [nom for bit, nom in noms_flags.items() if flags_val & bit]
return {
'sport': sport,
'dport': dport,
'seq': seq,
'ack': ack,
'flags': flags_actifs,
'window': window,
'hdr_size': data_offset,
}
# Paquet synthétique (trame Ethernet + IP + TCP + payload HTTP)
# On le construit en assemblant les champs manuellement
def construire_paquet_test() -> bytes:
"""Construit un paquet Ethernet/IP/TCP synthétique pour démonstration."""
# MAC
mac_src = b'\xaa\xbb\xcc\xdd\xee\x01'
mac_dst = b'\xaa\xbb\xcc\xdd\xee\x02'
ether_type = struct.pack('!H', 0x0800)
eth_hdr = mac_dst + mac_src + ether_type
# IP
src_ip = socket.inet_aton('10.0.0.1')
dst_ip = socket.inet_aton('93.184.216.34')
payload_tcp = b'GET / HTTP/1.1\r\nHost: example.com\r\n\r\n'
tcp_hdr_size = 20
ip_total = 20 + tcp_hdr_size + len(payload_tcp)
ip_hdr = struct.pack('!BBHHHBBH4s4s',
0x45, # ver=4, ihl=5 (20 octets)
0, # DSCP
ip_total, # total length
0x1234, # identification
0, # flags + fragment offset
64, # TTL
6, # protocol = TCP
0, # checksum (non calculé pour démo)
src_ip,
dst_ip)
# TCP (SYN-ACK simulé)
flags_psh_ack = 0x5018 # data_offset=5, PSH+ACK
tcp_hdr = struct.pack('!HHLLHH',
54321, # sport
80, # dport
1000, # seq
0, # ack
flags_psh_ack,
65535) # window
tcp_hdr += b'\x00' * 6 # checksum + urgent pointer
return eth_hdr + ip_hdr + tcp_hdr + payload_tcp
paquet = construire_paquet_test()
print(f"Paquet synthétique : {len(paquet)} octets\n")
eth = parse_ethernet(paquet)
print("=== En-tête Ethernet ===")
for k, v in eth.items():
print(f" {k:<15} : {v}")
ip = parse_ip(paquet[14:])
print("\n=== En-tête IP ===")
for k, v in ip.items():
print(f" {k:<15} : {v}")
tcp = parse_tcp(paquet[14 + ip['ihl']:])
print("\n=== En-tête TCP ===")
for k, v in tcp.items():
print(f" {k:<15} : {v}")
payload_offset = 14 + ip['ihl'] + tcp['hdr_size']
payload = paquet[payload_offset:]
print(f"\n=== Payload applicatif ({len(payload)} octets) ===")
print(f" {payload.decode('ascii', errors='replace')!r}")
Paquet synthétique : 93 octets
=== En-tête Ethernet ===
mac_dst : aa:bb:cc:dd:ee:02
mac_src : aa:bb:cc:dd:ee:01
ether_type : 0x800
protocole : IPv4
=== En-tête IP ===
version : 4
ihl : 20
total_len : 77
ttl : 64
proto : 6
proto_nom : TCP
src : 10.0.0.1
dst : 93.184.216.34
---------------------------------------------------------------------------
error Traceback (most recent call last)
Cell In[3], line 130
127 for k, v in ip.items():
128 print(f" {k:<15} : {v}")
--> 130 tcp = parse_tcp(paquet[14 + ip['ihl']:])
131 print("\n=== En-tête TCP ===")
132 for k, v in tcp.items():
Cell In[3], line 54, in parse_tcp(données)
51 if len(données) < 20:
52 raise ValueError("Données trop courtes pour un en-tête TCP")
---> 54 sport, dport, seq, ack, do_flags, window = struct.unpack('!HHLLHH', données[:14])
56 data_offset = ((do_flags >> 12) & 0xF) * 4
57 flags_val = do_flags & 0x1FF
error: unpack requires a buffer of 16 bytes
Forge de paquets avec Scapy (illustratif)#
Voici comment Scapy s’utilise concrètement. Ces extraits sont illustratifs — ils supposent que Scapy est installé et que les privilèges nécessaires sont disponibles.
# Code illustratif Scapy — sans exécution réelle de send/sniff
code_scapy = """
from scapy.all import Ether, IP, TCP, UDP, ICMP, DNS, DNSQR, Raw, hexdump
# 1. Construire un paquet TCP SYN
paquet = IP(dst='192.168.1.1') / TCP(dport=80, flags='S')
paquet.show() # affiche tous les champs
hexdump(paquet) # affiche le hexdump annoté
# 2. Paquet ICMP Echo Request (ping)
ping = IP(dst='8.8.8.8') / ICMP()
ping.show2() # show2() calcule les checksums
# 3. Requête DNS forgée
dns_q = IP(dst='8.8.8.8') / UDP(dport=53) / DNS(rd=1, qd=DNSQR(qname='example.com'))
# 4. Inspecter un champ spécifique
print(paquet[TCP].flags)
print(paquet[IP].ttl)
print(paquet[TCP].dport)
# 5. Lire un fichier pcap
from scapy.all import rdpcap
paquets = rdpcap('capture.pcap')
for p in paquets[:5]:
if IP in p:
print(p[IP].src, '->', p[IP].dst)
# 6. Écrire un fichier pcap
from scapy.all import wrpcap
wrpcap('sortie.pcap', [paquet, ping, dns_q])
"""
print("Exemples d'utilisation de Scapy (illustratif) :")
print("=" * 55)
print(code_scapy)
Inspecter les champs d’un paquet Scapy#
Scapy affiche la décomposition couche par couche avec la méthode .show() :
>>> paquet = IP(dst='192.168.1.1') / TCP(dport=443, flags='S')
>>> paquet.show()
###[ IP ]###
version = 4
ihl = None
tos = 0x0
len = None
id = 1
flags =
frag = 0
ttl = 64
proto = tcp
chksum = None
src = 192.168.1.2 ← rempli automatiquement
dst = 192.168.1.1
###[ TCP ]###
sport = ftp_data
dport = https
seq = 0
ack = 0
dataofs = None
reserved = 0
flags = S
window = 8192
chksum = None
urgptr = 0
Format PCAP#
Les fichiers .pcap sont le format standard de capture de paquets, produit par libpcap (utilisé par Wireshark, tcpdump, tshark).
Structure d’un fichier PCAP#
fig, ax = plt.subplots(figsize=(12, 5))
ax.set_xlim(0, 12)
ax.set_ylim(0, 5)
ax.axis('off')
ax.set_title("Structure d'un fichier PCAP", fontsize=13, fontweight='bold', pad=12)
blocs = [
(0, 2.2, "Global Header\n(24 octets)", "#4575b4",
"Magic: 0xa1b2c3d4\nVersion maj/min\nSnaplen, DLT"),
(2.5, 1.8, "Packet Header\n(16 octets)", "#1a9850",
"ts_sec, ts_usec\ncaplen, origlen"),
(4.5, 1.8, "Packet Data\n(caplen octets)", "#d73027",
"Trame brute\n(Ethernet…)"),
(6.5, 1.8, "Packet Header\n(16 octets)", "#1a9850",
"ts_sec, ts_usec\ncaplen, origlen"),
(8.5, 1.8, "Packet Data\n(caplen octets)", "#d73027",
"Trame brute\n(Ethernet…)"),
(10.5, 1.5, "…", "#888888", ""),
]
for x, w, label, col, desc in blocs:
ax.add_patch(mpatches.FancyBboxPatch((x+0.05, 1.5), w-0.1, 2,
boxstyle="round,pad=0.1", facecolor=col, alpha=0.2,
edgecolor=col, linewidth=2))
ax.text(x + w/2, 3.05, label, ha='center', va='center',
fontsize=9, fontweight='bold', color=col)
ax.text(x + w/2, 2.0, desc, ha='center', va='center',
fontsize=7.5, color='#444444', style='italic')
ax.text(6, 1.0,
"Global Header : une seule fois au début du fichier\n"
"Packet Header + Packet Data : répétés pour chaque paquet capturé",
ha='center', va='center', fontsize=9, color='#333333')
plt.tight_layout()
plt.savefig('_static/pcap_structure.png', dpi=100, bbox_inches='tight')
plt.show()
import struct
import io
# Magic numbers PCAP
PCAP_MAGIC_LE = 0xa1b2c3d4 # little-endian timestamps en microsecondes
PCAP_MAGIC_LE_NS = 0xa1b23c4d # little-endian timestamps en nanosecondes
def lire_pcap_header(données: bytes) -> dict:
"""
Lit le Global Header d'un fichier PCAP.
Format : magic(4) version_major(2) version_minor(2)
thiszone(4) sigfigs(4) snaplen(4) network(4)
"""
if len(données) < 24:
raise ValueError("Fichier PCAP trop court")
magic = struct.unpack('<I', données[0:4])[0]
if magic == PCAP_MAGIC_LE:
endian = '<'
elif struct.unpack('>I', données[0:4])[0] == PCAP_MAGIC_LE:
endian = '>'
else:
raise ValueError(f"Magic number inconnu : {hex(magic)}")
maj, minor, tz, sigfigs, snaplen, network = struct.unpack(
endian + 'HHiIII', données[4:24])
dlt_noms = {1: 'Ethernet', 105: 'IEEE 802.11 WiFi', 113: 'Linux cooked',
127: 'IEEE 802.11 Radiotap', 228: 'IPv4 raw'}
return {
'magic': hex(magic),
'endian': endian,
'version': f"{maj}.{minor}",
'snaplen': snaplen,
'network': network,
'dlt_nom': dlt_noms.get(network, f"DLT #{network}"),
}
def lire_paquets_pcap(données: bytes) -> list[dict]:
"""
Lit les paquets d'un fichier PCAP.
Retourne une liste de dicts avec ts, caplen, origlen, données.
"""
hdr = lire_pcap_header(données)
endian = hdr['endian']
offset = 24
paquets = []
while offset + 16 <= len(données):
ts_sec, ts_usec, caplen, origlen = struct.unpack(
endian + 'IIII', données[offset:offset+16])
offset += 16
if offset + caplen > len(données):
break
raw = données[offset:offset+caplen]
paquets.append({
'ts': ts_sec + ts_usec / 1e6,
'caplen': caplen,
'origlen': origlen,
'raw': raw,
})
offset += caplen
return paquets
def créer_pcap_minimal() -> bytes:
"""Crée un fichier PCAP minimal en mémoire avec deux paquets synthétiques."""
# Global header
global_hdr = struct.pack('<IHHiIII',
0xa1b2c3d4, # magic
2, 4, # version 2.4
0, # timezone
0, # sigfigs
65535, # snaplen
1) # DLT_EN10MB (Ethernet)
def paquet_pcap(données: bytes, ts: float) -> bytes:
ts_sec = int(ts)
ts_usec = int((ts - ts_sec) * 1e6)
hdr = struct.pack('<IIII', ts_sec, ts_usec, len(données), len(données))
return hdr + données
# Deux trames Ethernet synthétiques
trame1 = construire_paquet_test()
trame2 = construire_paquet_test()
# On simule un second paquet avec des IPs différentes
trame2 = trame2[:26] + socket.inet_aton('10.0.0.2') + trame2[30:]
return global_hdr + paquet_pcap(trame1, 1_700_000_000.123456) \
+ paquet_pcap(trame2, 1_700_000_000.246912)
# Démonstration
pcap_data = créer_pcap_minimal()
print(f"Fichier PCAP synthétique : {len(pcap_data)} octets")
hdr = lire_pcap_header(pcap_data)
print("\nGlobal Header :")
for k, v in hdr.items():
print(f" {k:<12} : {v}")
paquets = lire_paquets_pcap(pcap_data)
print(f"\n{len(paquets)} paquets lus :")
for i, pkt in enumerate(paquets):
ip = parse_ip(pkt['raw'][14:])
tcp = parse_tcp(pkt['raw'][14 + ip['ihl']:])
print(f" Paquet {i+1} : ts={pkt['ts']:.6f} "
f"{ip['src']}:{tcp['sport']} → {ip['dst']}:{tcp['dport']} "
f"flags={tcp['flags']} {pkt['caplen']}o")
Décodage manuel de protocoles applicatifs#
Décodage d’une requête DNS#
import struct
def parse_dns_question(données: bytes, offset: int) -> tuple[str, int, int, int]:
"""
Décode une question DNS à partir de 'offset'.
Retourne (nom, qtype, qclass, nouvel_offset).
"""
parties = []
while offset < len(données):
longueur = données[offset]
if longueur == 0:
offset += 1
break
# Compression DNS (pointeur)
if longueur & 0xC0 == 0xC0:
ptr = struct.unpack('!H', données[offset:offset+2])[0] & 0x3FFF
partie, _, _, _ = parse_dns_question(données, ptr)
parties.append(partie)
offset += 2
break
offset += 1
parties.append(données[offset:offset+longueur].decode('ascii', errors='replace'))
offset += longueur
nom = '.'.join(parties)
qtype, qclass = struct.unpack('!HH', données[offset:offset+4])
return nom, qtype, qclass, offset + 4
def parse_dns(données: bytes) -> dict:
"""Décode un message DNS (en-tête + questions)."""
if len(données) < 12:
raise ValueError("Message DNS trop court")
tx_id, flags, qdcount, ancount, nscount, arcount = struct.unpack(
'!HHHHHH', données[:12])
qr = (flags >> 15) & 1
opcode = (flags >> 11) & 0xF
aa = (flags >> 10) & 1
tc = (flags >> 9) & 1
rd = (flags >> 8) & 1
ra = (flags >> 7) & 1
rcode = flags & 0xF
questions = []
offset = 12
for _ in range(qdcount):
nom, qtype, qclass, offset = parse_dns_question(données, offset)
questions.append({
'nom': nom,
'type': {1:'A', 28:'AAAA', 5:'CNAME', 15:'MX', 2:'NS', 16:'TXT'}.get(qtype, str(qtype)),
'classe': 'IN' if qclass == 1 else str(qclass),
})
return {
'tx_id': hex(tx_id),
'type': 'Réponse' if qr else 'Requête',
'opcode': opcode,
'rd': bool(rd),
'rcode': rcode,
'questions': questions,
'qdcount': qdcount,
'ancount': ancount,
}
# Forge d'une requête DNS pour example.com de type A
def forge_dns_query(nom: str, tx_id: int = 0xABCD) -> bytes:
"""Forge une requête DNS (question uniquement)."""
# En-tête DNS : ID flags qdcount ancount nscount arcount
flags = 0x0100 # QR=0 (requête), RD=1 (récursion souhaitée)
en_tête = struct.pack('!HHHHHH', tx_id, flags, 1, 0, 0, 0)
# Encodage QNAME : chaque label précédé de sa longueur, terminé par 0x00
qname = b''
for partie in nom.split('.'):
partie_b = partie.encode('ascii')
qname += bytes([len(partie_b)]) + partie_b
qname += b'\x00'
# QTYPE=A (1), QCLASS=IN (1)
question = qname + struct.pack('!HH', 1, 1)
return en_tête + question
requête = forge_dns_query('example.com', tx_id=0x1234)
print(f"Requête DNS forgée : {len(requête)} octets")
print(f"Hexdump : {requête.hex(' ')}")
print()
résultat = parse_dns(requête)
print("Décodage :")
for k, v in résultat.items():
print(f" {k:<12} : {v}")
Décodage d’un TLS ClientHello#
def parse_tls_client_hello(données: bytes) -> dict:
"""
Décode partiellement un message TLS ClientHello.
Format : RecordLayer(5) + Handshake(4) + ClientHello(variable).
"""
résultat = {}
# TLS Record Layer (5 octets)
if len(données) < 5:
return {'erreur': 'Trop court'}
content_type = données[0]
version_maj, version_min = données[1], données[2]
record_len = struct.unpack('!H', données[3:5])[0]
versions_tls = {(3,1): 'TLS 1.0', (3,2): 'TLS 1.1',
(3,3): 'TLS 1.2', (3,4): 'TLS 1.3'}
résultat['record_type'] = {20:'ChangeCipherSpec', 21:'Alert',
22:'Handshake', 23:'ApplicationData'}.get(content_type)
résultat['tls_version'] = versions_tls.get((version_maj, version_min), f"{version_maj}.{version_min}")
résultat['record_length'] = record_len
if content_type != 22 or len(données) < 9:
return résultat
# Handshake header (4 octets)
hs_type = données[5]
hs_len = int.from_bytes(données[6:9], 'big')
résultat['handshake_type'] = {1:'ClientHello', 2:'ServerHello',
11:'Certificate', 12:'ServerKeyExchange',
14:'ServerHelloDone', 16:'ClientKeyExchange'}.get(hs_type)
if hs_type != 1 or len(données) < 43:
return résultat
# ClientHello
offset = 9
ch_ver_maj, ch_ver_min = données[offset], données[offset+1]
résultat['client_version'] = versions_tls.get((ch_ver_maj, ch_ver_min), f"{ch_ver_maj}.{ch_ver_min}")
# Random (32 octets)
résultat['random'] = données[offset+2:offset+34].hex()
offset += 34
# Session ID
sid_len = données[offset]; offset += 1 + sid_len
# Cipher suites
if offset + 2 > len(données):
return résultat
cs_len = struct.unpack('!H', données[offset:offset+2])[0]; offset += 2
cipher_suites = []
for i in range(0, cs_len, 2):
if offset + i + 2 <= len(données):
cs = struct.unpack('!H', données[offset+i:offset+i+2])[0]
cipher_suites.append(hex(cs))
résultat['cipher_suites_count'] = cs_len // 2
résultat['cipher_suites_sample'] = cipher_suites[:5]
return résultat
# ClientHello synthétique (début de paquet TLS 1.2 réaliste)
client_hello = bytes.fromhex(
'160303' # TLS Record : Handshake, TLS 1.2
'00f1' # Record length
'01' # Handshake type : ClientHello
'0000ed' # Handshake length
'0303' # Client version TLS 1.2
+ 'aa' * 32 # Random (32 octets)
+ '00' # Session ID length = 0
'0020' # Cipher suites length = 32 octets (16 suites)
+ 'c02bc02cc02fc030' # ECDHE-RSA-AES avec SHA256/SHA384
+ '009c009d006b0067' # RSA + AES-GCM
+ '00ff' # TLS_EMPTY_RENEGOTIATION_INFO_SCSV
+ '0100' # Remplissage
'01' # Compression methods length
'00' # null compression
)
parsed = parse_tls_client_hello(client_hello)
print("TLS ClientHello décodé :")
print("=" * 40)
for k, v in parsed.items():
print(f" {k:<28} : {v}")
Wireshark et tcpdump#
Wireshark — filtres essentiels#
# Filtres de capture BPF (passés à tcpdump ou à la capture Wireshark)
tcp port 80 or tcp port 443 # HTTP et HTTPS
host 192.168.1.10 # tout le trafic d'un hôte
not arp and not broadcast # exclure l'ARP et le broadcast
udp port 53 # DNS uniquement
# Filtres d'affichage Wireshark (syntaxe différente du BPF)
http.request.method == "POST" # requêtes HTTP POST
tls.handshake.type == 1 # TLS ClientHello
tcp.flags.syn == 1 && tcp.flags.ack == 0 # SYN pur (début de connexion)
dns.qry.name contains "google" # requêtes DNS vers Google
ip.addr == 192.168.1.10 # trafic d'un hôte
frame.len > 1400 # gros paquets (près du MTU)
tcp.analysis.retransmission # retransmissions TCP
# Suivre un flux TCP dans Wireshark
# Clic droit sur un paquet → Follow → TCP Stream
# Affiche la conversation HTTP/SMTP/POP3 reconstituée en clair
# Filtrer une conversation spécifique
tcp.stream eq 0 # premier flux TCP de la capture
tcpdump — exemples de commandes#
# Capturer sur l'interface eth0, 100 paquets, sauvegarder dans un fichier
tcpdump -i eth0 -c 100 -w capture.pcap
# Afficher le trafic HTTP (ports 80 et 8080) avec contenu ASCII
tcpdump -i eth0 -A 'tcp port 80 or tcp port 8080'
# Capturer uniquement les paquets DNS
tcpdump -i eth0 -n udp port 53
# Capturer le trafic entre deux hôtes
tcpdump -i eth0 'host 192.168.1.10 and host 192.168.1.1'
# Afficher les drapeaux TCP (SYN, FIN, RST)
tcpdump -i eth0 'tcp[tcpflags] & (tcp-syn|tcp-fin) != 0'
# Rotation de fichiers : 1 fichier par minute, 10 fichiers max
tcpdump -i eth0 -w capture_%Y%m%d_%H%M%S.pcap -G 60 -W 10
Statistiques de protocoles — visualisation#
# Simulation de statistiques de protocoles dans une capture réseau
np.random.seed(42)
protocoles = ['TCP/HTTP', 'TCP/HTTPS', 'UDP/DNS', 'UDP/QUIC', 'TCP/SSH',
'UDP/NTP', 'ICMP', 'TCP/SMTP', 'Autres']
paquets = [1820, 4350, 892, 1240, 310, 95, 180, 85, 228]
octets_mb = [p * np.random.uniform(0.3, 1.8) / 100 for p in paquets]
fig, axes = plt.subplots(1, 2, figsize=(13, 5))
# Graphe en camembert — nombre de paquets
couleurs_proto = sns.color_palette('muted', len(protocoles))
wedges, texts, autotexts = axes[0].pie(
paquets, labels=protocoles, autopct='%1.1f%%',
colors=couleurs_proto, startangle=90,
pctdistance=0.82, wedgeprops=dict(edgecolor='white', linewidth=1.5))
for text in autotexts:
text.set_fontsize(8)
axes[0].set_title("Répartition par protocole\n(nombre de paquets)", fontsize=12, fontweight='bold')
# Graphe barres — volume en Mo
colors_bar = couleurs_proto
bars = axes[1].barh(protocoles, octets_mb, color=couleurs_proto, edgecolor='white')
axes[1].set_xlabel("Volume capturé (Mo)", fontsize=11)
axes[1].set_title("Volume de données par protocole", fontsize=12, fontweight='bold')
for bar, val in zip(bars, octets_mb):
axes[1].text(val + 0.02, bar.get_y() + bar.get_height()/2,
f"{val:.2f} Mo", va='center', fontsize=8.5)
plt.tight_layout()
plt.savefig('_static/proto_stats.png', dpi=100, bbox_inches='tight')
plt.show()
# Reconstruction d'une timeline de paquets depuis une capture simulée
np.random.seed(7)
n = 300
timestamps = np.sort(np.random.exponential(0.02, n).cumsum())
tailles = np.random.choice([64, 128, 512, 1024, 1460], n,
p=[0.25, 0.20, 0.25, 0.15, 0.15])
protos_sim = np.random.choice(['HTTPS', 'HTTP', 'DNS', 'QUIC', 'SSH'],
n, p=[0.45, 0.15, 0.20, 0.15, 0.05])
colors_map = {'HTTPS': '#4575b4', 'HTTP': '#74add1', 'DNS': '#1a9850',
'QUIC': '#d73027', 'SSH': '#984ea3'}
colors_pts = [colors_map[p] for p in protos_sim]
fig, axes = plt.subplots(2, 1, figsize=(12, 7), sharex=True)
axes[0].scatter(timestamps, tailles, c=colors_pts, alpha=0.6, s=18)
axes[0].set_ylabel("Taille du paquet (octets)", fontsize=11)
axes[0].set_title("Analyse temporelle d'une capture réseau simulée", fontsize=13, fontweight='bold')
for proto, col in colors_map.items():
axes[0].scatter([], [], c=col, label=proto, s=30)
axes[0].legend(loc='upper right', fontsize=9, ncol=5)
# Débit lissé (octets par fenêtre de 0.1s)
fenêtre = 0.1
temps_max = timestamps[-1]
bins = np.arange(0, temps_max + fenêtre, fenêtre)
débit, _ = np.histogram(timestamps, bins=bins, weights=tailles)
t_centres = (bins[:-1] + bins[1:]) / 2
axes[1].fill_between(t_centres, débit / fenêtre / 1000, alpha=0.5, color='#4575b4')
axes[1].plot(t_centres, débit / fenêtre / 1000, color='#2c7bb6', linewidth=1.2)
axes[1].set_xlabel("Temps (secondes)", fontsize=11)
axes[1].set_ylabel("Débit (ko/s)", fontsize=11)
axes[1].set_title("Débit instantané lissé", fontsize=12, fontweight='bold')
plt.tight_layout()
plt.savefig('_static/capture_timeline.png', dpi=100, bbox_inches='tight')
plt.show()
Résumé#
Points clés du chapitre
Scapy permet de forger et de disséquer des paquets à n’importe quelle couche ; la syntaxe
Ether()/IP()/TCP()reflète exactement l’empilement des protocoles.Le module
structde Python permet de décoder des octets bruts selon un format précis :!pour big-endian (réseau),Bpour un octet non signé,Hpour 2 octets,Ipour 4 octets,4spour 4 octets de chaîne.Le format PCAP est simple : un en-tête global suivi de blocs (header de paquet + données brutes).
DNS encode les noms sous forme de labels précédés de leur longueur, terminés par
\x00.Wireshark distingue les filtres BPF (capture) des filtres d’affichage ;
Follow TCP Streamreconstruit une conversation complète.tcpdump avec
-wpermet de capturer vers un fichier analysable ultérieurement par Wireshark.