PyWatch — Folder Deletion and Event Coalescing

                
                PyWatch — Folder Deletion and Event Coalescing
We will dive into the on_deleted() handler and show how PyWatch correctly counts the number of deleted objects when an entire folder is removed. The focus is on macOS and the FSEvents mechanism. Function explanations will be minimal, and the “line-by-line” details will be inside the code itself (at the end).

Key Functions

 

  • seed_index(root)
    At startup, fills live_paths with absolute paths of all files and directories under the root. This is the baseline from which we count removed objects.
  • norm(path)
    Converts a path to an absolute one via .resolve() (removes ./.., expands symlinks). Needed for consistent keys.
  • is_under(child, parent)
    Checks whether child lies inside parent. Used to filter descendants.
  • on_deleted(event)
    The main function.
    • If a directory was deleted: we mark a “tombstone”, remove its descendants from live_paths, count them, open the coalescing window, and print the final summary via a timer.
    • If a file was deleted: we add a record to the buffer of “early” deletions and, if the file is inside a folder with a tombstone, we increment its counter.
  • _arm_summary_timer(dir_key) / _emit_summary(dir_key)
    Restart/execute the summary timer. When the coalescing window expires, they print the total and clear the state.

 

Theory: macOS FSEvents, queues, and coalescing

 

How FSEvents delivers events

 

On macOS, the FSEvents service keeps a per-directory change journal. An application (watchdog) subscribes to directories and receives batches of events where paths are already normalized by the kernel. Important details:

  • Batching and OS-level coalescing: during fast cascade deletions (many files → a folder), the system may not send every child event separately. Sometimes you will see only “directory deleted”.
  • Delivery order is not strictly guaranteed: individual child events can arrive before or after the directory deletion event—differences of tens/hundreds of milliseconds.
  • Threads: watchdog invokes handlers from the observer’s worker thread; our timers live in their own threads. Synchronization is required.

 

Why, without coalescing, losses/miscounts are possible

 

If you print logs immediately without a “waiting window”, then:

  • you can miss some child deletions (they arrive after “directory deleted”);
  • or get double counting: you counted descendants by index first, then “late” file deletions arrived and increased the number again.

 

What our coalescing does

 

We added a time window COALESCE_MS. If a directory DELETE arrives, we:

  1. Count its descendants from our live_paths index (built by seed_index and updated in on_created/on_modified).
  2. Remove them from live_paths and mark a tombstone pending_dir_deletes[p].
  3. Start the summary timer for COALESCE_MS.
  4. All “late” file deletions within this folder that arrive within the window are absorbed into pending_dir_counts[p].
  5. When the window closes, we print one summary and clear the state.

 

This protects against “holes” in statistics and against double counting.

 

How the code executes on directory deletion

 

Below are the steps that happen in code when on_deleted(is_directory=True) fires:

  1. Normalize the path
    p = norm(event.src_path) → the path is absolute and consistent.
  2. Prepare the coalescing window
    now = time.time() and cutoff = now - COALESCE_MS/1000.
  3. Create a tombstone for the directory (under lock)

    pending_dir_deletes[p] = now
    pending_dir_counts[p] = 0

    This is the root of the summary.

  4. Absorb early deletions (under lock)
    Iterate recent_file_deletes and take those not earlier than cutoff that lie under p. They increment absorbed_from_buffer.
  5. Count via the index (under lock)
    Find all descendants of p in live_paths and count them. This covers the case where the OS sent only “directory deleted” without children. Then remove them from the index (and p itself).
  6. Final counter (under lock)
    pending_dir_counts[p] = count_index + absorbed_from_buffer.
  7. Immediate fact log
    Print that directory p was deleted. (The summary is later.)
  8. Start/extend the timer
    _arm_summary_timer(p) — starts/restarts the timer for COALESCE_MS.
  9. What happens “during the window”
    If, within the window, individual DELETE events for files arrive inside p, they find the parent in pending_dir_deletes and increment pending_dir_counts[p]. The timer may be extended if desired (in your code — restart on repeated _arm_summary_timer for the parent).
  10. When the timer fires
    _emit_summary(p) prints: “Deleted folder X; N objects removed”, and clears pending_dir_*.

 

Diagrams for visual understanding of the code

 

1) Architecture (high level)

 

┌─────────────┐   ┌─────────────────┐   ┌───────────────────┐   ┌───────────────────────┐
│  OS (macOS) │──►│  FSEvents (OS)  │──►│ watchdog.Observer │──►│ MyHandler (callbacks) │
└─────────────┘   └─────────────────┘   └───────────────────┘   └───────────┬───────────┘
                                                                            │
                                                                            ▼
                                                                     ┌──────────────┐
                                                                     │ on_deleted() │
                                                                     └──────┬───────┘
                                                                            │
                                                                            ▼
                                                              ┌────────────────────────────┐
                                                              │  coalescing/summary (300ms)│
                                                              └─────────────┬──────────────┘
                                                                            ▼
                                                                     ┌─────────────┐
                                                                     │   output    │
                                                                     └─────────────┘

 

2) Execution sequence for directory deletion

 

Actor: Filesystem(FSEvents)     Observer(watchdog)     MyHandler.on_deleted       State/Stores
      ─────────────────────     ──────────────────     ─────────────────────       ─────────────
1) DELETE dir=/A  ───────────►  event: is_directory ─► p=norm(/A)                 live_paths (before)
                                              │
2)                                   lock ────┼──► pending_dir_deletes[p]=now
                                              │    pending_dir_counts[p]=0
                                              │    absorb recent_file_deletes within cutoff
                                              │    descendants = {x ∈ live_paths | x under p}
                                              │    remove descendants & p from live_paths
                                              │    pending_dir_counts[p]+=len(descendants)+absorbed
                                              ▼
3) print "[Deleted] Dir: /A"
4) _arm_summary_timer(p)  ─────►  start/restart Timer(COALESCE_MS)
5) (during window)        ◄─────  DELETE file=/A/… may arrive
   on_deleted(file)               → +1 to pending_dir_counts[p], extend timer
6) timer fires ──────────►  _emit_summary(p) → print "[Summary] … N objects removed"
                             clear pending_dir_* for p

 

3) Finite state machine for on_deleted

 

        ┌───────────────────────────────────────────────────────────┐
        │ [IDLE]                                                    │
        └───────────┬───────────────────────────────────────────────┘
                    │ DELETE(dir)
                    ▼
        ┌───────────────────────────────────────────────────────────┐
        │ [DIR_TOMBSTONE_SET]  (tombstone created, index counted)   │
        └───────────┬───────────────────────────────────────────────┘
                    │ DELETE(file under dir) during window
                    ▼
        ┌───────────────────────────────────────────────────────────┐
        │ [COALESCE_WINDOW] (accumulate counter, timer runs)        │
        └───────────┬───────────────────────────────────────────────┘
                    │ timer expired
                    ▼
        ┌───────────────────────────────────────────────────────────┐
        │ [SUMMARY_EMIT] → print summary, clear pending_*           │
        └───────────┬───────────────────────────────────────────────┘
                    │
                    ▼
        ┌───────────────────────────────────────────────────────────┐
        │ [IDLE]                                                    │
        └───────────────────────────────────────────────────────────┘

 

4) Coalescing timeline

 

time →   t0              t0+60ms            t0+140ms           t0+280ms        t0+300ms
         │                │                  │                  │               │
FSEvents │  DELETE /A ───►│                  │                  │               │
        files             │  DELETE /A/f1 ──►│                  │               │
                          │                  │  DELETE /A/f2 ──►│               │
                          │                  │                  │               │ (timer)
                          
summary  │                                                     emit summary: count = index(A)+absorbed

 

5) Data structure relationships

 

┌───────────────────┐      write/remove     ┌─────────────────────┐
│    live_paths     │◄──────────────────────│  seed_index, events │
│  set[str]         │                       │  on_created/modified│
└─────────┬─────────┘                       └─────────┬───────────┘
          │  read (for dir delete)                    │
          ▼                                           │
┌───────────────────┐   append    ┌─────────────────────────┐     refer/absorb    ┌──────────────────────┐
│ recent_file_del.  │◄────────────│ on_deleted(file)        │────────────────────►│ pending_dir_counts   │
│ deque[(str,float)]│             └─────────────────────────┘                     │ dict[str,int]        │
└───────────────────┘                                                             └───────────┬──────────┘
                                                                                              │
                                                                                              ▼
                                                                                   ┌──────────────────────┐
│                      │    set/start timer   ┌─────────────────────────┐          │ pending_dir_deletes │
│ watchdog timer thread│─────────────────────►│ _arm_summary_timer(dir) │─────────►│ dict[str,float]     │
│                      │                      └───────────┬─────────────┘          └──────────────────────┘
                                                          │                                   │
                                                          ▼                                   │
                                              ┌─────────────────────────┐                     │
                                              │   _emit_summary(dir)    │◄────────────────────┘
                                              └─────────────────────────┘  (clear pending_*)

 

6) Threads and locks

 

                 ┌───────────────────────────────────────────────┐
                 │            Observer Thread (watchdog)         │
                 │  calls MyHandler.on_*                         │
                 │  - modifies live_paths                        │
                 │  - updates pending_dir_* / deque              │
                 └───────────────▲───────────────────────────────┘
                                 │
           acquire lock()  ┌─────┴─────┐  acquire lock()
                           │   lock    │
           release lock()  └─────┬─────┘  release lock()
                                 │
                 ┌───────────────▼───────────────────────────────┐
                 │               Timer Thread(s)                 │
                 │  - _arm_summary_timer/_emit_summary           │
                 │  - reads & clears pending_dir_*               │
                 └───────────────────────────────────────────────┘

 

7) “Early” deletion queue and absorption

 

recent_file_deletes (deque):
   head                                                       tail
   ┌───────────────┬───────────────┬───────────────┬───────────────┐
   │ (fp1, ts=100) │ (fp2, ts=180) │ (fp3, ts=240) │ (fp4, ts=410) │
   └───────────────┴───────────────┴───────────────┴───────────────┘
DELETE dir=/A at now=300, cutoff=300-COALESCE=0.3s ⇒ 300-0.3=0.0s (example)
Absorb elements satisfying:
   ts ≥ cutoff  AND  is_under(fp, /A)
Suppose fp1, fp2, fp3 are under /A and satisfy ts ≥ cutoff ⇒ absorbed=3
Non-matching elements are moved to a new buffer and returned to the deque.

 

Tables

 

1) Core structures

 

StructureTypeWritten byRead byPurpose
live_pathsset[str]seed_index, on_created/modifiedon_deletedIndex of “live” paths to know what disappeared when a folder is deleted
recent_file_deletesdeque[(str, float)]on_deleted(file)on_deleted(dir)Buffer of early file DELETEs “absorbed” by the parent directory
pending_dir_deletesdict[str, float]on_deleted(dir)_emit_summary, on_deleted(file)Tombstone: the fact of directory deletion and start of the window
pending_dir_countsdict[str, int]on_deleted(dir/file)_emit_summaryCounter of “how many disappeared”
pending_dir_timersdict[str, threading.Timer]_arm_summary_timer_emit_summaryDeferred summary output

 

2) Choosing the coalescing window COALESCE_MS

 

Criterion / Window100 ms300 ms (recommended)700 ms
Risk of “missed” files (late DELETEs)mediumloweven lower
Risk of double countinglowlowlow
User-visible summary delaylowmoderatenoticeable
Timer/lock overheadlowmoderateslightly higher
UX/correctness balanceundercount possiblebest balancedelay is large
Whyfast, but fragile for batch opslate FSEvents usually arrive within this windowmakes the response feel sluggish

 

Why 300 ms: on macOS, FSEvents batches cascade operations, and late deletions of child objects often arrive within ~200–250 ms; 300 ms reliably “swallows” them without a noticeable delay for the user.

 

Limitations and threading notes

 

  • All structures are modified under lock because watchdog callbacks and timers run in different threads.
  • For very large trees, seed_index may take time. This is the cost of a correct “directory deletion summary”.
  • If the user deleted a tree via a tool that doesn’t emit child events, we are still correct: we get the count from live_paths.

 

Full code with comments

 

Above, we explained only the key functions. Below is the full listing where the comments already contain detailed explanations of all parts.

 

import time
import datetime
from datetime import datetime as dt
import threading
from pathlib import Path
from collections import deque
import os
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

# ============================================================================
#                               VARIABLES
# ============================================================================

COALESCE_MS = 300
'''
    Coalescing window (ms): within this time we "merge" a burst of deletions.
'''

pending_dir_deletes: dict[str, float] = {}           # directory -> deletion time
pending_dir_counts:  dict[str, int]   = {}           # directory -> number of removed objects
pending_dir_timers:  dict[str, threading.Timer] = {} # directory -> summary timer
'''
    "Tombstones" for deleted directories and related artifacts.
    pending_dir_deletes - Dictionary variable name
    : dict[]            - Hint for human/IDE/linter; just good practice for debugging
    str                 - Directory path string
    float/int           - Numeric values depending on function context
    threading.Timer     - Timer class
    = {}                - Dictionary is created empty
'''

recent_file_deletes: deque[tuple[str, float]] = deque()
''' 
    Buffer of "early" file deletions (in case DELETEs for files arrive first,
    and the directory DELETE arrives later).
    recent_file_deletes - Name of the double-ended queue variable
    : deque[]           - Hint for human/IDE/linter; just good practice for debugging
    tuple[]             - Immutable, ordered tuple with flexible objects
    str                 - Path string
    float               - Floating numbers, we use for timestamps
    = deque()           - Queue is created empty
'''

live_paths: set[str] = set()
''' 
    Lightweight index of live paths; contains absolute normalized paths of all files
    and directories we consider "existing".
    Needed on macOS to count how many items inside a folder disappeared, even if the OS
    didn't send separate events for children.
    live_paths - Name of the set variable
    : set[]    - Hint for human/IDE/linter; just good practice for debugging
    str        - Path string
    = set()    - Set is created empty
'''


lock = threading.Lock()
''' 
    Protects critical sections where shared resources are accessed and modified.
    By acquiring the lock before entering a critical section, we ensure only one thread
    executes that code at a time, preventing data races.
    Lock()     - Create an instance. By default the lock is not acquired until a thread gets it
    acquire()  - Acquire the lock
    release()  - Release the lock
    with lock: - Equivalent to acquire() at block start and release() on exit
'''

# ============================================================================
#                         HELPER FUNCTIONS
# ============================================================================

def stamp(): 
    return dt.now().strftime("%Y-%m-%d %H:%M:%S")
'''
    Convenient timestamp formatting
'''

def norm(path: str) -> str:
    return str(Path(path).resolve())
'''
    Create an absolute normalized path for consistent keys.
    norm                      - Function name
    (path: str)               - Parameter path, the function expects a string
    -> str                    - Function returns a string
    str(Path(path).resolve()) - Convert to full path and to string
'''

def is_under(child: str, parent: str) -> bool:
    try:
        Path(child).resolve().relative_to(Path(parent).resolve())
        return True
    except Exception:
        return False
'''
    Check: does child lie inside parent?
    is_under                                                  - Function name
    (child: str, parent: str)                                 - Two string parameters
    -> bool                                                   - Function returns a bool
    Path(child).resolve().relative_to(Path(parent).resolve()) - Compare resolved paths to see
                                                                if one is under the other.
                                                                If it runs without errors → True.
                                                                On error → False.
'''

def seed_index(root: str):
    root_abs = norm(root)
    with lock:
        live_paths.add(root_abs)
        for dirpath, dirnames, filenames in os.walk(root_abs):
            dp = norm(dirpath)
            live_paths.add(dp)
            for name in dirnames:
                live_paths.add(norm(os.path.join(dp, name)))
            for name in filenames:
                live_paths.add(norm(os.path.join(dp, name)))
'''
    Initialize live_paths index with the current disk state.
    Important for correct counts if the folder already contains something at start.
    seed_index                                   - Function name
    (root: str)                                  - Parameter root, expected to be str
    root_abs = norm(root)                        - Normalize root path string
    with lock:                                   - Lock the shared live_paths
    live_paths.add(root_abs)                     - Add root path to live_paths
    for dirpath, dirnames, filenames in os.walk(root_abs) - Walk entire tree
    dp = norm(dirpath)                           - Normalize current dir path
    live_paths.add(dp)                           - Add directory path to live_paths
    for name in dirnames                         - Iterate subdirectories
    live_paths.add(norm(os.path.join(dp, name))) - Add each subdirectory
    for name in filenames                        - Iterate files
    live_paths.add(norm(os.path.join(dp, name))) - Add each file
'''

def _emit_summary(dir_key: str):
    with lock:
        count = pending_dir_counts.pop(dir_key, 0)
        print(f"{stamp()} [Summary] Directory {dir_key} deleted; {count} objects removed with it")
        pending_dir_deletes.pop(dir_key, None)
        t = pending_dir_timers.pop(dir_key, None)
        if t:
            t.cancel()
'''
    Fires on timer: prints the summary and clears per-directory state.
    _emit_summary                              - Function name
    (dir_key: str)                             - Parameter dir_key, expected to be str
    with lock:                                 - Lock the shared state
    count = pending_dir_counts.pop(dir_key, 0) - Pop count (0 if missing)
    pending_dir_deletes.pop(dir_key, None)     - Clean tombstone
    t = pending_dir_timers.pop(dir_key, None)  - Clean timer
    if t: t.cancel()                           - Cancel timer if present
'''

def _arm_summary_timer(dir_key: str):
    ttl = COALESCE_MS / 1000.0
    with lock:
        old = pending_dir_timers.get(dir_key)
        if old:
            old.cancel()
        t = threading.Timer(ttl, _emit_summary, args=(dir_key,))
        pending_dir_timers[dir_key] = t
        t.start()
'''
    (Re)start the summary timer for the directory.
    _arm_summary_timer                    - Function name
    (dir_key: str)                        - Parameter dir_key, expected to be str
    ttl = COALESCE_MS / 1000.0            - Convert ms to seconds
    with lock:                            - Lock shared state
    old = pending_dir_timers.get(dir_key) - Get existing timer
    if old: old.cancel()                  - Cancel if exists
    t = threading.Timer(...)              - Create a new timer that calls _emit_summary
    pending_dir_timers[dir_key] = t       - Store the timer
    t.start()                             - Start timer
'''

# ============================================================================
#                           EVENT HANDLER
# ============================================================================

class MyHandler(FileSystemEventHandler):
    def on_modified(self, event):
        p = norm(event.src_path)
        kind = 'Directory' if event.is_directory else "File"
        with lock:
            live_paths.add(p)
        print(f"{stamp()} [Modified] {kind}: {p}")

    def on_moved(self, event):
        p = norm(event.src_path)
        p_dest = norm(event.dest_path)
        kind = 'Directory' if event.is_directory else "File"
        print(f"{stamp()} [Moved] {kind}: {p} -> {p_dest}")

    def on_created(self, event):
        p = norm(event.src_path)
        kind = 'Directory' if event.is_directory else "File"
        with lock:
            live_paths.add(p)
        print(f"{stamp()} [Created] {kind}: {p}")

    def on_deleted(self, event):                                                        # Function called on deletion
        p = norm(event.src_path)                                                        # Normalize event path
        if event.is_directory:                                                          # Is it a directory path?
            kind = 'Directory'                                                          # Used in logs
            now = time.time()                                                           # Current time as float
            cutoff = now - COALESCE_MS / 1000.0                                         # Older than this won't be absorbed
            absorbed_from_buffer = 0                                                    # Counter of deletions before dir delete

            with lock:                                                                  # Single-thread access
                pending_dir_deletes[p] = now                                            # Add tombstone for dir p
                pending_dir_counts[p] = 0                                               # Reset accumulated count

                global recent_file_deletes                                              # We'll reassign the global deque
                newbuf = deque()                                                        # Keep only unrelated/too-old records
                for fp, ts in recent_file_deletes:                                      # Iterate prior file deletions
                    if ts >= cutoff and is_under(fp, p):                                # If recent and under the deleted dir
                        absorbed_from_buffer += 1                                       # Increment absorbed counter
                    else:                                                               # Otherwise
                        newbuf.append((fp, ts))                                         # Keep in the new buffer
                recent_file_deletes = newbuf                                            # Replace buffer

                descendants = [lp for lp in live_paths if lp != p and is_under(lp, p)]  # All live descendants of p
                count_index = len(descendants)                                          # Count them
                for lp in descendants:                                                  # Remove descendants
                    live_paths.discard(lp)
                live_paths.discard(p)                                                   # Remove the directory itself

                pending_dir_counts[p] = count_index + absorbed_from_buffer              # Final count

            print(f"{stamp()} [Deleted] {kind}: {p}")                                   # Log directory deletion
            _arm_summary_timer(p)                                                       # Schedule summary for the dir

        else:                                                                           # A file was deleted
            
            kind = 'File'                                                               # Log will say file
            abs_path = p                                                                # Absolute path
            now = time.time()                                                           # Current time as float

            with lock:                                                                  # Single-thread access
                recent_file_deletes.append((abs_path, now))                             # Put (path, time) into early-deletes buffer
                was_present = abs_path in live_paths                                    # Mark if it existed
                live_paths.discard(abs_path)                                            # Remove from live set

                parents = [d for d in pending_dir_deletes if is_under(abs_path, d)]     # Is it under a tombstoned dir?
                parent = max(parents, key=len) if parents else None                     # Choose the deepest parent
                
                if parent and was_present:                                              # Only if the file really existed
                    pending_dir_counts[parent] = pending_dir_counts.get(parent, 0) + 1  # Increment parent's count

            if parent:                                                                  # If tied to a parent dir
                _arm_summary_timer(parent)                                              # Suppress standalone log; extend parent's timer
                return

            print(f"{stamp()} [Deleted] {kind}: {abs_path}")                            # Log standalone file deletion


# ============================================================================
#                               ENTRY POINT
# ============================================================================

if __name__ == "__main__":
    path = "." # Watch path
    seed_index(path)

    event_handler = MyHandler()
    observer = Observer()
    observer.schedule(event_handler, path, recursive=True)
    observer.start()

    print(f"Watching {Path(path).resolve()} started. Press Ctrl+C to exit.")
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()