Вступление
Библиотека watchdog облегчает слежение за директорией: в минимальном варианте достаточно подписаться обработчиком на путь, запустить наблюдатель и принимать события[1]. Однако в реальных приложениях этого мало — редакторы могут триггерить несколько событий при сохранении одного файла, а системные вызовы не всегда возвращают полную информацию. Ниже описан набор усовершенствований, которые помогут построить надёжный слой мониторинга.
Зачем нужны метаданные
При получении уведомления о создании, изменении или удалении объекта важно знать, что это был за файл. Одного пути недостаточно: имя может поменяться, файл может быть перезаписан или быстро удалён.
Системный вызов os.stat() позволяет получить массу полезных атрибутов: размер файла (st_size), время последней модификации (st_mtime), номер inode (st_ino), идентификатор владельца (st_uid) и другое. Документация stat модуля поясняет, что ST_INO содержит «номер inode»[2], ST_UID — «идентификатор пользователя»[3], ST_SIZE — «размер в байтах»[4], а ST_MTIME фиксирует время последней модификации[5].
Эти четыре значения помогают однозначно идентифицировать файл и выявлять изменения даже тогда, когда имя не меняется.
Дополнительный уровень — частичная SHA‑256. Стандартный пример из hashlib показывает, как создать объект sha256, передавать в него байты и получить хеш в шестнадцатеричном виде[6].
В нашем случае полное хеширование больших файлов нецелесообразно, поэтому мы ограничиваемся HASH_MAX_BYTES: считываем первые несколько мегабайт (по умолчанию 2 MiB) и вычисляем их хеш. Если лимит равен нулю — хеширование выключается. Этого достаточно для обнаружения изменений содержимого на уровне «первые байты изменились».
Датакласс FsEvent и новые поля
Чтобы удобно передавать и логировать информацию о событиях, введён датакласс FsEvent. Он хранит:
- ts — время события в секундах от эпохи;
- action — тип действия: created, deleted, modified, moved или summary;
- path — путь к объекту;
- is_dir — флаг, папка это или файл;
- метаданные: size, mtime, inode, uid, hash_sha256;
- служебные поля: new_path (для перемещений), summary_count (для сводок), source ( event или resync ), batch_id, seq, extra.
Поле batch_id позволяет группировать события, полученные при одном «рескане» (например, при начальной индексации).
Функция new_batch_id() возвращает новую строку UUID, а глобальная переменная _current_batch_id содержит id текущей пачки.
Монотонный счётчик _seq повышается при каждом создании события через функцию next_seq(); это гарантирует строгий порядок событий, даже если часы неточны.
Поле source показывает, откуда пришла информация: event — реальное событие от файловой системы, resync — событие, созданное в ходе инициального обхода (seed_index()), например при запуске программы.
Наконец, extra представляет собой произвольный словарь для дополнительных сведений (например, meta_source — откуда взялись метаданные).
Такой объект удобно сериализовать функцией asdict() и затем выводить или записывать в файл — именно это и делает наш новый слой логирования.
Сбор метаданных: collect_fs_meta() и collect_fs_meta_with_source()
Функция collect_fs_meta(p, is_dir_hint=None) вызывает safe_stat(), которая оборачивает os.stat() и аккуратно обрабатывает ошибки. Возвращается словарь с ключами is_dir, size, mtime, inode, uid и hash_sha256. Если объект — каталог, размер не важен и ставится None.
Если хеширование включено, вызывается compute_sha256_limited(), которая считывает первые HASH_MAX_BYTES данных и обновляет объект hashlib.sha256() через метод update()[6]; в конце берётся hexdigest() для строки.
Вторая функция, collect_fs_meta_with_source(), выбирает источник метаданных. Она сначала пробует stat: если хотя бы один атрибут (например, mtime или inode) заполнен — метаданные берутся из файловой системы, а источник равен stat. Если stat вернул ничего, функция ищет запись в LRU‑кэше last_known: тогда данные берутся из кэша, а источник cache. В противном случае возвращается минимальная форма, где заполнен только is_dir, а источник none. Этот флаг записывается в поле meta_source внутри extra события.
LRU‑кэш метаданных
Файл может быть удалён или перемещён, и после этого os.stat() вернёт ошибку. Чтобы всё же знать его размер и inode, PyWatch хранит последние известные метаданные в структуре last_known. Это OrderedDict, работающий по принципу «least recently used» (LRU). Как отмечает учебник по кэшированию, LRU‑кэш организует элементы в порядке их использования, и каждый раз, когда запись читают, она перемещается в начало; поэтому легко найти элемент, который давно не использовался, и удалить его[7].
Функции cache_put() и cache_get() добавляют и извлекают записи из last_known. При добавлении _lru_touch() переносит ключ в конец, обновляя порядок. Если число записей превышает LAST_KNOWN_MAX, самая старая запись удаляется (popitem(last=False)).
Функции cache_delete(), cache_delete_prefix() и cache_move_prefix() удаляют данные для конкретного пути, целого префикса или переносят ключи при перемещении каталога. Такой кэш нужен для двух задач: возвращать метаданные при удалении (когда stat недоступен) и не выполнять лишние системные вызовы для часто изменяемых файлов.
Унифицированный вывод: emit_event() и JSON/JSONL
Чтобы сохранять события в понятном формате и иметь возможность анализировать логи, был создан единый выходной путь. Функция emit_event(ev: FsEvent) выполняет три действия:
- Преобразует экземпляр FsEvent в словарь (
asdict(ev)), добавляя _stamp — строковую дату/время черезstamp()(для удобства); - Выводит JSON‑строку в стандартный вывод (можно читать через watchtail);
- Передаёт данные функции
write_jsonl(), которая дописывает строку в файл pywatch_events.jsonl рядом со скриптом.
Функция _maybe_rotate_log() следит за размером файла и, если он превышает LOG_MAX_BYTES, переименовывает текущий файл в *.1 и начинает новый. Такой подход напоминает поведение RotatingFileHandler из стандартного модуля logging: документация пишет, что при достижении лимита по размеру файл закрывается и переименовывается в app.log.1, app.log.2 и т.д., пока новый файл продолжает сбор записей[8].
В нашем простом случае мы храним только один бэкап (.1), но при желании можно расширить.
Журнал в формате JSONL (JSON Lines) удобен тем, что каждое событие записано одной строкой: это легко парсить, фильтровать и грузить в системы анализа. Дополнительные поля, такие как batch_id и seq, позволяют восстановить порядок и принадлежность событий к конкретному обходу.
Дебаунс: подавление лишних событий
Операционная система и редакторы кода могут генерировать несколько событий для одного изменения. Статья о debounce на Dev.to объясняет, что при сохранении файла редактор часто создаёт несколько событий подряд; без дебаунса обработчик может выполниться несколько раз, хотя изменение было одно[9].
Чтобы избежать этого, PyWatch использует словарь _last_mod_times: при каждом вызове on_modified() мы смотрим, когда этот путь обрабатывался в прошлый раз, и если прошло меньше, чем MODIFIED_DEBOUNCE_MS миллисекунд (по умолчанию 300 мс), просто игнорируем событие. Это уменьшает шум, особенно когда файл сохраняется несколькими операциями (создание временного файла, запись, переименование).
Изменения в обработчиках событий
Индексация: seed_index()
При запуске программы вызывается seed_index(root). Она присваивает новый batch_id, добавляет корневой путь в индекс live_paths и рекурсивно проходит по os.walk(). Для каждого каталога и файла:
- Приводит путь к абсолютному (
norm()). - Добавляет его в live_paths.
- Получает метаданные через
collect_fs_meta_with_source(). - Кладёт их в LRU‑кэш last_known (
cache_put()). - Создаёт событие
FsEvent(action="created", source="resync")с заполненными полями и вызываетemit_event().
Этот этап формирует базовый слепок файловой системы и заносит его в журнал JSONL, чтобы последующие изменения можно было соотнести с предыдущим состоянием.
Создание: on_created(event)
При создании объекта обработчик:
- Нормализует путь и заносит его в live_paths.
- Печатает в консоль сообщение
[Создано]. - Собирает метаданные (
collect_fs_meta_with_source()), складывает их в кэш и отправляет событие черезemit_event(). В extra пишется, откуда взялись метаданные (meta_source). - Каталог: в pending_dir_moves запоминается пара (время, новое имя), в pending_move_counts — сколько потомков переместилось. Все записи под старым префиксом в live_paths обновляются на новый путь. Затем вызывается
cache_move_prefix()для переноса кэшированных метаданных на новые ключи. Создаётся событиеFsEvent(action="moved", new_path=dst, is_dir=True)с актуальными метаданными и в extra указывается количество потомков. Наконец, запускается таймер_arm_move_summary_timer(), который через COALESCE_MS секунд сформирует сводку (summary) о перемещении и очистит состояние. - Файл: если родительская папка помечена как перемещаемая, событие не выводится сразу, а учитывается в счётчике pending_move_counts; иначе выводится лог
[Перемещено], собираются новые метаданные, кэш обновляется (cache_put()для нового пути иcache_delete()для старого), и отправляется событиеFsEvent(action="moved", new_path=dst, is_dir=False).
Изменение: on_modified(event)
Обработчик модификации теперь включает дебаунс: если событие повторяется слишком быстро, оно игнорируется. В остальных случаях процедура похожа на on_created() — мы обновляем live_paths, печатаем лог, собираем метаданные, кладём их в кэш и создаём событие FsEvent(action="modified") с заполненными полями.
Перемещение: on_moved(event)
Перемещение — самое сложное событие. Оно обрабатывается отдельно для каталогов и файлов:
Удаление: on_deleted(event)
При удалении каталога создаётся «могилка» в pending_dir_deletes, перечисляются все потомки из live_paths, они удаляются, и счётчик заполняется. Срабатывает лог [Удалено] с типом is_dir=True, а реальная сводка (FsEvent(action="summary")) появится после истечения окна коалесинга. Кэш очищается вызовом cache_delete_prefix().
Если удалён файл, его путь добавляется в recent_file_deletes.
Если родительская папка удаляется, то событие учитывается в её счётчике. В противном случае cache_get() возвращает последние метаданные (если они есть), в extra указывается meta_source="cache" или "none", путь удаляется из live_paths и кэша, и событие FsEvent(action="deleted") отправляется.
Диаграммы
on_modified()
│
▼
Нормализация пути, добавление времени сейчас
norm(), time()
│
┌──────────────────────────┴──────────────────────────┐
│ │
▼ ▼
НЕ прошло время с Прошло время с
последнего запуска последнего запуска
│ │
▼ ▼
Выход (исключение повторов) Запись в словарь запусков _last_mod_times
и в живые пути live_paths
│
▼
Метаданные
│ │
▼ ▼
Из stat Из кэша
collect_fs_meta_with_source() collect_fs_meta()
└───┬────┘
│
▼
Сохраняем кэш cache_put()
Формируем событие FsEvent
Выводим логи emit_event()
Записываем json write_jsonl()
Вывод
on_moved()
│
▼
Нормализация пути, добавление времени сейчас
norm(), time()
│
┌────────────────────────┴─────────────────────────┐
│ │
▼ ▼
Папка Файл
│ │
▼ ▼
Бежим по спискам файлов внутри папки Проверка, может ли файл быть объектом
и считаем их переносимой папки
│ ┌───────────────────┴───────────────────┐
▼ ▼ ▼
Метаданные Да Нет
│ │ │ │
▼ ▼ ▼ ▼
Из stat Из кэша Счетчик +1 Создаем событие
collect_fs_meta_with_source() collect_fs_meta() │ FsEvent
└───┬────┘ ▼ из кэша и stat
│ Таймер (re)start │
▼ │ ▼
Перенос кэша ▼ Вывод
Старого пути на новый по префиксу Метаданные
cache_move_prefix() │ │
Сохраняем кэш по новому пути ▼ ▼
cache_put() Из stat Из кэша
Создаем событие FsEvent └───┬────┘
│ ▼
▼ Удаление старого кэша
Запуск таймера и вывод Сохранение нового кэша
│
▼
FsEvent и выводon_deleted() - тот же процесс, что и в on_moved()
on_created() - ничего нового, только запись метаданных
Краткая памятка
Ниже приведён список основных переменных и функций с кратким описанием. Таблица предназначена для быстрого напоминания и не содержит подробностей.
| Имя | Что делает |
|---|---|
| COALESCE_MS | Таймаут (мс) для коалесинга удалений/перемещений |
| MODIFIED_DEBOUNCE_MS | Интервал подавления быстрых on_modified() |
| LOG_JSONL_PATH | Путь к файлу журнала JSONL |
| LOG_MAX_BYTES | Максимальный размер журнала, после которого происходит ротация |
| LAST_KNOWN_MAX | Максимальное число элементов в LRU‑кэше метаданных |
| HASH_MAX_BYTES | Сколько байт файла хешировать (0 — выключить) |
| _current_batch_id | Уникальный идентификатор текущего обхода (рескана) |
| _seq | Монотонный счётчик событий |
| last_known | LRU‑кэш метаданных (OrderedDict[path, LastMeta]) |
| MetaSource | Тип: "stat", "cache" или "none" |
| LastMeta | Датакласс: is_dir, size, mtime, inode, uid, hash_sha256 |
| new_batch_id() | Сгенерировать новый UUID для batch_id |
| next_seq() | Увеличить глобальный счётчик и вернуть номер |
| _lru_touch(key) | Переместить запись в конец LRU‑кэша |
| cache_put(path, meta) | Положить метаданные в кэш и выбросить старые записи |
| cache_get(path) | Получить метаданные из кэша, обновляя порядок |
| cache_delete(path) | Удалить запись о пути из кэша |
| cache_delete_prefix(prefix) | Удалить все записи под префиксом |
| cache_move_prefix(src_dir, dst_dir) | Перенести записи при перемещении каталога |
| FsEvent | Датакласс события с метаданными и служебной информацией |
| stamp() | Вернуть текущую дату/время строкой |
| norm(path) | Привести путь к абсолютному (с учётом символических ссылок) |
| is_under(child, parent) | Проверить, находится ли путь child внутри parent |
| safe_stat(p) | Вызвать os.stat() и вернуть результат или ошибку |
| compute_sha256_limited(p, limit) | Вычислить частичный SHA‑256 для файла |
| collect_fs_meta(p, is_dir_hint) | Собрать size/mtime/inode/uid/is_dir и хеш |
| collect_fs_meta_with_source(p, is_dir_hint) | То же, но с источником (stat/cache/none) |
| _maybe_rotate_log(path) | Проверить размер журнала и сделать ротацию |
| write_jsonl(payload) | Записать событие в JSONL‑файл |
| emit_event(ev) | Унифицированный вывод события: print + запись в JSONL |
| seed_index(root) | Первичный обход: заполнение live_paths, кэша и логирование |
| _emit_summary(dir_key) | Сформировать сводку по удалению каталога |
| _arm_summary_timer(dir_key) | Запустить/продлить таймер сводки удаления |
| _emit_move_summary(src_dir) | Сформировать сводку по перемещению каталога |
| _arm_move_summary_timer(src_dir) | Запустить/продлить таймер сводки перемещения |
| emit_move_event(old_path, new_path, is_dir) | Создать событие перемещения файла/каталога |
| MyHandler.on_created() | Обработчик создания файлов/директорий |
| MyHandler.on_modified() | Обработчик изменения с дебаунсом |
| MyHandler.on_moved() | Обработчик перемещения с коалесингом и кэшем |
| MyHandler.on_deleted() | Обработчик удаления с коалесингом и кэшем |
Итоговый код
Все перечисленные функции и структуры собраны в едином модуле.
В конце статьи приведён полный код с комментариями.
import time
import datetime
from datetime import datetime as dt
import threading
from pathlib import Path
from collections import deque, OrderedDict
import os
import json
import hashlib
import uuid
import sys
from dataclasses import dataclass, asdict, field
from typing import Optional, Literal, Any, Tuple
from io import TextIOWrapper
import errno
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
# ============================================================================
# НАСТРОЙКИ
# ============================================================================
COALESCE_MS = 300
# Дебаунс для on_modified (мс)
MODIFIED_DEBOUNCE_MS = 300
# Лимиты журнала JSONL
LOG_JSONL_PATH = str(Path(__file__).with_name("pywatch_events.jsonl"))
LOG_MAX_BYTES = 20 * 1024 * 1024 # 20 MiB
# Лимит кэша метаданных (LRU)
LAST_KNOWN_MAX = 100_000
# Включение/лимит хеша: если 0 -> хеширование выключено; иначе считаем до limit байт
HASH_MAX_BYTES = 2 * 1024 * 1024 # 2 MiB по умолчанию;
# ============================================================================
# СОСТОЯНИЕ
# ============================================================================
# --- Коалесинг удалений ---
pending_dir_deletes: dict[str, float] = {}
pending_dir_counts: dict[str, int] = {}
pending_dir_timers: dict[str, threading.Timer] = {}
recent_file_deletes: deque[tuple[str, float]] = deque()
# --- Коалесинг перемещений ---
pending_dir_moves: dict[str, tuple[float, str]] = {}
pending_move_counts: dict[str, int] = {}
pending_move_timers: dict[str, threading.Timer] = {}
recent_file_moves: deque[tuple[str, float]] = deque()
live_paths: set[str] = set()
lock = threading.Lock()
_current_batch_id = uuid.uuid4().hex
'''
Когда мы запускаем рескан всем событиям присваивается уникальное значение
uuid.uuid4() - Создание случайного 128-битного уникального значения //d5aef09f-9a28-4d35-9cb5-df13e4167b41
.hex - Преобразование значения в 16ричное представление без дефисов 32 символа //d5aef09f9a284d359cb5df13e4167b41
'''
_seq = 0
'''
Счетчик, хранит в себе номер последнего события, когда мы создаем FsEvent и срабатывает next_seq()
'''
MetaSource = Literal["stat", "cache", "none"]
'''
Кэш последних известных метаданных (LRU)
Lireral - Литерал, то есть неизменяемое значение строк метаданных stat из os.stat() и cache из кэша last_known
'''
@dataclass
class LastMeta:
is_dir: bool
size: Optional[int]
mtime: Optional[float]
inode: Optional[int]
uid: Optional[int]
hash_sha256: Optional[str]
'''
Создание класса LastMeta
@dataclass - Декоратор, который автоматически генерирует рутинные методы для класса с данными
Optional - Или значение в [] или None
'''
last_known: "OrderedDict[str, LastMeta]" = OrderedDict()
'''
Поддержка лимита кэша. Добавлние нового и удаление старого.
OrderedDict - Подкласс словаря dict, но в отличие от него имеет дополнительные несколько методов. Принимаем ключ строку (путь) и значение из LastMeta
'''
# для дебаунса on_modified
_last_mod_times: dict[str, float] = {}
# ошибки лога (чтобы не спамить)
_log_err_once = False # печатать ошибки записи в лог только один раз
def new_batch_id() -> str:
return uuid.uuid4().hex
''' Генерация нового batch_id (можно дергать при ресканах или наружных группировках). '''
def next_seq() -> int:
global _seq
with lock:
_seq += 1
return _seq
''' Счетчик событий '''
def _lru_touch(key: str):
if key in last_known:
last_known.move_to_end(key)
''' Переместить элемент в конец (самый свежий). '''
def cache_put(path: str, meta: dict):
with lock:
last_known[path] = LastMeta(
is_dir=bool(meta.get("is_dir")),
size=meta.get("size"),
mtime=meta.get("mtime"),
inode=meta.get("inode"),
uid=meta.get("uid"),
hash_sha256=meta.get("hash_sha256"),
)
_lru_touch(path)
while len(last_known) > LAST_KNOWN_MAX:
last_known.popitem(last=False)
''' Положить метаданные в кэш, сравнить заполненность словаря с лимитами и удалить старые данные (LRU - Least Recently Used) '''
def cache_get(path: str) -> Optional[LastMeta]:
with lock:
lm = last_known.get(path)
if lm:
_lru_touch(path)
return lm
''' Вывод кэша LastMeta из last_known '''
def cache_delete(path: str):
with lock:
last_known.pop(path, None)
''' Удаление кэша LastMeta из last_known '''
def cache_delete_prefix(prefix: str) -> int:
root = norm(prefix).rstrip(os.sep)
deleted = 0
with lock:
keys = list(last_known.keys())
for k in keys:
if k == root or is_under(k, root):
last_known.pop(k, None)
deleted += 1
return deleted
'''
Составляем словарь путей по ключам префикса на удаление, удаляем, считаем удаленные
last_known.keys() - Делает снимок ключей, нельзя итерироваться по OrderedDict и менять его одновременно
'''
def cache_move_prefix(src_dir: str, dst_dir: str):
with lock:
# Собираем затронутые ключи заранее (нельзя менять OrderedDict во время итерации)
affected = [k for k in list(last_known.keys()) if k == src_dir or k.startswith(src_dir + os.sep)]
for old_key in affected:
rel = os.path.relpath(old_key, src_dir)
if rel == ".":
new_key = dst_dir
else:
new_key = norm(os.path.join(dst_dir, rel))
meta = last_known.pop(old_key)
last_known[new_key] = meta
_lru_touch(new_key)
'''
Перенос ключей кэша при перемещении директории.
'''
# ============================================================================
# ДАТАКЛАСС СОБЫТИЯ
# ============================================================================
Action = Literal["created", "deleted", "modified", "moved", "summary"]
Source = Literal["event", "resync"]
'''
Напоминание: Literal - неизменяемое значение строк метаданных
'''
@dataclass
class FsEvent:
ts: float
action: Action
path: str
is_dir: bool
# метаданные
size: Optional[int] = None
mtime: Optional[float] = None
inode: Optional[int] = None
uid: Optional[int] = None
hash_sha256: Optional[str] = None
# для moved/summary
new_path: Optional[str] = None
summary_count: Optional[int] = None # для сводок (сколько объектов затронуто)
# служебные
source: Source = "event"
batch_id: str = field(default_factory=lambda: _current_batch_id)
seq: int = field(default_factory=next_seq)
extra: dict[str, Any] = field(default_factory=dict)
'''
Жирнейший класс, включает в себя все возможные данные
'''
# ============================================================================
# ВСПОМОГАТЕЛЬНЫЕ ФУНКЦИИ
# ============================================================================
def stamp() -> str:
return dt.now().strftime("%Y-%m-%d %H:%M:%S")
def norm(path: str) -> str:
return str(Path(path).resolve())
def is_under(child: str, parent: str) -> bool:
try:
Path(child).resolve().relative_to(Path(parent).resolve())
return True
except Exception:
return False
def safe_stat(p: str) -> Tuple[Optional[os.stat_result], Optional[int]]:
try:
st = os.stat(p, follow_symlinks=False)
return st, None
except OSError as e:
return None, e.errno
except Exception:
return None, -1
'''
Предоставление данных о размере, правах доступа и времени модификации с отладкой ошибки, если есть проблемы с объектом
'''
def compute_sha256_limited(p: str, limit: int) -> Optional[str]:
if limit is None or limit <= 0:
return None
try:
if not os.path.isfile(p):
return None
h = hashlib.sha256()
remaining = limit
# читаем фиксированным чанком, но не выходим за limit
with open(p, "rb") as f:
chunk_size = 1024 * 1024 # 1 MiB
while remaining > 0:
to_read = min(chunk_size, remaining)
data = f.read(to_read)
if not data:
break
h.update(data)
remaining -= len(data)
return h.hexdigest()
except Exception:
return None
'''
Если это файл, то открываем в бинарном режиме и сканируем первый мегабайт, если он существует и существует в виде данных. Хэшируем эти данные
'''
def collect_fs_meta(p: str, is_dir_hint: Optional[bool] = None) -> dict:
st = safe_stat(p)
is_dir = False
size = None
mtime = None
inode = None
uid = None
if st:
is_dir = is_dir_hint if is_dir_hint is not None else (os.path.isdir(p))
# На некоторых FS директории имеют размер, но обычно это не то что нам нужно
size = None if is_dir else getattr(st, "st_size", None)
mtime = getattr(st, "st_mtime", None)
inode = getattr(st, "st_ino", None)
# st_uid есть не везде (Windows) — аккуратно
uid = getattr(st, "st_uid", None)
else:
# Если stat не прошел, но мы знаем из события, что это директория — проставим
if is_dir_hint is not None:
is_dir = is_dir_hint
hash_value = None
if not is_dir and size is not None and HASH_MAX_BYTES and HASH_MAX_BYTES > 0:
hash_value = compute_sha256_limited(p, HASH_MAX_BYTES)
return dict(
is_dir=is_dir,
size=size,
mtime=mtime,
inode=inode,
uid=uid,
hash_sha256=hash_value,
)
'''
Сбор size/mtime/inode/uid/is_dir и optional hash из safe_stat(), хэширует метаданные и выдает словарь этих данных
'''
def collect_fs_meta_with_source(p: str, is_dir_hint: Optional[bool] = None) -> tuple[dict, MetaSource]:
meta = collect_fs_meta(p, is_dir_hint=is_dir_hint)
# Если stat прошел, то хотя бы mtime/inode/uid где-то будут не None; для директорий is_dir=True
has_stat = meta["mtime"] is not None or meta["inode"] is not None or meta["uid"] is not None
if meta["is_dir"] is True:
has_stat = True # директория подтверждена
if has_stat:
src: MetaSource = "stat"
return meta, src
lm = cache_get(p)
if lm:
meta_from_cache = dict(
is_dir = lm.is_dir if is_dir_hint is None else bool(is_dir_hint),
size = lm.size,
mtime = lm.mtime,
inode = lm.inode,
uid = lm.uid,
hash_sha256 = lm.hash_sha256,
)
return meta_from_cache, "cache"
# ничего не знаем — минимальная форма
meta_blank = dict(
is_dir = bool(is_dir_hint),
size=None, mtime=None, inode=None, uid=None, hash_sha256=None
)
return meta_blank, "none"
'''
Сбор метаданных как в collect_fs_meta, но при недоступном stat — вернуться к кэшу.
'''
def _maybe_rotate_log(path: str):
try:
st = os.stat(path)
if st.st_size <= LOG_MAX_BYTES:
return
rotated = path + ".1"
if os.path.exists(rotated):
os.remove(rotated)
os.replace(path, rotated)
except FileNotFoundError:
return
except Exception as e:
global _log_err_once
if not _log_err_once:
print(f"{stamp()} [LOG_ROTATE_ERROR] {e}", file=sys.stderr)
_log_err_once = True
'''
Создание новых лог файлов при превышении лимита
'''
def write_jsonl(payload: dict):
try:
log_path = Path(LOG_JSONL_PATH)
log_path.parent.mkdir(parents=True, exist_ok=True)
_maybe_rotate_log(str(log_path))
with open(log_path, "a", encoding="utf-8") as f:
f.write(json.dumps(payload, ensure_ascii=False, separators=(",", ":")) + "\n")
except Exception as e:
global _log_err_once
if not _log_err_once:
print(f"{stamp()} [LOG_WRITE_ERROR] {e}", file=sys.stderr)
_log_err_once = True
'''
Записать событие в JSONL-файл с простой ротацией.
'''
def emit_event(ev: FsEvent):
try:
payload = asdict(ev)
payload["_stamp"] = stamp()
line = json.dumps(payload, ensure_ascii=False, separators=(",", ":"))
print(line)
write_jsonl(payload)
except Exception as e:
print(f"{stamp()} [EMIT_ERROR] {e} for event {ev}")
'''
Единая точка логирования: печатаем JSON одной строкой (удобно парсить), плюс пишем JSONL на диск.
'''
def seed_index(root: str):
root_abs = norm(root)
batch_id = new_batch_id()
with lock:
live_paths.add(root_abs)
# Отмечаем сам корень
meta, meta_src = collect_fs_meta_with_source(root_abs, is_dir_hint=True)
cache_put(root_abs, meta)
emit_event(FsEvent(
ts=time.time(),
action="created",
path=root_abs,
is_dir=meta["is_dir"],
size=meta["size"],
mtime=meta["mtime"],
inode=meta["inode"],
uid=meta["uid"],
hash_sha256=meta["hash_sha256"],
source="resync",
batch_id=batch_id,
extra={"meta_source": meta_src},
))
for dirpath, dirnames, filenames in os.walk(root_abs):
dp = norm(dirpath)
with lock:
live_paths.add(dp)
# Директории
for name in dirnames:
p = norm(os.path.join(dp, name))
with lock:
live_paths.add(p)
meta, meta_src = collect_fs_meta_with_source(p, is_dir_hint=True)
cache_put(p, meta)
emit_event(FsEvent(
ts=time.time(),
action="created",
path=p,
is_dir=meta["is_dir"],
size=meta["size"],
mtime=meta["mtime"],
inode=meta["inode"],
uid=meta["uid"],
hash_sha256=meta["hash_sha256"],
source="resync",
batch_id=batch_id,
extra={"meta_source": meta_src},
))
# Файлы
for name in filenames:
p = norm(os.path.join(dp, name))
with lock:
live_paths.add(p)
meta, meta_src = collect_fs_meta_with_source(p, is_dir_hint=False)
cache_put(p, meta)
emit_event(FsEvent(
ts=time.time(),
action="created",
path=p,
is_dir=meta["is_dir"],
size=meta["size"],
mtime=meta["mtime"],
inode=meta["inode"],
uid=meta["uid"],
hash_sha256=meta["hash_sha256"],
source="resync",
batch_id=batch_id,
extra={"meta_source": meta_src},
))
'''
Наполнение FsEvent метаданными папки/файла
'''
# ------------------------- УДАЛЕНИЯ -------------------------------
def _emit_summary(dir_key: str):
with lock:
count = pending_dir_counts.pop(dir_key, 0)
t = pending_dir_timers.pop(dir_key, None)
pending_dir_deletes.pop(dir_key, None)
if t:
t.cancel()
print(f"{stamp()} [Сводка] Удалена папка {dir_key}; вместе с ней удалено {count} объектов")
ev = FsEvent(
ts=time.time(),
action="summary",
path=dir_key,
is_dir=True,
summary_count=count,
source="event",
batch_id=_current_batch_id,
)
emit_event(ev)
cache_delete_prefix(dir_key)
def _arm_summary_timer(dir_key: str):
ttl = COALESCE_MS / 1000.0
with lock:
old = pending_dir_timers.get(dir_key)
if old:
old.cancel()
t = threading.Timer(ttl, _emit_summary, args=(dir_key,))
pending_dir_timers[dir_key] = t
t.start()
# ------------------------- ПЕРЕМЕЩЕНИЯ ----------------------------
def _emit_move_summary(src_dir: str):
with lock:
count = pending_move_counts.pop(src_dir, 0)
ts_dst = pending_dir_moves.pop(src_dir, None)
dst_dir = ts_dst[1] if ts_dst else None
t = pending_move_timers.pop(src_dir, None)
if t:
t.cancel()
if dst_dir:
print(f"{stamp()} [Сводка] Перемещена папка {src_dir} -> {dst_dir}; вместе с ней перемещено {count} объектов")
else:
print(f"{stamp()} [Сводка] Перемещена папка {src_dir}; вместе с ней перемещено {count} объектов")
ev = FsEvent(
ts=time.time(),
action="summary",
path=src_dir,
is_dir=True,
new_path=dst_dir,
summary_count=count,
source="event",
batch_id=_current_batch_id,
)
emit_event(ev)
if dst_dir:
cache_move_prefix(src_dir, dst_dir)
def _arm_move_summary_timer(src_dir: str):
ttl = COALESCE_MS / 1000.0
with lock:
old = pending_move_timers.get(src_dir)
if old:
old.cancel()
t = threading.Timer(ttl, _emit_move_summary, args=(src_dir,))
pending_move_timers[src_dir] = t
t.start()
def emit_move_event(old_path: str, new_path: str, is_dir: bool):
kind = 'Папка' if is_dir else "Файл"
with lock:
live_paths.discard(old_path)
live_paths.add(new_path)
print(f"{stamp()} [Перемещено] {kind}: {old_path} -> {new_path}")
meta_new = collect_fs_meta(new_path, is_dir_hint=is_dir)
ev = FsEvent(
ts=time.time(),
action="moved",
path=old_path,
new_path=new_path,
is_dir=is_dir,
size=meta_new["size"],
mtime=meta_new["mtime"],
inode=meta_new["inode"],
uid=meta_new["uid"],
hash_sha256=meta_new["hash_sha256"],
source="event",
batch_id=_current_batch_id,
)
emit_event(ev)
cache_put(new_path, meta_new)
cache_delete(old_path)
# ============================================================================
# ОБРАБОТЧИК СОБЫТИЙ
# ============================================================================
class MyHandler(FileSystemEventHandler):
def on_modified(self, event):
p = norm(event.src_path)
now = time.time()
# дебаунс
last = _last_mod_times.get(p, 0.0)
if (now - last) * 1000 < MODIFIED_DEBOUNCE_MS:
return
_last_mod_times[p] = now
kind = 'Папка' if event.is_directory else "Файл"
with lock:
live_paths.add(p)
print(f"{stamp()} [Изменено] {kind}: {p}")
meta, meta_src = collect_fs_meta_with_source(p, is_dir_hint=event.is_directory)
# обновляем кэш (объект существует)
cache_put(p, meta)
ev = FsEvent(
ts=now,
action="modified",
path=p,
is_dir=meta["is_dir"],
size=meta["size"],
mtime=meta["mtime"],
inode=meta["inode"],
uid=meta["uid"],
hash_sha256=meta["hash_sha256"],
source="event",
batch_id=_current_batch_id,
extra={"meta_source": meta_src},
)
emit_event(ev)
def on_moved(self, event):
src = norm(event.src_path)
dst = norm(event.dest_path)
now = time.time()
if event.is_directory:
kind = 'Папка'
cutoff = now - COALESCE_MS / 1000.0
absorbed_from_buffer = 0
with lock:
pending_dir_moves[src] = (now, dst)
pending_move_counts[src] = 0
global recent_file_moves
newbuf = deque()
for fp, ts in recent_file_moves:
if ts >= cutoff and is_under(fp, src):
absorbed_from_buffer += 1
else:
newbuf.append((fp, ts))
recent_file_moves = newbuf
descendants = [lp for lp in live_paths if lp != src and is_under(lp, src)]
count_index = len(descendants)
for lp in descendants:
rel = os.path.relpath(lp, src)
new_lp = norm(os.path.join(dst, rel))
live_paths.discard(lp)
live_paths.add(new_lp)
live_paths.discard(src)
live_paths.add(dst)
pending_move_counts[src] = count_index + absorbed_from_buffer
print(f"{stamp()} [Перемещено] {kind}: {src} -> {dst}")
# Структурное moved-событие для папки (одиночное), summary придет таймером
meta_new, meta_src = collect_fs_meta_with_source(dst, is_dir_hint=True)
# перенесем кэш под директорию
cache_move_prefix(src, dst)
cache_put(dst, meta_new)
emit_event(FsEvent(
ts=time.time(),
action="moved",
path=src,
new_path=dst,
is_dir=True,
size=meta_new["size"],
mtime=meta_new["mtime"],
inode=meta_new["inode"],
uid=meta_new["uid"],
hash_sha256=meta_new["hash_sha256"],
source="event",
batch_id=_current_batch_id,
extra={
"descendants_index_count": count_index,
"absorbed_from_buffer": absorbed_from_buffer,
"meta_source": meta_src
},
))
_arm_move_summary_timer(src)
return
# Файл
kind = 'Файл'
with lock:
recent_file_moves.append((src, now))
was_present = src in live_paths
live_paths.discard(src)
live_paths.add(dst)
parents = [d for d in pending_dir_moves.keys() if is_under(src, d)]
parent = max(parents, key=len) if parents else None
if parent and was_present:
pending_move_counts[parent] = pending_move_counts.get(parent, 0) + 1
if parent:
# не шумим — событие будет учтено в сводке
_arm_move_summary_timer(parent)
# Но структурный след о самом переносе файла сохраним (полезно для журнала)
meta_new, meta_src = collect_fs_meta_with_source(dst, is_dir_hint=False)
cache_put(dst, meta_new)
cache_delete(src)
emit_event(FsEvent(
ts=time.time(),
action="moved",
path=src,
new_path=dst,
is_dir=False,
size=meta_new["size"],
mtime=meta_new["mtime"],
inode=meta_new["inode"],
uid=meta_new["uid"],
hash_sha256=meta_new["hash_sha256"],
source="event",
batch_id=_current_batch_id,
extra={"coalesced_under": parent, "meta_source": meta_src},
))
return
print(f"{stamp()} [Перемещено] {kind}: {src} -> {dst}")
meta_new, meta_src = collect_fs_meta_with_source(dst, is_dir_hint=False)
cache_put(dst, meta_new)
cache_delete(src)
emit_event(FsEvent(
ts=time.time(),
action="moved",
path=src,
new_path=dst,
is_dir=False,
size=meta_new["size"],
mtime=meta_new["mtime"],
inode=meta_new["inode"],
uid=meta_new["uid"],
hash_sha256=meta_new["hash_sha256"],
source="event",
batch_id=_current_batch_id,
extra={"meta_source": meta_src},
))
def on_created(self, event):
p = norm(event.src_path)
kind = 'Папка' if event.is_directory else "Файл"
with lock:
live_paths.add(p)
print(f"{stamp()} [Создано] {kind}: {p}")
meta, meta_src = collect_fs_meta_with_source(p, is_dir_hint=event.is_directory)
cache_put(p, meta)
emit_event(FsEvent(
ts=time.time(),
action="created",
path=p,
is_dir=meta["is_dir"],
size=meta["size"],
mtime=meta["mtime"],
inode=meta["inode"],
uid=meta["uid"],
hash_sha256=meta["hash_sha256"],
source="event",
batch_id=_current_batch_id,
extra={"meta_source": meta_src},
))
def on_deleted(self, event):
p = norm(event.src_path)
if event.is_directory:
kind = 'Папка'
now = time.time()
cutoff = now - COALESCE_MS / 1000.0
absorbed_from_buffer = 0
with lock:
pending_dir_deletes[p] = now
pending_dir_counts[p] = 0
global recent_file_deletes
newbuf = deque()
for fp, ts in recent_file_deletes:
if ts >= cutoff and is_under(fp, p):
absorbed_from_buffer += 1
else:
newbuf.append((fp, ts))
recent_file_deletes = newbuf
descendants = [lp for lp in live_paths if lp != p and is_under(lp, p)]
count_index = len(descendants)
for lp in descendants:
live_paths.discard(lp)
live_paths.discard(p)
pending_dir_counts[p] = count_index + absorbed_from_buffer
print(f"{stamp()} [Удалено] {kind}: {p}")
# Отметим факт удаления папки
emit_event(FsEvent(
ts=time.time(),
action="deleted",
path=p,
is_dir=True,
source="event",
batch_id=_current_batch_id,
extra={"descendants_index_count": count_index, "absorbed_from_buffer": absorbed_from_buffer},
))
# чистим кэш под удаленной директорией
cache_delete_prefix(p)
_arm_summary_timer(p)
else:
kind = 'Файл'
abs_path = p
now = time.time()
with lock:
recent_file_deletes.append((abs_path, now))
was_present = abs_path in live_paths
live_paths.discard(abs_path)
parents = [d for d in pending_dir_deletes if is_under(abs_path, d)]
parent = max(parents, key=len) if parents else None
if parent and was_present:
pending_dir_counts[parent] = pending_dir_counts.get(parent, 0) + 1
# попробуем достать метаданные из кэша
lm = cache_get(abs_path)
extra = {"meta_source": "cache" if lm else "none"}
ev_kwargs = dict(
ts=time.time(),
action="deleted",
path=abs_path,
is_dir=False,
source="event",
batch_id=_current_batch_id,
extra=extra,
)
if lm:
ev_kwargs.update(dict(size=lm.size, mtime=lm.mtime, inode=lm.inode, uid=lm.uid, hash_sha256=lm.hash_sha256))
if parent:
_arm_summary_timer(parent)
ev_kwargs["extra"]["coalesced_under"] = parent
emit_event(FsEvent(**ev_kwargs))
cache_delete(abs_path)
return
print(f"{stamp()} [Удалено] {kind}: {abs_path}")
emit_event(FsEvent(**ev_kwargs))
cache_delete(abs_path)
# ============================================================================
# ТОЧКА ВХОДА
# ============================================================================
if __name__ == "__main__":
path = "."
seed_index(path)
event_handler = MyHandler()
observer = Observer()
observer.schedule(event_handler, path, recursive=True)
observer.start()
print(f"Слежение за {Path(path).resolve()} запущено. Нажмите Ctrl+C для выхода.")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()