PyWatch — metadata, hashing and cache

                
                PyWatch — metadata, hashing and cache
In the previous posts, we explored the foundation of the PyWatch project — basic filesystem monitoring with the watchdog library and the first approaches to coalescing. We learned how to correctly count deleted objects when removing folders and how to handle moves so that the log doesn’t get cluttered with individual events. The next step in PyWatch’s development is to enrich events with metadata, obtain more reliable file identifiers, add checksum calculation, define a unified output format, and implement a simple log rotation mechanism. It is also necessary to reduce noise from editors and the operating system by using debouncing, and to remember the latest metadata with an LRU cache.

Introduction

 

The watchdog library makes directory monitoring easier: in the minimal case, it’s enough to attach a handler to a path, start the observer, and receive events[1]. However, for real applications this isn’t enough — editors can trigger several events when saving a single file, and system calls don’t always return complete information. Below is a set of improvements that will help build a reliable monitoring layer.

 

Why metadata matters

 

When receiving a notification about creating, modifying, or deleting an object, it’s important to know what file it was. The path alone isn’t sufficient: the name may change, the file may be overwritten, or quickly deleted.


The system call os.stat() lets you fetch a lot of useful attributes: file size (st_size), last modification time (st_mtime), inode number (st_ino), owner ID (st_uid), and more. The stat module docs clarify that ST_INO contains the “inode number”[2], ST_UID is the “user ID”[3], ST_SIZE is the “size in bytes”[4], and ST_MTIME records the last modification time[5].

 

These four values help uniquely identify a file and detect changes even when the name doesn’t change.
An additional layer is a partial SHA-256. The standard hashlib example shows how to create a sha256 object, feed it bytes, and get a hex digest[6].


In our case, hashing entire large files isn’t practical, so we limit it with HASH_MAX_BYTES: we read the first few megabytes (2 MiB by default) and compute their hash. If the limit is zero, hashing is disabled. This is sufficient to detect content changes at the “first bytes changed” level.


The FsEvent dataclass and new fields

 

To conveniently pass and log event information, we’ve introduced the FsEvent dataclass. It stores:

 

  • ts — event time in seconds since the epoch;
  • action — action type: created, deleted, modified, moved, or summary;
  • path — path to the object;
  • is_dir — a flag indicating whether it’s a directory;
  • metadata: size, mtime, inode, uid, hash_sha256;
  • service fields: new_path (for moves), summary_count (for summaries), source (event or resync), batch_id, seq, extra.

 

The batch_id field lets you group events obtained during a single “rescan” (e.g., initial indexing).

 

The new_batch_id() function returns a new UUID string, and the global variable _current_batch_id stores the current batch id.
 

The monotonic counter _seq increments with every event created via next_seq(); this guarantees strict event order even if the clock is off.
 

The source field shows where the information came from: event — a real filesystem event, resync — an event created during the initial walk (seed_index()), e.g., at program startup.
 

Finally, extra is an arbitrary dictionary for additional details (for example, meta_source — where the metadata came from).

 

Such an object is easy to serialize with asdict() and then print or write to a file — that’s exactly what our new logging layer does.

 

Collecting metadata: collect_fs_meta() and collect_fs_meta_with_source()

 

The function collect_fs_meta(p, is_dir_hint=None) calls safe_stat(), which wraps os.stat() and handles errors gracefully. It returns a dict with keys is_dir, size, mtime, inode, uid and hash_sha256. If the object is a directory, the size is irrelevant and set to None.
 

If hashing is enabled, compute_sha256_limited() is called; it reads the first HASH_MAX_BYTES of data and updates a hashlib.sha256() object via update()[6]; finally, hexdigest() is taken to get the string.

 

The second function, collect_fs_meta_with_source(), chooses the metadata source. It first tries stat: if at least one attribute (e.g., mtime or inode) is filled, the metadata are taken from the filesystem and the source equals stat. If stat returns nothing, the function looks for a record in the LRU cache last_known: then the data are taken from the cache and the source is cache. Otherwise, a minimal form is returned where only is_dir is filled and the source is none. This flag is written to the meta_source field inside the event’s extra.
 

LRU metadata cache

 

A file can be deleted or moved, and after that os.stat() will raise an error. To still know its size and inode, PyWatch stores the last known metadata in the last_known structure. This is an OrderedDict working on a “least recently used” (LRU) basis. As a caching tutorial notes, an LRU cache keeps elements ordered by use, and each time a record is read it’s moved to the front; therefore it’s easy to find an element that hasn’t been used for a long time and remove it[7].

 

The cache_put() and cache_get() functions add and retrieve records from last_known. When adding, _lru_touch() moves the key to the end, updating the order. If the number of records exceeds LAST_KNOWN_MAX, the oldest record is removed (popitem(last=False)). 

 

The cache_delete(), cache_delete_prefix(), and cache_move_prefix() functions delete data for a specific path, for an entire prefix, or move keys when a directory is moved. Such a cache is needed for two tasks: returning metadata on deletion (when stat is unavailable) and avoiding extra system calls for frequently changing files.

 

Unified output: emit_event() and JSON/JSONL

 

To store events in a clear format and be able to analyze logs, we created a single output path. The emit_event(ev: FsEvent) function performs three actions:

 

  1. Converts the FsEvent instance to a dict (asdict(ev)), adding _stamp — a string date/time via stamp() (for convenience);
  2. Prints the JSON string to stdout (you can read it via watchtail);
  3. Passes the data to write_jsonl(), which appends the line to the pywatch_events.jsonl file next to the script.

 

The _maybe_rotate_log() function monitors the file size and, if it exceeds LOG_MAX_BYTES, renames the current file to *.1 and starts a new one. This approach is similar to the behavior of RotatingFileHandler from the standard logging module: the documentation states that when the size limit is reached, the file is closed and renamed to app.log.1, app.log.2, etc., while the new file continues collecting records[8].
 

In our simple case we keep only one backup (.1), but this can be extended if needed.

A JSONL (JSON Lines) log is convenient because each event is written on a single line: it’s easy to parse, filter, and load into analysis systems. Additional fields like batch_id and seq let you reconstruct order and event grouping for a specific scan.

 

Debounce: suppressing redundant events

 

The operating system and code editors can generate multiple events for a single change. A Dev.to article on debounce explains that when saving a file, an editor often creates several events in a row; without debouncing, the handler may run multiple times even though the change was singular[9].
 

To avoid this, PyWatch uses the _last_mod_times dictionary: on each on_modified() call we check when this path was last processed, and if less than MODIFIED_DEBOUNCE_MS milliseconds have passed (300 ms by default), we simply ignore the event. This reduces noise, especially when a file is saved via multiple operations (temporary file creation, write, rename).

 

Changes in event handlers

 

Indexing: seed_index()

 

When the program starts, seed_index(root) is called. It assigns a new batch_id, adds the root path to the live_paths index, and recursively walks via os.walk(). For each directory and file:

 

  1. Normalizes the path to an absolute one (norm()).
  2. Adds it to live_paths.
  3. Fetches metadata via collect_fs_meta_with_source().
  4. Puts them into the LRU cache last_known (cache_put()).
  5. Creates an event FsEvent(action="created", source="resync") with populated fields and calls emit_event().

 

This stage forms a baseline snapshot of the filesystem and records it into the JSONL log so that subsequent changes can be correlated with the previous state.

 

Creation: on_created(event)

 

When an object is created, the handler:

 

  • Normalizes the path and adds it to live_paths.
  • Prints a [Created] message to the console.
  • Collects metadata (collect_fs_meta_with_source()), stores them in the cache, and sends the event via emit_event(). The extra field records where the metadata came from (meta_source).
  • Directory: pending_dir_moves remembers a pair (time, new name), and pending_move_counts stores how many children were moved. All entries under the old prefix in live_paths are updated to the new path. Then cache_move_prefix() is called to move cached metadata to new keys. An event FsEvent(action="moved", new_path=dst, is_dir=True) is created with up-to-date metadata, and extra contains the number of children. Finally, _arm_move_summary_timer() is started, which after COALESCE_MS seconds will form a summary of the move and clear the state.
  • File: if the parent folder is marked as being moved, the event isn’t printed immediately and is instead counted in pending_move_counts; otherwise a [Moved] log is printed, new metadata are collected, the cache is updated (cache_put() for the new path and cache_delete() for the old), and an event FsEvent(action="moved", new_path=dst, is_dir=False) is sent.

 

Modification: on_modified(event)

 

The modification handler now includes debounce: if the event repeats too quickly, it is ignored. Otherwise the procedure resembles on_created() — we update live_paths, print a log line, collect metadata, put them into the cache, and create an FsEvent(action="modified") with populated fields.

 

Move: on_moved(event)

 

Moving is the most complex event. It is handled separately for directories and files:

 

Deletion: on_deleted(event)

 

When a directory is deleted, a “graveyard” entry is created in pending_dir_deletes, all children from live_paths are enumerated and removed, and the counter is filled in. A [Deleted] log is triggered with is_dir=True, while the actual summary (FsEvent(action="summary")) will appear after the coalescing window expires. The cache is cleared by calling cache_delete_prefix().

If a file is deleted, its path is added to recent_file_deletes.
If the parent folder is being deleted, the event is counted in its counter. Otherwise, cache_get() returns the last metadata (if available), extra gets meta_source="cache" or "none", the path is removed from live_paths and the cache, and the FsEvent(action="deleted") event is sent.
 

Diagrams

 

                                    on_modified()
                                           │
                                           ▼
                    Path normalization, add current time
                                   norm(), time()
                                           │
                ┌──────────────────────────┴──────────────────────────┐
                │                                                     │
                ▼                                                     ▼
     Insufficient time since the last run               Enough time since last run
                │                                                     │
                ▼                                                     ▼
         Return (debounce hit)                         Write to _last_mod_times
                                                       and to live paths live_paths
                                                                  │
                                                                  ▼
                                                               Metadata
                                                              │        │
                                                              ▼        ▼
                                                       From stat   From cache
                                   collect_fs_meta_with_source()   collect_fs_meta()
                                                              └────┬────┘
                                                                   │
                                                                   ▼
                                                          Save cache: cache_put()
                                                          Build event: FsEvent
                                                          Print logs: emit_event()
                                                          Append JSON: write_jsonl()
                                                                   ▼
                                                                Output

 

                                              on_moved()
                                                  │
                                                  ▼
                               Path normalization, add current time
                                            norm(), time()
                                                  │
                         ┌─────────────────────────┴─────────────────────────┐
                         │                                                   │
                         ▼                                                   ▼
                      Directory                                            File
                         │                                                   │
                         ▼                                                   ▼
       Iterate files inside the directory                   Check if the file belongs to a
       and count them                                       directory currently being moved
                         │                                 ┌───────────────────┴───────────────────┐
                         ▼                                 ▼                                       ▼
                     Metadata                             Yes                                      No
                    │        │                             │                                       │
                    ▼        ▼                             ▼                                       ▼
                From stat       From cache              Counter +1                          Create event
collect_fs_meta_with_source()   collect_fs_meta()          │                                   FsEvent
                    └───────┬────────┘                     ▼                          from cache and stat
                            │                        (Re)start timer                               │
                            ▼                              │                                       ▼
                      Move cache by                        ▼                                     Output
               old prefix to the new one                Metadata
                    cache_move_prefix()                │        │
               Save cache under new path               ▼        ▼
                        cache_put()                From stat  From cache
                      Create FsEvent                   └────┬───┘
                            │                               ▼
                            ▼                         Delete old cache
                  Start timer and output              Save new cache
                                                            │
                                                            ▼
                                                     FsEvent and output

 

on_deleted() - same process as in on_moved()
on_created() - nothing new, just write metadata

 


Quick Reference

Below is a list of key variables and functions with brief descriptions. The table is meant as a quick reminder and does not include details.

NameWhat it does
COALESCE_MSTimeout (ms) for coalescing deletions/moves
MODIFIED_DEBOUNCE_MSDebounce interval for rapid on_modified()
LOG_JSONL_PATHPath to the JSONL log file
LOG_MAX_BYTESMax log size after which rotation occurs
LAST_KNOWN_MAXMax number of entries in the metadata LRU cache
HASH_MAX_BYTESHow many bytes of a file to hash (0 — disable)
_current_batch_idUnique identifier of the current scan (rescan)
_seqMonotonic event counter
last_knownLRU metadata cache (OrderedDict[path, LastMeta])
MetaSourceType: "stat", "cache", or "none"
LastMetaDataclass: is_dir, size, mtime, inode, uid, hash_sha256
new_batch_id()Generate a new UUID for batch_id
next_seq()Increment the global counter and return the number
_lru_touch(key)Move the entry to the end of the LRU cache
cache_put(path, meta)Put metadata into the cache and evict old entries
cache_get(path)Get metadata from the cache, updating order
cache_delete(path)Delete the entry for a path from the cache
cache_delete_prefix(prefix)Delete all entries under a prefix
cache_move_prefix(src_dir, dst_dir)Move entries when a directory is moved
FsEventEvent dataclass with metadata and service fields
stamp()Return the current date/time as a string
norm(path)Normalize path to absolute (resolving symlinks)
is_under(child, parent)Check whether the child path is under the parent path
safe_stat(p)Call os.stat() and return the result or an error
compute_sha256_limited(p, limit)Compute a partial SHA-256 for a file
collect_fs_meta(p, is_dir_hint)Collect size/mtime/inode/uid/is_dir and hash
collect_fs_meta_with_source(p, is_dir_hint)Same, but with source (stat/cache/none)
_maybe_rotate_log(path)Check log size and rotate
write_jsonl(payload)Append an event to the JSONL file
emit_event(ev)Unified event output: print + JSONL write
seed_index(root)Initial scan: populate live_paths, cache, and log
_emit_summary(dir_key)Produce a deletion summary for a directory
_arm_summary_timer(dir_key)Start/extend the deletion summary timer
_emit_move_summary(src_dir)Produce a move summary for a directory
_arm_move_summary_timer(src_dir)Start/extend the move summary timer
emit_move_event(old_path, new_path, is_dir)Create a move event for a file/directory
MyHandler.on_created()Handler for file/directory creation
MyHandler.on_modified()Modification handler with debounce
MyHandler.on_moved()Move handler with coalescing and cache
MyHandler.on_deleted()Deletion handler with coalescing and cache

 

Final code

 

All the listed functions and structures are assembled in a single module.

The end of the article provides the full code with comments.

 

import time
import datetime
from datetime import datetime as dt
import threading
from pathlib import Path
from collections import deque, OrderedDict
import os
import json
import hashlib
import uuid
import sys
from dataclasses import dataclass, asdict, field
from typing import Optional, Literal, Any, Tuple
from io import TextIOWrapper
import errno
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

# ============================================================================
#                                SETTINGS
# ============================================================================

COALESCE_MS = 300
# Debounce for on_modified (ms)
MODIFIED_DEBOUNCE_MS = 300
# JSONL log limits
LOG_JSONL_PATH = str(Path(__file__).with_name("pywatch_events.jsonl"))
LOG_MAX_BYTES = 20 * 1024 * 1024  # 20 MiB
# Metadata cache limit (LRU)
LAST_KNOWN_MAX = 100_000
# Hash toggle/limit: if 0 -> hashing is disabled; otherwise hash up to `limit` bytes
HASH_MAX_BYTES = 2 * 1024 * 1024  # 2 MiB by default

# ============================================================================
#                                 STATE
# ============================================================================

# --- Deletion coalescing ---
pending_dir_deletes: dict[str, float] = {}
pending_dir_counts: dict[str, int] = {}
pending_dir_timers: dict[str, threading.Timer] = {}
recent_file_deletes: deque[tuple[str, float]] = deque()

# --- Move coalescing ---
pending_dir_moves: dict[str, tuple[float, str]] = {}
pending_move_counts: dict[str, int] = {}
pending_move_timers: dict[str, threading.Timer] = {}
recent_file_moves: deque[tuple[str, float]] = deque()

live_paths: set[str] = set()
lock = threading.Lock()

_current_batch_id = uuid.uuid4().hex
'''
    When we run a rescan, all events get a unique value.
    uuid.uuid4()  - Generate a random 128-bit UUID       // d5aef09f-9a28-4d35-9cb5-df13e4167b41
    .hex          - Hex string without dashes (32 chars)  // d5aef09f9a284d359cb5df13e4167b41
'''

_seq = 0
'''
    Counter that holds the last event number; incremented via next_seq() when creating FsEvent.
'''

MetaSource = Literal["stat", "cache", "none"]
'''
    Cache of last known metadata (LRU).
    Literal — an immutable set of string values for metadata source: "stat" from os.stat(), "cache" from `last_known`.
'''

@dataclass
class LastMeta:
    is_dir: bool
    size: Optional[int]
    mtime: Optional[float]
    inode: Optional[int]
    uid: Optional[int]
    hash_sha256: Optional[str]
'''
    LastMeta dataclass.
    @dataclass — decorator that auto-generates boilerplate for data classes.
    Optional — value or None.
'''

last_known: "OrderedDict[str, LastMeta]" = OrderedDict()
'''
    LRU cache capacity support. Add new and evict old entries.
    OrderedDict — dict subclass that preserves order and adds a few methods.
    Keys are string paths; values are LastMeta.
'''

# for on_modified debounce
_last_mod_times: dict[str, float] = {}

# log errors (to avoid spamming)
_log_err_once = False  # print log write errors only once


def new_batch_id() -> str:
    return uuid.uuid4().hex
''' Generate a new batch_id (may be used for rescans or external groupings). '''


def next_seq() -> int:
    global _seq
    with lock:
        _seq += 1
        return _seq
''' Event counter '''


def _lru_touch(key: str):
    if key in last_known:
        last_known.move_to_end(key)
''' Move an element to the end (most recent). '''


def cache_put(path: str, meta: dict):
    with lock:
        last_known[path] = LastMeta(
            is_dir=bool(meta.get("is_dir")),
            size=meta.get("size"),
            mtime=meta.get("mtime"),
            inode=meta.get("inode"),
            uid=meta.get("uid"),
            hash_sha256=meta.get("hash_sha256"),
        )
        _lru_touch(path)

        while len(last_known) > LAST_KNOWN_MAX:
            last_known.popitem(last=False)
''' Put metadata into cache, compare size against limits, and evict old data (LRU — Least Recently Used). '''


def cache_get(path: str) -> Optional[LastMeta]:
    with lock:
        lm = last_known.get(path)
        if lm:
            _lru_touch(path)
        return lm
''' Read LastMeta from last_known cache '''


def cache_delete(path: str):
    with lock:
        last_known.pop(path, None)
''' Delete LastMeta from last_known '''


def cache_delete_prefix(prefix: str) -> int:
    root = norm(prefix).rstrip(os.sep)
    deleted = 0
    with lock:
        keys = list(last_known.keys())
        for k in keys:
            if k == root or is_under(k, root):
                last_known.pop(k, None)
                deleted += 1
    return deleted
'''
    Build a list of keys by prefix for deletion; delete and count removed entries.
    last_known.keys() — take a snapshot of keys; you cannot iterate and mutate an OrderedDict simultaneously.
'''


def cache_move_prefix(src_dir: str, dst_dir: str):
    with lock:
        # Collect affected keys beforehand (cannot mutate OrderedDict during iteration)
        affected = [k for k in list(last_known.keys()) if k == src_dir or k.startswith(src_dir + os.sep)]
        for old_key in affected:
            rel = os.path.relpath(old_key, src_dir)
            if rel == ".":
                new_key = dst_dir
            else:
                new_key = norm(os.path.join(dst_dir, rel))
            meta = last_known.pop(old_key)
            last_known[new_key] = meta
            _lru_touch(new_key)
'''
    Move cache keys when a directory is moved.
'''

# ============================================================================
#                              EVENT DATACLASS
# ============================================================================

Action = Literal["created", "deleted", "modified", "moved", "summary"]
Source = Literal["event", "resync"]
'''
    Reminder: Literal — immutable set of string values.
'''

@dataclass
class FsEvent:
    ts: float
    action: Action
    path: str
    is_dir: bool
    # metadata
    size: Optional[int] = None
    mtime: Optional[float] = None
    inode: Optional[int] = None
    uid: Optional[int] = None
    hash_sha256: Optional[str] = None
    # for moved/summary
    new_path: Optional[str] = None
    summary_count: Optional[int] = None  # summaries (how many objects affected)
    # service fields
    source: Source = "event"
    batch_id: str = field(default_factory=lambda: _current_batch_id)
    seq: int = field(default_factory=next_seq)
    extra: dict[str, Any] = field(default_factory=dict)
'''
    Comprehensive event class including all possible data.
'''

# ============================================================================
#                             HELPER FUNCTIONS
# ============================================================================

def stamp() -> str:
    return dt.now().strftime("%Y-%m-%d %H:%M:%S")


def norm(path: str) -> str:
    return str(Path(path).resolve())


def is_under(child: str, parent: str) -> bool:
    try:
        Path(child).resolve().relative_to(Path(parent).resolve())
        return True
    except Exception:
        return False


def safe_stat(p: str) -> Tuple[Optional[os.stat_result], Optional[int]]:
    try:
        st = os.stat(p, follow_symlinks=False)
        return st, None
    except OSError as e:
        return None, e.errno
    except Exception:
        return None, -1
'''
    Provide size, permissions, and mtime via os.stat, returning an error code if the object is problematic.
'''


def compute_sha256_limited(p: str, limit: int) -> Optional[str]:
    if limit is None or limit <= 0:
        return None
    try:
        if not os.path.isfile(p):
            return None
        h = hashlib.sha256()
        remaining = limit
        # Read in fixed-size chunks but not beyond `limit`
        with open(p, "rb") as f:
            chunk_size = 1024 * 1024  # 1 MiB
            while remaining > 0:
                to_read = min(chunk_size, remaining)
                data = f.read(to_read)
                if not data:
                    break
                h.update(data)
                remaining -= len(data)
        return h.hexdigest()
    except Exception:
        return None
'''
    If it's a file, open in binary mode and scan the first N bytes (up to limit). Hash those bytes.
'''


def collect_fs_meta(p: str, is_dir_hint: Optional[bool] = None) -> dict:
    st = safe_stat(p)
    is_dir = False
    size = None
    mtime = None
    inode = None
    uid = None
    if st:
        is_dir = is_dir_hint if is_dir_hint is not None else (os.path.isdir(p))
        # On some FSes directories have a size, but that’s usually not what we want
        size = None if is_dir else getattr(st, "st_size", None)
        mtime = getattr(st, "st_mtime", None)
        inode = getattr(st, "st_ino", None)
        # st_uid isn’t everywhere (Windows) — be careful
        uid = getattr(st, "st_uid", None)
    else:
        # If stat failed but the event tells us it's a directory — set it
        if is_dir_hint is not None:
            is_dir = is_dir_hint
    hash_value = None
    if not is_dir and size is not None and HASH_MAX_BYTES and HASH_MAX_BYTES > 0:
        hash_value = compute_sha256_limited(p, HASH_MAX_BYTES)
    return dict(
        is_dir=is_dir,
        size=size,
        mtime=mtime,
        inode=inode,
        uid=uid,
        hash_sha256=hash_value,
    )
'''
    Collect size/mtime/inode/uid/is_dir and optional hash via safe_stat(); return a dict of these values.
'''


def collect_fs_meta_with_source(p: str, is_dir_hint: Optional[bool] = None) -> tuple[dict, MetaSource]:
    meta = collect_fs_meta(p, is_dir_hint=is_dir_hint)
    # If stat succeeded, at least mtime/inode/uid will be non-None; for directories is_dir=True
    has_stat = meta["mtime"] is not None or meta["inode"] is not None or meta["uid"] is not None
    if meta["is_dir"] is True:
        has_stat = True  # directory confirmed
    if has_stat:
        src: MetaSource = "stat"
        return meta, src
    lm = cache_get(p)
    if lm:
        meta_from_cache = dict(
            is_dir=lm.is_dir if is_dir_hint is None else bool(is_dir_hint),
            size=lm.size,
            mtime=lm.mtime,
            inode=lm.inode,
            uid=lm.uid,
            hash_sha256=lm.hash_sha256,
        )
        return meta_from_cache, "cache"
    # We know nothing — minimal form
    meta_blank = dict(
        is_dir=bool(is_dir_hint),
        size=None, mtime=None, inode=None, uid=None, hash_sha256=None
    )
    return meta_blank, "none"
'''
    Collect metadata as in collect_fs_meta, but if stat is unavailable — fall back to cache.
'''


def _maybe_rotate_log(path: str):
    try:
        st = os.stat(path)
        if st.st_size <= LOG_MAX_BYTES:
            return
        rotated = path + ".1"
        if os.path.exists(rotated):
            os.remove(rotated)
        os.replace(path, rotated)
    except FileNotFoundError:
        return
    except Exception as e:
        global _log_err_once
        if not _log_err_once:
            print(f"{stamp()} [LOG_ROTATE_ERROR] {e}", file=sys.stderr)
            _log_err_once = True
'''
    Create new log files when the limit is exceeded.
'''


def write_jsonl(payload: dict):
    try:
        log_path = Path(LOG_JSONL_PATH)
        log_path.parent.mkdir(parents=True, exist_ok=True)
        _maybe_rotate_log(str(log_path))
        with open(log_path, "a", encoding="utf-8") as f:
            f.write(json.dumps(payload, ensure_ascii=False, separators=(",", ":")) + "\n")
    except Exception as e:
        global _log_err_once
        if not _log_err_once:
            print(f"{stamp()} [LOG_WRITE_ERROR] {e}", file=sys.stderr)
            _log_err_once = True
'''
    Append an event to the JSONL file with simple rotation.
'''


def emit_event(ev: FsEvent):
    try:
        payload = asdict(ev)
        payload["_stamp"] = stamp()
        line = json.dumps(payload, ensure_ascii=False, separators=(",", ":"))
        print(line)
        write_jsonl(payload)
    except Exception as e:
        print(f"{stamp()} [EMIT_ERROR] {e} for event {ev}")
'''
    Unified logging point: print single-line JSON (easy to parse) and write JSONL to disk.
'''


def seed_index(root: str):
    root_abs = norm(root)
    batch_id = new_batch_id()
    with lock:
        live_paths.add(root_abs)
    # Mark the root itself
    meta, meta_src = collect_fs_meta_with_source(root_abs, is_dir_hint=True)
    cache_put(root_abs, meta)
    emit_event(FsEvent(
        ts=time.time(),
        action="created",
        path=root_abs,
        is_dir=meta["is_dir"],
        size=meta["size"],
        mtime=meta["mtime"],
        inode=meta["inode"],
        uid=meta["uid"],
        hash_sha256=meta["hash_sha256"],
        source="resync",
        batch_id=batch_id,
        extra={"meta_source": meta_src},
    ))
    for dirpath, dirnames, filenames in os.walk(root_abs):
        dp = norm(dirpath)
        with lock:
            live_paths.add(dp)
        # Directories
        for name in dirnames:
            p = norm(os.path.join(dp, name))
            with lock:
                live_paths.add(p)
            meta, meta_src = collect_fs_meta_with_source(p, is_dir_hint=True)
            cache_put(p, meta)
            emit_event(FsEvent(
                ts=time.time(),
                action="created",
                path=p,
                is_dir=meta["is_dir"],
                size=meta["size"],
                mtime=meta["mtime"],
                inode=meta["inode"],
                uid=meta["uid"],
                hash_sha256=meta["hash_sha256"],
                source="resync",
                batch_id=batch_id,
                extra={"meta_source": meta_src},
            ))
        # Files
        for name in filenames:
            p = norm(os.path.join(dp, name))
            with lock:
                live_paths.add(p)
            meta, meta_src = collect_fs_meta_with_source(p, is_dir_hint=False)
            cache_put(p, meta)
            emit_event(FsEvent(
                ts=time.time(),
                action="created",
                path=p,
                is_dir=meta["is_dir"],
                size=meta["size"],
                mtime=meta["mtime"],
                inode=meta["inode"],
                uid=meta["uid"],
                hash_sha256=meta["hash_sha256"],
                source="resync",
                batch_id=batch_id,
                extra={"meta_source": meta_src},
            ))
'''
    Populate FsEvent with folder/file metadata.
'''

# --------------------------- DELETIONS ----------------------------
def _emit_summary(dir_key: str):
    with lock:
        count = pending_dir_counts.pop(dir_key, 0)
        t = pending_dir_timers.pop(dir_key, None)
        pending_dir_deletes.pop(dir_key, None)
        if t:
            t.cancel()
    print(f"{stamp()} [Summary] Directory deleted {dir_key}; {count} objects deleted along with it")
    ev = FsEvent(
        ts=time.time(),
        action="summary",
        path=dir_key,
        is_dir=True,
        summary_count=count,
        source="event",
        batch_id=_current_batch_id,
    )
    emit_event(ev)
    cache_delete_prefix(dir_key)


def _arm_summary_timer(dir_key: str):
    ttl = COALESCE_MS / 1000.0
    with lock:
        old = pending_dir_timers.get(dir_key)
        if old:
            old.cancel()
        t = threading.Timer(ttl, _emit_summary, args=(dir_key,))
        pending_dir_timers[dir_key] = t
        t.start()

# ----------------------------- MOVES ------------------------------
def _emit_move_summary(src_dir: str):
    with lock:
        count = pending_move_counts.pop(src_dir, 0)
        ts_dst = pending_dir_moves.pop(src_dir, None)
        dst_dir = ts_dst[1] if ts_dst else None
        t = pending_move_timers.pop(src_dir, None)
        if t:
            t.cancel()
    if dst_dir:
        print(f"{stamp()} [Summary] Directory moved {src_dir} -> {dst_dir}; {count} objects moved along with it")
    else:
        print(f"{stamp()} [Summary] Directory moved {src_dir}; {count} objects moved along with it")
    ev = FsEvent(
        ts=time.time(),
        action="summary",
        path=src_dir,
        is_dir=True,
        new_path=dst_dir,
        summary_count=count,
        source="event",
        batch_id=_current_batch_id,
    )
    emit_event(ev)

    if dst_dir:
        cache_move_prefix(src_dir, dst_dir)


def _arm_move_summary_timer(src_dir: str):
    ttl = COALESCE_MS / 1000.0
    with lock:
        old = pending_move_timers.get(src_dir)
        if old:
            old.cancel()
        t = threading.Timer(ttl, _emit_move_summary, args=(src_dir,))
        pending_move_timers[src_dir] = t
        t.start()


def emit_move_event(old_path: str, new_path: str, is_dir: bool):
    kind = 'Directory' if is_dir else "File"
    with lock:
        live_paths.discard(old_path)
        live_paths.add(new_path)
    print(f"{stamp()} [Moved] {kind}: {old_path} -> {new_path}")
    meta_new = collect_fs_meta(new_path, is_dir_hint=is_dir)
    ev = FsEvent(
        ts=time.time(),
        action="moved",
        path=old_path,
        new_path=new_path,
        is_dir=is_dir,
        size=meta_new["size"],
        mtime=meta_new["mtime"],
        inode=meta_new["inode"],
        uid=meta_new["uid"],
        hash_sha256=meta_new["hash_sha256"],
        source="event",
        batch_id=_current_batch_id,
    )
    emit_event(ev)

    cache_put(new_path, meta_new)
    cache_delete(old_path)

# ============================================================================
#                              EVENT HANDLER
# ============================================================================

class MyHandler(FileSystemEventHandler):
    def on_modified(self, event):
        p = norm(event.src_path)
        now = time.time()
        # debounce
        last = _last_mod_times.get(p, 0.0)
        if (now - last) * 1000 < MODIFIED_DEBOUNCE_MS:
            return
        _last_mod_times[p] = now
        kind = 'Directory' if event.is_directory else "File"
        with lock:
            live_paths.add(p)
        print(f"{stamp()} [Modified] {kind}: {p}")
        meta, meta_src = collect_fs_meta_with_source(p, is_dir_hint=event.is_directory)
        # update cache (object exists)
        cache_put(p, meta)
        ev = FsEvent(
            ts=now,
            action="modified",
            path=p,
            is_dir=meta["is_dir"],
            size=meta["size"],
            mtime=meta["mtime"],
            inode=meta["inode"],
            uid=meta["uid"],
            hash_sha256=meta["hash_sha256"],
            source="event",
            batch_id=_current_batch_id,
            extra={"meta_source": meta_src},
        )
        emit_event(ev)

    def on_moved(self, event):
        src = norm(event.src_path)
        dst = norm(event.dest_path)
        now = time.time()
        if event.is_directory:
            kind = 'Directory'
            cutoff = now - COALESCE_MS / 1000.0
            absorbed_from_buffer = 0
            with lock:
                pending_dir_moves[src] = (now, dst)
                pending_move_counts[src] = 0
                global recent_file_moves
                newbuf = deque()
                for fp, ts in recent_file_moves:
                    if ts >= cutoff and is_under(fp, src):
                        absorbed_from_buffer += 1
                    else:
                        newbuf.append((fp, ts))
                recent_file_moves = newbuf
                descendants = [lp for lp in live_paths if lp != src and is_under(lp, src)]
                count_index = len(descendants)
                for lp in descendants:
                    rel = os.path.relpath(lp, src)
                    new_lp = norm(os.path.join(dst, rel))
                    live_paths.discard(lp)
                    live_paths.add(new_lp)
                live_paths.discard(src)
                live_paths.add(dst)
                pending_move_counts[src] = count_index + absorbed_from_buffer
            print(f"{stamp()} [Moved] {kind}: {src} -> {dst}")
            # Structural moved-event for the directory (single); summary will come via timer
            meta_new, meta_src = collect_fs_meta_with_source(dst, is_dir_hint=True)
            # move cache under the directory
            cache_move_prefix(src, dst)
            cache_put(dst, meta_new)
            emit_event(FsEvent(
                ts=time.time(),
                action="moved",
                path=src,
                new_path=dst,
                is_dir=True,
                size=meta_new["size"],
                mtime=meta_new["mtime"],
                inode=meta_new["inode"],
                uid=meta_new["uid"],
                hash_sha256=meta_new["hash_sha256"],
                source="event",
                batch_id=_current_batch_id,
                extra={
                    "descendants_index_count": count_index,
                    "absorbed_from_buffer": absorbed_from_buffer,
                    "meta_source": meta_src
                },
            ))
            _arm_move_summary_timer(src)
            return
        # File
        kind = 'File'
        with lock:
            recent_file_moves.append((src, now))
            was_present = src in live_paths
            live_paths.discard(src)
            live_paths.add(dst)
            parents = [d for d in pending_dir_moves.keys() if is_under(src, d)]
            parent = max(parents, key=len) if parents else None
            if parent and was_present:
                pending_move_counts[parent] = pending_move_counts.get(parent, 0) + 1
        if parent:
            # don't spam — event will be counted in the summary
            _arm_move_summary_timer(parent)
            # But keep a structural trace about the file move (useful for the journal)
            meta_new, meta_src = collect_fs_meta_with_source(dst, is_dir_hint=False)
            cache_put(dst, meta_new)
            cache_delete(src)
            emit_event(FsEvent(
                ts=time.time(),
                action="moved",
                path=src,
                new_path=dst,
                is_dir=False,
                size=meta_new["size"],
                mtime=meta_new["mtime"],
                inode=meta_new["inode"],
                uid=meta_new["uid"],
                hash_sha256=meta_new["hash_sha256"],
                source="event",
                batch_id=_current_batch_id,
                extra={"coalesced_under": parent, "meta_source": meta_src},
            ))
            return
        print(f"{stamp()} [Moved] {kind}: {src} -> {dst}")
        meta_new, meta_src = collect_fs_meta_with_source(dst, is_dir_hint=False)
        cache_put(dst, meta_new)
        cache_delete(src)
        emit_event(FsEvent(
            ts=time.time(),
            action="moved",
            path=src,
            new_path=dst,
            is_dir=False,
            size=meta_new["size"],
            mtime=meta_new["mtime"],
            inode=meta_new["inode"],
            uid=meta_new["uid"],
            hash_sha256=meta_new["hash_sha256"],
            source="event",
            batch_id=_current_batch_id,
            extra={"meta_source": meta_src},
        ))

    def on_created(self, event):
        p = norm(event.src_path)
        kind = 'Directory' if event.is_directory else "File"
        with lock:
            live_paths.add(p)
        print(f"{stamp()} [Created] {kind}: {p}")
        meta, meta_src = collect_fs_meta_with_source(p, is_dir_hint=event.is_directory)
        cache_put(p, meta)
        emit_event(FsEvent(
            ts=time.time(),
            action="created",
            path=p,
            is_dir=meta["is_dir"],
            size=meta["size"],
            mtime=meta["mtime"],
            inode=meta["inode"],
            uid=meta["uid"],
            hash_sha256=meta["hash_sha256"],
            source="event",
            batch_id=_current_batch_id,
            extra={"meta_source": meta_src},
        ))

    def on_deleted(self, event):
        p = norm(event.src_path)
        if event.is_directory:
            kind = 'Directory'
            now = time.time()
            cutoff = now - COALESCE_MS / 1000.0
            absorbed_from_buffer = 0
            with lock:
                pending_dir_deletes[p] = now
                pending_dir_counts[p] = 0
                global recent_file_deletes
                newbuf = deque()
                for fp, ts in recent_file_deletes:
                    if ts >= cutoff and is_under(fp, p):
                        absorbed_from_buffer += 1
                    else:
                        newbuf.append((fp, ts))
                recent_file_deletes = newbuf
                descendants = [lp for lp in live_paths if lp != p and is_under(lp, p)]
                count_index = len(descendants)
                for lp in descendants:
                    live_paths.discard(lp)
                live_paths.discard(p)
                pending_dir_counts[p] = count_index + absorbed_from_buffer
            print(f"{stamp()} [Deleted] {kind}: {p}")
            # Mark the fact of directory deletion
            emit_event(FsEvent(
                ts=time.time(),
                action="deleted",
                path=p,
                is_dir=True,
                source="event",
                batch_id=_current_batch_id,
                extra={"descendants_index_count": count_index, "absorbed_from_buffer": absorbed_from_buffer},
            ))
            # clean cache under the deleted directory
            cache_delete_prefix(p)
            _arm_summary_timer(p)
        else:
            kind = 'File'
            abs_path = p
            now = time.time()
            with lock:
                recent_file_deletes.append((abs_path, now))
                was_present = abs_path in live_paths
                live_paths.discard(abs_path)
                parents = [d for d in pending_dir_deletes if is_under(abs_path, d)]
                parent = max(parents, key=len) if parents else None
                if parent and was_present:
                    pending_dir_counts[parent] = pending_dir_counts.get(parent, 0) + 1
            # try to fetch metadata from cache
            lm = cache_get(abs_path)
            extra = {"meta_source": "cache" if lm else "none"}
            ev_kwargs = dict(
                ts=time.time(),
                action="deleted",
                path=abs_path,
                is_dir=False,
                source="event",
                batch_id=_current_batch_id,
                extra=extra,
            )
            if lm:
                ev_kwargs.update(dict(size=lm.size, mtime=lm.mtime, inode=lm.inode, uid=lm.uid, hash_sha256=lm.hash_sha256))
            if parent:
                _arm_summary_timer(parent)
                ev_kwargs["extra"]["coalesced_under"] = parent
                emit_event(FsEvent(**ev_kwargs))
                cache_delete(abs_path)
                return
            print(f"{stamp()} [Deleted] {kind}: {abs_path}")
            emit_event(FsEvent(**ev_kwargs))
            cache_delete(abs_path)

# ============================================================================
#                                 ENTRY POINT
# ============================================================================

if __name__ == "__main__":
    path = "."
    seed_index(path)
    event_handler = MyHandler()
    observer = Observer()
    observer.schedule(event_handler, path, recursive=True)
    observer.start()
    print(f"Monitoring {Path(path).resolve()} started. Press Ctrl+C to exit.")
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()