SIEM et threat hunting#
Un SIEM (Security Information and Event Management) est la colonne vertébrale du centre des opérations de sécurité (SOC). Il centralise les événements de sécurité provenant de l’ensemble de l’infrastructure, les corrèle selon des règles de détection, et produit des alertes actionnables pour les analystes.
Le threat hunting est l’activité complémentaire : là où le SIEM réagit à des patterns connus, le threat hunting est proactif — un analyste formule une hypothèse sur un comportement malveillant potentiellement non détecté et parcourt les données pour la confirmer ou l’infirmer.
Architecture SIEM#
Pipeline de traitement#
Un SIEM moderne traite les événements en cinq étapes :
1. Collecte : les événements proviennent de sources hétérogènes via plusieurs mécanismes :
Agents : logiciels déployés sur les endpoints (Winlogbeat, Filebeat, Wazuh agent) qui lisent les logs locaux et les transmettent
Syslog : protocole UDP/TCP/TLS standard pour les équipements réseau (pare-feux, switches, routeurs)
API polling : interrogation périodique des APIs cloud (AWS CloudTrail, Azure AD Sign-in logs, Google Workspace)
Direct database : connexion directe aux bases de données pour les logs applicatifs
2. Normalisation : les événements sont transformés dans un schéma unifié pour permettre la corrélation cross-source. Deux standards dominent :
ECS (Elastic Common Schema) : utilisé par la stack Elastic, définit des champs standardisés (
event.action,source.ip,user.name,process.name)CEF (Common Event Format) : format ArcSight, largement supporté par les équipements de sécurité
3. Corrélation : les règles de corrélation agrègent des événements individuels pour détecter des patterns d’attaque — par exemple, 5 échecs d’authentification suivis d’un succès depuis la même IP en moins de 60 secondes.
4. Alerting : les règles déclenchées génèrent des alertes avec un niveau de sévérité, un score de confiance, et des informations contextuelles enrichies (réputation IP, géolocalisation, threat intel).
5. Rétention : les logs bruts sont conservés pour les investigations forensiques. Les durées légales varient (RGPD, NIS2) : typiquement 90 jours à 1 an en hot storage, 3–7 ans en cold storage.
Elastic SIEM#
La stack Elastic (anciennement ELK) est devenue une référence open-source pour les SIEM. Son architecture comprend :
Elasticsearch : moteur de stockage et d’indexation distribué, optimisé pour les recherches full-text et les agrégations temporelles
Kibana : interface de visualisation, tableau de bord, et module SIEM (Security)
Logstash / Beats : pipeline d’ingestion et agents de collecte
Fleet : gestion centralisée des agents Elastic
Types de règles Elastic SIEM :
KQL (Kibana Query Language) : règles basées sur des requêtes de filtrage simples
EQL (Event Query Language) : règles de séquences d’événements avec contexte temporel
ML (Machine Learning) : détection d’anomalies par modèles statistiques (jobs ML Elastic)
Threshold : déclenchement après N occurrences dans une fenêtre temporelle
Splunk#
Splunk est le SIEM commercial le plus déployé en entreprise. Son langage de requête SPL (Search Processing Language) est expressif et puissant.
Requêtes SPL — Exemples (non exécutables)
Ces requêtes SPL sont illustratives. Elles nécessitent une instance Splunk configurée avec les sources de données appropriées.
| Détection de brute-force SSH
index=os_linux sourcetype=syslog "Failed password"
| stats count as tentatives by src_ip, user
| where tentatives > 10
| sort -tentatives
| Détection d'exfiltration DNS (volume anormal)
index=network sourcetype=dns
| bucket _time span=1h
| stats count as nb_requetes, dc(query) as domaines_uniques by src_ip, _time
| where nb_requetes > 1000 AND domaines_uniques > 500
| eval score_suspicion = nb_requetes / domaines_uniques
| sort -score_suspicion
| Corrélation : utilisateur se connectant depuis plusieurs pays en < 1h
index=auth sourcetype=azure_ad action=success
| transaction user maxspan=1h
| where mvcount(src_country) > 1
| table user, src_country, _time, src_ip
### Wazuh
**Wazuh** est un SIEM/XDR open-source basé sur OSSEC. Son architecture est composée d'un **manager** central et d'**agents** déployés sur les endpoints.
```{admonition} Configuration Wazuh — Decoder et règle XML (non exécutables)
:class: warning
Exemple de decoder Wazuh pour parser des logs applicatifs personnalisés et de règle de détection associée.
```xml
<!-- Decoder pour logs applicatifs custom -->
<decoder name="app_custom">
<prematch>^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}</prematch>
<regex>^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}) (\w+) (\d+\.\d+\.\d+\.\d+) (\w+) (\S+)</regex>
<order>timestamp, action, srcip, user, url</order>
</decoder>
<!-- Règle de détection brute-force (10 échecs en 2 minutes) -->
<rule id="100200" level="10" frequency="10" timeframe="120">
<if_matched_sid>100100</if_matched_sid>
<description>Brute-force détecté : 10 échecs en 2 minutes</description>
<group>authentication_failures,brute_force,</group>
<mitre>
<id>T1110</id>
</mitre>
</rule>
## Sigma Rules
**Sigma** est un langage générique de règles de détection pour SIEM. Une règle Sigma décrit un pattern de détection indépendamment du SIEM cible ; l'outil **pySigma** (ou l'ancien `sigmac`) la convertit vers SPL, KQL, EQL, ou d'autres langages.
```{admonition} Règles Sigma — Exemples YAML (non exécutables)
:class: warning
```yaml
title: Mimikatz via Command Line
id: 06d71506-7beb-4f22-8888-e2e5e2ca7fd8
status: stable
description: Détecte l'exécution de Mimikatz via des arguments de ligne de commande connus
references:
- https://attack.mitre.org/software/S0002/
author: Exemple pédagogique
date: 2024-01-01
tags:
- attack.credential_access
- attack.t1003.001
logsource:
category: process_creation
product: windows
detection:
selection:
CommandLine|contains:
- 'sekurlsa::logonpasswords'
- 'lsadump::dcsync'
- 'privilege::debug'
- 'kerberos::list'
condition: selection
falsepositives:
- Activité de test de sécurité autorisée
level: critical
---
title: DNS Tunneling Suspect
id: a1b2c3d4-e5f6-7890-abcd-ef1234567890
status: experimental
logsource:
category: dns
detection:
selection:
query_length|gt: 50
query|contains:
- '.dnscat.'
- '.iodine.'
filter:
query|endswith:
- '.microsoft.com'
- '.windows.com'
condition: selection and not filter
level: high
```{code-cell} python
# Moteur Sigma simplifié : évaluation de règles sur des logs synthétiques
@staticmethod
def correspondance_field(valeur, condition):
"""Vérifie si une valeur de log satisfait une condition Sigma."""
if isinstance(condition, list):
return any(correspondance_field_simple(valeur, c) for c in condition)
return correspondance_field_simple(valeur, condition)
def correspondance_field_simple(valeur, condition):
if valeur is None:
return False
valeur_str = str(valeur).lower()
if isinstance(condition, str):
return condition.lower() in valeur_str
return False
class RegleSigma:
"""Implémentation simplifiée d'une règle Sigma."""
def __init__(self, titre, niveau, detection_fields, condition_fn, description=""):
self.titre = titre
self.niveau = niveau
self.detection_fields = detection_fields # {champ: valeurs_attendues}
self.condition_fn = condition_fn # callable(matchs: dict) -> bool
self.description = description
def evaluer(self, event: dict) -> bool:
matchs = {}
for champ, attendus in self.detection_fields.items():
valeur_event = event.get(champ, "")
if isinstance(attendus, list):
matchs[champ] = any(
str(a).lower() in str(valeur_event).lower() for a in attendus
)
elif isinstance(attendus, dict) and "gt" in attendus:
try:
matchs[champ] = float(valeur_event) > float(attendus["gt"])
except (ValueError, TypeError):
matchs[champ] = False
else:
matchs[champ] = str(attendus).lower() in str(valeur_event).lower()
return self.condition_fn(matchs)
# Définition des règles Sigma
regles_sigma = [
RegleSigma(
titre="Mimikatz via Command Line",
niveau="critical",
detection_fields={
"CommandLine": ["sekurlsa::logonpasswords", "lsadump::dcsync",
"privilege::debug", "kerberos::list"],
},
condition_fn=lambda m: m.get("CommandLine", False),
description="Exécution de Mimikatz détectée via arguments CLI",
),
RegleSigma(
titre="DNS Tunneling Suspect",
niveau="high",
detection_fields={
"dns_query": [".dnscat.", ".iodine.", "aaaaa", "bbbbbbbb"],
"query_length": {"gt": 40},
},
condition_fn=lambda m: m.get("dns_query", False) or m.get("query_length", False),
description="Requête DNS anormalement longue ou avec pattern de tunneling",
),
]
# Logs synthétiques à analyser
random.seed(42)
logs_synthetiques = [
{"timestamp": "2024-03-15T08:12:00Z", "host": "WIN-DC01", "user": "SYSTEM",
"process": "powershell.exe",
"CommandLine": "powershell -ep bypass Invoke-WebRequest http://legit.corp/script.ps1",
"dns_query": "corp.alkimya.fr", "query_length": 18},
{"timestamp": "2024-03-15T09:34:21Z", "host": "WIN-WS03", "user": "bob",
"process": "mimikatz.exe",
"CommandLine": "mimikatz.exe sekurlsa::logonpasswords exit",
"dns_query": "outlook.microsoft.com", "query_length": 25},
{"timestamp": "2024-03-15T10:01:55Z", "host": "WIN-WS07", "user": "alice",
"process": "nslookup.exe",
"CommandLine": "nslookup -type=TXT alkimya.fr",
"dns_query": "aHR0cHM6Ly9tYWxpY2lvdXMuZXhhbXBsZS5jb20=.aaaaaaa.dnscat.evil.com",
"query_length": 68},
{"timestamp": "2024-03-15T10:45:00Z", "host": "WIN-DC01", "user": "admin",
"process": "powershell.exe",
"CommandLine": "powershell lsadump::dcsync /domain:alkimya.fr /user:krbtgt",
"dns_query": "dc1.alkimya.fr", "query_length": 16},
{"timestamp": "2024-03-15T11:20:33Z", "host": "LIN-SRV02", "user": "www-data",
"process": "python3",
"CommandLine": "python3 /var/www/html/shell.py --port 4444",
"dns_query": "cdn.cloudflare.net", "query_length": 20},
]
print("=" * 65)
print(" MOTEUR SIGMA SIMPLIFIÉ — ANALYSE DE LOGS")
print("=" * 65)
resultats_sigma = []
for log in logs_synthetiques:
ts = log["timestamp"]
host = log.get("host", "?")
alertes_log = []
for regle in regles_sigma:
if regle.evaluer(log):
alertes_log.append(regle.titre)
print(f"\n[ALERTE] {regle.niveau.upper()} — {regle.titre}")
print(f" Hôte : {host} | Timestamp : {ts}")
print(f" Description : {regle.description}")
if log.get("CommandLine"):
print(f" CommandLine : {log['CommandLine']}")
if log.get("dns_query") and len(log.get("dns_query", "")) > 30:
print(f" DNS query : {log['dns_query'][:60]}...")
resultats_sigma.append({
"timestamp": ts,
"host": host,
"regle": regle.titre,
"niveau": regle.niveau,
"alerte": regle.evaluer(log),
})
total_alertes = sum(1 for r in resultats_sigma if r["alerte"])
print(f"\nTotal : {total_alertes} alerte(s) sur {len(logs_synthetiques)} événements analysés")
Pipeline SIEM — Simulation complète#
# Simulation d'un pipeline SIEM : ingestion → corrélation → alertes
random.seed(2024)
def generer_logs_ecs(n=200):
"""Génère des logs synthétiques au format ECS simplifié."""
ips_internes = [f"10.0.{random.randint(1,5)}.{random.randint(10,250)}" for _ in range(8)]
ips_externes = [f"185.{random.randint(1,254)}.{random.randint(1,254)}.{random.randint(1,254)}" for _ in range(5)]
utilisateurs = ["alice", "bob", "carol", "admin", "svc_backup", "svc_web"]
domaines_dns = (
[f"normal{i}.alkimya.fr" for i in range(20)] +
[f"a{'b'*random.randint(20, 45)}.evil{i}.io" for i in range(10)]
)
logs = []
t_base = datetime(2024, 3, 15, 7, 0, 0)
for i in range(n):
t = t_base + timedelta(seconds=random.randint(0, 28800))
event_type = random.choices(
["auth_fail", "auth_success", "dns_query", "http_request", "process_create"],
weights=[0.25, 0.20, 0.30, 0.15, 0.10]
)[0]
base = {
"@timestamp": t.isoformat() + "Z",
"event.category": event_type,
"host.name": random.choice([f"WIN-WS{i:02d}" for i in range(1, 6)] +
[f"LIN-SRV{i:02d}" for i in range(1, 4)]),
"source.ip": random.choice(ips_internes + ips_externes),
"user.name": random.choice(utilisateurs),
}
if event_type == "auth_fail":
base["event.outcome"] = "failure"
base["event.action"] = "logon_failed"
elif event_type == "auth_success":
base["event.outcome"] = "success"
base["event.action"] = "logon_success"
elif event_type == "dns_query":
base["dns.question.name"] = random.choice(domaines_dns)
base["dns.question.type"] = "A"
elif event_type == "http_request":
base["http.request.method"] = random.choice(["GET", "POST", "PUT"])
base["url.path"] = random.choice(["/api/v1/data", "/admin", "/login", "/export"])
base["http.response.status_code"] = random.choice([200, 200, 200, 403, 404, 500])
elif event_type == "process_create":
base["process.name"] = random.choice([
"powershell.exe", "cmd.exe", "python3", "bash",
"mimikatz.exe", "nmap", "nc"
])
logs.append(base)
return sorted(logs, key=lambda x: x["@timestamp"])
def parser_timestamp(ts_str):
return datetime.fromisoformat(ts_str.replace("Z", ""))
# --- Règles de corrélation ---
def regle_brute_force(logs: list, fenetre_secondes=300, seuil=5) -> list:
"""Détecte les tentatives de brute-force : N échecs d'auth depuis la même IP."""
echecs_par_ip = defaultdict(list)
for log in logs:
if log.get("event.category") == "auth_fail":
ip = log.get("source.ip", "")
t = parser_timestamp(log["@timestamp"])
echecs_par_ip[ip].append(t)
alertes = []
for ip, timestamps in echecs_par_ip.items():
timestamps.sort()
for i in range(len(timestamps)):
fenetre = [t for t in timestamps[i:] if (t - timestamps[i]).seconds <= fenetre_secondes]
if len(fenetre) >= seuil:
alertes.append({
"regle": "Brute-Force Auth",
"severite": "High",
"source.ip": ip,
"timestamp": timestamps[i].isoformat(),
"details": f"{len(fenetre)} échecs en {fenetre_secondes}s",
})
break
return alertes
def regle_exfiltration_dns(logs: list, seuil_longueur=35) -> list:
"""Détecte les requêtes DNS avec noms de domaine anormalement longs (tunneling)."""
alertes = []
vus = set()
for log in logs:
if log.get("event.category") == "dns_query":
domaine = log.get("dns.question.name", "")
if len(domaine) > seuil_longueur and domaine not in vus:
vus.add(domaine)
alertes.append({
"regle": "DNS Exfiltration Suspect",
"severite": "Medium",
"source.ip": log.get("source.ip"),
"timestamp": log["@timestamp"],
"details": f"Domaine de {len(domaine)} chars : {domaine[:50]}...",
})
return alertes
def regle_connexion_suspecte(logs: list) -> list:
"""Détecte les connexions réussies depuis des IP externes inattendues."""
alertes = []
ips_externes = {l["source.ip"] for l in logs if l["source.ip"].startswith("185.")}
for log in logs:
if (log.get("event.category") == "auth_success"
and log.get("source.ip") in ips_externes):
alertes.append({
"regle": "Connexion Externe Suspecte",
"severite": "High",
"source.ip": log["source.ip"],
"timestamp": log["@timestamp"],
"details": f"Authentification réussie depuis IP externe pour {log.get('user.name')}",
})
return alertes[:5] # Limiter le bruit
# --- Pipeline ---
logs = generer_logs_ecs(200)
print(f"Logs ingérés : {len(logs)}")
print(f"Période : {logs[0]['@timestamp']} → {logs[-1]['@timestamp']}")
print(f"Répartition par catégorie :")
comptage_cat = Counter(l["event.category"] for l in logs)
for cat, n in sorted(comptage_cat.items()):
print(f" {cat:<20} : {n}")
# Application des règles de corrélation
toutes_alertes = []
toutes_alertes.extend(regle_brute_force(logs))
toutes_alertes.extend(regle_exfiltration_dns(logs))
toutes_alertes.extend(regle_connexion_suspecte(logs))
print(f"\nAlertes générées par le SIEM : {len(toutes_alertes)}")
for alerte in toutes_alertes:
print(f"\n [{alerte['severite']}] {alerte['regle']}")
print(f" IP source : {alerte.get('source.ip', 'N/A')}")
print(f" Timestamp : {alerte['timestamp']}")
print(f" Détails : {alerte['details']}")
Logs ingérés : 200
Période : 2024-03-15T07:00:01Z → 2024-03-15T14:57:53Z
Répartition par catégorie :
auth_fail : 62
auth_success : 38
dns_query : 63
http_request : 23
process_create : 14
Alertes générées par le SIEM : 13
[Medium] DNS Exfiltration Suspect
IP source : 185.20.188.199
Timestamp : 2024-03-15T07:03:47Z
Détails : Domaine de 50 chars : abbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb.evil4.io...
[Medium] DNS Exfiltration Suspect
IP source : 10.0.2.237
Timestamp : 2024-03-15T07:17:10Z
Détails : Domaine de 54 chars : abbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb.evil...
[Medium] DNS Exfiltration Suspect
IP source : 10.0.4.144
Timestamp : 2024-03-15T08:00:15Z
Détails : Domaine de 53 chars : abbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb.evil1...
[Medium] DNS Exfiltration Suspect
IP source : 185.222.53.177
Timestamp : 2024-03-15T08:16:33Z
Détails : Domaine de 36 chars : abbbbbbbbbbbbbbbbbbbbbbbbbb.evil7.io...
[Medium] DNS Exfiltration Suspect
IP source : 10.0.4.144
Timestamp : 2024-03-15T10:17:10Z
Détails : Domaine de 44 chars : abbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb.evil2.io...
[Medium] DNS Exfiltration Suspect
IP source : 10.0.5.87
Timestamp : 2024-03-15T10:21:54Z
Détails : Domaine de 43 chars : abbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb.evil8.io...
[Medium] DNS Exfiltration Suspect
IP source : 10.0.4.144
Timestamp : 2024-03-15T10:43:19Z
Détails : Domaine de 52 chars : abbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb.evil3....
[Medium] DNS Exfiltration Suspect
IP source : 185.56.80.140
Timestamp : 2024-03-15T13:03:46Z
Détails : Domaine de 47 chars : abbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb.evil6.io...
[High] Connexion Externe Suspecte
IP source : 185.222.53.177
Timestamp : 2024-03-15T07:29:03Z
Détails : Authentification réussie depuis IP externe pour carol
[High] Connexion Externe Suspecte
IP source : 185.20.188.199
Timestamp : 2024-03-15T07:43:53Z
Détails : Authentification réussie depuis IP externe pour admin
[High] Connexion Externe Suspecte
IP source : 185.222.53.177
Timestamp : 2024-03-15T08:02:52Z
Détails : Authentification réussie depuis IP externe pour svc_web
[High] Connexion Externe Suspecte
IP source : 185.187.158.247
Timestamp : 2024-03-15T08:05:07Z
Détails : Authentification réussie depuis IP externe pour svc_web
[High] Connexion Externe Suspecte
IP source : 185.222.53.177
Timestamp : 2024-03-15T09:19:23Z
Détails : Authentification réussie depuis IP externe pour admin
Threat hunting proactif#
Méthodologie#
Le threat hunting suit un cycle en cinq étapes :
Hypothèse : formuler une hypothèse précise basée sur les renseignements sur les menaces (TI), les TTPs ATT&CK, ou des anomalies observées. Ex. : « Un compte de service effectue des requêtes DNS vers des domaines nouvellement enregistrés en dehors des heures ouvrées. »
Données : identifier les sources de données nécessaires à la validation (logs EDR, DNS, authentification) et vérifier leur disponibilité et complétude.
Investigation : requêter les données, visualiser les distributions, rechercher des déviations par rapport à la baseline.
Validation : confirmer ou infirmer l’hypothèse. Si confirmée : escalader en incident. Si infirmée : documenter et améliorer la baseline.
Documentation : enrichir les playbooks, créer des règles SIEM/YARA/Sigma pour détecter automatiquement ce pattern à l’avenir.
Threat Intelligence#
STIX 2.1 (Structured Threat Information eXpression) est le format standard pour partager des renseignements sur les menaces. Il définit 18 types d’objets (STIX Domain Objects) :
indicator: pattern de détection (IP, hash, URL, expression STIX)malware: description d’un logiciel malveillantattack-pattern: TTP spécifique (lié à ATT&CK)campaign: série d’attaques liéesthreat-actor: groupe ou individu derrière les attaquescourse-of-action: contre-mesure recommandée
TAXII 2.1 (Trusted Automated eXchange of Intelligence Information) est le protocole HTTP de transport des objets STIX entre organisations.
MISP (Malware Information Sharing Platform) est la plateforme open-source de partage d’IoC la plus utilisée. Elle supporte STIX/TAXII et offre des fonctionnalités de corrélation automatique entre événements.
UEBA#
L”UEBA (User and Entity Behavior Analytics) applique des modèles statistiques et du machine learning pour détecter les déviations par rapport au comportement normal (baseline) des utilisateurs et des entités (serveurs, comptes de service).
Exemple de baseline utilisateur : connexion depuis Paris, heure 8h–19h, poste de travail habituel, accès aux ressources RH.
Signaux de déviation : connexion à 3h du matin, depuis un pays différent, accès à des ressources financières inhabituelles. Chaque déviation incrémente un score de risque ; au-delà d’un seuil, une alerte UEBA est générée.
# Visualisation threat hunting : timeline d'événements suspects + clustering d'IoC
random.seed(3333)
np.random.seed(3333)
# --- Partie 1 : Timeline d'événements suspects ---
categories_evt = ["Auth suspecte", "DNS anormal", "Proc. suspect", "Connexion C2", "Exfiltration"]
couleurs_evt = ["#e74c3c", "#f39c12", "#9b59b6", "#2980b9", "#27ae60"]
date_debut_hunt = datetime(2024, 3, 15, 0, 0, 0)
evenements_suspects = []
for _ in range(45):
t = date_debut_hunt + timedelta(hours=random.uniform(0, 24))
cat_idx = random.choices(range(5), weights=[0.3, 0.25, 0.2, 0.15, 0.1])[0]
evenements_suspects.append({
"timestamp": t,
"categorie": categories_evt[cat_idx],
"couleur": couleurs_evt[cat_idx],
"score": random.uniform(0.3, 1.0),
})
evenements_suspects.sort(key=lambda x: x["timestamp"])
# --- Partie 2 : Clustering d'IoC ---
categories_ioc = ["IP C2", "Domaine malveillant", "Hash malware", "URL phishing", "E-mail spoofé"]
couleurs_ioc = ["#c0392b", "#e67e22", "#8e44ad", "#16a085", "#2c3e50"]
ioc_data = []
for i, (cat, col) in enumerate(zip(categories_ioc, couleurs_ioc)):
n = random.randint(8, 20)
cx, cy = random.uniform(-3, 3), random.uniform(-3, 3)
for _ in range(n):
ioc_data.append({
"categorie": cat,
"couleur": col,
"x": cx + random.gauss(0, 0.8),
"y": cy + random.gauss(0, 0.8),
"score_risque": random.uniform(0.4, 1.0),
})
df_ioc = pd.DataFrame(ioc_data)
# --- Visualisation ---
sns.set_theme(style="whitegrid", palette="muted", font_scale=1.0)
fig, axes = plt.subplots(1, 2, figsize=(16, 6))
# Timeline
heures = [e["timestamp"].hour + e["timestamp"].minute / 60 for e in evenements_suspects]
scores = [e["score"] for e in evenements_suspects]
couleurs_pts = [e["couleur"] for e in evenements_suspects]
axes[0].scatter(heures, scores, c=couleurs_pts, s=80, alpha=0.8, edgecolors="white", linewidth=0.5)
axes[0].set_xlabel("Heure de la journée (H)")
axes[0].set_ylabel("Score de suspicion")
axes[0].set_title("Timeline d'événements suspects\n(threat hunting — 24h)", fontsize=11, pad=10)
axes[0].set_xlim(0, 24)
axes[0].set_xticks(range(0, 25, 3))
axes[0].axhline(0.75, color="#e74c3c", linestyle="--", alpha=0.7, linewidth=1.2)
axes[0].text(0.5, 0.77, "Seuil d'alerte (0.75)", color="#e74c3c", fontsize=8, transform=axes[0].get_yaxis_transform())
legende_evt = [mpatches.Patch(color=c, label=cat) for cat, c in zip(categories_evt, couleurs_evt)]
axes[0].legend(handles=legende_evt, fontsize=8, loc="upper left")
# Clustering IoC
for cat, col in zip(categories_ioc, couleurs_ioc):
sous_df = df_ioc[df_ioc["categorie"] == cat]
axes[1].scatter(sous_df["x"], sous_df["y"],
c=col, s=sous_df["score_risque"] * 120,
alpha=0.75, edgecolors="white", linewidth=0.5, label=cat)
axes[1].set_title("Clustering d'IoC par catégorie\n(threat intelligence)", fontsize=11, pad=10)
axes[1].set_xlabel("Dimension analytique 1")
axes[1].set_ylabel("Dimension analytique 2")
axes[1].legend(fontsize=8, loc="upper right", markerscale=0.8)
plt.suptitle("Threat Hunting — Visualisation des menaces détectées", fontsize=13, y=1.01)
plt.show()
print(f"\nStatistiques threat hunting :")
print(f" Événements analysés : {len(evenements_suspects)}")
print(f" Événements > seuil (0.75) : {sum(1 for e in evenements_suspects if e['score'] > 0.75)}")
print(f" IoC identifiés : {len(df_ioc)}")
print(f" Catégories d'IoC : {df_ioc['categorie'].nunique()}")
print(f" Score risque moyen : {df_ioc['score_risque'].mean():.2f}")
Statistiques threat hunting :
Événements analysés : 45
Événements > seuil (0.75) : 20
IoC identifiés : 79
Catégories d'IoC : 5
Score risque moyen : 0.68
Résumé#
Architecture SIEM : le pipeline collecte → normalisation (ECS/CEF) → corrélation → alerting → rétention centralise les événements de l’infrastructure pour détecter des patterns d’attaque. La normalisation en schéma unifié est le prérequis à toute corrélation cross-source efficace.
Elastic SIEM : la stack Elastic supporte quatre types de règles (KQL, EQL, ML, Threshold) permettant de couvrir aussi bien les menaces à signature connue que les anomalies comportementales. L’ECS standardise les noms de champs pour faciliter l’écriture de règles portables.
Splunk : le SPL (Search Processing Language) est un langage de requête puissant permettant des corrélations complexes, des aggregations temporelles, et des lookups d’enrichissement. Il reste la référence dans les grandes entreprises.
Wazuh : plateforme open-source SIEM/XDR basée sur OSSEC, avec une architecture manager/agent et une configuration par rules XML et decoders. La réponse active permet d’automatiser des contre-mesures (blocage IP, kill de processus).
Sigma : langage de règles générique qui abstrait le SIEM cible. Une règle Sigma unique peut être convertie vers SPL, KQL, ou EQL via pySigma, maximisant la portabilité des détections entre organisations.
Threat hunting : processus en cinq étapes (hypothèse → données → investigation → validation → documentation) pour détecter proactivement des menaces non couvertes par les règles automatiques. La qualité de la baseline et la disponibilité des logs sont les facteurs limitants.
Threat intelligence : STIX 2.1 standardise la représentation des renseignements sur les menaces (indicators, malware, attack-patterns, campaigns) ; TAXII 2.1 en assure le transport ; MISP facilite le partage communautaire et la corrélation automatique d’IoC.
UEBA : la détection comportementale basée sur des baselines et des scores de déviation permet de détecter des compromissions de comptes légitimes et des menaces internes que les règles à signature ne peuvent pas identifier.