Skip to content

Musiclib

Creating/maintaining the music collection database

📘 High‑level picture

  • Watchdog watches the music directory for creations, modifications, and deletions.
  • Detected changes are turned into IndexEvent objects and placed on a thread‑safe Queue.
  • A dedicated writer thread (_db_writer_loop) consumes those events and performs the actual SQLite writes.
  • The database consists of a normal tracks table (metadata) and an FTS5 virtual table tracks_fts that mirrors the metadata for fast full‑text search.
  • Helper functions in indexing_status.py keep a tiny JSON status file (indexing_status.json) that the UI can poll to show progress during a rebuild or resync operation.

Loading/resyncing process

flowchart LR
    A["User initiates process (rebuild() / resync())"] --> B["Log start and set indexing status"]
    B --> C{"Operation type?"}
    C -->|"'rebuild()'"| D["Clear database and prepare for file scan"]
    C -->|"'resync()'"| E["Scan filesystem and compare with DB"]

    D --> F["Scan music root for files to index"]
    E --> G["Identify changes: files to add or remove"]
    F --> H["Queue file operations to process"]
    G --> H

    H --> I["Start job, update counters & status"]
    I --> J["Process queued file operations (Delete/Index)"]

    J --> K{"Job complete?"}
    K -->|Yes| L["Finalize job, log completion, clear status"]
    K -->|No| J

    L --> M["Indexing complete, external indexing status cleared"]
Hold "Alt" / "Option" to enable pan & zoom

Monitoring process

flowchart LR
    A["'start_monitoring()' called"] --> B{"Is Observer running?"}
    B -->|Yes| C["No-op (return immediately)"]
    B -->|No| D["Create and start Observer instance"]

    D --> E["Schedule file watcher for 'music_root'"]
    E --> F["Start Observer thread"]

    F --> G["FS event in 'music_root' (create/modify/delete)"]
    G --> H["Process event"]

    H --> I{"Is directory?"}
    I -->|Yes| J["Ignore event"]
    I -->|No| K["Check if extension is supported"]

    K -->|No| J
    K -->|Yes| L["Enqueue event (Index or Delete file)"]

    L --> M["Writer thread processes events"]

    %% Shutdown flow
    N["'stop()' called"] --> O["Stop writer thread and join"]
    O --> P{"Is Observer running?"}
    P -->|Yes| Q["Stop and join Observer thread"]
    P -->|No| R["No monitoring to stop"]
Hold "Alt" / "Option" to enable pan & zoom

🧱 Core data structures

Name Type Purpose
IndexEvent @dataclass with fields type: EventType and path: Optional[Path] Represents a single action for the writer thread (index a file, delete a file, clear DB, signal rebuild/resync completion).
EventType Literal["INDEX_FILE", "DELETE_FILE", "CLEAR_DB", "REBUILD_DONE", "RESYNC_DONE"] Enumerates the possible actions.
_write_queue queue.Queue[IndexEvent] Thread‑safe hand‑off from the watcher / public methods to the writer thread.
_writer_stop threading.Event Signals the writer thread to shut down cleanly.
tracks table SQLite table with columns path, filename, artist, album, title, albumartist, genre, year, duration, mtime Stores the canonical metadata for each audio file.
tracks_fts SQLite FTS5 virtual table mirroring most columns of tracks Enables fast full‑text search across artist, album, title, etc.

🗄️ Database initialization

Started in _init_db:

  1. Opens a connection (sqlite3.connect(self.db_path)).
  2. Sets WAL journal mode and normal sync for better concurrency.
  3. Creates the tracks table if it does not exist.
  4. Creates indexes on artist, album, title, and composite indexes on artist+album and release_dir expression for faster grouped queries.
  5. Creates the FTS5 virtual table tracks_fts with a Unicode tokenizer that removes diacritics.
  6. Installs three triggers (tracks_ai, tracks_ad, tracks_au) that keep tracks_fts in sync with inserts, deletes, and updates on tracks.

Result: the DB is ready for both ordinary queries and full‑text search without any manual maintenance.

🛠️ Full‑text table boot‑strap

Starts in _populate_fts_if_needed

  • Opens a read‑only connection.
  • Checks SELECT count(*) FROM tracks_fts.
  • If the count is zero, executes a single INSERT … SELECT that copies every row from tracks into tracks_fts.
  • Commits the transaction.

This routine is called once after a fresh DB creation or after a manual purge of the FTS table.

🔗 Public connection helper

Function get_conn:

  • Read‑only mode (readonly=True) uses the URI file:<path>?mode=ro.
  • Write mode opens a normal connection.
  • Both connections set row_factory = sqlite3.Row so callers can treat rows like dictionaries.

All higher‑level code (search, UI, etc.) obtains connections via this method.

✍️ Writer thread

Function _db_writer_loop

  • Runs forever until _writer_stop is set.
  • Pulls an IndexEvent from _write_queue with a 1.0 s timeout (so it can notice the stop flag).
  • Handles each event type:

    Event type Action performed
    CLEAR_DB DELETE FROM tracks (removes all rows).
    INDEX_FILE Calls _index_file(conn, path) – extracts metadata and INSERT OR REPLACE into tracks.
    DELETE_FILE DELETE FROM tracks WHERE path = ?.
    REBUILD_DONE / RESYNC_DONE conn.commit() – flushes any pending changes.
  • After every 50 processed events it forces a commit to keep the transaction size reasonable.

  • Errors are caught and logged via the injected Logger.
  • When the loop exits, it commits any remaining work and closes the connection.

🔍 Metadata extraction

Function _index_file:

  1. Calls TinyTag.get(path, tags=True, duration=True).
  2. Safely extracts the following fields (fallbacks shown in parentheses):

    Field Source Fallback
    artist tag.artisttag.albumartist "Unknown"
    album tag.album "Unknown"
    title tag.titlepath.stem "Unknown"
    year int(str(tag.year)[:4]) (if parsable) None
    duration tag.duration None
    mtime path.stat().st_mtime
  3. Executes a single INSERT OR REPLACE INTO tracks (…) VALUES (…) with the gathered values.

  4. Because of the triggers defined in _init_db, the same row is automatically mirrored into tracks_fts.

🏗️ Full rebuild

Function: rebuild

  • Purpose – create a fresh DB from the current file system state.
  • Steps:

    1. Write status rebuilding with total = -1 (unknown) and current = 0.
    2. Enqueue a CLEAR_DB event (empties the DB).
    3. Recursively walk music_root (rglob("*")) and collect every file whose suffix is in SUPPORTED_EXTS.
    4. Update the status file with the exact total count.
    5. For each discovered file, enqueue INDEX_FILE events. Every 100 files the status file is refreshed (set_indexing_status).
    6. After the loop, enqueue REBUILD_DONE and call join() on the queue (wait until the writer thread finishes processing).
    7. Remove the status file (clear_indexing_status).
  • The UI can poll indexing_status.json to display a progress bar that reflects the total/current/progress fields.


🔄 Incremental resynchronisation

Function: resync

  • Purpose – bring the DB up‑to‑date after files have been added, removed, or renamed since the last run.
  • Steps:

    1. Set status resyncing with unknown totals (total = -1).
    2. Build a set of absolute paths for all supported files currently on disk (fs_paths).
    3. Query the DB for all stored paths (db_paths).
    4. Compute to_add = fs_paths - db_paths and to_remove = db_paths - fs_paths.
    5. total = len(to_add) + len(to_remove) and update the status file.
    6. Enqueue DELETE_FILE events for each path in to_remove; every 100 deletions the status file is refreshed.
    7. Enqueue INDEX_FILE events for each path in to_add; every 100 additions the status file is refreshed.
    8. Enqueue RESYNC_DONE, clear the status file, and log a summary.
  • As with rebuild, the writer thread processes the queued events sequentially, guaranteeing that the DB ends up exactly matching the file system.


👀 Real‑time monitoring

  • start_monitoring creates a watchdog.observers.Observer (if none exists), registers a EnhancedWatcher instance for the music_root, and starts the observer thread.

  • EnhancedWatcher inherits from FileSystemEventHandler. Its on_any_event method:

    1. Ignores directory events.
    2. Filters out files whose extensions are not in SUPPORTED_EXTS.
    3. For created or modified events → enqueues INDEX_FILE.
    4. For deleted events → enqueues DELETE_FILE.
  • This mechanism guarantees that any change made while the application is running is eventually reflected in the DB (subject to the writer thread’s batching policy).


🛑 Graceful shutdown

Function stop

  • Sets the _writer_stop flag, joins the writer thread (max 5 seconds).
  • Stops and joins the watchdog observer if it was started.
  • After this call the extractor is fully stopped and the SQLite connection is closed.

📝 Indexing‑status helper

File indexing_status.py:

Function Role
set_indexing_status(data_root, status, total, current) Computes progress (current/total), preserves the original started_at timestamp (or creates a new one), builds a dictionary with status, started_at, updated_at, total, current, progress, and writes it atomically to indexing_status.json.
_atomic_write_json(status_file, data) Writes JSON to a temporary file in the same directory, flushes, fsyncs, then atomically renames the temp file onto the target. Guarantees that a partially‑written file never appears.
_calculate_progress(total, current) Returns a float in [0.0, 1.0]; guards against division by zero or negative totals.
_get_started_at(status_file) Reads the existing JSON (if any) and returns the original started_at value, allowing a rebuild/resync to keep the same start‑time across restarts.
_build_status_data(...) Packages all fields into a plain dict ready for JSON serialization.
clear_indexing_status(data_root) Deletes the JSON file if it exists.
get_indexing_status(data_root, logger=None) Reads and parses the JSON file, returning the dict or None on missing/corrupt files. Logs JSON decode errors via the supplied logger (defaults to NullLogger).

These utilities are deliberately lightweight: they operate purely on the filesystem and do not depend on the SQLite connection, making them safe to call from any thread (including the writer thread).


🧑‍💻 End‑to‑end flow for a typical user session

sequenceDiagram
    actor User
    participant CollectionExtractor
    participant DBWriterThread
    participant SQLite as SQLite_DB
    participant FS

    User->>CollectionExtractor: resync()
    CollectionExtractor->>CollectionExtractor: set_indexing_status(resyncing, total=-1, current=0)

    CollectionExtractor->>CollectionExtractor: _scan_filesystem_paths()
    FS-->>CollectionExtractor: fs_paths (relative)
    CollectionExtractor->>CollectionExtractor: _get_database_paths()
    SQLite-->>CollectionExtractor: db_paths (relative)

    CollectionExtractor->>CollectionExtractor: compute to_add_rel, to_remove_rel
    CollectionExtractor->>CollectionExtractor: to_add_abs = _to_abspath(to_add_rel)
    CollectionExtractor->>CollectionExtractor: set_indexing_status(resyncing, total, current=0)

    CollectionExtractor->>CollectionExtractor: _start_job("resyncing", total)
    loop for each path in to_remove_rel
        CollectionExtractor->>DBWriterThread: enqueue IndexEvent(DELETE_FILE, rel_path)
    end
    loop for each path in to_add_abs
        CollectionExtractor->>DBWriterThread: enqueue IndexEvent(INDEX_FILE, abs_path)
    end
    CollectionExtractor->>DBWriterThread: enqueue IndexEvent(RESYNC_DONE)

    loop DB writer loop
        alt DELETE_FILE
            DBWriterThread->>SQLite: DELETE FROM tracks WHERE path = rel_path
        else INDEX_FILE
            DBWriterThread->>SQLite: _index_file(abs_path) INSERT OR REPLACE
        end
        DBWriterThread->>CollectionExtractor: _processed_count++
        alt batch_size reached
            DBWriterThread->>SQLite: COMMIT
            DBWriterThread->>CollectionExtractor: _update_progress_status()
            CollectionExtractor->>CollectionExtractor: set_indexing_status(resyncing, total, current)
        end
    end

    DBWriterThread->>SQLite: COMMIT
    DBWriterThread->>SQLite: wal_checkpoint(PASSIVE)
    DBWriterThread->>CollectionExtractor: _handle_job_completion()
    CollectionExtractor->>CollectionExtractor: set_indexing_status(resyncing, total, current=total)
    CollectionExtractor->>CollectionExtractor: clear_indexing_status()
Hold "Alt" / "Option" to enable pan & zoom

🔌 API

EventType = Literal['INDEX_FILE', 'DELETE_FILE', 'CLEAR_DB', 'REBUILD_DONE', 'RESYNC_DONE'] module-attribute

IndexEvent(type, path=None) dataclass

Represents an event for indexing or modifying music files in the collection.

CollectionExtractor(music_root, db_path, logger=None)

Manages extraction, indexing, and synchronization of music files in a collection.

Creates a new collection extractor for managing a music library index.

This initializer wires together filesystem paths, logging, database schema, and background worker infrastructure so that music files can be scanned, indexed, and kept in sync with the underlying SQLite database.

Parameters:

Name Type Description Default
music_root Path

Root directory containing the music files to be indexed.

required
db_path Path

Path to the SQLite database file storing track metadata and search index.

required
logger Logger | None

Optional logger for recording indexing progress and errors; if omitted, a NullLogger is used.

None

Methods:

Name Description
get_conn

Opens and returns a configured SQLite connection for this collection.

is_initial_indexing_done

Checks whether an initial full indexing pass has been completed.

set_initial_indexing_done

Marks that the initial full indexing pass has completed successfully.

rebuild

Performs a full rebuild of the music collection index from the current filesystem state.

resync

Synchronizes the database with the current set of music files on disk.

start_monitoring

Starts monitoring the music directory for file system changes.

stop

Stops the database writer thread and file system observer.

enable_bulk_edit_mode

Enables bulk edit mode by pausing file system monitoring.

disable_bulk_edit_mode

Disables bulk edit mode and resyncs the database.

wait_for_indexing_start

Waits until indexing has started or the specified timeout elapses.

Source code in src/musiclib/_extractor.py
def __init__(
    self, music_root: Path, db_path: Path, logger: Logger | None = None
) -> None:
    """Creates a new collection extractor for managing a music library index.

    This initializer wires together filesystem paths, logging, database schema,
    and background worker infrastructure so that music files can be scanned,
    indexed, and kept in sync with the underlying SQLite database.

    Args:
        music_root (Path): Root directory containing the music files to be indexed.
        db_path (Path): Path to the SQLite database file storing track metadata and search index.
        logger (Logger | None): Optional logger for recording indexing progress and errors;
            if omitted, a NullLogger is used.
    """
    self.music_root = music_root.resolve()
    self.db_path = db_path
    self.data_root = db_path.parent
    self._logger = logger or NullLogger()

    self.data_root.mkdir(parents=True, exist_ok=True)

    # Progress tracking
    self._initial_status_event = ThreadEvent()
    self._processed_count = 0
    self._total_for_current_job = None
    self._current_job_status = None

    # Threading
    self._write_queue: Queue[IndexEvent] = Queue()
    self._writer_stop = ThreadEvent()
    self._observer: Observer | None = None
    self._db_lock = Lock()

    try:
        self._init_db()
    except sqlite3.DatabaseError as e:
        if (
            'malformed' not in str(e).lower()
            and 'corrupt' not in str(e).lower()
        ):
            raise
        self._logger.error(f"Corruption detected: {e}")
        self._delete_database_files()
        self._init_db()  # Retry
    self._start_writer_thread()

get_conn(readonly=False)

Opens and returns a configured SQLite connection for this collection.

This method creates either a read-only or read-write connection depending on the readonly flag, applies a row factory for dict-like row access, and sets a busy timeout to make concurrent access more robust.

Parameters:

Name Type Description Default
readonly bool

Whether to open the connection in read-only mode. Defaults to False.

False

Returns:

Type Description
Connection

sqlite3.Connection: A SQLite connection configured with an appropriate busy timeout.

Source code in src/musiclib/_extractor.py
def get_conn(self, readonly: bool = False) -> sqlite3.Connection:
    """Opens and returns a configured SQLite connection for this collection.

    This method creates either a read-only or read-write connection depending
    on the ``readonly`` flag, applies a row factory for dict-like row access,
    and sets a busy timeout to make concurrent access more robust.

    Args:
        readonly (bool): Whether to open the connection in read-only mode. Defaults to False.

    Returns:
        sqlite3.Connection: A SQLite connection configured with an appropriate busy timeout.
    """
    if readonly:
        uri = f"file:{self.db_path}?mode=ro"
        conn = sqlite3.connect(uri, uri=True)
    else:
        conn = sqlite3.connect(self.db_path)

    conn.row_factory = sqlite3.Row
    # PRAGMA doesn't support parameters, but BUSY_TIMEOUT_MS is validated as int at module load
    conn.execute(f"PRAGMA busy_timeout={BUSY_TIMEOUT_MS}")
    return conn

is_initial_indexing_done()

Checks whether an initial full indexing pass has been completed.

This method looks up the initial_indexing_done flag in the meta table and interprets a stored value of "1" as meaning the first full index has successfully run.

Returns:

Name Type Description
bool bool

True if initial indexing has been marked as done, otherwise False.

Source code in src/musiclib/_extractor.py
def is_initial_indexing_done(self) -> bool:
    """Checks whether an initial full indexing pass has been completed.

    This method looks up the ``initial_indexing_done`` flag in the meta table
    and interprets a stored value of "1" as meaning the first full index has
    successfully run.

    Returns:
        bool: True if initial indexing has been marked as done, otherwise False.
    """
    with self.get_conn(readonly=True) as conn:
        cur = conn.execute(
            "SELECT value FROM meta WHERE key = ?", ("initial_indexing_done",)
        )
        row = cur.fetchone()
        return row is not None and row["value"] == "1"

set_initial_indexing_done()

Marks that the initial full indexing pass has completed successfully.

This method updates the meta table flag so future runs and external callers can detect that a complete initial index of the collection is already in place.

Returns:

Type Description
None

None

Source code in src/musiclib/_extractor.py
def set_initial_indexing_done(self) -> None:
    """Marks that the initial full indexing pass has completed successfully.

    This method updates the meta table flag so future runs and external
    callers can detect that a complete initial index of the collection is
    already in place.

    Returns:
        None
    """
    with self.get_conn() as conn:
        conn.execute(
            "UPDATE meta SET value = ? WHERE key = ?",
            ("1", "initial_indexing_done"),
        )
        conn.commit()

rebuild()

Performs a full rebuild of the music collection index from the current filesystem state.

This method clears existing track data, scans the music root for all supported files, and queues indexing operations so that the database is fully regenerated to match the contents of the collection.

Returns:

Type Description
None

None

Source code in src/musiclib/_extractor.py
def rebuild(self) -> None:
    """Performs a full rebuild of the music collection index from the current filesystem state.

    This method clears existing track data, scans the music root for all supported files,
    and queues indexing operations so that the database is fully regenerated to match
    the contents of the collection.

    Returns:
        None
    """
    self._logger.info("Starting full rebuild")
    self._initial_status_event.set()

    # Pause file monitoring to prevent race conditions with watcher
    with self._pause_observer():
        set_indexing_status(self.data_root, "rebuilding", total=-1, current=0)
        self._write_queue.put(IndexEvent("CLEAR_DB"))
        self._write_queue.join()

        files = self._scan_music_files("rebuilding")
        set_indexing_status(self.data_root, "rebuilding", total=len(files), current=0)

        self._queue_file_operations(
            to_delete=[],
            to_index=files,
            job_type="rebuilding",
        )

    self.set_initial_indexing_done()

resync()

Synchronizes the database with the current set of music files on disk.

This method compares the filesystem under the music root with the paths stored in the database, then enqueues add and delete operations so that the index reflects files that have been created, removed, or renamed since the last run.

Returns:

Type Description
None

None

Source code in src/musiclib/_extractor.py
def resync(self) -> None:
    """Synchronizes the database with the current set of music files on disk.

    This method compares the filesystem under the music root with the paths
    stored in the database, then enqueues add and delete operations so that
    the index reflects files that have been created, removed, or renamed
    since the last run.

    Returns:
        None
    """
    self._logger.info("Starting resync")

    # Pause file monitoring to prevent race conditions with watcher
    with self._pause_observer():
        set_indexing_status(self.data_root, "resyncing", total=-1, current=0)

        with self._db_lock:
            fs_paths = self._scan_filesystem_paths()
            db_paths = self._get_database_paths()

        to_add_rel = fs_paths - db_paths
        to_remove_rel = db_paths - fs_paths
        to_add_abs = [self._to_abspath(p) for p in to_add_rel]

        self._logger.info(
            f"Resync: {len(to_add_rel)} to add, {len(to_remove_rel)} to remove"
        )
        set_indexing_status(
            self.data_root,
            "resyncing",
            total=len(to_add_rel) + len(to_remove_rel),
            current=0,
        )

        self._queue_file_operations(
            to_delete=to_remove_rel,
            to_index=to_add_abs,
            job_type="resyncing",
        )

start_monitoring()

Starts monitoring the music directory for file system changes.

Source code in src/musiclib/_extractor.py
def start_monitoring(self) -> None:
    """Starts monitoring the music directory for file system changes."""
    if self._observer:
        return
    self._observer = Observer()
    self._observer.schedule(_Watcher(self), str(self.music_root), recursive=True)
    self._observer.start()

stop(timeout=30.0)

Stops the database writer thread and file system observer.

Parameters:

Name Type Description Default
timeout float

Maximum time to wait for shutdown (seconds).

30.0
Source code in src/musiclib/_extractor.py
def stop(self, timeout: float = 30.0) -> None:
    """Stops the database writer thread and file system observer.

    Args:
        timeout: Maximum time to wait for shutdown (seconds).
    """
    # Shutdown watcher first to flush pending events
    if self._observer:
        for handler_list in self._observer._handlers.values():
            for handler in handler_list:
                if hasattr(handler, 'shutdown'):
                    handler.shutdown()
        self._observer.stop()
        self._observer.join(timeout=5)

    # Stop writer thread
    self._writer_stop.set()
    self._writer_thread.join(timeout=5)

enable_bulk_edit_mode()

Enables bulk edit mode by pausing file system monitoring.

Call this before performing bulk file operations (e.g., tagging 100+ files) to prevent event flooding. File changes will not be indexed in real-time while bulk edit mode is active.

Example

extractor.enable_bulk_edit_mode() try: # Perform bulk file operations here for file in files: update_tags(file) finally: extractor.disable_bulk_edit_mode()

Source code in src/musiclib/_extractor.py
def enable_bulk_edit_mode(self) -> None:
    """Enables bulk edit mode by pausing file system monitoring.

    Call this before performing bulk file operations (e.g., tagging 100+ files)
    to prevent event flooding. File changes will not be indexed in real-time
    while bulk edit mode is active.

    Example:
        extractor.enable_bulk_edit_mode()
        try:
            # Perform bulk file operations here
            for file in files:
                update_tags(file)
        finally:
            extractor.disable_bulk_edit_mode()
    """
    if self._observer:
        self._observer.unschedule_all()
        self._logger.debug("File monitoring paused for bulk edit mode")

disable_bulk_edit_mode()

Disables bulk edit mode and resyncs the database.

Call this after completing bulk file operations. This will resume file system monitoring and trigger a resync to catch all changes made during bulk edit mode.

Source code in src/musiclib/_extractor.py
def disable_bulk_edit_mode(self) -> None:
    """Disables bulk edit mode and resyncs the database.

    Call this after completing bulk file operations. This will resume file
    system monitoring and trigger a resync to catch all changes made during
    bulk edit mode.
    """
    if self._observer:
        # Resume monitoring with a new watcher instance
        watcher = _Watcher(self)
        self._observer.schedule(watcher, str(self.music_root), recursive=True)
        self._logger.debug("File monitoring resumed, triggering resync")

    # Trigger resync to catch all changes
    self.resync()

wait_for_indexing_start(timeout=5.0)

Waits until indexing has started or the specified timeout elapses.

This method blocks until the internal event indicating the beginning of a rebuild or resync job is set, allowing callers to synchronize with the start of indexing work.

Parameters:

Name Type Description Default
timeout float

The maximum number of seconds to wait for indexing to start.

5.0

Returns:

Name Type Description
bool bool

True if indexing started before the timeout, or False if the timeout expired first.

Source code in src/musiclib/_extractor.py
def wait_for_indexing_start(self, timeout: float = 5.0) -> bool:
    """Waits until indexing has started or the specified timeout elapses.

    This method blocks until the internal event indicating the beginning of
    a rebuild or resync job is set, allowing callers to synchronize with the
    start of indexing work.

    Args:
        timeout (float): The maximum number of seconds to wait for indexing to start.

    Returns:
        bool: True if indexing started before the timeout, or False if the timeout expired first.
    """
    return self._initial_status_event.wait(timeout=timeout)

indexing_status

Functions:

Name Description
set_indexing_status

Writes the current indexing status to a JSON file.

clear_indexing_status

Removes the indexing status file for the given data root.

get_indexing_status

Retrieves the current indexing status from the status file for the given data root.

set_indexing_status(data_root, status, total, current)

Writes the current indexing status to a JSON file.

Calculates progress, determines the start time, builds the status data, and writes it atomically to the status file for the given data root.

Parameters:

Name Type Description Default
data_root Path | str

The root directory containing the indexing status file.

required
status str

The current status string (e.g., 'rebuilding', 'resyncing').

required
total int

The total number of items to process.

required
current int

The number of items processed so far.

required

Returns:

Type Description
None

None

Source code in src/musiclib/indexing_status.py
def set_indexing_status(
    data_root: Path | str, status: str, total: int | None, current: int
) -> None:
    """Writes the current indexing status to a JSON file.

    Calculates progress, determines the start time, builds the status data, and writes it atomically to the status file for the given data root.

    Args:
        data_root (Path | str): The root directory containing the indexing status file.
        status (str): The current status string (e.g., 'rebuilding', 'resyncing').
        total (int): The total number of items to process.
        current (int): The number of items processed so far.

    Returns:
        None
    """
    data_root = Path(data_root)
    status_file = data_root / "indexing_status.json"
    status_file.parent.mkdir(parents=True, exist_ok=True)
    if total is None or total < 0:
        progress = 0.0
    else:
        progress = _calculate_progress(total, current)
    started_at = _get_started_at(status_file) or datetime.now(timezone.utc).isoformat()
    data = _build_status_data(status, started_at, total, current, progress)
    _atomic_write_json(status_file, data)

clear_indexing_status(data_root)

Removes the indexing status file for the given data root.

Deletes the indexing status JSON file if it exists, effectively clearing any current indexing progress or state.

Parameters:

Name Type Description Default
data_root Path | str

The root directory containing the indexing status file.

required

Returns:

Type Description
None

None

Source code in src/musiclib/indexing_status.py
def clear_indexing_status(data_root: Path | str) -> None:
    """Removes the indexing status file for the given data root.

    Deletes the indexing status JSON file if it exists, effectively clearing any current indexing progress or state.

    Args:
        data_root (Path | str): The root directory containing the indexing status file.

    Returns:
        None
    """
    data_root = Path(data_root)
    status_file = data_root / "indexing_status.json"
    status_file.unlink(missing_ok=True)

get_indexing_status(data_root, logger=None)

Retrieves the current indexing status from the status file for the given data root.

Attempts to read and parse the indexing status JSON file, returning its contents as a dictionary. Handles missing files and JSON decode errors gracefully, logging errors if a logger is provided.

Parameters:

Name Type Description Default
data_root Path | str

The root directory containing the indexing status file.

required
logger Logger

Logger for error reporting. Uses NullLogger if not provided.

None

Returns:

Type Description
dict | None

dict | None: The indexing status data as a dictionary, or None if the file does not exist or cannot be read.

Source code in src/musiclib/indexing_status.py
def get_indexing_status(
    data_root: Path | str, logger: Logger | None = None
) -> dict | None:
    """
    Retrieves the current indexing status from the status file for the given data root.

    Attempts to read and parse the indexing status JSON file, returning its contents as a dictionary.
    Handles missing files and JSON decode errors gracefully, logging errors if a logger is provided.

    Args:
        data_root (Path | str): The root directory containing the indexing status file.
        logger (Logger, optional): Logger for error reporting. Uses NullLogger if not provided.

    Returns:
        dict | None: The indexing status data as a dictionary, or None if the file does not exist or cannot be read.
    """
    logger = logger or NullLogger()
    data_root = Path(data_root)
    status_file = data_root / "indexing_status.json"

    try:
        with status_file.open("r", encoding="utf-8") as f:
            return json.load(f)
    except FileNotFoundError:
        return None
    except json.JSONDecodeError as e:
        logger.error(f"JSON decode error in {status_file}: {e}")
        return None

EnhancedWatcher(extractor)

Bases: FileSystemEventHandler

Enhanced file system watcher with debouncing to prevent corruption.

This watcher prevents database corruption during bulk file editing by: 1. Waiting DEBOUNCE_DELAY seconds after the last file change before indexing 2. Coalescing multiple modifications to the same file into a single operation 3. Properly flushing all pending events on shutdown

Example

Without debouncing: Edit file.mp3 5 times rapidly → 5 index operations → corruption risk

With debouncing: Edit file.mp3 5 times rapidly → wait 2 seconds → 1 index operation → safe

Attributes:

Name Type Description
extractor

The CollectionExtractor instance that processes index/delete events.

debounce_delay

Number of seconds to wait after last file change (default: 2.0).

pending_events Dict[str, Tuple[str, float]]

Dict mapping file paths to their pending event type and timestamp.

pending_lock

Thread lock for synchronizing access to pending events.

timers Dict[str, Timer]

Dict mapping file paths to their active debounce timers.

Initializes the enhanced watcher with debouncing.

Parameters:

Name Type Description Default
extractor

The CollectionExtractor instance that will handle the events.

required

Methods:

Name Description
on_any_event

Handles file system events with debouncing.

shutdown

Cancels all pending timers and processes remaining events immediately.

Source code in src/musiclib/_watcher.py
def __init__(self, extractor) -> None:
    """Initializes the enhanced watcher with debouncing.

    Args:
        extractor: The CollectionExtractor instance that will handle the events.
    """
    self.extractor = extractor
    self.debounce_delay = DEBOUNCE_DELAY

    # Track pending events: path -> (event_type, timestamp)
    self.pending_events: Dict[str, Tuple[str, float]] = {}
    self.pending_lock = Lock()

    # Track active timers: path -> Timer
    self.timers: Dict[str, Timer] = {}

on_any_event(event)

Handles file system events with debouncing.

This method ignores directory changes and unsupported file types, then applies debouncing to file modification and deletion events. Multiple rapid changes to the same file are coalesced into a single operation.

Parameters:

Name Type Description Default
event object

A watchdog file system event with is_directory, src_path, and event_type attributes.

required
Source code in src/musiclib/_watcher.py
def on_any_event(self, event: object) -> None:
    """Handles file system events with debouncing.

    This method ignores directory changes and unsupported file types, then
    applies debouncing to file modification and deletion events. Multiple
    rapid changes to the same file are coalesced into a single operation.

    Args:
        event: A watchdog file system event with is_directory, src_path,
               and event_type attributes.
    """
    # Ignore directory changes
    if event.is_directory:
        return

    path = Path(event.src_path)

    # Only process supported audio file extensions
    if path.suffix.lower() not in self.extractor.SUPPORTED_EXTS:
        return

    path_str = str(path)
    event_type = event.event_type

    with self.pending_lock:
        # Cancel any existing timer for this file
        if path_str in self.timers:
            self.timers[path_str].cancel()

        # Update pending event (this coalesces multiple events)
        if event_type in ("created", "modified"):
            # Both created and modified should result in reindexing
            self.pending_events[path_str] = ("modified", time.time())
        elif event_type == "deleted":
            # Delete takes precedence over everything
            self.pending_events[path_str] = ("deleted", time.time())

        # Set new debounce timer
        timer = Timer(
            self.debounce_delay,
            self._process_debounced_event,
            args=(path_str,)
        )
        self.timers[path_str] = timer
        timer.start()

shutdown()

Cancels all pending timers and processes remaining events immediately.

This ensures no events are lost when stopping the watcher. All pending events are flushed to the processing queue before shutdown completes.

This method should be called before stopping the file system observer.

Source code in src/musiclib/_watcher.py
def shutdown(self) -> None:
    """Cancels all pending timers and processes remaining events immediately.

    This ensures no events are lost when stopping the watcher. All pending
    events are flushed to the processing queue before shutdown completes.

    This method should be called before stopping the file system observer.
    """
    # Import IndexEvent here to avoid circular imports
    from ._extractor import IndexEvent

    with self.pending_lock:
        # Cancel all pending timers
        for timer in self.timers.values():
            timer.cancel()

        # Process all remaining pending events immediately
        for path_str, (event_type, _) in self.pending_events.items():
            path = Path(path_str)
            if event_type == "modified":
                self.extractor._write_queue.put(IndexEvent("INDEX_FILE", path))
            elif event_type == "deleted":
                self.extractor._write_queue.put(IndexEvent("DELETE_FILE", path))

        # Clear all state
        self.pending_events.clear()
        self.timers.clear()