Introduction
The watchdog library makes directory monitoring easier: in the minimal case, it’s enough to attach a handler to a path, start the observer, and receive events[1]. However, for real applications this isn’t enough — editors can trigger several events when saving a single file, and system calls don’t always return complete information. Below is a set of improvements that will help build a reliable monitoring layer.
Why metadata matters
When receiving a notification about creating, modifying, or deleting an object, it’s important to know what file it was. The path alone isn’t sufficient: the name may change, the file may be overwritten, or quickly deleted.
The system call os.stat() lets you fetch a lot of useful attributes: file size (st_size), last modification time (st_mtime), inode number (st_ino), owner ID (st_uid), and more. The stat module docs clarify that ST_INO contains the “inode number”[2], ST_UID is the “user ID”[3], ST_SIZE is the “size in bytes”[4], and ST_MTIME records the last modification time[5].
These four values help uniquely identify a file and detect changes even when the name doesn’t change.
An additional layer is a partial SHA-256. The standard hashlib example shows how to create a sha256 object, feed it bytes, and get a hex digest[6].
In our case, hashing entire large files isn’t practical, so we limit it with HASH_MAX_BYTES: we read the first few megabytes (2 MiB by default) and compute their hash. If the limit is zero, hashing is disabled. This is sufficient to detect content changes at the “first bytes changed” level.
The FsEvent dataclass and new fields
To conveniently pass and log event information, we’ve introduced the FsEvent dataclass. It stores:
- ts — event time in seconds since the epoch;
- action — action type: created, deleted, modified, moved, or summary;
- path — path to the object;
- is_dir — a flag indicating whether it’s a directory;
- metadata: size, mtime, inode, uid, hash_sha256;
- service fields: new_path (for moves), summary_count (for summaries), source (event or resync), batch_id, seq, extra.
The batch_id field lets you group events obtained during a single “rescan” (e.g., initial indexing).
The new_batch_id() function returns a new UUID string, and the global variable _current_batch_id stores the current batch id.
The monotonic counter _seq increments with every event created via next_seq(); this guarantees strict event order even if the clock is off.
The source field shows where the information came from: event — a real filesystem event, resync — an event created during the initial walk (seed_index()), e.g., at program startup.
Finally, extra is an arbitrary dictionary for additional details (for example, meta_source — where the metadata came from).
Such an object is easy to serialize with asdict() and then print or write to a file — that’s exactly what our new logging layer does.
Collecting metadata: collect_fs_meta() and collect_fs_meta_with_source()
The function collect_fs_meta(p, is_dir_hint=None) calls safe_stat(), which wraps os.stat() and handles errors gracefully. It returns a dict with keys is_dir, size, mtime, inode, uid and hash_sha256. If the object is a directory, the size is irrelevant and set to None.
If hashing is enabled, compute_sha256_limited() is called; it reads the first HASH_MAX_BYTES of data and updates a hashlib.sha256() object via update()[6]; finally, hexdigest() is taken to get the string.
The second function, collect_fs_meta_with_source(), chooses the metadata source. It first tries stat: if at least one attribute (e.g., mtime or inode) is filled, the metadata are taken from the filesystem and the source equals stat. If stat returns nothing, the function looks for a record in the LRU cache last_known: then the data are taken from the cache and the source is cache. Otherwise, a minimal form is returned where only is_dir is filled and the source is none. This flag is written to the meta_source field inside the event’s extra.
LRU metadata cache
A file can be deleted or moved, and after that os.stat() will raise an error. To still know its size and inode, PyWatch stores the last known metadata in the last_known structure. This is an OrderedDict working on a “least recently used” (LRU) basis. As a caching tutorial notes, an LRU cache keeps elements ordered by use, and each time a record is read it’s moved to the front; therefore it’s easy to find an element that hasn’t been used for a long time and remove it[7].
The cache_put() and cache_get() functions add and retrieve records from last_known. When adding, _lru_touch() moves the key to the end, updating the order. If the number of records exceeds LAST_KNOWN_MAX, the oldest record is removed (popitem(last=False)).
The cache_delete(), cache_delete_prefix(), and cache_move_prefix() functions delete data for a specific path, for an entire prefix, or move keys when a directory is moved. Such a cache is needed for two tasks: returning metadata on deletion (when stat is unavailable) and avoiding extra system calls for frequently changing files.
Unified output: emit_event() and JSON/JSONL
To store events in a clear format and be able to analyze logs, we created a single output path. The emit_event(ev: FsEvent) function performs three actions:
- Converts the FsEvent instance to a dict (
asdict(ev)), adding _stamp — a string date/time viastamp()(for convenience); - Prints the JSON string to stdout (you can read it via watchtail);
- Passes the data to
write_jsonl(), which appends the line to the pywatch_events.jsonl file next to the script.
The _maybe_rotate_log() function monitors the file size and, if it exceeds LOG_MAX_BYTES, renames the current file to *.1 and starts a new one. This approach is similar to the behavior of RotatingFileHandler from the standard logging module: the documentation states that when the size limit is reached, the file is closed and renamed to app.log.1, app.log.2, etc., while the new file continues collecting records[8].
In our simple case we keep only one backup (.1), but this can be extended if needed.
A JSONL (JSON Lines) log is convenient because each event is written on a single line: it’s easy to parse, filter, and load into analysis systems. Additional fields like batch_id and seq let you reconstruct order and event grouping for a specific scan.
Debounce: suppressing redundant events
The operating system and code editors can generate multiple events for a single change. A Dev.to article on debounce explains that when saving a file, an editor often creates several events in a row; without debouncing, the handler may run multiple times even though the change was singular[9].
To avoid this, PyWatch uses the _last_mod_times dictionary: on each on_modified() call we check when this path was last processed, and if less than MODIFIED_DEBOUNCE_MS milliseconds have passed (300 ms by default), we simply ignore the event. This reduces noise, especially when a file is saved via multiple operations (temporary file creation, write, rename).
Changes in event handlers
Indexing: seed_index()
When the program starts, seed_index(root) is called. It assigns a new batch_id, adds the root path to the live_paths index, and recursively walks via os.walk(). For each directory and file:
- Normalizes the path to an absolute one (
norm()). - Adds it to live_paths.
- Fetches metadata via
collect_fs_meta_with_source(). - Puts them into the LRU cache last_known (
cache_put()). - Creates an event
FsEvent(action="created", source="resync")with populated fields and callsemit_event().
This stage forms a baseline snapshot of the filesystem and records it into the JSONL log so that subsequent changes can be correlated with the previous state.
Creation: on_created(event)
When an object is created, the handler:
- Normalizes the path and adds it to live_paths.
- Prints a
[Created]message to the console. - Collects metadata (
collect_fs_meta_with_source()), stores them in the cache, and sends the event viaemit_event(). The extra field records where the metadata came from (meta_source). - Directory: pending_dir_moves remembers a pair (time, new name), and pending_move_counts stores how many children were moved. All entries under the old prefix in live_paths are updated to the new path. Then
cache_move_prefix()is called to move cached metadata to new keys. An eventFsEvent(action="moved", new_path=dst, is_dir=True)is created with up-to-date metadata, and extra contains the number of children. Finally,_arm_move_summary_timer()is started, which after COALESCE_MS seconds will form a summary of the move and clear the state. - File: if the parent folder is marked as being moved, the event isn’t printed immediately and is instead counted in pending_move_counts; otherwise a
[Moved]log is printed, new metadata are collected, the cache is updated (cache_put()for the new path andcache_delete()for the old), and an eventFsEvent(action="moved", new_path=dst, is_dir=False)is sent.
Modification: on_modified(event)
The modification handler now includes debounce: if the event repeats too quickly, it is ignored. Otherwise the procedure resembles on_created() — we update live_paths, print a log line, collect metadata, put them into the cache, and create an FsEvent(action="modified") with populated fields.
Move: on_moved(event)
Moving is the most complex event. It is handled separately for directories and files:
Deletion: on_deleted(event)
When a directory is deleted, a “graveyard” entry is created in pending_dir_deletes, all children from live_paths are enumerated and removed, and the counter is filled in. A [Deleted] log is triggered with is_dir=True, while the actual summary (FsEvent(action="summary")) will appear after the coalescing window expires. The cache is cleared by calling cache_delete_prefix().
If a file is deleted, its path is added to recent_file_deletes.
If the parent folder is being deleted, the event is counted in its counter. Otherwise, cache_get() returns the last metadata (if available), extra gets meta_source="cache" or "none", the path is removed from live_paths and the cache, and the FsEvent(action="deleted") event is sent.
Diagrams
on_modified()
│
▼
Path normalization, add current time
norm(), time()
│
┌──────────────────────────┴──────────────────────────┐
│ │
▼ ▼
Insufficient time since the last run Enough time since last run
│ │
▼ ▼
Return (debounce hit) Write to _last_mod_times
and to live paths live_paths
│
▼
Metadata
│ │
▼ ▼
From stat From cache
collect_fs_meta_with_source() collect_fs_meta()
└────┬────┘
│
▼
Save cache: cache_put()
Build event: FsEvent
Print logs: emit_event()
Append JSON: write_jsonl()
▼
Output
on_moved()
│
▼
Path normalization, add current time
norm(), time()
│
┌─────────────────────────┴─────────────────────────┐
│ │
▼ ▼
Directory File
│ │
▼ ▼
Iterate files inside the directory Check if the file belongs to a
and count them directory currently being moved
│ ┌───────────────────┴───────────────────┐
▼ ▼ ▼
Metadata Yes No
│ │ │ │
▼ ▼ ▼ ▼
From stat From cache Counter +1 Create event
collect_fs_meta_with_source() collect_fs_meta() │ FsEvent
└───────┬────────┘ ▼ from cache and stat
│ (Re)start timer │
▼ │ ▼
Move cache by ▼ Output
old prefix to the new one Metadata
cache_move_prefix() │ │
Save cache under new path ▼ ▼
cache_put() From stat From cache
Create FsEvent └────┬───┘
│ ▼
▼ Delete old cache
Start timer and output Save new cache
│
▼
FsEvent and output
on_deleted() - same process as in on_moved()
on_created() - nothing new, just write metadata
Quick Reference
Below is a list of key variables and functions with brief descriptions. The table is meant as a quick reminder and does not include details.
| Name | What it does |
|---|---|
| COALESCE_MS | Timeout (ms) for coalescing deletions/moves |
| MODIFIED_DEBOUNCE_MS | Debounce interval for rapid on_modified() |
| LOG_JSONL_PATH | Path to the JSONL log file |
| LOG_MAX_BYTES | Max log size after which rotation occurs |
| LAST_KNOWN_MAX | Max number of entries in the metadata LRU cache |
| HASH_MAX_BYTES | How many bytes of a file to hash (0 — disable) |
| _current_batch_id | Unique identifier of the current scan (rescan) |
| _seq | Monotonic event counter |
| last_known | LRU metadata cache (OrderedDict[path, LastMeta]) |
| MetaSource | Type: "stat", "cache", or "none" |
| LastMeta | Dataclass: is_dir, size, mtime, inode, uid, hash_sha256 |
| new_batch_id() | Generate a new UUID for batch_id |
| next_seq() | Increment the global counter and return the number |
| _lru_touch(key) | Move the entry to the end of the LRU cache |
| cache_put(path, meta) | Put metadata into the cache and evict old entries |
| cache_get(path) | Get metadata from the cache, updating order |
| cache_delete(path) | Delete the entry for a path from the cache |
| cache_delete_prefix(prefix) | Delete all entries under a prefix |
| cache_move_prefix(src_dir, dst_dir) | Move entries when a directory is moved |
| FsEvent | Event dataclass with metadata and service fields |
| stamp() | Return the current date/time as a string |
| norm(path) | Normalize path to absolute (resolving symlinks) |
| is_under(child, parent) | Check whether the child path is under the parent path |
| safe_stat(p) | Call os.stat() and return the result or an error |
| compute_sha256_limited(p, limit) | Compute a partial SHA-256 for a file |
| collect_fs_meta(p, is_dir_hint) | Collect size/mtime/inode/uid/is_dir and hash |
| collect_fs_meta_with_source(p, is_dir_hint) | Same, but with source (stat/cache/none) |
| _maybe_rotate_log(path) | Check log size and rotate |
| write_jsonl(payload) | Append an event to the JSONL file |
| emit_event(ev) | Unified event output: print + JSONL write |
| seed_index(root) | Initial scan: populate live_paths, cache, and log |
| _emit_summary(dir_key) | Produce a deletion summary for a directory |
| _arm_summary_timer(dir_key) | Start/extend the deletion summary timer |
| _emit_move_summary(src_dir) | Produce a move summary for a directory |
| _arm_move_summary_timer(src_dir) | Start/extend the move summary timer |
| emit_move_event(old_path, new_path, is_dir) | Create a move event for a file/directory |
| MyHandler.on_created() | Handler for file/directory creation |
| MyHandler.on_modified() | Modification handler with debounce |
| MyHandler.on_moved() | Move handler with coalescing and cache |
| MyHandler.on_deleted() | Deletion handler with coalescing and cache |
Final code
All the listed functions and structures are assembled in a single module.
The end of the article provides the full code with comments.
import time
import datetime
from datetime import datetime as dt
import threading
from pathlib import Path
from collections import deque, OrderedDict
import os
import json
import hashlib
import uuid
import sys
from dataclasses import dataclass, asdict, field
from typing import Optional, Literal, Any, Tuple
from io import TextIOWrapper
import errno
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
# ============================================================================
# SETTINGS
# ============================================================================
COALESCE_MS = 300
# Debounce for on_modified (ms)
MODIFIED_DEBOUNCE_MS = 300
# JSONL log limits
LOG_JSONL_PATH = str(Path(__file__).with_name("pywatch_events.jsonl"))
LOG_MAX_BYTES = 20 * 1024 * 1024 # 20 MiB
# Metadata cache limit (LRU)
LAST_KNOWN_MAX = 100_000
# Hash toggle/limit: if 0 -> hashing is disabled; otherwise hash up to `limit` bytes
HASH_MAX_BYTES = 2 * 1024 * 1024 # 2 MiB by default
# ============================================================================
# STATE
# ============================================================================
# --- Deletion coalescing ---
pending_dir_deletes: dict[str, float] = {}
pending_dir_counts: dict[str, int] = {}
pending_dir_timers: dict[str, threading.Timer] = {}
recent_file_deletes: deque[tuple[str, float]] = deque()
# --- Move coalescing ---
pending_dir_moves: dict[str, tuple[float, str]] = {}
pending_move_counts: dict[str, int] = {}
pending_move_timers: dict[str, threading.Timer] = {}
recent_file_moves: deque[tuple[str, float]] = deque()
live_paths: set[str] = set()
lock = threading.Lock()
_current_batch_id = uuid.uuid4().hex
'''
When we run a rescan, all events get a unique value.
uuid.uuid4() - Generate a random 128-bit UUID // d5aef09f-9a28-4d35-9cb5-df13e4167b41
.hex - Hex string without dashes (32 chars) // d5aef09f9a284d359cb5df13e4167b41
'''
_seq = 0
'''
Counter that holds the last event number; incremented via next_seq() when creating FsEvent.
'''
MetaSource = Literal["stat", "cache", "none"]
'''
Cache of last known metadata (LRU).
Literal — an immutable set of string values for metadata source: "stat" from os.stat(), "cache" from `last_known`.
'''
@dataclass
class LastMeta:
is_dir: bool
size: Optional[int]
mtime: Optional[float]
inode: Optional[int]
uid: Optional[int]
hash_sha256: Optional[str]
'''
LastMeta dataclass.
@dataclass — decorator that auto-generates boilerplate for data classes.
Optional — value or None.
'''
last_known: "OrderedDict[str, LastMeta]" = OrderedDict()
'''
LRU cache capacity support. Add new and evict old entries.
OrderedDict — dict subclass that preserves order and adds a few methods.
Keys are string paths; values are LastMeta.
'''
# for on_modified debounce
_last_mod_times: dict[str, float] = {}
# log errors (to avoid spamming)
_log_err_once = False # print log write errors only once
def new_batch_id() -> str:
return uuid.uuid4().hex
''' Generate a new batch_id (may be used for rescans or external groupings). '''
def next_seq() -> int:
global _seq
with lock:
_seq += 1
return _seq
''' Event counter '''
def _lru_touch(key: str):
if key in last_known:
last_known.move_to_end(key)
''' Move an element to the end (most recent). '''
def cache_put(path: str, meta: dict):
with lock:
last_known[path] = LastMeta(
is_dir=bool(meta.get("is_dir")),
size=meta.get("size"),
mtime=meta.get("mtime"),
inode=meta.get("inode"),
uid=meta.get("uid"),
hash_sha256=meta.get("hash_sha256"),
)
_lru_touch(path)
while len(last_known) > LAST_KNOWN_MAX:
last_known.popitem(last=False)
''' Put metadata into cache, compare size against limits, and evict old data (LRU — Least Recently Used). '''
def cache_get(path: str) -> Optional[LastMeta]:
with lock:
lm = last_known.get(path)
if lm:
_lru_touch(path)
return lm
''' Read LastMeta from last_known cache '''
def cache_delete(path: str):
with lock:
last_known.pop(path, None)
''' Delete LastMeta from last_known '''
def cache_delete_prefix(prefix: str) -> int:
root = norm(prefix).rstrip(os.sep)
deleted = 0
with lock:
keys = list(last_known.keys())
for k in keys:
if k == root or is_under(k, root):
last_known.pop(k, None)
deleted += 1
return deleted
'''
Build a list of keys by prefix for deletion; delete and count removed entries.
last_known.keys() — take a snapshot of keys; you cannot iterate and mutate an OrderedDict simultaneously.
'''
def cache_move_prefix(src_dir: str, dst_dir: str):
with lock:
# Collect affected keys beforehand (cannot mutate OrderedDict during iteration)
affected = [k for k in list(last_known.keys()) if k == src_dir or k.startswith(src_dir + os.sep)]
for old_key in affected:
rel = os.path.relpath(old_key, src_dir)
if rel == ".":
new_key = dst_dir
else:
new_key = norm(os.path.join(dst_dir, rel))
meta = last_known.pop(old_key)
last_known[new_key] = meta
_lru_touch(new_key)
'''
Move cache keys when a directory is moved.
'''
# ============================================================================
# EVENT DATACLASS
# ============================================================================
Action = Literal["created", "deleted", "modified", "moved", "summary"]
Source = Literal["event", "resync"]
'''
Reminder: Literal — immutable set of string values.
'''
@dataclass
class FsEvent:
ts: float
action: Action
path: str
is_dir: bool
# metadata
size: Optional[int] = None
mtime: Optional[float] = None
inode: Optional[int] = None
uid: Optional[int] = None
hash_sha256: Optional[str] = None
# for moved/summary
new_path: Optional[str] = None
summary_count: Optional[int] = None # summaries (how many objects affected)
# service fields
source: Source = "event"
batch_id: str = field(default_factory=lambda: _current_batch_id)
seq: int = field(default_factory=next_seq)
extra: dict[str, Any] = field(default_factory=dict)
'''
Comprehensive event class including all possible data.
'''
# ============================================================================
# HELPER FUNCTIONS
# ============================================================================
def stamp() -> str:
return dt.now().strftime("%Y-%m-%d %H:%M:%S")
def norm(path: str) -> str:
return str(Path(path).resolve())
def is_under(child: str, parent: str) -> bool:
try:
Path(child).resolve().relative_to(Path(parent).resolve())
return True
except Exception:
return False
def safe_stat(p: str) -> Tuple[Optional[os.stat_result], Optional[int]]:
try:
st = os.stat(p, follow_symlinks=False)
return st, None
except OSError as e:
return None, e.errno
except Exception:
return None, -1
'''
Provide size, permissions, and mtime via os.stat, returning an error code if the object is problematic.
'''
def compute_sha256_limited(p: str, limit: int) -> Optional[str]:
if limit is None or limit <= 0:
return None
try:
if not os.path.isfile(p):
return None
h = hashlib.sha256()
remaining = limit
# Read in fixed-size chunks but not beyond `limit`
with open(p, "rb") as f:
chunk_size = 1024 * 1024 # 1 MiB
while remaining > 0:
to_read = min(chunk_size, remaining)
data = f.read(to_read)
if not data:
break
h.update(data)
remaining -= len(data)
return h.hexdigest()
except Exception:
return None
'''
If it's a file, open in binary mode and scan the first N bytes (up to limit). Hash those bytes.
'''
def collect_fs_meta(p: str, is_dir_hint: Optional[bool] = None) -> dict:
st = safe_stat(p)
is_dir = False
size = None
mtime = None
inode = None
uid = None
if st:
is_dir = is_dir_hint if is_dir_hint is not None else (os.path.isdir(p))
# On some FSes directories have a size, but that’s usually not what we want
size = None if is_dir else getattr(st, "st_size", None)
mtime = getattr(st, "st_mtime", None)
inode = getattr(st, "st_ino", None)
# st_uid isn’t everywhere (Windows) — be careful
uid = getattr(st, "st_uid", None)
else:
# If stat failed but the event tells us it's a directory — set it
if is_dir_hint is not None:
is_dir = is_dir_hint
hash_value = None
if not is_dir and size is not None and HASH_MAX_BYTES and HASH_MAX_BYTES > 0:
hash_value = compute_sha256_limited(p, HASH_MAX_BYTES)
return dict(
is_dir=is_dir,
size=size,
mtime=mtime,
inode=inode,
uid=uid,
hash_sha256=hash_value,
)
'''
Collect size/mtime/inode/uid/is_dir and optional hash via safe_stat(); return a dict of these values.
'''
def collect_fs_meta_with_source(p: str, is_dir_hint: Optional[bool] = None) -> tuple[dict, MetaSource]:
meta = collect_fs_meta(p, is_dir_hint=is_dir_hint)
# If stat succeeded, at least mtime/inode/uid will be non-None; for directories is_dir=True
has_stat = meta["mtime"] is not None or meta["inode"] is not None or meta["uid"] is not None
if meta["is_dir"] is True:
has_stat = True # directory confirmed
if has_stat:
src: MetaSource = "stat"
return meta, src
lm = cache_get(p)
if lm:
meta_from_cache = dict(
is_dir=lm.is_dir if is_dir_hint is None else bool(is_dir_hint),
size=lm.size,
mtime=lm.mtime,
inode=lm.inode,
uid=lm.uid,
hash_sha256=lm.hash_sha256,
)
return meta_from_cache, "cache"
# We know nothing — minimal form
meta_blank = dict(
is_dir=bool(is_dir_hint),
size=None, mtime=None, inode=None, uid=None, hash_sha256=None
)
return meta_blank, "none"
'''
Collect metadata as in collect_fs_meta, but if stat is unavailable — fall back to cache.
'''
def _maybe_rotate_log(path: str):
try:
st = os.stat(path)
if st.st_size <= LOG_MAX_BYTES:
return
rotated = path + ".1"
if os.path.exists(rotated):
os.remove(rotated)
os.replace(path, rotated)
except FileNotFoundError:
return
except Exception as e:
global _log_err_once
if not _log_err_once:
print(f"{stamp()} [LOG_ROTATE_ERROR] {e}", file=sys.stderr)
_log_err_once = True
'''
Create new log files when the limit is exceeded.
'''
def write_jsonl(payload: dict):
try:
log_path = Path(LOG_JSONL_PATH)
log_path.parent.mkdir(parents=True, exist_ok=True)
_maybe_rotate_log(str(log_path))
with open(log_path, "a", encoding="utf-8") as f:
f.write(json.dumps(payload, ensure_ascii=False, separators=(",", ":")) + "\n")
except Exception as e:
global _log_err_once
if not _log_err_once:
print(f"{stamp()} [LOG_WRITE_ERROR] {e}", file=sys.stderr)
_log_err_once = True
'''
Append an event to the JSONL file with simple rotation.
'''
def emit_event(ev: FsEvent):
try:
payload = asdict(ev)
payload["_stamp"] = stamp()
line = json.dumps(payload, ensure_ascii=False, separators=(",", ":"))
print(line)
write_jsonl(payload)
except Exception as e:
print(f"{stamp()} [EMIT_ERROR] {e} for event {ev}")
'''
Unified logging point: print single-line JSON (easy to parse) and write JSONL to disk.
'''
def seed_index(root: str):
root_abs = norm(root)
batch_id = new_batch_id()
with lock:
live_paths.add(root_abs)
# Mark the root itself
meta, meta_src = collect_fs_meta_with_source(root_abs, is_dir_hint=True)
cache_put(root_abs, meta)
emit_event(FsEvent(
ts=time.time(),
action="created",
path=root_abs,
is_dir=meta["is_dir"],
size=meta["size"],
mtime=meta["mtime"],
inode=meta["inode"],
uid=meta["uid"],
hash_sha256=meta["hash_sha256"],
source="resync",
batch_id=batch_id,
extra={"meta_source": meta_src},
))
for dirpath, dirnames, filenames in os.walk(root_abs):
dp = norm(dirpath)
with lock:
live_paths.add(dp)
# Directories
for name in dirnames:
p = norm(os.path.join(dp, name))
with lock:
live_paths.add(p)
meta, meta_src = collect_fs_meta_with_source(p, is_dir_hint=True)
cache_put(p, meta)
emit_event(FsEvent(
ts=time.time(),
action="created",
path=p,
is_dir=meta["is_dir"],
size=meta["size"],
mtime=meta["mtime"],
inode=meta["inode"],
uid=meta["uid"],
hash_sha256=meta["hash_sha256"],
source="resync",
batch_id=batch_id,
extra={"meta_source": meta_src},
))
# Files
for name in filenames:
p = norm(os.path.join(dp, name))
with lock:
live_paths.add(p)
meta, meta_src = collect_fs_meta_with_source(p, is_dir_hint=False)
cache_put(p, meta)
emit_event(FsEvent(
ts=time.time(),
action="created",
path=p,
is_dir=meta["is_dir"],
size=meta["size"],
mtime=meta["mtime"],
inode=meta["inode"],
uid=meta["uid"],
hash_sha256=meta["hash_sha256"],
source="resync",
batch_id=batch_id,
extra={"meta_source": meta_src},
))
'''
Populate FsEvent with folder/file metadata.
'''
# --------------------------- DELETIONS ----------------------------
def _emit_summary(dir_key: str):
with lock:
count = pending_dir_counts.pop(dir_key, 0)
t = pending_dir_timers.pop(dir_key, None)
pending_dir_deletes.pop(dir_key, None)
if t:
t.cancel()
print(f"{stamp()} [Summary] Directory deleted {dir_key}; {count} objects deleted along with it")
ev = FsEvent(
ts=time.time(),
action="summary",
path=dir_key,
is_dir=True,
summary_count=count,
source="event",
batch_id=_current_batch_id,
)
emit_event(ev)
cache_delete_prefix(dir_key)
def _arm_summary_timer(dir_key: str):
ttl = COALESCE_MS / 1000.0
with lock:
old = pending_dir_timers.get(dir_key)
if old:
old.cancel()
t = threading.Timer(ttl, _emit_summary, args=(dir_key,))
pending_dir_timers[dir_key] = t
t.start()
# ----------------------------- MOVES ------------------------------
def _emit_move_summary(src_dir: str):
with lock:
count = pending_move_counts.pop(src_dir, 0)
ts_dst = pending_dir_moves.pop(src_dir, None)
dst_dir = ts_dst[1] if ts_dst else None
t = pending_move_timers.pop(src_dir, None)
if t:
t.cancel()
if dst_dir:
print(f"{stamp()} [Summary] Directory moved {src_dir} -> {dst_dir}; {count} objects moved along with it")
else:
print(f"{stamp()} [Summary] Directory moved {src_dir}; {count} objects moved along with it")
ev = FsEvent(
ts=time.time(),
action="summary",
path=src_dir,
is_dir=True,
new_path=dst_dir,
summary_count=count,
source="event",
batch_id=_current_batch_id,
)
emit_event(ev)
if dst_dir:
cache_move_prefix(src_dir, dst_dir)
def _arm_move_summary_timer(src_dir: str):
ttl = COALESCE_MS / 1000.0
with lock:
old = pending_move_timers.get(src_dir)
if old:
old.cancel()
t = threading.Timer(ttl, _emit_move_summary, args=(src_dir,))
pending_move_timers[src_dir] = t
t.start()
def emit_move_event(old_path: str, new_path: str, is_dir: bool):
kind = 'Directory' if is_dir else "File"
with lock:
live_paths.discard(old_path)
live_paths.add(new_path)
print(f"{stamp()} [Moved] {kind}: {old_path} -> {new_path}")
meta_new = collect_fs_meta(new_path, is_dir_hint=is_dir)
ev = FsEvent(
ts=time.time(),
action="moved",
path=old_path,
new_path=new_path,
is_dir=is_dir,
size=meta_new["size"],
mtime=meta_new["mtime"],
inode=meta_new["inode"],
uid=meta_new["uid"],
hash_sha256=meta_new["hash_sha256"],
source="event",
batch_id=_current_batch_id,
)
emit_event(ev)
cache_put(new_path, meta_new)
cache_delete(old_path)
# ============================================================================
# EVENT HANDLER
# ============================================================================
class MyHandler(FileSystemEventHandler):
def on_modified(self, event):
p = norm(event.src_path)
now = time.time()
# debounce
last = _last_mod_times.get(p, 0.0)
if (now - last) * 1000 < MODIFIED_DEBOUNCE_MS:
return
_last_mod_times[p] = now
kind = 'Directory' if event.is_directory else "File"
with lock:
live_paths.add(p)
print(f"{stamp()} [Modified] {kind}: {p}")
meta, meta_src = collect_fs_meta_with_source(p, is_dir_hint=event.is_directory)
# update cache (object exists)
cache_put(p, meta)
ev = FsEvent(
ts=now,
action="modified",
path=p,
is_dir=meta["is_dir"],
size=meta["size"],
mtime=meta["mtime"],
inode=meta["inode"],
uid=meta["uid"],
hash_sha256=meta["hash_sha256"],
source="event",
batch_id=_current_batch_id,
extra={"meta_source": meta_src},
)
emit_event(ev)
def on_moved(self, event):
src = norm(event.src_path)
dst = norm(event.dest_path)
now = time.time()
if event.is_directory:
kind = 'Directory'
cutoff = now - COALESCE_MS / 1000.0
absorbed_from_buffer = 0
with lock:
pending_dir_moves[src] = (now, dst)
pending_move_counts[src] = 0
global recent_file_moves
newbuf = deque()
for fp, ts in recent_file_moves:
if ts >= cutoff and is_under(fp, src):
absorbed_from_buffer += 1
else:
newbuf.append((fp, ts))
recent_file_moves = newbuf
descendants = [lp for lp in live_paths if lp != src and is_under(lp, src)]
count_index = len(descendants)
for lp in descendants:
rel = os.path.relpath(lp, src)
new_lp = norm(os.path.join(dst, rel))
live_paths.discard(lp)
live_paths.add(new_lp)
live_paths.discard(src)
live_paths.add(dst)
pending_move_counts[src] = count_index + absorbed_from_buffer
print(f"{stamp()} [Moved] {kind}: {src} -> {dst}")
# Structural moved-event for the directory (single); summary will come via timer
meta_new, meta_src = collect_fs_meta_with_source(dst, is_dir_hint=True)
# move cache under the directory
cache_move_prefix(src, dst)
cache_put(dst, meta_new)
emit_event(FsEvent(
ts=time.time(),
action="moved",
path=src,
new_path=dst,
is_dir=True,
size=meta_new["size"],
mtime=meta_new["mtime"],
inode=meta_new["inode"],
uid=meta_new["uid"],
hash_sha256=meta_new["hash_sha256"],
source="event",
batch_id=_current_batch_id,
extra={
"descendants_index_count": count_index,
"absorbed_from_buffer": absorbed_from_buffer,
"meta_source": meta_src
},
))
_arm_move_summary_timer(src)
return
# File
kind = 'File'
with lock:
recent_file_moves.append((src, now))
was_present = src in live_paths
live_paths.discard(src)
live_paths.add(dst)
parents = [d for d in pending_dir_moves.keys() if is_under(src, d)]
parent = max(parents, key=len) if parents else None
if parent and was_present:
pending_move_counts[parent] = pending_move_counts.get(parent, 0) + 1
if parent:
# don't spam — event will be counted in the summary
_arm_move_summary_timer(parent)
# But keep a structural trace about the file move (useful for the journal)
meta_new, meta_src = collect_fs_meta_with_source(dst, is_dir_hint=False)
cache_put(dst, meta_new)
cache_delete(src)
emit_event(FsEvent(
ts=time.time(),
action="moved",
path=src,
new_path=dst,
is_dir=False,
size=meta_new["size"],
mtime=meta_new["mtime"],
inode=meta_new["inode"],
uid=meta_new["uid"],
hash_sha256=meta_new["hash_sha256"],
source="event",
batch_id=_current_batch_id,
extra={"coalesced_under": parent, "meta_source": meta_src},
))
return
print(f"{stamp()} [Moved] {kind}: {src} -> {dst}")
meta_new, meta_src = collect_fs_meta_with_source(dst, is_dir_hint=False)
cache_put(dst, meta_new)
cache_delete(src)
emit_event(FsEvent(
ts=time.time(),
action="moved",
path=src,
new_path=dst,
is_dir=False,
size=meta_new["size"],
mtime=meta_new["mtime"],
inode=meta_new["inode"],
uid=meta_new["uid"],
hash_sha256=meta_new["hash_sha256"],
source="event",
batch_id=_current_batch_id,
extra={"meta_source": meta_src},
))
def on_created(self, event):
p = norm(event.src_path)
kind = 'Directory' if event.is_directory else "File"
with lock:
live_paths.add(p)
print(f"{stamp()} [Created] {kind}: {p}")
meta, meta_src = collect_fs_meta_with_source(p, is_dir_hint=event.is_directory)
cache_put(p, meta)
emit_event(FsEvent(
ts=time.time(),
action="created",
path=p,
is_dir=meta["is_dir"],
size=meta["size"],
mtime=meta["mtime"],
inode=meta["inode"],
uid=meta["uid"],
hash_sha256=meta["hash_sha256"],
source="event",
batch_id=_current_batch_id,
extra={"meta_source": meta_src},
))
def on_deleted(self, event):
p = norm(event.src_path)
if event.is_directory:
kind = 'Directory'
now = time.time()
cutoff = now - COALESCE_MS / 1000.0
absorbed_from_buffer = 0
with lock:
pending_dir_deletes[p] = now
pending_dir_counts[p] = 0
global recent_file_deletes
newbuf = deque()
for fp, ts in recent_file_deletes:
if ts >= cutoff and is_under(fp, p):
absorbed_from_buffer += 1
else:
newbuf.append((fp, ts))
recent_file_deletes = newbuf
descendants = [lp for lp in live_paths if lp != p and is_under(lp, p)]
count_index = len(descendants)
for lp in descendants:
live_paths.discard(lp)
live_paths.discard(p)
pending_dir_counts[p] = count_index + absorbed_from_buffer
print(f"{stamp()} [Deleted] {kind}: {p}")
# Mark the fact of directory deletion
emit_event(FsEvent(
ts=time.time(),
action="deleted",
path=p,
is_dir=True,
source="event",
batch_id=_current_batch_id,
extra={"descendants_index_count": count_index, "absorbed_from_buffer": absorbed_from_buffer},
))
# clean cache under the deleted directory
cache_delete_prefix(p)
_arm_summary_timer(p)
else:
kind = 'File'
abs_path = p
now = time.time()
with lock:
recent_file_deletes.append((abs_path, now))
was_present = abs_path in live_paths
live_paths.discard(abs_path)
parents = [d for d in pending_dir_deletes if is_under(abs_path, d)]
parent = max(parents, key=len) if parents else None
if parent and was_present:
pending_dir_counts[parent] = pending_dir_counts.get(parent, 0) + 1
# try to fetch metadata from cache
lm = cache_get(abs_path)
extra = {"meta_source": "cache" if lm else "none"}
ev_kwargs = dict(
ts=time.time(),
action="deleted",
path=abs_path,
is_dir=False,
source="event",
batch_id=_current_batch_id,
extra=extra,
)
if lm:
ev_kwargs.update(dict(size=lm.size, mtime=lm.mtime, inode=lm.inode, uid=lm.uid, hash_sha256=lm.hash_sha256))
if parent:
_arm_summary_timer(parent)
ev_kwargs["extra"]["coalesced_under"] = parent
emit_event(FsEvent(**ev_kwargs))
cache_delete(abs_path)
return
print(f"{stamp()} [Deleted] {kind}: {abs_path}")
emit_event(FsEvent(**ev_kwargs))
cache_delete(abs_path)
# ============================================================================
# ENTRY POINT
# ============================================================================
if __name__ == "__main__":
path = "."
seed_index(path)
event_handler = MyHandler()
observer = Observer()
observer.schedule(event_handler, path, recursive=True)
observer.start()
print(f"Monitoring {Path(path).resolve()} started. Press Ctrl+C to exit.")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()