NODE_STATUS: ACTIVE
DOC: DETECTION_GUIDE // DOC-002
PLATFORM: FEDORA_42 // SURICATA // PYTHON
SECTIONS: 12
SYS_TIME: --:--:--
[ DOC-002 // DETECTION_GUIDE ] Author: Richard · v1.0.4
FIELD MANUAL | COMPLETE DETECTION WALKTHROUGH

Detection
Guide

Complete walkthrough of the HomeSOC detection pipeline — Suricata IDS, custom rules, Python alert engine, VirusTotal enrichment, and Telegram notifications.

Written by Richard · Detection Engineer
Suricata Python Detection Loki Grafana VirusTotal Telegram
[ TABLE OF CONTENTS ] 12 SECTIONS
01
[ LAYER: DETECTION // OVERVIEW ]

Overview

Layer 1 — Network (Suricata)
Watches all traffic on the NUC's interface. Detects malware signatures, port scans, and C2 beacons. Writes structured JSON alerts to eve.json.
Layer 2 — Host (Wazuh)
Set up by Kathlyn. Monitors both machines for login attempts, privilege escalation, file changes, and process anomalies.

Detection Engineer responsibilities

  • Install and tune Suricata IDS
  • Write custom detection rules
  • Build the Python engine that reads alerts and forwards critical ones
  • Connect to VirusTotal for IP reputation enrichment
  • Send real-time Telegram notifications

Data flow

Network traffic
Suricata
eve.json
Python engine
VirusTotal
Telegram 📱
02
[ LAYER: IDS // INSTALL ]

Suricata IDS Installation

Run on the NUC:

bash
sudo dnf install -y suricata

# Update community ruleset
sudo suricata-update

# Enable and start
sudo systemctl enable --now suricata
sudo systemctl status suricata --no-pager | grep Active

# Verify it's capturing traffic
sudo tail -f /var/log/suricata/suricata.log
You should see Engine started in the suricata.log output.
03
[ LAYER: IDS // CONFIG ]

Suricata Configuration

bash
sudo nano /etc/suricata/suricata.yaml

Set your network interface

/etc/suricata/suricata.yaml
af-packet:
  - interface: wlp2s0

Enable JSON output

/etc/suricata/suricata.yaml — outputs section
outputs:
  - eve-log:
      enabled: yes
      filetype: regular
      filename: /var/log/suricata/eve.json
      types:
        - alert:
            payload: yes
            payload-printable: yes
            metadata: yes
        - dns
        - http:
            extended: yes
        - tls:
            extended: yes
        - ssh
        - flow

Set home network range

/etc/suricata/suricata.yaml — vars section
vars:
  address-groups:
    HOME_NET: "[192.168.1.0/24]"
    EXTERNAL_NET: "!$HOME_NET"
bash — restart after changes
sudo systemctl restart suricata
04
[ LAYER: IDS // CUSTOM_RULES ]

Custom Detection Rules

Custom rules live in rules/local.rules and are loaded by Suricata automatically.

bash
sudo nano /etc/suricata/rules/local.rules

Rules written for this project

ICMP Ping Sweep — Network Reconnaissance
suricata rule
alert icmp any any -> $HOME_NET any (msg:"ICMP Ping Sweep Detected"; \
  threshold: type threshold, track by_src, count 5, seconds 2; \
  sid:9000001; rev:1;)
TCP Port Scan
suricata rule
alert tcp any any -> $HOME_NET any (msg:"TCP Port Scan Detected"; \
  flags:S; threshold: type threshold, track by_src, count 20, seconds 5; \
  sid:9000002; rev:1;)
Common Reverse Shell Ports
suricata rule
alert tcp $HOME_NET any -> any [4444,5555,6666,7777,8888,9999] \
  (msg:"Outbound Connection to Common Reverse Shell Port"; \
  sid:9000003; rev:1;)
DNS Exfiltration — Long Query
suricata rule
alert dns any any -> any any (msg:"Possible DNS Exfiltration - Long Query"; \
  dns.query; content:"."; pcre:"/^.{50,}/"; \
  sid:9000004; rev:1;)

Load custom rules

/etc/suricata/suricata.yaml — rule-files section
rule-files:
  - suricata.rules
  - local.rules
bash — reload without restart
sudo kill -USR2 $(cat /var/run/suricata.pid)

Auto-update community rules daily

crontab
0 3 * * * /usr/bin/suricata-update && kill -USR2 $(cat /var/run/suricata.pid)
05
[ LAYER: LOG_PIPELINE // LOKI_GRAFANA ]

Loki + Grafana Integration

Loki collects logs from all three sources via Grafana Alloy and displays them as a live dashboard in Grafana.

Stack overview

Suricata eve.json
Grafana Alloy
Loki :3100
Grafana :3000
Wazuh alerts.json
Grafana Alloy

Install Loki

bash
cd /tmp
wget https://github.com/grafana/loki/releases/latest/download/loki-linux-amd64.zip
unzip loki-linux-amd64.zip
sudo mv loki-linux-amd64 /usr/local/bin/loki
sudo chmod +x /usr/local/bin/loki
sudo chcon -t bin_t /usr/local/bin/loki   # fix SELinux on Fedora
sudo mkdir -p /etc/loki /var/loki/{chunks,rules,tsdb}
/etc/loki/loki-config.yaml
auth_enabled: false

server:
  http_listen_port: 3100
  log_level: info

common:
  path_prefix: /var/loki
  storage:
    filesystem:
      chunks_directory: /var/loki/chunks
      rules_directory: /var/loki/rules
  replication_factor: 1
  ring:
    instance_addr: 127.0.0.1
    kvstore:
      store: inmemory

schema_config:
  configs:
    - from: 2024-01-01
      store: tsdb
      object_store: filesystem
      schema: v13
      index:
        prefix: index_
        period: 24h

limits_config:
  reject_old_samples: true
  reject_old_samples_max_age: 168h
  allow_structured_metadata: false
/etc/systemd/system/loki.service
[Unit]
Description=Loki log aggregation
After=network.target

[Service]
ExecStart=/usr/local/bin/loki -config.file=/etc/loki/loki-config.yaml
Restart=always
RestartSec=5
User=root

[Install]
WantedBy=multi-user.target
bash — start and verify
sudo systemctl daemon-reload
sudo systemctl enable --now loki
curl http://localhost:3100/ready   # should return: ready

Install Grafana Alloy (log shipper)

Alloy replaces Promtail in Loki v3. Ships logs from all three sources into Loki.

ℹ️ Loki v3 removed Promtail as a standalone binary. Always use Grafana Alloy instead.
/etc/yum.repos.d/grafana.repo
[grafana]
name=grafana
baseurl=https://rpm.grafana.com
repo_gpgcheck=1
enabled=1
gpgcheck=1
gpgkey=https://rpm.grafana.com/gpg.key
sslverify=1
sslcacert=/etc/pki/tls/certs/ca-bundle.crt
bash
sudo dnf makecache
sudo dnf install -y alloy
sudo chcon -t bin_t /usr/bin/alloy   # fix SELinux

Configure Alloy

/etc/alloy/config.alloy
// Loki destination
loki.write "local_loki" {
  endpoint {
    url = "http://localhost:3100/loki/api/v1/push"
  }
}

// SOURCE 1: Suricata IDS
local.file_match "suricata_files" {
  path_targets = [{
    __path__ = "/var/log/suricata/eve.json",
    job      = "suricata",
    source   = "suricata-ids",
  }]
}

loki.source.file "suricata" {
  targets    = local.file_match.suricata_files.targets
  forward_to = [loki.process.suricata_parse.receiver]
}

loki.process "suricata_parse" {
  stage.json {
    expressions = {
      event_type = "event_type",
      severity   = "alert.severity",
      src_ip     = "src_ip",
      dest_ip    = "dest_ip",
    }
  }
  stage.labels {
    values = {
      event_type = "event_type",
      severity   = "severity",
    }
  }
  forward_to = [loki.write.local_loki.receiver]
}

// SOURCE 2: Wazuh HIDS
local.file_match "wazuh_files" {
  path_targets = [{
    __path__ = "/var/ossec/logs/alerts/alerts.json",
    job      = "wazuh",
    source   = "wazuh-hids",
  }]
}

loki.source.file "wazuh" {
  targets    = local.file_match.wazuh_files.targets
  forward_to = [loki.process.wazuh_parse.receiver]
}

loki.process "wazuh_parse" {
  stage.json {
    expressions = {
      rule_level = "rule.level",
      rule_desc  = "rule.description",
      agent      = "agent.name",
    }
  }
  stage.labels {
    values = {
      rule_level = "rule_level",
      agent      = "agent",
    }
  }
  forward_to = [loki.write.local_loki.receiver]
}

// SOURCE 3: Python alert engine
local.file_match "python_files" {
  path_targets = [{
    __path__ = "/var/log/suricata-telegram/forwarder.log",
    job      = "python-engine",
    source   = "python-alerts",
  }]
}

loki.source.file "python_engine" {
  targets    = local.file_match.python_files.targets
  forward_to = [loki.write.local_loki.receiver]
}
bash — permissions and start
sudo usermod -aG adm alloy
sudo chmod o+r /var/log/suricata/eve.json
sudo chmod o+rx /var/ossec/logs/alerts/
sudo chmod o+r /var/ossec/logs/alerts/alerts.json
sudo systemctl enable --now alloy

Install Grafana

bash
sudo dnf install -y grafana
sudo systemctl enable --now grafana-server
sudo firewall-cmd --permanent --add-port=3000/tcp
sudo firewall-cmd --reload
🌐 Access at http://<server-ip>:3000 — Default login: admin / admin

Connect Grafana to Loki

1
Connections → Data sources → Add data source
2
Select Loki
URL: http://localhost:3100
3
Save & test
Should show "Data source connected and labels found"

Recommended dashboard panels

PanelLogQL QueryType
Alert rate over timecount_over_time({job="suricata", event_type="alert"}[5m])Time series
Live Suricata alerts{job="suricata", event_type="alert"}Logs
Wazuh high severity{job="wazuh"} | json | rule_level >= 10Logs
Python engine activity{job="python-engine"}Logs
06
[ LAYER: ALERTING // PYTHON_ENGINE ]

Python Alert Engine

The alert engine tails Suricata's eve.json directly, filters by severity, enriches with VirusTotal, and forwards to Telegram.

Project structure

tree
src/
├── alert_engine.py      # Main engine — tails eve.json
├── vt_lookup.py         # VirusTotal IP reputation lookup
└── telegram_bot.py      # Telegram notification sender

Dependencies

bash
pip3 install requests python-dotenv

alert_engine.py

src/alert_engine.py
#!/usr/bin/env python3
"""HomeSOC Alert Engine — tails Suricata eve.json and forwards critical alerts."""

import json, time, os
from datetime import datetime
from dotenv import load_dotenv
from vt_lookup import check_ip_reputation
from telegram_bot import send_alert

load_dotenv()

EVE_LOG = os.getenv("SURICATA_EVE_LOG", "/var/log/suricata/eve.json")
MIN_SEV = int(os.getenv("MIN_SEVERITY", "2"))   # 1=critical, 2=high, 3=medium, 4=low

def now():
    return datetime.now().strftime("%Y-%m-%d %H:%M:%S")

def tail_file(filepath):
    while not os.path.exists(filepath):
        print(f"[{now()}] Waiting for {filepath}...")
        time.sleep(5)
    with open(filepath, "r") as f:
        f.seek(0, 2)
        current_inode = os.fstat(f.fileno()).st_ino
        while True:
            line = f.readline()
            if line:
                yield line.strip()
            else:
                time.sleep(0.5)
                try:
                    if os.stat(filepath).st_ino != current_inode:
                        f = open(filepath, "r")
                        current_inode = os.stat(filepath).st_ino
                except FileNotFoundError:
                    pass

def process_alert(event):
    if event.get("event_type") != "alert":
        return None
    alert    = event.get("alert", {})
    severity = alert.get("severity", 99)
    if severity > MIN_SEV:
        return None
    src_ip = event.get("src_ip", "unknown")
    vt_result = None
    if src_ip and not src_ip.startswith(("192.168.", "10.", "172.")):
        vt_result = check_ip_reputation(src_ip)
    return {
        "severity":  severity,
        "signature": alert.get("signature", "Unknown rule"),
        "category":  alert.get("category", "Uncategorized"),
        "sid":       alert.get("signature_id", "?"),
        "src_ip":    src_ip,
        "dest_ip":   event.get("dest_ip", "unknown"),
        "proto":     event.get("proto", "unknown"),
        "timestamp": event.get("timestamp", now()),
        "vt_result": vt_result,
    }

def main():
    print(f"[{now()}] HomeSOC Alert Engine started — watching {EVE_LOG}")
    for line in tail_file(EVE_LOG):
        event = json.loads(line) if line else None
        if not event: continue
        processed = process_alert(event)
        if not processed: continue
        print(f"[{now()}] ALERT SEV-{processed['severity']} — {processed['signature']}")
        send_alert(processed)

if __name__ == "__main__":
    main()

vt_lookup.py

src/vt_lookup.py
#!/usr/bin/env python3
import os, requests

VT_API_KEY = os.getenv("VIRUSTOTAL_API_KEY")
VT_URL     = "https://www.virustotal.com/api/v3/ip_addresses/{}"

def check_ip_reputation(ip):
    if not VT_API_KEY: return None
    try:
        r = requests.get(VT_URL.format(ip), headers={"x-apikey": VT_API_KEY}, timeout=5)
        if r.status_code != 200: return None
        stats     = r.json()["data"]["attributes"]["last_analysis_stats"]
        malicious = stats.get("malicious", 0)
        total     = sum(stats.values())
        return {"ip": ip, "malicious": malicious, "total": total, "score": f"{malicious}/{total}"}
    except Exception:
        return None

telegram_bot.py

src/telegram_bot.py
#!/usr/bin/env python3
import os, requests

BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")
CHAT_ID   = os.getenv("TELEGRAM_CHAT_ID")
API_URL   = f"https://api.telegram.org/bot{BOT_TOKEN}/sendMessage"

SEVERITY_LABELS = {1: "CRITICAL 🔴", 2: "HIGH 🟠", 3: "MEDIUM 🟡", 4: "LOW ⚪"}

def format_message(alert):
    sev = SEVERITY_LABELS.get(alert["severity"], "UNKNOWN")
    lines = [
        f"<b>Suricata Alert — {sev}</b>",
        f"",
        f"<b>Rule:</b> <code>{alert['signature']}</code>",
        f"<b>Category:</b> {alert['category']}",
        f"<b>SID:</b> {alert['sid']}",
        f"",
        f"<b>Source:</b>      <code>{alert['src_ip']}</code>",
        f"<b>Destination:</b> <code>{alert['dest_ip']}</code>",
        f"<b>Protocol:</b>    {alert['proto']}",
        f"<b>Time:</b> {alert['timestamp']}",
    ]
    if alert.get("vt_result"):
        lines.append(f"<b>VirusTotal:</b> {alert['vt_result']['score']} engines flagged")
    return "\n".join(lines)

def send_alert(alert):
    if not BOT_TOKEN or not CHAT_ID: return
    try:
        requests.post(API_URL, json={
            "chat_id": CHAT_ID, "text": format_message(alert), "parse_mode": "HTML"
        }, timeout=10)
    except Exception as e:
        print(f"Telegram send failed: {e}")
07
[ LAYER: ENRICHMENT // VIRUSTOTAL ]

VirusTotal Integration

1
Sign up at virustotal.com
2
Go to profile → API Key → copy it
3
Add to your .env file
VIRUSTOTAL_API_KEY=your_key_here
💡 Free tier allows 500 lookups/day. Only look up external IPs (non-RFC1918) on severity 1 or 2 alerts to stay well within limits.
08
[ LAYER: ALERTING // TELEGRAM_BOT ]

Telegram Bot Setup

Create the bot

1
Search @BotFather on Telegram
2
Send /newbot and follow the prompts
3
Copy your bot token

Get your chat ID

1
Send any message to your new bot
2
Open in browser
https://api.telegram.org/botYOUR_TOKEN/getUpdates
3
Find "chat":{"id": — that number is your chat ID

Add to .env

.env
VIRUSTOTAL_API_KEY=your_key_here
TELEGRAM_BOT_TOKEN=your_token_here
TELEGRAM_CHAT_ID=your_chat_id_here

Test the bot

bash
python3 -c "
from src.telegram_bot import send_alert
send_alert({
    'severity': 1, 'signature': 'Test alert from HomeSOC',
    'category': 'Test', 'sid': '9999999',
    'src_ip': '1.2.3.4', 'dest_ip': '192.168.1.33',
    'proto': 'TCP', 'timestamp': '2026-04-06T15:00:00',
    'vt_result': {'score': '5/94'},
})
"
09
[ LAYER: ALERTING // SYSTEMD_SERVICE ]

Running the Alert Engine

Manual (testing)

bash
sudo python3 src/alert_engine.py

Production — systemd service

/etc/systemd/system/homesoc-alerts.service
[Unit]
Description=HomeSOC Python Alert Engine
After=network.target suricata.service

[Service]
ExecStart=/usr/bin/python3 /home/katato/HomeSOC/src/alert_engine.py
WorkingDirectory=/home/katato/HomeSOC
EnvironmentFile=/home/katato/HomeSOC/.env
Restart=always
RestartSec=5
User=root

[Install]
WantedBy=multi-user.target
bash
sudo systemctl daemon-reload
sudo systemctl enable --now homesoc-alerts
sudo systemctl status homesoc-alerts --no-pager | grep Active

# Check live logs
sudo journalctl -u homesoc-alerts -f
10
[ LAYER: VERIFICATION // PIPELINE_TEST ]

Testing the Pipeline

Five Python scripts test each layer of the SOC pipeline. Run them only against hosts you own.

tree
scripts/
├── severity_trigger.py      # fires traffic for each severity level 1-4
├── pipeline_check.py        # checks each stage pass/fail
├── loki_severity_report.py  # queries Loki, shows counts per severity
├── port_scanner.py          # TCP port scan — triggers ET SCAN rules
├── ssh_brute.py             # SSH brute force — triggers Suricata + Wazuh
├── web_attacks.py           # SQLi, XSS, path traversal, bad user agents
├── wazuh_triggers.py        # FIM changes, /etc/shadow, sudo failures
└── run_all_tests.py         # master runner — executes all in sequence

Test 1 — Severity triggers

SeverityLabelWhat it triggers
1CriticalMeterpreter UA, shellcode pattern, EternalBlue SMB probe
2HighSQL injection, SSH scan burst, FTP brute force
3MediumNmap UA, Nikto UA, path traversal, DNS VERSION query
4Lowcurl UA, python-requests UA
bash
sudo python3 scripts/severity_trigger.py
sudo tail -30 /var/log/suricata/fast.log

Test 2 — Full pipeline check

bash
sudo python3 scripts/pipeline_check.py
sample output
SEV   LABEL      SURICATA     LOKI       STATUS
1     CRITICAL   PASS         PASS       FULL PASS
2     HIGH       PASS         PASS       FULL PASS
3     MEDIUM     PASS         FAIL       PARTIAL
4     LOW        FAIL         FAIL       NO ALERT
⚠️ If you see SURICATA: PASS but LOKI: FAIL — the problem is in Alloy. Check with sudo journalctl -u alloy -f

Test 3 — Loki severity report

bash
python3 scripts/loki_severity_report.py

Test 4 — Run everything at once

bash
sudo python3 scripts/run_all_tests.py

End-to-end verification

bash — full pipeline check
# 1. Suricata caught it
sudo tail -20 /var/log/suricata/fast.log

# 2. Alloy shipped it to Loki
sudo journalctl -u alloy -n 20 --no-pager

# 3. Loki indexed it
curl http://localhost:3100/loki/api/v1/labels

# 4. Python engine processed it
sudo journalctl -u homesoc-alerts -n 20 --no-pager

# 5. Check your phone for Telegram alert
11
[ LAYER: ANALYSIS // MALWARE_SAMPLES ]

Malware Sample Analysis

We test the detection pipeline using safe samples from MalwareBazaar inside an isolated VirtualBox VM.

Safe analysis environment

1
Install VirtualBox on ThinkPad
2
Create a Windows VM
3
Set network to Host-Only
No internet access — malware can't phone home
4
Take a clean snapshot before running anything
5
Download samples inside the VM only

Watch on the NUC while a sample runs

bash — live monitoring
# Suricata network alerts in real time
sudo tail -f /var/log/suricata/fast.log

# Full JSON event stream
sudo tail -f /var/log/suricata/eve.json | python3 -m json.tool

# Wazuh host alerts
sudo tail -f /var/ossec/logs/alerts/alerts.log

# Python engine processing
sudo journalctl -u homesoc-alerts -f

Analysis report template

Each sample should be documented in docs/analysis-reports/. Include:

  • Sample hash (MD5, SHA256)
  • Family if known
  • Static analysis — strings, imports, file type
  • Dynamic analysis — what Suricata and Wazuh caught
  • IOCs — IPs, domains, file hashes, registry keys
  • MITRE ATT&CK techniques observed
  • Screenshots of alerts in the dashboard
12
[ LAYER: POST_MORTEM // LESSONS ]

Lessons Learned

[ SURICATA // WIFI_PACKET_CAPTURE ]

Running Suricata on wlp2s0 means it only sees traffic to/from the NUC itself — not all traffic on the network. For full network visibility you need a managed switch with port mirroring, or Suricata in inline mode.

[ LOKI // V3_BREAKING_CHANGES ]

Loki v3 dropped boltdb-shipper + shared_store config fields and removed Promtail as a standalone binary. Use schema v13 with tsdb store, and use Grafana Alloy as the log shipper instead of Promtail.

[ SELINUX // CUSTOM_BINARY_BLOCKING ]

Any binary placed in /usr/local/bin/ and run by systemd will be blocked by SELinux by default on Fedora. Fix with:

bash — fix SELinux context
sudo chcon -t bin_t /usr/local/bin/loki
sudo chcon -t bin_t /usr/bin/alloy
[ RULES // FALSE_POSITIVE_TUNING ]

The Emerging Threats ruleset generates false positives on home network traffic. Common ones to suppress: DNS queries to *.local triggering DNS tunneling rules, and normal HTTPS traffic flagging certificate rules. Suppress false positives in /etc/suricata/threshold.conf.

[ VIRUSTOTAL // RATE_LIMITS ]

Free tier allows 500 lookups/day. Stay within limits by only looking up external IPs (not RFC1918 ranges) on severity 1 or 2 alerts, and adding a simple dict cache keyed by IP to skip repeated lookups.