PyWatch — Moving a Folder with Content Count

                
                PyWatch — Moving a Folder with Content Count
The article demonstrates how to implement coalescing in PyWatch when moving a folder. New structures were added to handle both the old and new paths, along with the summary functions _emit_move_summary and _arm_move_summary_timer, and the on_moved() handler was reworked. Now, when a folder is moved, the log outputs a single event showing both the old and new paths, plus a final summary with the number of moved objects. Essentially, this is an extension of the approach from the deletion article, but adapted to account for two paths.

Introduction

 

In --> the previous article <-- I took a detailed look at coalescing when deleting folders and files. There, we wrote a complete mechanism that collects a storm of events into one understandable message. Now it’s time to handle moves.

 

The logic is similar: if we move a folder, we don’t want to flood the log with dozens of events for each nested file. Instead, we want one neat message like this:

 

[Moved] Folder: old_path -> new_path
[Summary] Folder moved old_path -> new_path; along with it N objects were moved

 

Quick implementation

 

To implement this, I added several functions and structures very similar to what we had in on_deleted(), but with a caveat: we have both the old path and the new path.

 

New variables

 

pending_dir_moves: dict[str, tuple[float, str]] = {}
pending_move_counts: dict[str, int] = {}
pending_move_timers: dict[str, threading.Timer] = {}
recent_file_moves: deque[tuple[str, float]] = deque()

 

They work just like the structures for deletion, but additionally we store dst_dir — where exactly the folder was moved.

 

Helper functions

 

def _emit_move_summary(src_dir: str):
    with lock:
        count = pending_move_counts.pop(src_dir, 0)
        ts_dst = pending_dir_moves.pop(src_dir, None)
        dst_dir = ts_dst[1] if ts_dst else None
        t = pending_move_timers.pop(src_dir, None)
        if t:
            t.cancel()

    if dst_dir:
        print(f"{stamp()} [Summary] Folder moved {src_dir} -> {dst_dir}; along with it {count} objects were moved")
    else:
        print(f"{stamp()} [Summary] Folder moved {src_dir}; along with it {count} objects were moved")

def _arm_move_summary_timer(src_dir: str):
    ttl = COALESCE_MS / 1000.0
    with lock:
        old = pending_move_timers.get(src_dir)
        if old:
            old.cancel()
        t = threading.Timer(ttl, _emit_move_summary, args=(src_dir,))
        pending_move_timers[src_dir] = t
        t.start()

def emit_move_event(old_path: str, new_path: str, is_dir: bool):
    kind = 'Folder' if is_dir else "File"
    with lock:
        live_paths.discard(old_path)
        live_paths.add(new_path)
    print(f"{stamp()} [Moved] {kind}: {old_path} -> {new_path}")

 

These functions are almost a copy of _emit_summary() and _arm_summary_timer(), but they print a move and use the pair src_dir -> dst_dir.

 

Modifying the on_moved() function

 

The key part is the move handler:

 

  • If a folder is moved:
  1. Save src and dst.
  2. Move all descendants in the live_paths index according to the new path.
  3. Count the number of objects (descendants + files from the buffer).
  4. Print the log and start the summary timer.

 

  • If a file is moved:
  1. Add it to the buffer.
  2. Check whether the parent folder is currently being moved. If so, don’t print a log; just increment the folder’s counter.

 

Thus, the behavior becomes identical to deletion, except now we don’t throw objects away — we “replant” them to a new place.

 

Differences from on_deleted()

 

In on_deleted() we worked with only one path — the old one. Here we need to keep both the old (src) and the new (dst). This is the main difference: we move the index (live_paths) instead of deleting.

Everything else — the logic of timers, counters, and the buffer — fully matches what was implemented earlier.

 

Conclusion

 

This function was easier than the previous one with deletion. That’s because I had already worked out the coalescing concept, and all that remained was to add handling of the old and new paths.

 

Final code:

 

import time
import datetime
from datetime import datetime as dt
import threading
from pathlib import Path
from collections import deque
import os
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

# ============================================================================
#                               VARIABLES
# ============================================================================

COALESCE_MS = 300

# --- Deletion coalescing ---
pending_dir_deletes: dict[str, float] = {}
pending_dir_counts:  dict[str, int]   = {}
pending_dir_timers:  dict[str, threading.Timer] = {}
recent_file_deletes: deque[tuple[str, float]] = deque()

# --- Move coalescing ---
pending_dir_moves: dict[str, tuple[float, str]] = {} 
pending_move_counts: dict[str, int] = {}
pending_move_timers: dict[str, threading.Timer] = {}
recent_file_moves: deque[tuple[str, float]] = deque()

live_paths: set[str] = set()

lock = threading.Lock()

# ============================================================================
#                         HELPER FUNCTIONS
# ============================================================================

def stamp(): 
    return dt.now().strftime("%Y-%m-%d %H:%M:%S")

def norm(path: str) -> str:
    return str(Path(path).resolve())

def is_under(child: str, parent: str) -> bool:
    try:
        Path(child).resolve().relative_to(Path(parent).resolve())
        return True
    except Exception:
        return False

def seed_index(root: str):
    root_abs = norm(root)
    with lock:
        live_paths.add(root_abs)
        for dirpath, dirnames, filenames in os.walk(root_abs):
            dp = norm(dirpath)
            live_paths.add(dp)
            for name in dirnames:
                live_paths.add(norm(os.path.join(dp, name)))
            for name in filenames:
                live_paths.add(norm(os.path.join(dp, name)))

# ------------------------- DELETIONS -------------------------------

def _emit_summary(dir_key: str):
    with lock:
        count = pending_dir_counts.pop(dir_key, 0)
        print(f"{stamp()} [Summary] Folder deleted {dir_key}; along with it {count} objects were deleted")
        pending_dir_deletes.pop(dir_key, None)
        t = pending_dir_timers.pop(dir_key, None)
        if t:
            t.cancel()

def _arm_summary_timer(dir_key: str):
    ttl = COALESCE_MS / 1000.0
    with lock:
        old = pending_dir_timers.get(dir_key)
        if old:
            old.cancel()
        t = threading.Timer(ttl, _emit_summary, args=(dir_key,))
        pending_dir_timers[dir_key] = t
        t.start()

# ------------------------- MOVES ----------------------------

def _emit_move_summary(src_dir: str):
    with lock:
        count = pending_move_counts.pop(src_dir, 0)
        ts_dst = pending_dir_moves.pop(src_dir, None)
        dst_dir = ts_dst[1] if ts_dst else None
        t = pending_move_timers.pop(src_dir, None)
        if t:
            t.cancel()

    if dst_dir:
        print(f"{stamp()} [Summary] Folder moved {src_dir} -> {dst_dir}; along with it {count} objects were moved")
    else:
        print(f"{stamp()} [Summary] Folder moved {src_dir}; along with it {count} objects were moved")

def _arm_move_summary_timer(src_dir: str):
    ttl = COALESCE_MS / 1000.0
    with lock:
        old = pending_move_timers.get(src_dir)
        if old:
            old.cancel()
        t = threading.Timer(ttl, _emit_move_summary, args=(src_dir,))
        pending_move_timers[src_dir] = t
        t.start()

def emit_move_event(old_path: str, new_path: str, is_dir: bool):
    kind = 'Folder' if is_dir else "File"
    with lock:
        live_paths.discard(old_path)
        live_paths.add(new_path)
    print(f"{stamp()} [Moved] {kind}: {old_path} -> {new_path}")

# ============================================================================
#                           EVENT HANDLER
# ============================================================================

class MyHandler(FileSystemEventHandler):
    def on_modified(self, event):
        p = norm(event.src_path)
        kind = 'Folder' if event.is_directory else "File"
        with lock:
            live_paths.add(p)
        print(f"{stamp()} [Modified] {kind}: {p}")

    def on_moved(self, event):
        src = norm(event.src_path)
        dst = norm(event.dest_path)
        now = time.time()

        if event.is_directory:
            kind = 'Folder'
            cutoff = now - COALESCE_MS / 1000.0
            absorbed_from_buffer = 0

            with lock:
                pending_dir_moves[src] = (now, dst)
                pending_move_counts[src] = 0

                global recent_file_moves
                newbuf = deque()
                for fp, ts in recent_file_moves:
                    if ts >= cutoff and is_under(fp, src):
                        absorbed_from_buffer += 1
                    else:
                        newbuf.append((fp, ts))
                recent_file_moves = newbuf

                descendants = [lp for lp in live_paths if lp != src and is_under(lp, src)]
                count_index = len(descendants)
                for lp in descendants:
                    rel = os.path.relpath(lp, src)
                    new_lp = norm(os.path.join(dst, rel))
                    live_paths.discard(lp)
                    live_paths.add(new_lp)

                live_paths.discard(src)
                live_paths.add(dst)

                pending_move_counts[src] = count_index + absorbed_from_buffer

            print(f"{stamp()} [Moved] {kind}: {src} -> {dst}")
            _arm_move_summary_timer(src)
            return

        kind = 'File'
        with lock:
            recent_file_moves.append((src, now))

            was_present = src in live_paths
            live_paths.discard(src)
            live_paths.add(dst)

            parents = [d for d in pending_dir_moves.keys() if is_under(src, d)]
            parent = max(parents, key=len) if parents else None
            if parent and was_present:
                pending_move_counts[parent] = pending_move_counts.get(parent, 0) + 1

        if parent:
            _arm_move_summary_timer(parent)
            return

        print(f"{stamp()} [Moved] {kind}: {src} -> {dst}")

    def on_created(self, event):
        p = norm(event.src_path)
        kind = 'Folder' if event.is_directory else "File"
        with lock:
            live_paths.add(p)
        print(f"{stamp()} [Created] {kind}: {p}")

    def on_deleted(self, event):
        p = norm(event.src_path)
        if event.is_directory:
            kind = 'Folder'
            now = time.time()
            cutoff = now - COALESCE_MS / 1000.0
            absorbed_from_buffer = 0

            with lock:
                pending_dir_deletes[p] = now
                pending_dir_counts[p] = 0

                global recent_file_deletes
                newbuf = deque()
                for fp, ts in recent_file_deletes:
                    if ts >= cutoff and is_under(fp, p):
                        absorbed_from_buffer += 1
                    else:
                        newbuf.append((fp, ts))
                recent_file_deletes = newbuf

                descendants = [lp for lp in live_paths if lp != p and is_under(lp, p)]
                count_index = len(descendants)
                for lp in descendants:
                    live_paths.discard(lp)
                live_paths.discard(p)

                pending_dir_counts[p] = count_index + absorbed_from_buffer

            print(f"{stamp()} [Deleted] {kind}: {p}")
            _arm_summary_timer(p)

        else:
            
            kind = 'File'
            abs_path = p
            now = time.time()

            with lock:
                recent_file_deletes.append((abs_path, now))
                was_present = abs_path in live_paths
                live_paths.discard(abs_path)

                parents = [d for d in pending_dir_deletes if is_under(abs_path, d)]
                parent = max(parents, key=len) if parents else None
                
                if parent and was_present:
                    pending_dir_counts[parent] = pending_dir_counts.get(parent, 0) + 1

            if parent:
                _arm_summary_timer(parent)
                return

            print(f"{stamp()} [Deleted] {kind}: {abs_path}")


# ============================================================================
#                               ENTRY POINT
# ============================================================================

if __name__ == "__main__":
    path = "."
    seed_index(path)

    event_handler = MyHandler()
    observer = Observer()
    observer.schedule(event_handler, path, recursive=True)
    observer.start()

    print(f"Watching {Path(path).resolve()} started. Press Ctrl+C to exit.")
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()