Complete walkthrough of the HomeSOC detection pipeline — Suricata IDS, custom rules, Python alert engine, VirusTotal enrichment, and Telegram notifications.
eve.json.Run on the NUC:
sudo dnf install -y suricata # Update community ruleset sudo suricata-update # Enable and start sudo systemctl enable --now suricata sudo systemctl status suricata --no-pager | grep Active # Verify it's capturing traffic sudo tail -f /var/log/suricata/suricata.log
Engine started in the suricata.log output.
sudo nano /etc/suricata/suricata.yaml
af-packet: - interface: wlp2s0
outputs:
- eve-log:
enabled: yes
filetype: regular
filename: /var/log/suricata/eve.json
types:
- alert:
payload: yes
payload-printable: yes
metadata: yes
- dns
- http:
extended: yes
- tls:
extended: yes
- ssh
- flow
vars:
address-groups:
HOME_NET: "[192.168.1.0/24]"
EXTERNAL_NET: "!$HOME_NET"
sudo systemctl restart suricata
Custom rules live in rules/local.rules and are loaded by Suricata automatically.
sudo nano /etc/suricata/rules/local.rules
alert icmp any any -> $HOME_NET any (msg:"ICMP Ping Sweep Detected"; \ threshold: type threshold, track by_src, count 5, seconds 2; \ sid:9000001; rev:1;)
alert tcp any any -> $HOME_NET any (msg:"TCP Port Scan Detected"; \ flags:S; threshold: type threshold, track by_src, count 20, seconds 5; \ sid:9000002; rev:1;)
alert tcp $HOME_NET any -> any [4444,5555,6666,7777,8888,9999] \ (msg:"Outbound Connection to Common Reverse Shell Port"; \ sid:9000003; rev:1;)
alert dns any any -> any any (msg:"Possible DNS Exfiltration - Long Query"; \
dns.query; content:"."; pcre:"/^.{50,}/"; \
sid:9000004; rev:1;)
rule-files: - suricata.rules - local.rules
sudo kill -USR2 $(cat /var/run/suricata.pid)
0 3 * * * /usr/bin/suricata-update && kill -USR2 $(cat /var/run/suricata.pid)
Loki collects logs from all three sources via Grafana Alloy and displays them as a live dashboard in Grafana.
cd /tmp
wget https://github.com/grafana/loki/releases/latest/download/loki-linux-amd64.zip
unzip loki-linux-amd64.zip
sudo mv loki-linux-amd64 /usr/local/bin/loki
sudo chmod +x /usr/local/bin/loki
sudo chcon -t bin_t /usr/local/bin/loki # fix SELinux on Fedora
sudo mkdir -p /etc/loki /var/loki/{chunks,rules,tsdb}
auth_enabled: false
server:
http_listen_port: 3100
log_level: info
common:
path_prefix: /var/loki
storage:
filesystem:
chunks_directory: /var/loki/chunks
rules_directory: /var/loki/rules
replication_factor: 1
ring:
instance_addr: 127.0.0.1
kvstore:
store: inmemory
schema_config:
configs:
- from: 2024-01-01
store: tsdb
object_store: filesystem
schema: v13
index:
prefix: index_
period: 24h
limits_config:
reject_old_samples: true
reject_old_samples_max_age: 168h
allow_structured_metadata: false
[Unit] Description=Loki log aggregation After=network.target [Service] ExecStart=/usr/local/bin/loki -config.file=/etc/loki/loki-config.yaml Restart=always RestartSec=5 User=root [Install] WantedBy=multi-user.target
sudo systemctl daemon-reload
sudo systemctl enable --now loki
curl http://localhost:3100/ready # should return: ready
Alloy replaces Promtail in Loki v3. Ships logs from all three sources into Loki.
[grafana] name=grafana baseurl=https://rpm.grafana.com repo_gpgcheck=1 enabled=1 gpgcheck=1 gpgkey=https://rpm.grafana.com/gpg.key sslverify=1 sslcacert=/etc/pki/tls/certs/ca-bundle.crt
sudo dnf makecache
sudo dnf install -y alloy
sudo chcon -t bin_t /usr/bin/alloy # fix SELinux
// Loki destination loki.write "local_loki" { endpoint { url = "http://localhost:3100/loki/api/v1/push" } } // SOURCE 1: Suricata IDS local.file_match "suricata_files" { path_targets = [{ __path__ = "/var/log/suricata/eve.json", job = "suricata", source = "suricata-ids", }] } loki.source.file "suricata" { targets = local.file_match.suricata_files.targets forward_to = [loki.process.suricata_parse.receiver] } loki.process "suricata_parse" { stage.json { expressions = { event_type = "event_type", severity = "alert.severity", src_ip = "src_ip", dest_ip = "dest_ip", } } stage.labels { values = { event_type = "event_type", severity = "severity", } } forward_to = [loki.write.local_loki.receiver] } // SOURCE 2: Wazuh HIDS local.file_match "wazuh_files" { path_targets = [{ __path__ = "/var/ossec/logs/alerts/alerts.json", job = "wazuh", source = "wazuh-hids", }] } loki.source.file "wazuh" { targets = local.file_match.wazuh_files.targets forward_to = [loki.process.wazuh_parse.receiver] } loki.process "wazuh_parse" { stage.json { expressions = { rule_level = "rule.level", rule_desc = "rule.description", agent = "agent.name", } } stage.labels { values = { rule_level = "rule_level", agent = "agent", } } forward_to = [loki.write.local_loki.receiver] } // SOURCE 3: Python alert engine local.file_match "python_files" { path_targets = [{ __path__ = "/var/log/suricata-telegram/forwarder.log", job = "python-engine", source = "python-alerts", }] } loki.source.file "python_engine" { targets = local.file_match.python_files.targets forward_to = [loki.write.local_loki.receiver] }
sudo usermod -aG adm alloy sudo chmod o+r /var/log/suricata/eve.json sudo chmod o+rx /var/ossec/logs/alerts/ sudo chmod o+r /var/ossec/logs/alerts/alerts.json sudo systemctl enable --now alloy
sudo dnf install -y grafana sudo systemctl enable --now grafana-server sudo firewall-cmd --permanent --add-port=3000/tcp sudo firewall-cmd --reload
http://<server-ip>:3000 — Default login: admin / admin
http://localhost:3100| Panel | LogQL Query | Type |
|---|---|---|
| Alert rate over time | count_over_time({job="suricata", event_type="alert"}[5m]) | Time series |
| Live Suricata alerts | {job="suricata", event_type="alert"} | Logs |
| Wazuh high severity | {job="wazuh"} | json | rule_level >= 10 | Logs |
| Python engine activity | {job="python-engine"} | Logs |
The alert engine tails Suricata's eve.json directly, filters by severity, enriches with VirusTotal, and forwards to Telegram.
src/ ├── alert_engine.py # Main engine — tails eve.json ├── vt_lookup.py # VirusTotal IP reputation lookup └── telegram_bot.py # Telegram notification sender
pip3 install requests python-dotenv
#!/usr/bin/env python3
"""HomeSOC Alert Engine — tails Suricata eve.json and forwards critical alerts."""
import json, time, os
from datetime import datetime
from dotenv import load_dotenv
from vt_lookup import check_ip_reputation
from telegram_bot import send_alert
load_dotenv()
EVE_LOG = os.getenv("SURICATA_EVE_LOG", "/var/log/suricata/eve.json")
MIN_SEV = int(os.getenv("MIN_SEVERITY", "2")) # 1=critical, 2=high, 3=medium, 4=low
def now():
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def tail_file(filepath):
while not os.path.exists(filepath):
print(f"[{now()}] Waiting for {filepath}...")
time.sleep(5)
with open(filepath, "r") as f:
f.seek(0, 2)
current_inode = os.fstat(f.fileno()).st_ino
while True:
line = f.readline()
if line:
yield line.strip()
else:
time.sleep(0.5)
try:
if os.stat(filepath).st_ino != current_inode:
f = open(filepath, "r")
current_inode = os.stat(filepath).st_ino
except FileNotFoundError:
pass
def process_alert(event):
if event.get("event_type") != "alert":
return None
alert = event.get("alert", {})
severity = alert.get("severity", 99)
if severity > MIN_SEV:
return None
src_ip = event.get("src_ip", "unknown")
vt_result = None
if src_ip and not src_ip.startswith(("192.168.", "10.", "172.")):
vt_result = check_ip_reputation(src_ip)
return {
"severity": severity,
"signature": alert.get("signature", "Unknown rule"),
"category": alert.get("category", "Uncategorized"),
"sid": alert.get("signature_id", "?"),
"src_ip": src_ip,
"dest_ip": event.get("dest_ip", "unknown"),
"proto": event.get("proto", "unknown"),
"timestamp": event.get("timestamp", now()),
"vt_result": vt_result,
}
def main():
print(f"[{now()}] HomeSOC Alert Engine started — watching {EVE_LOG}")
for line in tail_file(EVE_LOG):
event = json.loads(line) if line else None
if not event: continue
processed = process_alert(event)
if not processed: continue
print(f"[{now()}] ALERT SEV-{processed['severity']} — {processed['signature']}")
send_alert(processed)
if __name__ == "__main__":
main()
#!/usr/bin/env python3
import os, requests
VT_API_KEY = os.getenv("VIRUSTOTAL_API_KEY")
VT_URL = "https://www.virustotal.com/api/v3/ip_addresses/{}"
def check_ip_reputation(ip):
if not VT_API_KEY: return None
try:
r = requests.get(VT_URL.format(ip), headers={"x-apikey": VT_API_KEY}, timeout=5)
if r.status_code != 200: return None
stats = r.json()["data"]["attributes"]["last_analysis_stats"]
malicious = stats.get("malicious", 0)
total = sum(stats.values())
return {"ip": ip, "malicious": malicious, "total": total, "score": f"{malicious}/{total}"}
except Exception:
return None
#!/usr/bin/env python3
import os, requests
BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")
CHAT_ID = os.getenv("TELEGRAM_CHAT_ID")
API_URL = f"https://api.telegram.org/bot{BOT_TOKEN}/sendMessage"
SEVERITY_LABELS = {1: "CRITICAL 🔴", 2: "HIGH 🟠", 3: "MEDIUM 🟡", 4: "LOW ⚪"}
def format_message(alert):
sev = SEVERITY_LABELS.get(alert["severity"], "UNKNOWN")
lines = [
f"<b>Suricata Alert — {sev}</b>",
f"",
f"<b>Rule:</b> <code>{alert['signature']}</code>",
f"<b>Category:</b> {alert['category']}",
f"<b>SID:</b> {alert['sid']}",
f"",
f"<b>Source:</b> <code>{alert['src_ip']}</code>",
f"<b>Destination:</b> <code>{alert['dest_ip']}</code>",
f"<b>Protocol:</b> {alert['proto']}",
f"<b>Time:</b> {alert['timestamp']}",
]
if alert.get("vt_result"):
lines.append(f"<b>VirusTotal:</b> {alert['vt_result']['score']} engines flagged")
return "\n".join(lines)
def send_alert(alert):
if not BOT_TOKEN or not CHAT_ID: return
try:
requests.post(API_URL, json={
"chat_id": CHAT_ID, "text": format_message(alert), "parse_mode": "HTML"
}, timeout=10)
except Exception as e:
print(f"Telegram send failed: {e}")
VIRUSTOTAL_API_KEY=your_key_herehttps://api.telegram.org/botYOUR_TOKEN/getUpdates"chat":{"id": — that number is your chat IDVIRUSTOTAL_API_KEY=your_key_here TELEGRAM_BOT_TOKEN=your_token_here TELEGRAM_CHAT_ID=your_chat_id_here
python3 -c "
from src.telegram_bot import send_alert
send_alert({
'severity': 1, 'signature': 'Test alert from HomeSOC',
'category': 'Test', 'sid': '9999999',
'src_ip': '1.2.3.4', 'dest_ip': '192.168.1.33',
'proto': 'TCP', 'timestamp': '2026-04-06T15:00:00',
'vt_result': {'score': '5/94'},
})
"
sudo python3 src/alert_engine.py
[Unit] Description=HomeSOC Python Alert Engine After=network.target suricata.service [Service] ExecStart=/usr/bin/python3 /home/katato/HomeSOC/src/alert_engine.py WorkingDirectory=/home/katato/HomeSOC EnvironmentFile=/home/katato/HomeSOC/.env Restart=always RestartSec=5 User=root [Install] WantedBy=multi-user.target
sudo systemctl daemon-reload
sudo systemctl enable --now homesoc-alerts
sudo systemctl status homesoc-alerts --no-pager | grep Active
# Check live logs
sudo journalctl -u homesoc-alerts -f
Five Python scripts test each layer of the SOC pipeline. Run them only against hosts you own.
scripts/ ├── severity_trigger.py # fires traffic for each severity level 1-4 ├── pipeline_check.py # checks each stage pass/fail ├── loki_severity_report.py # queries Loki, shows counts per severity ├── port_scanner.py # TCP port scan — triggers ET SCAN rules ├── ssh_brute.py # SSH brute force — triggers Suricata + Wazuh ├── web_attacks.py # SQLi, XSS, path traversal, bad user agents ├── wazuh_triggers.py # FIM changes, /etc/shadow, sudo failures └── run_all_tests.py # master runner — executes all in sequence
| Severity | Label | What it triggers |
|---|---|---|
| 1 | Critical | Meterpreter UA, shellcode pattern, EternalBlue SMB probe |
| 2 | High | SQL injection, SSH scan burst, FTP brute force |
| 3 | Medium | Nmap UA, Nikto UA, path traversal, DNS VERSION query |
| 4 | Low | curl UA, python-requests UA |
sudo python3 scripts/severity_trigger.py sudo tail -30 /var/log/suricata/fast.log
sudo python3 scripts/pipeline_check.py
SEV LABEL SURICATA LOKI STATUS 1 CRITICAL PASS PASS FULL PASS 2 HIGH PASS PASS FULL PASS 3 MEDIUM PASS FAIL PARTIAL 4 LOW FAIL FAIL NO ALERT
SURICATA: PASS but LOKI: FAIL — the problem is in Alloy. Check with sudo journalctl -u alloy -f
python3 scripts/loki_severity_report.py
sudo python3 scripts/run_all_tests.py
# 1. Suricata caught it sudo tail -20 /var/log/suricata/fast.log # 2. Alloy shipped it to Loki sudo journalctl -u alloy -n 20 --no-pager # 3. Loki indexed it curl http://localhost:3100/loki/api/v1/labels # 4. Python engine processed it sudo journalctl -u homesoc-alerts -n 20 --no-pager # 5. Check your phone for Telegram alert
We test the detection pipeline using safe samples from MalwareBazaar inside an isolated VirtualBox VM.
# Suricata network alerts in real time sudo tail -f /var/log/suricata/fast.log # Full JSON event stream sudo tail -f /var/log/suricata/eve.json | python3 -m json.tool # Wazuh host alerts sudo tail -f /var/ossec/logs/alerts/alerts.log # Python engine processing sudo journalctl -u homesoc-alerts -f
Each sample should be documented in docs/analysis-reports/. Include:
Running Suricata on wlp2s0 means it only sees traffic to/from the NUC itself — not all traffic on the network. For full network visibility you need a managed switch with port mirroring, or Suricata in inline mode.
Loki v3 dropped boltdb-shipper + shared_store config fields and removed Promtail as a standalone binary. Use schema v13 with tsdb store, and use Grafana Alloy as the log shipper instead of Promtail.
Any binary placed in /usr/local/bin/ and run by systemd will be blocked by SELinux by default on Fedora. Fix with:
sudo chcon -t bin_t /usr/local/bin/loki sudo chcon -t bin_t /usr/bin/alloy
The Emerging Threats ruleset generates false positives on home network traffic. Common ones to suppress: DNS queries to *.local triggering DNS tunneling rules, and normal HTTPS traffic flagging certificate rules. Suppress false positives in /etc/suricata/threshold.conf.
Free tier allows 500 lookups/day. Stay within limits by only looking up external IPs (not RFC1918 ranges) on severity 1 or 2 alerts, and adding a simple dict cache keyed by IP to skip repeated lookups.