Contents
Contents
Introduction
Project relevance
Goal
Theoretical part
Libraries
Creation tools
Project directory structure
Code
Server layer – app.py
Module watcher.py
Web interface – index.html
Diagrams
Optimization
Practical part
1. Creating the project folder
2. Creating the virtual development environment
3. Installing libraries
4. Creating project files
5. Writing code
6. Testing
7. Creating a Telegram bot
8. Linking the bot and checking notifications
9. Installing Docker (optional)
10. Installing and configuring Zabbix
11. Linking Zabbix and checking operation
12. Checking functionality and debugging
Conclusion, links and code
Introduction
Project relevance
The creation of the project began out of concern for the security of my server. No, there were no discovered vulnerabilities — I regularly improved the protection. Nevertheless, no matter how sure I was of the work done, from time to time I still ask myself: “Is everything all right? Has anyone gained access to the internal files of the project?” I can often not log in to the server for weeks, and yet I want to know the status of the files at all times. This is how the idea was born to create a tool for regularly monitoring the directories of any project on the disk.
Goal
On my phone there are three applications that I use every day: WhatsApp, Telegram and Instagram. Telegram seems to be the most convenient notification channel: it has a simple procedure for creating a personal bot and a convenient format for private messages. In addition, the bot can accept commands — for example, one command “drop” can quickly disconnect the server’s network connection. If the project proves useful to corporate users, it is also important to send logs to a centralized monitoring system, such as Zabbix.
The end idea of Folder Watcher is to get a lightweight yet effective “guard” that monitors the project while you are not on the server. It should notify only when necessary and provide clear reports: “Turn on the alarm?” or “Here’s a list of who, when and what they did while you were asleep.” Simplicity is the key: less code — less load on the system and more possibilities for customization without deep knowledge of the internal structure.
Theoretical part
Libraries
- watchdog – monitors file system changes — who, what and when created, deleted, moved or modified.
- queue – a thread-safe queue for exchanging data between threads.
- threading – creates parallel threads so different parts of the program (monitoring, Telegram, Zabbix, UI) do not interfere with each other.
- queue and threading are used so the interface does not freeze while Telegram sends a message, and observers can work in parallel. For example, one thread monitors the folder, another writes to the log, a third sends notifications — and all this peacefully coexists in one program.
- requests – a simple library for working with HTTP requests. We use it to communicate with the Telegram API — sending and receiving messages.
- subprocess – allows running system commands directly from Python. Through it we send messages to Zabbix and execute system commands to disconnect the network in case of an alarm.
- logging – allows writing neat records about what is happening: from errors to system events. Mainly we get errors from it when they occur.
- json – serializes and deserializes data (converts them to a convenient JSON format).
- datetime and time – provide time and date, calculate intervals and format event timestamps.
- socket – works with network connections, IP and ports. To determine the local IP address of the user — we add it to each event in the UI. This is especially useful when the system is running on multiple machines: you can know from which computer the signal came.
- collections.deque – a ring queue for correct display of the events table.
- FastAPI – a minimalist and fast server in Python. It handles HTTP requests from the frontend: starting monitoring, retrieving events, configuring Telegram and Zabbix. This ensures the project is cross-platform: the same code works on macOS, Windows and Linux. You do not need to build an EXE — just open the browser and see all events live.
- venv – a virtual isolated development environment.
Creation tools
- VS Code
- Terminal x2
- Docker + Zabbix
- Activity Monitor
Project directory structure
|-- 📁 folderwatcher
| |-- 📄 app.py
| |-- 📄 config.json
| |-- 📄 events.log
| |-- 📄 index.html
| +-- 📄 watcher.py
|-- 📁 test_root
+-- 📁 venvCode
Server layer – app.py
The file app.py describes an HTTP server based on Flask. It defines several REST endpoints for controlling monitoring and integrations.
Main objects
Creating the application. At the beginning of the file, a Flask instance and a global WatchManager object are created, which manages file observers.
The index() function. This is the handler for the root URL /. Instead of using templates, the function simply returns the index.html file from the project root, which allows it to work even if there is no templates folder.
API endpoints
Endpoint | Method | Description | Main calls |
|---|---|---|---|
/api/start_monitoring | POST | Starts monitoring for the specified paths. The client passes a list of paths and patterns to exclude. The function normalizes the paths to absolute form, sets the log file path if necessary, configures exclusion patterns, and calls watch_manager.start_monitoring. | watch_manager.set_log_path, watch_manager.set_exclude_patterns, watch_manager.start_monitoring |
/api/stop_monitoring | POST | Stops all watchers by calling watch_manager.stop_monitoring(). | watch_manager.stop_monitoring |
/api/configure_telegram | POST | Accepts token and chat_id, calling watch_manager.configure_telegram to set up sending notifications to Telegram. | watch_manager.configure_telegram |
/api/configure_zabbix | POST | Configures sending traps to Zabbix (server, host, key) via watch_manager.configure_zabbix. | watch_manager.configure_zabbix |
/api/events | GET | Returns the latest events from the UI ring buffer. The client can specify since_seq (the last visible event sequence number) and limit. The handler calls watch_manager.get_events(). | watch_manager.get_events |
/api/download_log | GET | Returns the events.log file if it exists. Uses the path from watch_manager.log_path. | — |
/static/<path> | GET | Serves static files (CSS, JS) from the static directory. | send_from_directory |
At the end, app.py starts the server with a port taken from the environment variable PORT (default 5000).
Module watcher.py
This module contains all the services responsible for tracking file system events and sending notifications. The logic is divided into several classes.
Class TelegramNotifier
Provides asynchronous sending of messages to Telegram. When created, it takes a bot token and chat ID. If both values are set and the requests library is available, sending mode is enabled. It maintains a queue and a worker thread that respects rate limits, retries on errors, and falls back to plain text on a 400 error.
Class ZabbixNotifier
Used to send trap messages to Zabbix via the zabbix_sender utility. On initialization it searches for zabbix_sender in the system PATH and common locations. If the program is not found, sending is disabled and an error is logged. The methods send_trap and send_event_json create and launch a separate thread that runs the zabbix_sender command with the necessary parameters to avoid blocking the main thread.
Class TelegramDropListener
Implements a lightweight listener for incoming Telegram messages. It periodically polls the getUpdates endpoint, and if a message with the text "drop" arrives from the configured chat_id, it invokes the callback. It runs in a dedicated daemon thread and stops automatically when stop() is called.
Class RansomwareDetector
A simple detector of mass changes. It maintains a sliding window (window_seconds) and a threshold for event count. The record_event() method adds a timestamp to the list and, if the number of events within the window reaches the threshold, it calls the callback (for example to send a warning about possible ransomware).
Class DebounceFilter
Filters out repeated events for the same path and event type within a given debounce interval to reduce noisy duplicates. The should_emit(path, event_type) method checks the last event time and decides whether to emit the new event.
Class EventProcessor (event handler)
Inherits from FileSystemEventHandler and reacts to file system events. The constructor receives an event queue, exclusion patterns, a log path, Telegram and Zabbix notifiers, a ransomware detector, a debounce filter, and a UI push callback. It logs events to JSONL, sends notifications to Telegram and Zabbix, coalesces "modified" events if a delete or move follows shortly, and buffers events for the UI.
Internal methods:
- _write_log(data) – writes the event dictionary to the JSONL log file.
- _is_excluded(path) – checks whether the path matches any exclusion patterns (suffixes starting with '.' are treated as suffix matches; others are substring matches).
- _emit_ui_and_tg(action, path, src_path, dest_path) – constructs a UI event object, pushes it to the UI and internal queue, and sends a brief Telegram message if enabled.
- _schedule_modified_emit(path) – delays emitting a "modified" event to allow a subsequent delete/move to suppress it.
Watchdog callbacks:
- on_created(event) – called when a file or directory is created. Applies exclusions and debouncing, logs the event, sends it to Zabbix if configured, records it for ransomware detection, updates last_created_ts, determines the action label ("File created" or "Folder created") and pushes it to the UI/Telegram.
- on_deleted(event) – called when a file or directory is deleted. Similar to on_created: checks exclusions and debouncing, logs, notifies Zabbix and the ransomware detector, records the time of the terminal action, removes pending modified events, determines the action label ("File deleted" or "Folder deleted") and emits it.
- on_modified(event) – called when a file is modified. Ignores directories and suppresses modifications immediately after creation. Uses the debounce filter to avoid duplicates, logs the event, notifies Zabbix and the ransomware detector, schedules a delayed emit to allow suppression if a delete/move follows.
- on_moved(event) – called when a file or directory is moved. Checks exclusions and debouncing, logs the move, notifies Zabbix and the ransomware detector, records the terminal action time, removes pending modified events, determines the action label ("File moved" or "Folder moved") and emits it.
Class WatchManager
Coordinates multiple observers and stores configuration and state. It manages exclusion patterns, the JSONL log path, and notifiers (Telegram/Zabbix), maintains a ring buffer of UI events with a monotonic sequence number so the frontend can fetch only new events, persists Telegram settings in config.json so they survive restarts, and implements OS-specific network disable logic triggered by a Telegram "drop" command.
Key fields:
- observers – list of Observer objects from watchdog.
- event_queue – queue for events forwarded to the UI.
- exclude_patterns – list of exclusion patterns.
- log_path – path to the JSONL log file.
- telegram_notifier and zabbix_notifier – instances of the respective notifier classes.
- ransomware_detector and debounce_filter – utilities for detection and debouncing.
- ui_events – deque storing recent events for the web interface.
- user_name and local_ip – information added to each event.
- drop_listener – listens for "drop" commands in Telegram.
Main methods:
- _push_ui(event) – attaches common fields, increments the sequence number, and pushes the event into the ring buffer.
- get_events(since_seq, limit) – returns recent events optionally filtered by sequence number.
- configure_telegram(token, chat_id) – recreates the Telegram notifier and drop listener and saves the configuration.
- configure_zabbix(server, host, key, port) – reconfigures the Zabbix sender.
- set_exclude_patterns(patterns) – sets path exclusion patterns.
- set_log_path(path) – sets the log file path.
- _on_ransomware_detected() – called when the mass-change heuristic triggers; pushes a UI alert and notifies via Telegram and Zabbix.
- _on_drop() – invoked when a 'drop' command is received; attempts to disable network connectivity.
- start_monitoring(paths) – starts observers for given directories, resetting state if monitoring is already active.
- stop_monitoring() – stops all observers and clears the monitoring flag.
Overall event processing flow
Client call /api/start_monitoring
|
v
+------------------------------+
| WatchManager.start_monitoring|
+------------------------------+
|
Create EventProcessor and Observer
|
Observer → watches the file system (watchdog)
|
+--------------------------------+
| EventProcessor (watchdog handler)|
+--------------------------------+
| | | |
v v v v
on_created on_deleted on_modified on_moved
| | | |
v v v v
_write_log,… (logging)
zabbix_notifier.send_event_json (if configured)
ransomware_detector.record_event → on threshold triggers _on_ransomware_detected
debounce_filter (debouncing)
_emit_ui_and_tg → queues event for UI,
sends Telegram notification
|
v
WatchManager._push_ui → ring buffer + seq
Web interface – index.html
Main elements:
- Monitoring settings: users can add multiple paths to watch, specify extensions or patterns to exclude, and start/stop monitoring.
- Integrations: fields for entering the Telegram Bot Token and Chat ID, as well as Zabbix parameters (server, host, key). Buttons send POST requests to /api/configure_telegram and /api/configure_zabbix.
- Event log: a table displaying received events. The client periodically polls /api/events via fetchEventsOnce and updates the table. Users can filter by event type, search by path, sort by various columns, and toggle between table and JSON view.
- Log download: a button triggers downloadLog() which opens /api/download_log to download the log file.
The JavaScript implements DOM manipulation (addPath, removePath), API calls (startMonitoring, stopMonitoring, configureTelegram, configureZabbix), polling of events (startPolling), filtering and sorting (redrawAll), and persistence of settings via localStorage.
Diagrams
Starting monitoring
Client calls /api/start_monitoring
|
v
Flask function api_start_monitoring
|
v
WatchManager.set_log_path / set_exclude_patterns
|
v
WatchManager.start_monitoring(paths)
|
+--> for each path:
create EventProcessor
schedule on Observer
Observer.start()
Processing a file system event
A watchdog event (create/delete/modify/move)
|
v
EventProcessor.on_* callback
|
+--> apply _is_excluded and DebounceFilter
|
+--> log to file (_write_log)
|
+--> send to Zabbix (if configured)
|
+--> record in RansomwareDetector
|
+--> push to UI queue (_emit_ui_and_tg)
|
+--> WatchManager._push_ui → ring buffer
|
+--> TelegramNotifier.send_message (if enabled)
Mass change detection
EventProcessor → RansomwareDetector.record_event
|
v
If ≥ threshold events within window_seconds:
|
v
WatchManager._on_ransomware_detected
|
+--> forms an event with alert=True
+--> _push_ui (UI)
+--> TelegramNotifier.send_message
+--> ZabbixNotifier.send_event_json
Receiving a “drop” command
TelegramDropListener ← background bot messages
|
v
Received a "drop" message from the configured chat_id
|
v
WatchManager._on_drop
|
+--> prints a message to the console
+--> calls _disable_network
|
+--> on Linux: nmcli off / ip link down
+--> on macOS: networksetup -setnetworkserviceenabled off
+--> on Windows: Disable-NetAdapter via PowerShell
Optimization
- before optimization: CPU 86–100%, RAM 370 MB
- after combining threads and reducing the polling interval: CPU 0.1–0.5%, RAM 40 MB
Practical part
1. Creating the project folder
1. Open a terminal and navigate to a convenient directory for development.
2. Create a new folder for the project:
mkdir fs_monitor
cd fs_monitor
This folder will contain the source code for the web application and auxiliary files.
2. Creating the virtual development environment
To isolate dependencies it is convenient to work in a virtual environment:
pip install venv
python3 -m venv venv
source venv/bin/activate # Linux/Mac
venv\Scriptsctivate # Windows
Activating the environment will allow you to install the necessary libraries without affecting the system.
3. Installing libraries
Install the required dependencies:
pip install flask watchdog requests python-telegram-bot
If your IDE does not “see” the installed packages, restart it or make sure it uses the correct interpreter from the virtual environment.
4. Creating project files
The project consists of three main files and a directory static for static resources:
1. app.py — the entry point of the Flask application. It configures routes (/api/start_monitoring, /api/stop_monitoring, /api/configure_telegram, /api/configure_zabbix, /api/events, /api/download_log) and starts the HTTP server.
2. watcher.py — contains classes for tracking events: TelegramNotifier for sending messages to Telegram, ZabbixNotifier for sending trap messages via zabbix_sender, EventProcessor for handling file system events and filtering them, and WatchManager for managing watchers, exclusions and integrations.
3. index.html — the web page. It allows you to enter paths to monitor, manage exclusions, configure integrations, view the event log and download the log file. The file uses JavaScript to interact with the application API.
Create these files in the project root and fill them with code. Additionally create a folder static if you plan to separate CSS or images.
5. Writing code
The code is organized as follows:
- In
watcher.pythe class TelegramNotifier uses a token and chat ID to send messages to Telegram via the API endpointhttps://api.telegram.org/bot<TOKEN>/sendMessage. The requests library is used for HTTP requests. - ZabbixNotifier searches the system for the
zabbix_senderutility and calls it in a separate thread, passing server address, host, key and value. - EventProcessor inherits from the watchdog event handler and reacts to create, delete, modify and move events. It uses DebounceFilter to suppress duplicates. It logs events, sends them to Zabbix and the ransomware detector, and pushes them to the UI via WatchManager.
- WatchManager manages multiple observers and buffers events for the UI. It also saves Telegram parameters in
config.jsonand automatically runs a drop listener that disables network interfaces if a "drop" message is received.
The file app.py sets up the Flask server, providing API endpoints for starting/stopping monitoring, configuring Telegram and Zabbix, retrieving events, and downloading the log. When run, the application serves index.html.
6. Testing
Run the server:
python3 app.py
By default the application listens at http://127.0.0.1:5000/. Open this page in a browser. In the web interface add the path to the folder you want to monitor, specify exclusions (for example .log,.tmp) and click “Start monitoring”. New rows will appear when files are created, deleted, modified or moved.
You can sort the table by columns, filter events and search by part of the path.
The log file events.log should record events in JSON format. When you stop monitoring (click “Stop monitoring”), observers should stop correctly and new events should not be recorded.

7. Creating a Telegram bot
To receive notifications in Telegram create your own bot:
- In Telegram find the user @BotFather.
- Send the command
/newbotand follow the instructions: choose a name and username. BotFather will create the bot and give you a token. - The token is a long string of the form
123456:ABC-DEF1234_xxx_xxxwhich allows the application to send messages on behalf of the bot. - Obtain your chat ID by sending a message to the bot and using a bot viewer or by calling
getUpdatesthrough the Telegram API.
8. Linking the bot and checking notifications
Open the “Integrations” section on your index.html page. Enter the Bot Token and Chat ID and click “Link Telegram”. The application will send the data to /api/configure_telegram, save them in config.json and start the drop listener. If everything is successful, a message will indicate that Telegram is configured.
Now when a file changes in a monitored folder the application will send a notification to your chat. Test this by creating or deleting a file.

9. Installing Docker (optional)
If you want to deploy the application in a container, first install Docker. The official site provides instructions for different operating systems (Ubuntu, Debian, Fedora, Windows, etc.). After installation:
Create a file
Dockerfilein the project root to build the image:FROM python:3.11-slim WORKDIR /app COPY . . RUN pip install --no-cache-dir flask watchdog requests EXPOSE 5000 CMD ["python", "app.py"]Build and run the container:
docker build -t fs_monitor:latest . docker run -d -p 5000:5000 --name fs_monitor fs_monitor:latest- Check that the application is available on port 5000.
- Containerization simplifies portability and deployment on a server.
Default login and password for Zabbix:
Login: Admin
Password: zabbix
10. Installing and configuring Zabbix
Zabbix is a monitoring system that can receive trap messages from our application. For installation use the official guide. On the Zabbix website there are sections “Quickstart”, “Download” and “Install”, which let you choose the server version and instructions for your platform.
Or run the following in a terminal:
brew install zabbix
brew install --cask docker
open -a Docker
mkdir -p ~/zbx && cd ~/zbx
cat > docker-compose.yml <<'YAML'
version: "3.8"
services:
db:
image: postgres:16
container_name: zbx-db
environment:
POSTGRES_USER: zabbix
POSTGRES_PASSWORD: zabbix
POSTGRES_DB: zabbix
volumes:
- zbx-db:/var/lib/postgresql/data
restart: unless-stopped
zabbix-server:
image: zabbix/zabbix-server-pgsql:alpine-latest
container_name: zbx-server
environment:
DB_SERVER_HOST: db
POSTGRES_USER: zabbix
POSTGRES_PASSWORD: zabbix
POSTGRES_DB: zabbix
depends_on: [db]
ports:
- "10051:10051" # port for zabbix_sender
restart: unless-stopped
zabbix-web:
image: zabbix/zabbix-web-nginx-pgsql:alpine-latest
container_name: zbx-web
environment:
DB_SERVER_HOST: db
POSTGRES_USER: zabbix
POSTGRES_PASSWORD: zabbix
POSTGRES_DB: zabbix
PHP_TZ: "Asia/Jerusalem"
depends_on: [db, zabbix-server]
ports:
- "8080:8080" # web interface
restart: unless-stopped
volumes:
zbx-db:
YAML
Then run:
docker compose up -d
After deploying the Zabbix server:
- Create a new host (for example
fs_monitor) and add a data item of type “Trap” with the key you specify in the application (e.g.fs.events). - Ensure the
zabbix_senderutility is installed on your machine (it comes with the Zabbix agent package). - Configure a trigger to generate alerts when the JSON payload contains
"alert":1(the ransomware detector flag).
11. Linking Zabbix and checking operation
In the “Integrations” section of index.html, fill in the fields Server (the Zabbix server address), Host (host name), and Key (the data item key) and then click “Link Zabbix”. The application will call /api/configure_zabbix, set up sending and try to find zabbix_sender on your system.
Generate a few events in the watched folders. In the Zabbix web interface confirm that data items update and that triggers fire when there is a large number of changes.

12. Checking functionality and debugging
1. Ensure that the start and stop monitoring functions work correctly: after pressing “Stop monitoring” events stop being logged.
2. Verify that exclusion patterns filter the appropriate extensions.
3. Observe that when there are mass changes to files, the detector triggers and sends a warning to Telegram and Zabbix.
4. If notifications do not arrive, check the correctness of the Telegram token and chat ID; make sure the Zabbix server is available and the item key matches. Enable detailed logging or examine the events.log file as needed.
Conclusion, links and code
Below are helpful links, as well as code that can be copied into files.
https://www.docker.com/products/docker-desktop/
https://web.telegram.org/k/#@BotFather
https://www.zabbix.com/documentation/current/en/
watcher.py
import os
import json
import threading
import queue
import time
import logging
import subprocess
from datetime import datetime
import socket
from collections import deque
from typing import Optional
import shutil
from watchdog.observers import Observer
from watchdog.events import (
FileSystemEventHandler,
FileModifiedEvent,
FileCreatedEvent,
FileDeletedEvent,
FileMovedEvent,
)
try:
import requests
except ImportError:
requests = None
class TelegramNotifier:
"""
Queue + worker thread for sending messages to Telegram.
- Uses `parse_mode=HTML` (safer than MarkdownV2 for arbitrary text).
- Retries with exponential backoff on failures.
- Detailed logging of HTTP status and response body on errors.
- Fallback: on HTTP 400/Bad Request, resends without `parse_mode`.
"""
def __init__(self, bot_token: str, chat_id: str | int):
self.bot_token = (bot_token or "").strip()
self.chat_id = str(chat_id).strip() if chat_id is not None else ""
self.enabled = bool(self.bot_token and self.chat_id and requests is not None)
self._q: "queue.Queue[dict]" = queue.Queue(maxsize=1000)
self._stop_evt = threading.Event()
self._worker: threading.Thread | None = None
self._session = requests.Session() if requests is not None else None
self._rate_limit_per_sec = 20
self._last_ts = 0.0
if self.enabled:
self._start_worker()
def _start_worker(self):
if self._worker and self._worker.is_alive():
return
self._worker = threading.Thread(target=self._run, name="tg_notifier", daemon=True)
self._worker.start()
def stop(self):
self._stop_evt.set()
if self._worker:
self._worker.join(timeout=2.0)
def send_message(self, text: str):
if not self.enabled:
return
try:
self._q.put_nowait({"text": text, "parse_mode": "HTML"})
except queue.Full:
logging.warning("[TG] Queue is full, dropping message")
def _respect_rate_limit(self):
now = time.time()
min_interval = 1.0 / self._rate_limit_per_sec
if now - self._last_ts < min_interval:
time.sleep(min_interval - (now - self._last_ts))
self._last_ts = time.time()
def _request(self, url: str, data: dict, timeout: float = 8.0):
self._respect_rate_limit()
return self._session.post(url, data=data, timeout=timeout)
def _send_once(self, text: str, parse_mode: str | None) -> bool:
url = f"https://api.telegram.org/bot{self.bot_token}/sendMessage"
payload = {"chat_id": self.chat_id, "text": text}
if parse_mode:
payload["parse_mode"] = parse_mode
try:
resp = self._request(url, payload)
ok = resp.ok
if not ok:
logging.warning("[TG] sendMessage failed: %s %s | body=%s",
resp.status_code, resp.reason, resp.text[:1000])
return ok
except Exception as e:
logging.error("[TG] Exception on sendMessage: %r", e)
return False
def _send_with_fallback(self, text: str, parse_mode: str | None) -> bool:
ok = self._send_once(text, parse_mode)
if not ok and parse_mode is not None:
return self._send_once(text, None)
return ok
def _run(self):
while not self._stop_evt.is_set():
try:
item = self._q.get(timeout=0.25)
except queue.Empty:
continue
text = str(item.get("text", "")).strip()
parse_mode = item.get("parse_mode")
if not text:
continue
backoff = 0.7
for _ in range(3):
if self._send_with_fallback(text, parse_mode):
break
time.sleep(backoff)
backoff *= 2.0
class ZabbixNotifier:
"""Sends trap messages to Zabbix using `zabbix_sender` (asynchronously, non-blocking)."""
def __init__(self, server: str, host: str, key: str, port: int = 10051):
self.server = (server or "").strip()
self.host = (host or "").strip()
self.key = (key or "").strip()
self.port = int(port) if port else 10051
self.enabled = bool(self.server and self.host and self.key)
# Try to locate `zabbix_sender` in PATH (common locations for macOS/Linux; Windows example path provided).
self.sender_path = shutil.which("zabbix_sender")
if not self.sender_path:
for p in (
"/opt/homebrew/bin/zabbix_sender",
"/usr/local/bin/zabbix_sender",
"C:\\Program Files\\Zabbix Agent\\zabbix_sender.exe",
):
if os.path.exists(p) and os.access(p, os.X_OK):
self.sender_path = p
break
if self.enabled and not self.sender_path:
logging.error("[ZBX] zabbix_sender not found in PATH")
def _run_sender(self, value: str):
if not self.enabled or not self.sender_path:
return
def _send():
try:
cmd = [
self.sender_path,
"-z", self.server,
"-p", str(self.port),
"-s", self.host,
"-k", self.key,
"-o", value,
]
res = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
if res.returncode != 0:
logging.warning("[ZBX] sender rc=%s out=%s", res.returncode, (res.stdout or "").strip())
except Exception as e:
logging.error("[ZBX] Exception on zabbix_sender: %r", e)
threading.Thread(target=_send, name="zbx_sender", daemon=True).start()
def send_trap(self, value: str):
"""Send an arbitrary string payload to the configured item key."""
self._run_sender(value)
def send_event_json(self, obj: dict):
"""Helper: serialize `obj` to JSON and send it via `zabbix_sender`."""
try:
payload = json.dumps(obj, ensure_ascii=False)
except Exception:
payload = str(obj)
self._run_sender(payload)
class TelegramDropListener:
"""
Lightweight incoming-message listener for Telegram.
Periodically polls the `getUpdates` endpoint. If a message with the text "drop"
arrives from the configured `chat_id`, invokes the provided callback.
Runs in a dedicated daemon thread and stops automatically when `stop()` is called.
"""
def __init__(self, bot_token: str, chat_id: str, callback=None):
self.token = (bot_token or "").strip()
self.chat_id = str(chat_id).strip() if chat_id is not None else ""
self.callback = callback
self._thread: threading.Thread | None = None
self._stop_evt = threading.Event()
self._last_update_id: int | None = None
if self.token and self.chat_id and requests is not None:
self._start()
def _start(self):
if self._thread and self._thread.is_alive():
return
self._thread = threading.Thread(target=self._run, name="tg_drop_listener", daemon=True)
self._thread.start()
def stop(self):
self._stop_evt.set()
if self._thread:
self._thread.join(timeout=2.0)
def _run(self):
api_url = f"https://api.telegram.org/bot{self.token}/getUpdates"
while not self._stop_evt.is_set():
params = {"timeout": 15}
if self._last_update_id is not None:
params["offset"] = self._last_update_id + 1
try:
resp = requests.get(api_url, params=params, timeout=20)
data = resp.json()
if data.get("ok") and data.get("result"):
for update in data["result"]:
upd_id = update.get("update_id")
if upd_id is not None:
self._last_update_id = upd_id
msg = update.get("message") or update.get("edited_message")
if not msg:
continue
chat = msg.get("chat", {})
if str(chat.get("id")) != self.chat_id:
continue
text = str(msg.get("text", "")).strip().lower()
if text == "drop":
if self.callback:
try:
self.callback()
except Exception:
pass
else:
print("User requested drop")
time.sleep(3.0)
except Exception as e:
logging.error("[TG_DROP] Error while polling updates: %r", e)
time.sleep(10.0)
class RansomwareDetector:
"""
Simple "mass change" detector: triggers if >= `threshold` events occur within `window_seconds`.
Intended as a coarse heuristic to raise an alert on suspicious spikes of file operations.
"""
def __init__(self, threshold: int = 100, window_seconds: int = 60, callback=None):
self.threshold = int(threshold)
self.window_seconds = int(window_seconds)
self.callback = callback
self.events: list[float] = []
self.lock = threading.Lock()
self.last_alert_time = 0.0
def record_event(self):
now = time.time()
with self.lock:
self.events.append(now)
cutoff = now - self.window_seconds
self.events = [t for t in self.events if t >= cutoff]
if len(self.events) >= self.threshold:
if now - self.last_alert_time > self.window_seconds:
self.last_alert_time = now
if self.callback:
try:
self.callback()
except Exception as e:
logging.error("[RANSOM] Callback error: %r", e)
class DebounceFilter:
"""
Debounce filter for file events.
Suppresses repeated events of the same (path, event_type) pair within `debounce_seconds`
to reduce noisy duplicates from underlying OS notifications.
"""
def __init__(self, debounce_seconds: float = 2.0):
self.debounce_seconds = float(debounce_seconds)
self.last_event_time: dict[tuple[str, str], float] = {}
self.lock = threading.Lock()
def should_emit(self, path: str, event_type: str) -> bool:
now = time.time()
key = (path, event_type)
with self.lock:
t = self.last_event_time.get(key)
if t is None or (now - t) > self.debounce_seconds:
self.last_event_time[key] = now
return True
return False
class EventProcessor(FileSystemEventHandler):
"""
Watchdog event handler.
Responsibilities:
- Exclusion filtering and debouncing.
- JSONL logging of events.
- Notifications (Telegram, Zabbix).
- Coalescing: delay emitting "modified" if a terminal action (delete/move) follows quickly.
- UI buffering via a push callback.
Also suppresses an immediate "modified" that sometimes follows right after "created".
"""
def __init__(
self,
event_queue: queue.Queue,
exclude_patterns: list[str],
log_path: str,
telegram_notifier: TelegramNotifier,
zabbix_notifier: ZabbixNotifier,
ransomware_detector: RansomwareDetector,
debounce_filter: DebounceFilter,
ui_push, # callable(dict) -> None
):
super().__init__()
self.event_queue = event_queue
self.exclude_patterns = exclude_patterns or []
self.log_path = log_path
self.telegram_notifier = telegram_notifier
self.zabbix_notifier = zabbix_notifier
self.ransomware_detector = ransomware_detector
self.debounce_filter = debounce_filter
self.ui_push = ui_push
os.makedirs(os.path.dirname(self.log_path) or ".", exist_ok=True)
self.log_lock = threading.Lock()
# For coalescing "modified" that are followed by "deleted"/"moved".
self.pending_modified: dict[str, dict] = {} # path -> {"start_ts": float}
self.pending_lock = threading.Lock()
self.modified_delay = 0.5 # seconds; tune for your workload
self.last_terminal_action: dict[str, float] = {} # path -> ts of last deleted/moved
# Suppress noisy "modified" immediately after "created".
self.last_created_ts: dict[str, float] = {}
self.created_modified_suppress_sec: float = 0.7 # seconds
def _write_log(self, data: dict):
line = json.dumps(data, ensure_ascii=False)
with self.log_lock, open(self.log_path, "a", encoding="utf-8") as f:
f.write(line + "\n")
def _is_excluded(self, path: str) -> bool:
"""
Naive pattern-based exclusion:
- If pattern starts with '.', treat it as a suffix (e.g., '.log' excludes paths ending with '.log').
- Otherwise, a case-insensitive substring match.
"""
p = (path or "").lower()
for pattern in self.exclude_patterns:
pat = (pattern or "").lower().strip()
if not pat:
continue
if pat.startswith("."):
if p.endswith(pat):
return True
else:
if pat in p:
return True
return False
def _emit_ui_and_tg(self, action: str, path: str, src_path: str | None = None, dest_path: str | None = None):
"""
Build a UI event object, push it to UI and the internal queue,
and optionally send a short Telegram message.
"""
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
if dest_path:
desc = f"{action}: {path} → {dest_path}"
else:
desc = f"{action}: {path}"
low = action.lower()
if "created" in low:
category = "Create"
elif "deleted" in low:
category = "Delete"
elif "modified" in low or "changed" in low:
category = "Modify"
elif "moved" in low or "renamed" in low:
category = "Move"
else:
category = action
ev = {
"timestamp": ts,
"action": action,
"category": category,
"path": path,
"src_path": src_path,
"dest_path": dest_path,
"description": desc,
}
self.ui_push(ev)
try:
self.event_queue.put_nowait(ev)
except queue.Full:
pass
if self.telegram_notifier.enabled:
# Escape minimal HTML for safe `parse_mode=HTML`.
safe_path = (path or "").replace("&", "&").replace("<", "<").replace(">", ">")
msg = (
f"A change was detected: <b>{ts}</b> {action} <code>{safe_path}</code>.\n"
f"Do you want to restrict the server's internet connectivity?"
)
self.telegram_notifier.send_message(msg)
def _schedule_modified_emit(self, path: str):
"""
Delay emission of 'modified' to allow possible terminal actions to arrive.
If a delete/move for the same path is seen within the window, the 'modified' is suppressed.
"""
def _worker(p=path):
time.sleep(self.modified_delay)
with self.pending_lock:
entry = self.pending_modified.get(p)
if not entry:
return
last_term = self.last_terminal_action.get(p, 0.0)
if last_term >= entry["start_ts"]:
self.pending_modified.pop(p, None)
return
self.pending_modified.pop(p, None)
self._emit_ui_and_tg("File modified", p)
threading.Thread(target=_worker, name="emit_modified_delay", daemon=True).start()
# === Watchdog callbacks ===
def on_created(self, event: FileCreatedEvent):
if self._is_excluded(event.src_path):
return
path = event.src_path
if not self.debounce_filter.should_emit(path, "created"):
return
raw = {
"event": "created",
"path": path,
"is_directory": event.is_directory,
"time": datetime.now().isoformat(),
}
self._write_log(raw)
if self.zabbix_notifier.enabled:
self.zabbix_notifier.send_event_json(raw)
self.ransomware_detector.record_event()
self.last_created_ts[path] = time.monotonic()
action = "Folder created" if event.is_directory else "File created"
self._emit_ui_and_tg(action, path)
def on_deleted(self, event: FileDeletedEvent):
if self._is_excluded(event.src_path):
return
path = event.src_path
if not self.debounce_filter.should_emit(path, "deleted"):
return
raw = {
"event": "deleted",
"path": path,
"is_directory": event.is_directory,
"time": datetime.now().isoformat(),
}
self._write_log(raw)
if self.zabbix_notifier.enabled:
self.zabbix_notifier.send_event_json(raw)
self.ransomware_detector.record_event()
with self.pending_lock:
self.last_terminal_action[path] = time.time()
self.pending_modified.pop(path, None)
action = "Folder deleted" if event.is_directory else "File deleted"
self._emit_ui_and_tg(action, path)
def on_modified(self, event: FileModifiedEvent):
if self._is_excluded(event.src_path):
return
path = event.src_path
if event.is_directory:
return
# Suppress spurious 'modified' right after a 'created'.
created_ts = self.last_created_ts.get(path, 0.0)
if created_ts and (time.monotonic() - created_ts) < self.created_modified_suppress_sec:
return
if not self.debounce_filter.should_emit(path, "modified"):
return
raw = {
"event": "modified",
"path": path,
"is_directory": event.is_directory,
"time": datetime.now().isoformat(),
}
self._write_log(raw)
if self.zabbix_notifier.enabled:
self.zabbix_notifier.send_event_json(raw)
self.ransomware_detector.record_event()
with self.pending_lock:
self.pending_modified[path] = {"start_ts": time.time()}
self._schedule_modified_emit(path)
def on_moved(self, event: FileMovedEvent):
if self._is_excluded(event.src_path) or self._is_excluded(event.dest_path):
return
if not self.debounce_filter.should_emit(event.src_path, "moved"):
return
raw = {
"event": "moved",
"src_path": event.src_path,
"dest_path": event.dest_path,
"is_directory": event.is_directory,
"time": datetime.now().isoformat(),
}
self._write_log(raw)
if self.zabbix_notifier.enabled:
self.zabbix_notifier.send_event_json(raw)
self.ransomware_detector.record_event()
with self.pending_lock:
self.last_terminal_action[event.src_path] = time.time()
self.pending_modified.pop(event.src_path, None)
action = "Folder moved" if event.is_directory else "File moved"
self._emit_ui_and_tg(action, event.src_path, src_path=event.src_path, dest_path=event.dest_path)
class WatchManager:
"""
Orchestrates multiple observers and holds shared configuration/state.
Features:
- Manages exclusion patterns, JSONL log path, and notifiers (Telegram/Zabbix).
- Maintains a ring buffer of UI events with a monotonic sequence number (`seq`)
so the frontend can fetch deltas without missing events.
- Persists Telegram settings (`config.json`) so they survive restarts and the
drop-listener auto-starts next time.
- Implements OS-specific network-disable logic triggered by a Telegram "drop" command.
"""
def __init__(self):
self.observers: list[Observer] = []
self.event_queue: "queue.Queue[dict]" = queue.Queue(maxsize=5000)
self.exclude_patterns: list[str] = []
self.log_path = os.path.join(os.getcwd(), "events.log")
self.telegram_notifier = TelegramNotifier(bot_token="", chat_id="")
self.zabbix_notifier = ZabbixNotifier(server="", host="", key="")
self.ransomware_detector = RansomwareDetector(
threshold=100,
window_seconds=60,
callback=self._on_ransomware_detected,
)
self.debounce_filter = DebounceFilter(debounce_seconds=2.0)
self.monitoring_active = False
# UI buffer: newest first
self.ui_events = deque(maxlen=5000)
self.ui_lock = threading.Lock()
self.seq = 0
# Basic environment info attached to UI events.
self.user_name = os.environ.get("USER") or os.environ.get("USERNAME") or "unknown"
self.local_ip = self._get_local_ip()
self.drop_listener: TelegramDropListener | None = None
self.config_path = os.path.join(os.getcwd(), "config.json")
self._load_config()
# === UI buffering helpers ===
def _push_ui(self, ev: dict):
"""
Attach common fields, assign a monotonically increasing `seq`,
and push the event into the ring buffer (newest first).
"""
if ev is not None:
ev.setdefault("user_name", self.user_name)
ev.setdefault("user_ip", self.local_ip)
with self.ui_lock:
self.seq += 1
ev["seq"] = self.seq
self.ui_events.appendleft(ev)
def get_events(self, since_seq: Optional[int] = None, limit: int = 200) -> list[dict]:
"""
Fetch recent UI events.
- If `since_seq` is provided, returns only events with `seq > since_seq`.
- Results are capped by `limit`.
"""
with self.ui_lock:
if since_seq is not None:
out = [e for e in self.ui_events if e.get("seq", 0) > since_seq]
return out[:limit]
return list(self.ui_events)[:limit]
# === Notifiers/config ===
def configure_telegram(self, token: str, chat_id: str | int):
"""
Reconfigure Telegram notifier and start the drop listener if both token and chat_id are provided.
Persist the configuration so it survives restarts.
"""
try:
self.telegram_notifier.stop()
except Exception:
pass
if self.drop_listener:
try:
self.drop_listener.stop()
except Exception:
pass
self.telegram_notifier = TelegramNotifier(token, chat_id)
if token and chat_id:
self.drop_listener = TelegramDropListener(token, chat_id, callback=self._on_drop)
else:
self.drop_listener = None
try:
self._save_config(token, chat_id)
except Exception:
logging.exception("[CONFIG] Failed to save Telegram config")
def configure_zabbix(self, server: str, host: str, key: str, port: int = 10051):
"""Reconfigure Zabbix trap sender."""
self.zabbix_notifier = ZabbixNotifier(server, host, key, port=port)
def set_exclude_patterns(self, patterns: list[str]):
"""Set path exclusion patterns (see `_is_excluded` for matching rules)."""
self.exclude_patterns = patterns or []
def set_log_path(self, path: str):
"""Set the JSONL events log file path."""
self.log_path = path
# === Ransomware alert ===
def _on_ransomware_detected(self):
"""
Called when the simple mass-change heuristic trips.
Pushes a UI alert and notifies via Telegram/Zabbix if enabled.
"""
alert_msg = "⚠️ Warning: massive file changes detected, possible ransomware activity!"
ev = {
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"action": "Warning",
"category": "Warning",
"path": "",
"description": alert_msg,
"alert": True,
}
self._push_ui(ev)
try:
self.event_queue.put_nowait(ev)
except queue.Full:
pass
if self.telegram_notifier.enabled:
self.telegram_notifier.send_message(alert_msg)
if self.zabbix_notifier.enabled:
# Send a JSON flag; easy to trigger Zabbix actions by matching `"alert":1`.
self.zabbix_notifier.send_event_json({
"alert": 1,
"time": datetime.now().isoformat(),
"msg": "massive file changes detected"
})
# === Drop command ===
def _on_drop(self):
"""
Invoked by TelegramDropListener when a 'drop' command is received from the configured chat.
Attempts to disable network connectivity using OS-specific mechanisms.
"""
try:
print("User requested drop")
try:
self._disable_network()
except Exception as e:
logging.error("[DROP] Failed to disable network: %r", e)
except Exception:
pass
# === Config persistence ===
def _save_config(self, token: str, chat_id: str | int) -> None:
"""Persist Telegram configuration atomically into `config.json`."""
cfg = {
"telegram_token": (token or "").strip(),
"telegram_chat_id": str(chat_id).strip() if chat_id is not None else "",
}
tmp_path = self.config_path + ".tmp"
with open(tmp_path, "w", encoding="utf-8") as f:
json.dump(cfg, f)
os.replace(tmp_path, self.config_path)
def _load_config(self) -> None:
"""Load Telegram configuration (if present) and initialize notifiers/listeners."""
if not os.path.isfile(self.config_path):
return
try:
with open(self.config_path, "r", encoding="utf-8") as f:
data = json.load(f)
token = data.get("telegram_token") or ""
chat = data.get("telegram_chat_id") or ""
if token and chat:
self.configure_telegram(token, chat)
except Exception:
logging.exception("[CONFIG] Failed to load config")
# === Network disabling ===
def _disable_network(self) -> None:
"""
Best-effort attempt to disable network connectivity across platforms.
Linux:
- Prefer `nmcli networking off`.
- Fallback: enumerate interfaces in `/sys/class/net` and `ip link set <iface> down` (excluding loopback).
macOS:
- Try `networksetup -listnetworkserviceorder` and disable listed services.
- Fallback: `networksetup -setairportpower en0 off` for Wi-Fi.
Windows:
- Powershell: `Disable-NetAdapter` for adapters with Status 'Up'.
All calls are fire-and-forget where possible to avoid blocking.
"""
import platform
system = platform.system().lower()
if system == "linux":
cmd = ["nmcli", "networking", "off"]
try:
subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
logging.info("[DROP] Executed Linux network disable via nmcli")
return
except FileNotFoundError:
interfaces = []
try:
interfaces = os.listdir("/sys/class/net")
except Exception:
pass
for iface in interfaces:
if iface == "lo":
continue
try:
subprocess.Popen(["ip", "link", "set", iface, "down"],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
logging.info("[DROP] Disabled interface %s via ip link", iface)
except Exception:
pass
return
elif system == "darwin":
try:
services_out = subprocess.check_output(
["/usr/sbin/networksetup", "-listnetworkserviceorder"],
stderr=subprocess.DEVNULL
).decode("utf-8")
import re
matches = re.findall(r"\(Device: ([^)]+)\)", services_out)
if matches:
for dev in matches:
if dev == "lo0":
continue
try:
subprocess.Popen(
["/usr/sbin/networksetup", "-setnetworkserviceenabled", dev, "off"],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
)
logging.info("[DROP] Disabled macOS network service %s", dev)
except Exception:
pass
return
except Exception:
pass
try:
subprocess.Popen(
["/usr/sbin/networksetup", "-setairportpower", "en0", "off"],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
)
logging.info("[DROP] Disabled macOS Wi-Fi via networksetup")
except Exception:
pass
elif system == "windows":
try:
ps_script = (
"Get-NetAdapter | Where-Object { $_.Status -eq 'Up' } | "
"Disable-NetAdapter -Confirm:$false"
)
subprocess.Popen(
["powershell", "-NoProfile", "-Command", ps_script],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
)
logging.info("[DROP] Executed Windows network disable via PowerShell")
except Exception:
pass
else:
logging.warning("[DROP] Unsupported OS for network disable: %s", system)
def _get_local_ip(self) -> str:
"""Best-effort local IPv4 detection (UDP connect to 8.8.8.8; fallback to hostname resolution)."""
ip = ""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
s.close()
except Exception:
try:
ip = socket.gethostbyname(socket.gethostname())
except Exception:
ip = ""
return ip
# === Start/stop ===
def start_monitoring(self, paths: list[str]):
"""
Start observers for the given directories.
If monitoring is already active, it resets state (UI buffer, queues, debounce filter,
ransomware detector) to start fresh.
"""
if self.monitoring_active:
self.stop_monitoring()
with self.ui_lock:
self.ui_events.clear()
self.seq = 0
self.event_queue = queue.Queue(maxsize=5000)
self.debounce_filter = DebounceFilter(debounce_seconds=2.0)
self.ransomware_detector = RansomwareDetector(
threshold=100,
window_seconds=60,
callback=self._on_ransomware_detected,
)
self.monitoring_active = True
for path in paths:
if not os.path.isdir(path):
logging.warning("[WATCH] Skip non-existing dir: %s", path)
continue
handler = EventProcessor(
event_queue=self.event_queue,
exclude_patterns=self.exclude_patterns,
log_path=self.log_path,
telegram_notifier=self.telegram_notifier,
zabbix_notifier=self.zabbix_notifier,
ransomware_detector=self.ransomware_detector,
debounce_filter=self.debounce_filter,
ui_push=self._push_ui,
)
obs = Observer()
obs.schedule(handler, path, recursive=True)
obs.start()
self.observers.append(obs)
logging.info("[WATCH] Started: %s", path)
def stop_monitoring(self):
"""
Stop all observers and wait briefly for them to exit.
Leaves notifiers and configuration intact.
"""
for obs in self.observers:
try:
obs.stop()
except Exception:
pass
for obs in self.observers:
try:
obs.join(timeout=2.0)
except Exception:
pass
self.observers.clear()
self.monitoring_active = False
logging.info("[WATCH] Stopped all observers")
index.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>File System Monitor</title>
<style>
:root {
--color-bg-primary: #000000;
--color-bg-secondary: #0a0a0a;
--color-bg-card: #3d3d3d;
--color-bg-hover: #1a1a1a;
--color-border: #262626;
--color-border-light: #333333;
--color-text-primary: #ededed;
--color-text-secondary: #a1a1a1;
--color-text-muted: #666666;
--color-accent-blue: #3b82f6;
--color-accent-green: #10b981;
--color-accent-red: #ef4444;
--color-accent-yellow: #f59e0b;
--color-accent-purple: #8b5cf6;
--spacing-xs: 4px;
--spacing-sm: 8px;
--spacing-md: 16px;
--spacing-lg: 24px;
--spacing-xl: 32px;
--radius-sm: 6px;
--radius-md: 8px;
--radius-lg: 12px;
--shadow-sm: 0 1px 2px 0 rgba(0, 0, 0, 0.3);
--shadow-md: 0 4px 6px -1px rgba(0, 0, 0, 0.4);
}
* {
box-sizing: border-box;
margin: 0;
padding: 0;
}
body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', 'Roboto', 'Oxygen', 'Ubuntu', 'Cantarell', sans-serif;
background-color: var(--color-bg-primary);
color: var(--color-text-primary);
line-height: 1.6;
padding: var(--spacing-lg);
}
header {
background-color: var(--color-bg-card);
border: 1px solid var(--color-border);
border-radius: var(--radius-md);
padding: var(--spacing-lg) var(--spacing-xl);
margin-bottom: var(--spacing-xl);
}
h1 {
font-size: 28px;
font-weight: 600;
letter-spacing: -0.02em;
}
h2 {
font-size: 18px;
font-weight: 600;
margin-bottom: var(--spacing-lg);
letter-spacing: -0.01em;
}
h3 {
font-size: 14px;
font-weight: 600;
margin-top: var(--spacing-lg);
margin-bottom: var(--spacing-md);
color: var(--color-text-secondary);
text-transform: uppercase;
letter-spacing: 0.05em;
}
section {
background-color: var(--color-bg-card);
border: 1px solid var(--color-border);
border-radius: var(--radius-md);
padding: var(--spacing-xl);
margin-bottom: var(--spacing-lg);
}
input[type="text"] {
width: 100%;
padding: var(--spacing-sm) var(--spacing-md);
background-color: var(--color-bg-secondary);
border: 1px solid var(--color-border);
border-radius: var(--radius-sm);
color: var(--color-text-primary);
font-size: 14px;
transition: all 0.2s ease;
}
input[type="text"]:focus {
outline: none;
border-color: var(--color-accent-blue);
background-color: var(--color-bg-primary);
}
input[type="text"]::placeholder {
color: var(--color-text-muted);
}
button {
padding: var(--spacing-sm) var(--spacing-md);
background-color: var(--color-bg-secondary);
color: var(--color-text-primary);
border: 1px solid var(--color-border);
border-radius: var(--radius-sm);
font-size: 14px;
font-weight: 500;
cursor: pointer;
transition: all 0.2s ease;
}
button:hover {
background-color: var(--color-bg-hover);
border-color: var(--color-border-light);
}
button:active {
transform: translateY(1px);
}
#monitoring-settings button[onclick*="start"],
#integrations button {
background-color: var(--color-accent-blue);
border-color: var(--color-accent-blue);
color: white;
}
#monitoring-settings button[onclick*="start"]:hover,
#integrations button:hover {
background-color: #2563eb;
border-color: #2563eb;
}
.path-input {
display: flex;
gap: var(--spacing-sm);
margin-bottom: var(--spacing-sm);
}
.path-input input {
flex: 1;
}
.path-input button {
width: 36px;
padding: 0;
background-color: var(--color-bg-secondary);
color: var(--color-text-secondary);
}
.path-input button:hover {
background-color: var(--color-accent-red);
border-color: var(--color-accent-red);
color: white;
}
.paths {
margin-bottom: var(--spacing-md);
}
.mt-10 {
margin-top: var(--spacing-md);
}
.mb-10 {
margin-bottom: var(--spacing-md);
}
.w-full {
width: 100%;
}
label {
display: block;
font-size: 13px;
font-weight: 500;
color: var(--color-text-secondary);
margin-bottom: var(--spacing-xs);
}
.filter-controls {
display: flex;
flex-wrap: wrap;
gap: var(--spacing-md);
align-items: center;
margin-bottom: var(--spacing-md);
padding: var(--spacing-md);
background-color: var(--color-bg-secondary);
border-radius: var(--radius-sm);
}
.filter-controls label {
display: inline-flex;
align-items: center;
gap: var(--spacing-xs);
margin: 0;
cursor: pointer;
font-size: 14px;
color: var(--color-text-primary);
}
input[type="checkbox"] {
width: 16px;
height: 16px;
cursor: pointer;
accent-color: var(--color-accent-blue);
}
.action-buttons {
display: flex;
gap: var(--spacing-sm);
flex-wrap: wrap;
}
table {
width: 100%;
border-collapse: separate;
border-spacing: 0;
font-size: 13px;
}
thead {
position: sticky;
top: 0;
z-index: 10;
}
th {
background-color: var(--color-bg-secondary);
color: var(--color-text-secondary);
font-weight: 600;
text-align: left;
padding: var(--spacing-md);
border-bottom: 1px solid var(--color-border);
cursor: pointer;
user-select: none;
transition: background-color 0.2s ease;
white-space: nowrap;
font-size: 12px;
text-transform: uppercase;
letter-spacing: 0.05em;
}
th:hover {
background-color: var(--color-bg-hover);
}
th:first-child {
border-top-left-radius: var(--radius-sm);
}
th:last-child {
border-top-right-radius: var(--radius-sm);
}
td {
padding: var(--spacing-md);
border-bottom: 1px solid var(--color-border);
color: var(--color-text-primary);
}
tbody tr {
transition: background-color 0.15s ease;
}
tbody tr:hover {
background-color: var(--color-bg-hover);
}
th.sorted-asc::after,
th.sorted-desc::after {
margin-left: var(--spacing-xs);
color: var(--color-accent-blue);
font-size: 10px;
}
th.sorted-asc::after {
content: " ▲";
}
th.sorted-desc::after {
content: " ▼";
}
.alert {
color: var(--color-accent-red);
font-weight: 600;
}
#json-view {
display: none;
white-space: pre-wrap;
max-height: 600px;
overflow-y: auto;
background-color: var(--color-bg-secondary);
padding: var(--spacing-lg);
border: 1px solid var(--color-border);
border-radius: var(--radius-sm);
font-family: 'Monaco', 'Menlo', 'Courier New', monospace;
font-size: 12px;
line-height: 1.5;
color: var(--color-text-secondary);
}
td:nth-child(4)::before {
content: "●";
margin-right: var(--spacing-xs);
font-size: 10px;
}
tr:has(td:nth-child(4):contains("Create")) td:nth-child(4)::before {
color: var(--color-accent-green);
}
tr:has(td:nth-child(4):contains("Delete")) td:nth-child(4)::before {
color: var(--color-accent-red);
}
tr:has(td:nth-child(4):contains("Modify")) td:nth-child(4)::before {
color: var(--color-accent-blue);
}
tr:has(td:nth-child(4):contains("Move")) td:nth-child(4)::before {
color: var(--color-accent-yellow);
}
tr:has(td:nth-child(4):contains("Warning")) td:nth-child(4)::before {
color: var(--color-accent-purple);
}
::-webkit-scrollbar {
width: 8px;
height: 8px;
}
::-webkit-scrollbar-track {
background: var(--color-bg-secondary);
}
::-webkit-scrollbar-thumb {
background: var(--color-border-light);
border-radius: 4px;
}
::-webkit-scrollbar-thumb:hover {
background: var(--color-text-muted);
}
@media (max-width: 768px) {
body {
padding: var(--spacing-md);
}
section {
padding: var(--spacing-lg);
}
table {
font-size: 12px;
}
th, td {
padding: var(--spacing-sm);
}
}
</style>
</head>
<body>
<header>
<h1>File System Monitor</h1>
</header>
<section id="monitoring-settings">
<h2>Monitoring Settings</h2>
<div class="paths" id="paths-container">
<div class="path-input">
<input type="text" placeholder="Enter folder path" class="path-field">
<button type="button" class="remove-path" onclick="removePath(this)">–</button>
</div>
</div>
<button type="button" onclick="addPath()">Add path</button>
<div class="mt-10">
<label>Exclude files/formats (comma-separated, e.g. .log,.tmp):</label>
<input type="text" id="exclude-patterns" class="w-full" placeholder="E.g.: .log,.json,temp">
</div>
<div class="mt-10 action-buttons">
<button type="button" onclick="startMonitoring()">Start monitoring</button>
<button type="button" onclick="stopMonitoring()">Stop monitoring</button>
</div>
</section>
<section id="integrations">
<h2>Integrations</h2>
<h3>Telegram</h3>
<label>Bot Token:</label>
<input type="text" id="tg-token" class="w-full" placeholder="Enter Telegram bot token">
<label style="margin-top: 12px;">Chat ID:</label>
<input type="text" id="tg-chat" class="w-full" placeholder="Enter chat ID">
<div style="margin-top: 16px;">
<button type="button" onclick="configureTelegram()">Configure Telegram</button>
</div>
<h3>Zabbix</h3>
<label>Server:</label>
<input type="text" id="zbx-server" class="w-full" placeholder="Zabbix server address">
<label style="margin-top: 12px;">Host:</label>
<input type="text" id="zbx-host" class="w-full" placeholder="Host name">
<label style="margin-top: 12px;">Key:</label>
<input type="text" id="zbx-key" class="w-full" placeholder="Data key">
<div style="margin-top: 16px;">
<button type="button" onclick="configureZabbix()">Configure Zabbix</button>
</div>
</section>
<section id="event-log">
<h2>Event Log</h2>
<div class="filter-controls">
<label><input type="checkbox" class="filter" value="Create" checked> Create</label>
<label><input type="checkbox" class="filter" value="Delete" checked> Delete</label>
<label><input type="checkbox" class="filter" value="Modify" checked> Modify</label>
<label><input type="checkbox" class="filter" value="Move" checked> Move</label>
<label><input type="checkbox" class="filter" value="Warning" checked> Warnings</label>
</div>
<div class="mb-10" style="display: flex; gap: 8px; flex-wrap: wrap;">
<input type="text" id="search-path" placeholder="Filter by path" style="flex: 1; min-width: 200px;">
<div class="action-buttons">
<button type="button" onclick="downloadLog()">Download log</button>
<button type="button" onclick="toggleJSON()" id="toggle-json-btn">Show JSON</button>
</div>
</div>
<div style="overflow-x: auto;">
<table id="events-table">
<thead>
<tr>
<th data-col="seq" onclick="sortTable('seq')">No</th>
<th data-col="timestamp" onclick="sortTable('timestamp')">Date/Time</th>
<th>Interval</th>
<th data-col="action" onclick="sortTable('action')">Event</th>
<th data-col="path" onclick="sortTable('path')">Source</th>
<th data-col="dest_path" onclick="sortTable('dest_path')">Destination</th>
<th data-col="path_length" onclick="sortTable('path_length')">Path length</th>
<th data-col="user_name" onclick="sortTable('user_name')">User</th>
<th data-col="user_ip" onclick="sortTable('user_ip')">IP</th>
</tr>
</thead>
<tbody id="events-body">
</tbody>
</table>
</div>
<div id="json-view"></div>
</section>
<script>
let eventsBuffer = [];
let showJSON = false;
let lastSeq = -1;
let currentSortColumn = null;
let currentSortAsc = true;
const POLL_INTERVAL_MS = 5000;
let polling = false;
let pollErrorBackoff = 0;
function addPath() {
const container = document.getElementById('paths-container');
const div = document.createElement('div');
div.className = 'path-input';
div.innerHTML = `<input type="text" placeholder="Enter folder path" class="path-field"><button type="button" class="remove-path" onclick="removePath(this)">–</button>`;
container.appendChild(div);
}
function removePath(btn) {
const div = btn.parentElement;
div.remove();
}
function startMonitoring() {
const pathFields = document.querySelectorAll('.path-field');
const paths = [];
pathFields.forEach(field => {
const value = field.value.trim();
if (value) paths.push(value);
});
const excludes = document.getElementById('exclude-patterns').value.split(',').map(p => p.trim()).filter(p => p);
fetch('/api/start_monitoring', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ paths: paths, excludes: excludes })
}).then(resp => resp.json()).then(() => {
lastSeq = -1;
eventsBuffer = [];
currentSortColumn = null;
currentSortAsc = true;
clearSortIndicators();
redrawAll();
startPolling();
});
}
function stopMonitoring() {
fetch('/api/stop_monitoring', { method: 'POST' })
.then(resp => resp.json())
.then(() => {
stopPolling();
console.log('Monitoring stopped');
});
}
function configureTelegram() {
const token = document.getElementById('tg-token').value.trim();
const chat = document.getElementById('tg-chat').value.trim();
fetch('/api/configure_telegram', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ token: token, chat_id: chat })
}).then(resp => resp.json()).then(() => {
alert('Telegram configured');
try {
localStorage.setItem('tg_token', token);
localStorage.setItem('tg_chat', chat);
} catch (e) {}
});
}
function configureZabbix() {
const server = document.getElementById('zbx-server').value.trim();
const host = document.getElementById('zbx-host').value.trim();
const key = document.getElementById('zbx-key').value.trim();
fetch('/api/configure_zabbix', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ server: server, host: host, key: key })
}).then(resp => resp.json()).then(() => {
alert('Zabbix configured');
});
}
function downloadLog() {
window.location.href = '/api/download_log';
}
function toggleJSON() {
showJSON = !showJSON;
const jsonView = document.getElementById('json-view');
const table = document.getElementById('events-table');
const btn = document.getElementById('toggle-json-btn');
if (showJSON) {
table.style.display = 'none';
jsonView.style.display = 'block';
btn.textContent = 'Hide JSON';
jsonView.textContent = JSON.stringify(eventsBuffer.slice().reverse(), null, 2);
} else {
table.style.display = 'table';
jsonView.style.display = 'none';
btn.textContent = 'Show JSON';
}
}
async function fetchEventsOnce() {
const url = lastSeq >= 0 ? `/api/events?since_seq=${lastSeq}` : '/api/events';
const resp = await fetch(url);
const data = await resp.json();
const arr = Array.isArray(data?.events) ? data.events : (Array.isArray(data) ? data : []);
if (!arr || !arr.length) return;
let maxSeq = lastSeq;
arr.forEach(ev => {
eventsBuffer.push(ev);
if (typeof ev.seq === 'number' && ev.seq > maxSeq) {
maxSeq = ev.seq;
}
});
lastSeq = maxSeq;
redrawAll();
}
function startPolling() {
if (polling) return;
polling = true;
pollErrorBackoff = 0;
(async function loop() {
try {
await fetchEventsOnce();
pollErrorBackoff = 0;
} catch (e) {
pollErrorBackoff = Math.min((pollErrorBackoff || 1000) * 2, 8000);
} finally {
if (polling) {
const delay = POLL_INTERVAL_MS + (pollErrorBackoff || 0);
setTimeout(loop, delay);
}
}
})();
}
function stopPolling() {
polling = false;
}
function sortTable(column) {
if (currentSortColumn === column) {
currentSortAsc = !currentSortAsc;
} else {
currentSortColumn = column;
currentSortAsc = true;
}
setSortIndicator(column, currentSortAsc);
redrawAll();
}
function ipToNumber(ip) {
if (!ip) return 0;
const parts = ip.split('.').map(n => parseInt(n, 10) || 0);
return ((parts[0] << 24) >>> 0) + ((parts[1] << 16) >>> 0) + ((parts[2] << 8) >>> 0) + (parts[3] >>> 0);
}
function parseTimestamp(ts) {
if (!ts) return 0;
const parsed = Date.parse(ts.replace(' ', 'T'));
return isNaN(parsed) ? 0 : parsed;
}
function getSortValue(ev, col) {
switch (col) {
case 'seq': return typeof ev.seq === 'number' ? ev.seq : 0;
case 'timestamp': return parseTimestamp(ev.timestamp || '');
case 'action': return (ev.action || ev.category || '').toLowerCase();
case 'path': return (ev.path || '').toLowerCase();
case 'dest_path': return (ev.dest_path || '').toLowerCase();
case 'path_length': {
const p = ev.dest_path && ev.dest_path.length ? ev.dest_path : (ev.path || '');
return (p || '').length;
}
case 'user_name': return (ev.user_name || '').toLowerCase();
case 'user_ip': return ipToNumber(ev.user_ip || '');
default: return 0;
}
}
function compareEvents(a, b) {
if (!currentSortColumn) return 0;
const va = getSortValue(a, currentSortColumn);
const vb = getSortValue(b, currentSortColumn);
if (typeof va === 'number' && typeof vb === 'number') {
if (va < vb) return currentSortAsc ? -1 : 1;
if (va > vb) return currentSortAsc ? 1 : -1;
return 0;
} else {
const sa = String(va || '');
const sb = String(vb || '');
const cmp = sa.localeCompare(sb, undefined, { numeric: true, sensitivity: 'base' });
return currentSortAsc ? cmp : -cmp;
}
}
function clearSortIndicators() {
document.querySelectorAll('th[data-col]').forEach(th => {
th.classList.remove('sorted-asc', 'sorted-desc');
});
}
function setSortIndicator(column, asc) {
clearSortIndicators();
const th = document.querySelector(`th[data-col="${column}"]`);
if (th) th.classList.add(asc ? 'sorted-asc' : 'sorted-desc');
}
function redrawAll() {
const tbody = document.getElementById('events-body');
tbody.innerHTML = '';
const filters = Array.from(document.querySelectorAll('.filter:checked')).map(cb => cb.value);
const search = document.getElementById('search-path').value.toLowerCase();
const filtered = eventsBuffer.filter(ev => {
const category = ev.category || ev.action || '';
if (!filters.includes(category)) return false;
const combinedPath = ((ev.path || '') + ' ' + (ev.dest_path || '')).toLowerCase();
if (search && combinedPath.indexOf(search) === -1) return false;
return true;
});
const displayList = filtered.slice();
if (currentSortColumn) {
displayList.sort(compareEvents);
}
let lastShownTimestamp = null;
let rowNumber = 0;
displayList.forEach(ev => {
rowNumber++;
const tr = document.createElement('tr');
let interval = '';
if (lastShownTimestamp) {
const prevDate = new Date(lastShownTimestamp.replace(' ', 'T'));
const currentDate = new Date((ev.timestamp || '').replace(' ', 'T'));
if (!isNaN(prevDate) && !isNaN(currentDate)) {
const diff = (currentDate - prevDate) / 1000;
interval = diff.toFixed(1) + ' s';
}
}
lastShownTimestamp = ev.timestamp;
const pathForLen = ev.dest_path && ev.dest_path.length ? ev.dest_path : (ev.path || '');
const pathLen = (pathForLen || '').length;
const cells = [
rowNumber,
ev.timestamp || '',
interval,
ev.action || '',
ev.path || '',
ev.dest_path || '',
pathLen,
ev.user_name || '',
ev.user_ip || ''
];
cells.forEach(c => {
const td = document.createElement('td');
td.textContent = c;
if (ev.alert) td.classList.add('alert');
tr.appendChild(td);
});
tbody.appendChild(tr);
});
if (showJSON) {
const jsonView = document.getElementById('json-view');
jsonView.textContent = JSON.stringify(eventsBuffer.slice().reverse(), null, 2);
}
}
document.addEventListener('DOMContentLoaded', function () {
startPolling();
document.querySelectorAll('.filter').forEach(cb => {
cb.addEventListener('change', () => {
redrawAll();
});
});
const searchInput = document.getElementById('search-path');
if (searchInput) {
searchInput.addEventListener('input', () => {
redrawAll();
});
}
try {
const storedToken = localStorage.getItem('tg_token');
const storedChat = localStorage.getItem('tg_chat');
if (storedToken) document.getElementById('tg-token').value = storedToken;
if (storedChat) document.getElementById('tg-chat').value = storedChat;
} catch (e) {}
if (currentSortColumn) setSortIndicator(currentSortColumn, currentSortAsc);
});
function saveLocalData(key, value) {
try {
localStorage.setItem(key, JSON.stringify(value));
} catch (e) {}
}
function loadLocalData(key, fallback) {
try {
const val = JSON.parse(localStorage.getItem(key));
return val !== null ? val : fallback;
} catch (e) {
return fallback;
}
}
function persistZabbixInputs() {
const server = document.getElementById('zbx-server').value.trim();
const host = document.getElementById('zbx-host').value.trim();
const key = document.getElementById('zbx-key').value.trim();
saveLocalData('zabbix_config', { server, host, key });
}
function persistPaths() {
const paths = Array.from(document.querySelectorAll('.path-field'))
.map(i => i.value.trim())
.filter(Boolean);
const excludes = document.getElementById('exclude-patterns').value;
saveLocalData('watch_paths', { paths, excludes });
}
function restoreInputs() {
const zbx = loadLocalData('zabbix_config', {});
if (zbx.server) document.getElementById('zbx-server').value = zbx.server;
if (zbx.host) document.getElementById('zbx-host').value = zbx.host;
if (zbx.key) document.getElementById('zbx-key').value = zbx.key;
const wp = loadLocalData('watch_paths', {});
if (wp.paths && Array.isArray(wp.paths)) {
const container = document.getElementById('paths-container');
container.innerHTML = '';
wp.paths.forEach(p => {
const div = document.createElement('div');
div.className = 'path-input';
div.innerHTML = `<input type="text" value="${p}" class="path-field">
<button type="button" class="remove-path" onclick="removePath(this)">–</button>`;
container.appendChild(div);
});
}
if (wp.excludes) document.getElementById('exclude-patterns').value = wp.excludes;
}
document.addEventListener('input', function (e) {
const id = e.target.id;
if (id === 'zbx-server' || id === 'zbx-host' || id === 'zbx-key') {
persistZabbixInputs();
} else if (e.target.classList.contains('path-field') || id === 'exclude-patterns') {
persistPaths();
}
});
document.addEventListener('DOMContentLoaded', function () {
restoreInputs();
});
</script>
</body>
</html>
app.py
from flask import Flask, request, jsonify, send_file, render_template, send_from_directory
import os
import threading
import time
from watcher import WatchManager
# Serve static files from the default "static" directory.
# We don't use Flask templates here — the index page is returned via send_file.
app = Flask(__name__, static_folder="static")
watch_manager = WatchManager()
@app.route("/")
def index():
"""
Return the main page.
We load `index.html` from the project root (next to this file) instead of using
`template_folder` to avoid errors if a `templates` directory is not present.
"""
return send_file(os.path.join(os.path.dirname(__file__), "index.html"))
@app.route("/api/start_monitoring", methods=["POST"])
def api_start_monitoring():
"""
Start monitoring for the given paths.
Body (JSON):
- paths: list[str] directories to watch (will be normalized to absolute paths)
- excludes: list[str] naive exclude patterns (substring or suffix starting with '.')
- log_path: str optional path to JSONL log file
"""
data = request.get_json(force=True) or {}
paths = data.get("paths", [])
excludes = data.get("excludes", [])
log_path = data.get("log_path")
# Ensure paths are absolute
normalized_paths = []
for p in paths:
if p:
normalized_paths.append(os.path.abspath(p))
if log_path:
watch_manager.set_log_path(log_path)
watch_manager.set_exclude_patterns(excludes)
watch_manager.start_monitoring(normalized_paths)
return jsonify({"status": "monitoring_started"})
@app.route("/api/stop_monitoring", methods=["POST"])
def api_stop_monitoring():
"""Stop all active observers."""
watch_manager.stop_monitoring()
return jsonify({"status": "monitoring_stopped"})
@app.route("/api/configure_telegram", methods=["POST"])
def api_configure_telegram():
"""
Configure Telegram notifier.
Body (JSON):
- token: str bot token
- chat_id: str|int chat id to send messages to
"""
data = request.get_json(force=True) or {}
token = data.get("token", "")
chat_id = data.get("chat_id", "")
watch_manager.configure_telegram(token, chat_id)
return jsonify({"status": "telegram_configured"})
@app.route("/api/configure_zabbix", methods=["POST"])
def api_configure_zabbix():
"""
Configure Zabbix trap sender.
Body (JSON):
- server: str Zabbix server address
- host: str Zabbix host (as defined in Zabbix)
- key: str item key to send values to
"""
data = request.get_json(force=True) or {}
server = data.get("server", "")
host = data.get("host", "")
key = data.get("key", "")
watch_manager.configure_zabbix(server, host, key)
return jsonify({"status": "zabbix_configured"})
@app.route("/api/events", methods=["GET"])
def api_get_events():
"""
Fetch recent UI events.
Query params:
- since_seq: int (optional) return events with seq > since_seq
- limit: int (default 200) maximum number of events to return
"""
since = request.args.get("since_seq", type=int) # may be None
limit = request.args.get("limit", 200, type=int)
events = watch_manager.get_events(since_seq=since, limit=limit)
return jsonify({"events": events})
@app.route("/api/download_log", methods=["GET"])
def api_download_log():
"""Provide the JSONL log file for download."""
log_path = watch_manager.log_path
if os.path.exists(log_path):
return send_file(log_path, as_attachment=True)
else:
return ("Log file not found", 404)
@app.route("/static/<path:path>")
def serve_static(path):
"""Serve files from the ./static directory."""
return send_from_directory(app.static_folder, path)
if __name__ == "__main__":
# Optionally pick port from the environment
port = int(os.environ.get("PORT", 5000))
app.run(host="127.0.0.1", port=port, threaded=True)