PyWatch — Перемещение папки с подсчетом содержимого

                
                PyWatch — Перемещение папки с подсчетом содержимого
В статье показано, как в PyWatch реализовать коалесинг при перемещении папки. Добавлены новые структуры для учёта старого и нового пути, функции сводки _emit_move_summary и _arm_move_summary_timer, а также переработан on_moved(). Теперь при перемещении папки в лог выводится одно событие с указанием старого и нового пути и итоговая сводка с количеством перемещённых объектов. По сути, это развитие подхода из статьи про удаление, но с учётом двух путей.

Вступление

 

В --> прошлой статье <-- я подробно разбирал коалесинг при удалении папок и файлов. Там мы писали полноценный механизм, который собирает лавину событий в одно понятное сообщение. Теперь настала очередь перемещений.

 

Логика похожа: если мы двигаем папку, нужно не завалить лог десятками событий о каждом вложенном файле, а красиво вывести одно сообщение вида:

 

[Перемещено] Папка: old_path -> new_path
[Сводка] Перемещена папка old_path -> new_path; вместе с ней перемещено N объектов

 

Быстрая реализация

 

Чтобы реализовать это, я добавил несколько функций и структур, очень похожих на то, что было в on_deleted(), но с поправкой: у нас есть старый путь и новый путь.

 

Новые переменные

 

pending_dir_moves: dict[str, tuple[float, str]] = {}
pending_move_counts: dict[str, int] = {}
pending_move_timers: dict[str, threading.Timer] = {}
recent_file_moves: deque[tuple[str, float]] = deque()

 

Они работают так же, как структуры для удаления, но дополнительно храним dst_dir — куда именно поехала папка.

 

Вспомогательные функции

 

def _emit_move_summary(src_dir: str):
    with lock:
        count = pending_move_counts.pop(src_dir, 0)
        ts_dst = pending_dir_moves.pop(src_dir, None)
        dst_dir = ts_dst[1] if ts_dst else None
        t = pending_move_timers.pop(src_dir, None)
        if t:
            t.cancel()

    if dst_dir:
        print(f"{stamp()} [Сводка] Перемещена папка {src_dir} -> {dst_dir}; вместе с ней перемещено {count} объектов")
    else:
        print(f"{stamp()} [Сводка] Перемещена папка {src_dir}; вместе с ней перемещено {count} объектов")

def _arm_move_summary_timer(src_dir: str):
    ttl = COALESCE_MS / 1000.0
    with lock:
        old = pending_move_timers.get(src_dir)
        if old:
            old.cancel()
        t = threading.Timer(ttl, _emit_move_summary, args=(src_dir,))
        pending_move_timers[src_dir] = t
        t.start()

def emit_move_event(old_path: str, new_path: str, is_dir: bool):
    kind = 'Папка' if is_dir else "Файл"
    with lock:
        live_paths.discard(old_path)
        live_paths.add(new_path)
    print(f"{stamp()} [Перемещено] {kind}: {old_path} -> {new_path}")

 

Эти функции почти копия _emit_summary() и _arm_summary_timer(), только печатают перемещение и используют пару src_dir -> dst_dir.

 

Изменение функции on_moved()

 

Ключевая часть — обработчик перемещений:

 

  • Если двигается папка:
  1. Сохраняем src и dst.
  2. Переносим всех потомков в индексе live_paths с учётом нового пути.
  3. Считаем количество объектов (потомки + файлы из буфера).
  4. Выводим лог и запускаем таймер сводки.

 

  • Если двигается файл:
  1. Добавляем в буфер.
  2. Проверяем, не двигается ли сейчас родительская папка. Если да — не печатаем лог, а просто увеличиваем её счётчик.

 

Таким образом, поведение стало идентичным удалению, только теперь мы не выкидываем объекты, а «пересаживаем» их на новое место.

 

Отличия от on_deleted()

 

В on_deleted() мы работали только с одним путём — старым. Здесь же нужно помнить и старый (src), и новый (dst). Это основное отличие: мы переносим индекс (live_paths) вместо удаления.

Всё остальное — логика таймеров, счётчиков и буфера — полностью совпадает с тем, что было реализовано ранее.

 

Заключение

 

Эта функция далась проще, чем предыдущая с удалением. Всё потому, что концепцию коалесинга я уже отработал, и оставалось только добавить работу со старым и новым путём.

 

Конечный код:

 

import time
import datetime
from datetime import datetime as dt
import threading
from pathlib import Path
from collections import deque
import os
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

# ============================================================================
#                               ПЕРЕМЕННЫЕ
# ============================================================================

COALESCE_MS = 300

# --- Коалесинг удалений ---
pending_dir_deletes: dict[str, float] = {}
pending_dir_counts:  dict[str, int]   = {}
pending_dir_timers:  dict[str, threading.Timer] = {}
recent_file_deletes: deque[tuple[str, float]] = deque()

# --- Коалесинг перемещений ---
pending_dir_moves: dict[str, tuple[float, str]] = {} 
pending_move_counts: dict[str, int] = {}
pending_move_timers: dict[str, threading.Timer] = {}
recent_file_moves: deque[tuple[str, float]] = deque()

live_paths: set[str] = set()

lock = threading.Lock()

# ============================================================================
#                         ВСПОМОГАТЕЛЬНЫЕ ФУНКЦИИ
# ============================================================================

def stamp(): 
    return dt.now().strftime("%Y-%m-%d %H:%M:%S")

def norm(path: str) -> str:
    return str(Path(path).resolve())

def is_under(child: str, parent: str) -> bool:
    try:
        Path(child).resolve().relative_to(Path(parent).resolve())
        return True
    except Exception:
        return False

def seed_index(root: str):
    root_abs = norm(root)
    with lock:
        live_paths.add(root_abs)
        for dirpath, dirnames, filenames in os.walk(root_abs):
            dp = norm(dirpath)
            live_paths.add(dp)
            for name in dirnames:
                live_paths.add(norm(os.path.join(dp, name)))
            for name in filenames:
                live_paths.add(norm(os.path.join(dp, name)))

# ------------------------- УДАЛЕНИЯ -------------------------------

def _emit_summary(dir_key: str):
    with lock:
        count = pending_dir_counts.pop(dir_key, 0)
        print(f"{stamp()} [Сводка] Удалена папка {dir_key}; вместе с ней удалено {count} объектов")
        pending_dir_deletes.pop(dir_key, None)
        t = pending_dir_timers.pop(dir_key, None)
        if t:
            t.cancel()

def _arm_summary_timer(dir_key: str):
    ttl = COALESCE_MS / 1000.0
    with lock:
        old = pending_dir_timers.get(dir_key)
        if old:
            old.cancel()
        t = threading.Timer(ttl, _emit_summary, args=(dir_key,))
        pending_dir_timers[dir_key] = t
        t.start()

# ------------------------- ПЕРЕМЕЩЕНИЯ ----------------------------

def _emit_move_summary(src_dir: str):
    with lock:
        count = pending_move_counts.pop(src_dir, 0)
        ts_dst = pending_dir_moves.pop(src_dir, None)
        dst_dir = ts_dst[1] if ts_dst else None
        t = pending_move_timers.pop(src_dir, None)
        if t:
            t.cancel()

    if dst_dir:
        print(f"{stamp()} [Сводка] Перемещена папка {src_dir} -> {dst_dir}; вместе с ней перемещено {count} объектов")
    else:
        print(f"{stamp()} [Сводка] Перемещена папка {src_dir}; вместе с ней перемещено {count} объектов")

def _arm_move_summary_timer(src_dir: str):
    ttl = COALESCE_MS / 1000.0
    with lock:
        old = pending_move_timers.get(src_dir)
        if old:
            old.cancel()
        t = threading.Timer(ttl, _emit_move_summary, args=(src_dir,))
        pending_move_timers[src_dir] = t
        t.start()

def emit_move_event(old_path: str, new_path: str, is_dir: bool):
    kind = 'Папка' if is_dir else "Файл"
    with lock:
        live_paths.discard(old_path)
        live_paths.add(new_path)
    print(f"{stamp()} [Перемещено] {kind}: {old_path} -> {new_path}")

# ============================================================================
#                           ОБРАБОТЧИК СОБЫТИЙ
# ============================================================================

class MyHandler(FileSystemEventHandler):
    def on_modified(self, event):
        p = norm(event.src_path)
        kind = 'Папка' if event.is_directory else "Файл"
        with lock:
            live_paths.add(p)
        print(f"{stamp()} [Изменено] {kind}: {p}")

    def on_moved(self, event):
        src = norm(event.src_path)
        dst = norm(event.dest_path)
        now = time.time()

        if event.is_directory:
            kind = 'Папка'
            cutoff = now - COALESCE_MS / 1000.0
            absorbed_from_buffer = 0

            with lock:
                pending_dir_moves[src] = (now, dst)
                pending_move_counts[src] = 0

                global recent_file_moves
                newbuf = deque()
                for fp, ts in recent_file_moves:
                    if ts >= cutoff and is_under(fp, src):
                        absorbed_from_buffer += 1
                    else:
                        newbuf.append((fp, ts))
                recent_file_moves = newbuf

                descendants = [lp for lp in live_paths if lp != src and is_under(lp, src)]
                count_index = len(descendants)
                for lp in descendants:
                    rel = os.path.relpath(lp, src)
                    new_lp = norm(os.path.join(dst, rel))
                    live_paths.discard(lp)
                    live_paths.add(new_lp)

                live_paths.discard(src)
                live_paths.add(dst)

                pending_move_counts[src] = count_index + absorbed_from_buffer

            print(f"{stamp()} [Перемещено] {kind}: {src} -> {dst}")
            _arm_move_summary_timer(src)
            return

        kind = 'Файл'
        with lock:
            recent_file_moves.append((src, now))

            was_present = src in live_paths
            live_paths.discard(src)
            live_paths.add(dst)

            parents = [d for d in pending_dir_moves.keys() if is_under(src, d)]
            parent = max(parents, key=len) if parents else None
            if parent and was_present:
                pending_move_counts[parent] = pending_move_counts.get(parent, 0) + 1

        if parent:
            _arm_move_summary_timer(parent)
            return

        print(f"{stamp()} [Перемещено] {kind}: {src} -> {dst}")

    def on_created(self, event):
        p = norm(event.src_path)
        kind = 'Папка' if event.is_directory else "Файл"
        with lock:
            live_paths.add(p)
        print(f"{stamp()} [Создано] {kind}: {p}")

    def on_deleted(self, event):
        p = norm(event.src_path)
        if event.is_directory:
            kind = 'Папка'
            now = time.time()
            cutoff = now - COALESCE_MS / 1000.0
            absorbed_from_buffer = 0

            with lock:
                pending_dir_deletes[p] = now
                pending_dir_counts[p] = 0

                global recent_file_deletes
                newbuf = deque()
                for fp, ts in recent_file_deletes:
                    if ts >= cutoff and is_under(fp, p):
                        absorbed_from_buffer += 1
                    else:
                        newbuf.append((fp, ts))
                recent_file_deletes = newbuf

                descendants = [lp for lp in live_paths if lp != p and is_under(lp, p)]
                count_index = len(descendants)
                for lp in descendants:
                    live_paths.discard(lp)
                live_paths.discard(p)

                pending_dir_counts[p] = count_index + absorbed_from_buffer

            print(f"{stamp()} [Удалено] {kind}: {p}")
            _arm_summary_timer(p)

        else:
            
            kind = 'Файл'
            abs_path = p
            now = time.time()

            with lock:
                recent_file_deletes.append((abs_path, now))
                was_present = abs_path in live_paths
                live_paths.discard(abs_path)

                parents = [d for d in pending_dir_deletes if is_under(abs_path, d)]
                parent = max(parents, key=len) if parents else None
                
                if parent and was_present:
                    pending_dir_counts[parent] = pending_dir_counts.get(parent, 0) + 1

            if parent:
                _arm_summary_timer(parent)
                return

            print(f"{stamp()} [Удалено] {kind}: {abs_path}")


# ============================================================================
#                               ТОЧКА ВХОДА
# ============================================================================

if __name__ == "__main__":
    path = "."
    seed_index(path)

    event_handler = MyHandler()
    observer = Observer()
    observer.schedule(event_handler, path, recursive=True)
    observer.start()

    print(f"Слежение за {Path(path).resolve()} запущено. Нажмите Ctrl+C для выхода.")
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()