"""
This module provides functionality to backup raw electrophysiological data to a specified folder.
The backup is a "Frozen copy" that should be identical to the data acquired on the day of recording.
Much of the functionality of the `Archiver` class is deprecated but retained for legacy support.
Key features:
- Handles electrophysiological (ephys), video, audio, and metadata files.
- Compresses electrophysiological files using SpikeGLX, optionally removing the raw data after compression.
- Supports GUI-based file selection using PyQt5 for the backup process.
- Supports running the backup without a GUI through the command line.
- Copies the session data to a new target location based on the recording date.
- Marks the backup with a timestamp indicating when it was archived.
"""
from pathlib import Path
from PyQt5.QtWidgets import (
QApplication,
QWidget,
QLabel,
QLineEdit,
QPushButton,
QVBoxLayout,
QFileDialog,
QGridLayout,
)
from PyQt5.QtCore import Qt
import spikeglx
import sys
import shutil
import click
import subprocess
import datetime
import logging
logging.basicConfig()
[docs]
_log = logging.getLogger(name=__name__)
_log.setLevel(logging.INFO)
[docs]
DEFAULT_SUBJECTS_PATH = Path(r"D:\remote_test\Subjects")
[docs]
DEFAULT_SESSION_PATH = Path(r"D:\test\Subjects")
[docs]
DEFAULT_VIDEO_DIRECTORY = Path(r"D:\sglx_data")
[docs]
class Archiver:
"""
Handles the backup of electrophysiological and related session data for a given subject.
Attributes:
keep_raw (bool): Whether to keep raw data after compression.
subject_ID (str): ID of the subject being backed up.
ephys_files_local (list): Local electrophysiological files to back up.
ephys_files_remote (list): Remote electrophysiological files to back up.
num_sessions (int): Number of local sessions found for the subject.
subjects_path (Path): Path where the subjects' data is stored remotely.
run_path (Path): Path to the local session data.
today_path (Path): Path to the backup directory based on the record date.
video_files (list): List of video files found in the sessions.
"""
def __init__(self, keep_raw):
[docs]
self.ephys_files_local = None
[docs]
self.ephys_files_target = None
[docs]
self.has_insertions = False
[docs]
self.subjects_path_target = DEFAULT_SUBJECTS_PATH
[docs]
self.subject_path_target = None
[docs]
self.run_path_source = DEFAULT_SESSION_PATH
[docs]
self.record_date = None
[docs]
self.today_path_target = None
[docs]
self.keep_raw = keep_raw
[docs]
self.video_files = None
[docs]
self.video_path_target = None
[docs]
self.session_list_source = None
[docs]
def set_subjects_path(self, path):
"""Set the path where the subjects' data will be archived."""
self.subjects_path_target = Path(path)
print(f"Subjects path set to: {self.subjects_path_target}")
[docs]
def get_sessions_local(self):
"""Find local sessions to back up."""
contents = self.run_path_source.glob("*_g[0-9]*")
self.session_list_source = [x for x in contents if x.is_dir()]
self.num_sessions = len(self.session_list_source)
[docs]
def guess_subject_ID(self):
"""Infer the subject ID from the local session path."""
self.subject_ID = self.run_path_source.name
print(f"Subject ID set to: {self.subject_ID}")
[docs]
def get_ephys_files_remote(self):
"""Retrieve remote electrophysiological files to be backed up."""
self.ephys_files_target = spikeglx.glob_ephys_files(self.today_path_target)
[docs]
def get_ephys_files_local(self):
"""Retrieve local electrophysiological files."""
self.ephys_files_local = spikeglx.glob_ephys_files(self.run_path_source)
[docs]
def compress_ephys_files_local(self):
"""Compress local electrophysiological files before copying to archive."""
if self.ephys_files_local is None:
self.get_ephys_files_local()
def _run_compress(bin_file):
if bin_file is None:
return None
SR = spikeglx.Reader(bin_file)
if SR.is_mtscomp:
print(f"{bin_file.name} is already compressed")
return None
try:
SR.compress_file(keep_original=self.keep_raw)
except PermissionError:
if ~self.keep_raw:
SR.close()
import time
time.sleep(1)
bin_file.unlink()
_log.warning(
"Compression likely did not succeed in deleting. Deleting manually."
)
print(f"Compressing {bin_file.name}")
for efi in self.ephys_files_local:
ap_file = efi.get("ap")
lf_file = efi.get("lf")
ni_file = efi.get("nidq")
# Not using the reader object because it currently does not support the commercial 2.0 4 shank (version 2013)
_run_compress(ap_file)
_run_compress(lf_file)
_run_compress(ni_file)
[docs]
def compress_ephys_files_remote(self):
"""Compress remote electrophysiological files."""
if self.ephys_files_target is None:
self.get_ephys_files_remote()
def _run_compress(bin_file):
if bin_file is None:
return None
SR = spikeglx.Reader(bin_file)
if SR.is_mtscomp:
print(f"{bin_file.name} is already compressed")
return None
try:
SR.compress_file(keep_original=self.keep_raw)
except PermissionError:
if ~self.keep_raw:
SR.close()
import time
time.sleep(1)
bin_file.unlink()
_log.warning(
"Compression likely did not succeed in deleting. Deleting manually."
)
print(f"Compressing {bin_file.name}")
for efi in self.ephys_files_target:
ap_file = efi.get("ap")
lf_file = efi.get("lf")
ni_file = efi.get("nidq")
# Not using the reader object becuase it currently does not support the commercial 2.0 4 shank (version 2013)
_run_compress(ap_file)
_run_compress(lf_file)
_run_compress(ni_file)
[docs]
def get_record_date(self):
"""Get the recording date from the metadata of local electrophysiological files."""
if self.ephys_files_local is None:
self.get_ephys_files_local()
efi = self.ephys_files_local[0]
bin_file = efi.get("ap", efi.get("nidq"))
md = spikeglx.read_meta_data(bin_file.with_suffix(".meta"))
self.record_date = md.fileCreateTime[:10] # As a string
[docs]
def make_rec_date_target(self):
"""Create the target directory for the backup, named by the record date."""
if self.record_date is None:
self.get_record_date()
self.subject_path_target = self.subjects_path_target.joinpath(self.subject_ID)
self.subject_path_target.mkdir(exist_ok=True)
self.today_path_target = self.subject_path_target.joinpath(self.record_date)
self.today_path_target.mkdir(exist_ok=True)
[docs]
def copy_sessions(self):
"""Copy session data from the local to the target directory."""
if self.today_path_target is None:
self.make_rec_date_target()
for session in self.session_list_source:
dst = self.today_path_target.joinpath(session.name)
print(f"Copying {session} to {dst}")
try:
shutil.copytree(session, dst)
except FileExistsError:
print(f"WARNING:Destination {dst} exists. Skipping copy!")
[docs]
def copy_sessions_alf(self):
"""Compress any video files in the sessions."""
if self.today_path_target is None:
self.make_rec_date_target()
for ii, session in enumerate(self.session_list_source):
dst = self.today_path_target.joinpath(f"{ii+1:03.0f}")
print(f"Copying {session} to {dst}")
try:
shutil.copytree(session, dst)
except FileExistsError:
print(f"WARNING:Destination {dst} exists. Skipping copy!")
[docs]
def compress_video_in_place(self):
"""Compress any video files in the sessions."""
self.get_videos_in_sessions()
print(f"{self.video_files=}")
if len(self.video_files) == 0:
print("No uncompressed video files found. Continuing")
return
for fn in self.video_files:
fn_comp = fn.with_suffix(".mp4")
subprocess.run(
[
"ffmpeg",
"-i",
str(fn),
"-y",
"-c:v",
"hevc_nvenc",
"-preset",
"slow",
"-cq",
"22",
str(fn_comp),
]
)
fn.unlink()
[docs]
def get_videos_in_sessions(self):
"""Find video files in the local session directories."""
print(f"{self.run_path_source=}")
self.video_files = list(self.run_path_source.rglob("*.avi"))
if len(self.video_files) > 0:
self.has_video = True
[docs]
def mark_backup(self):
"""Create a flag file in each session directory to indicate the backup date."""
for session in self.session_list_source:
backup_flag = session.joinpath("is_backed_up.txt")
with open(backup_flag, "w") as fid:
fid.write(f"Archived on {datetime.datetime.today().isoformat()}")
[docs]
def copy_run_level_files(self):
""" Copy files in the run folder to the backup location"""
run_files = list(self.run_path_source.glob("*"))
# Remove directories from the list
run_files = [x for x in run_files if x.is_file()]
print(f"{run_files=}")
for fn in run_files:
shutil.copy(fn, self.today_path_target)
[docs]
def full_archive(self):
""" Chain all the methods to perform a full archive"""
self.get_sessions_local()
self.make_rec_date_target()
self.copy_run_level_files()
self.copy_sessions()
self.compress_ephys_files_remote()
self.compress_video_in_place()
self.mark_backup()
[docs]
def full_archive_with_local_compression(self):
""" Chain all the methods to perform a full archive with local compression first"""
self.get_sessions_local()
self.compress_ephys_files_local() # Compress locally first
self.make_rec_date_target()
self.copy_run_level_files()
self.copy_sessions()
# No need to compress remotely since files are already compressed
self.compress_video_in_place()
self.mark_backup()
[docs]
class RecordingInfoUI(QWidget):
"""
GUI to ask the user for the destination (Subjects path), source (run_path), and subject_ID (spikeglx run name).
Attributes:
archiver (Archiver): Instance of the `Archiver` class to manage the backup.
"""
def __init__(self, archiver, title):
super().__init__()
[docs]
self.archiver = archiver
self.init_ui(title)
self.resize(1200, 400)
[docs]
def init_ui(self, title):
"""Set up the UI layout and widgets."""
main_layout = QGridLayout()
layout = QVBoxLayout()
label_subjects_path = QLabel("Select Subjects Path:")
self.subjects_path_line_edit = QLineEdit()
self.subjects_path_line_edit.setText(str(self.archiver.subjects_path))
browse_subjects_button = QPushButton("Browse")
browse_subjects_button.clicked.connect(self.browse_subjects_clicked)
label_run_path = QLabel("Select Run Path:")
self.run_path_line_edit = QLineEdit()
self.run_path_line_edit.setText(str(self.archiver.run_path))
browse_run_button = QPushButton("Browse")
browse_run_button.clicked.connect(self.browse_run_clicked)
label_subject_id = QLabel("Enter Subject ID:")
self.subject_id_line_edit = QLineEdit()
set_button = QPushButton("Set Paths")
set_button.clicked.connect(self.set_clicked)
layout.addWidget(label_subjects_path)
layout.addWidget(self.subjects_path_line_edit)
layout.addWidget(browse_subjects_button)
layout.addWidget(label_run_path)
layout.addWidget(self.run_path_line_edit)
layout.addWidget(browse_run_button)
layout.addWidget(label_subject_id)
layout.addWidget(self.subject_id_line_edit)
layout.addWidget(set_button)
main_layout.addLayout(layout, 0, 0)
self.setLayout(main_layout)
self.setWindowTitle(title)
[docs]
def browse_subjects_clicked(self):
"""Open a dialog for the user to select the subjects path."""
path = QFileDialog.getExistingDirectory(
self,
"Select Subjects Path (Archive destination)",
str(self.archiver.subjects_path),
)
if path:
self.subjects_path_line_edit.setText(path)
[docs]
def browse_run_clicked(self):
"""Open a dialog for the user to select the local run path."""
default_session_path = str(DEFAULT_SESSION_PATH)
path = QFileDialog.getExistingDirectory(
self, "Select Run Path (Local recording)", default_session_path
)
if path:
print(f"run_path set to {path}")
self.run_path_line_edit.setText(path)
self.archiver.run_path = Path(path)
self.archiver.guess_subject_ID()
self.subject_id_line_edit.setText(self.archiver.subject_ID)
[docs]
def set_clicked(self):
"""Set the paths and subject ID from the user input."""
self.subjects_path = Path(self.subjects_path_line_edit.text())
self.run_path = Path(self.run_path_line_edit.text())
self.subject_id = Path(self.subject_id_line_edit.text())
print(f"{self.subjects_path=}\n{self.run_path=}\n{self.subject_id=}")
self.close()
[docs]
def open_file_dialog(self):
options = QFileDialog.Options()
files, _ = QFileDialog.getOpenFileNames(
self,
"Select Video Files",
str(DEFAULT_VIDEO_DIRECTORY),
"Video Files (*.mp4 *.avi)",
options=options,
)
if files:
self.video_path_line_edit.setText(", ".join(files))
files.sort()
self.archiver.video_files = [Path(x) for x in files]
[docs]
def update_has_video_state(self, state):
# Update self.archiver.has_video when the checkbox state changes
self.archiver.has_video = state == Qt.Checked
@click.command()
@click.argument("local_run_path", required=False)
@click.argument("remote_subjects_path", required=False)
@click.option(
"--keep_raw",
is_flag=True,
help="If passed does not remove the raw bin file after compression.",
)
@click.option(
"--no_local_compression",
is_flag=True,
help="If passed, compresses files on remote server instead of locally (legacy behavior).",
)
[docs]
def main(local_run_path, remote_subjects_path, keep_raw, no_local_compression):
"""
Entry point for the backup process.
If no arguments are provided, the GUI will open.
If both `local_run_path` and `remote_subjects_path` are provided, the process will run without the GUI.
Args:
local_run_path (str): Path to the local recording session.
remote_subjects_path (str): Path to the remote storage location.
keep_raw (bool): Flag to indicate whether raw data should be kept after compression.
no_local_compression (bool): If True, use legacy remote compression behavior.
"""
if local_run_path is None and remote_subjects_path is None:
archive(keep_raw)
elif local_run_path is not None and remote_subjects_path is not None:
no_gui(local_run_path, remote_subjects_path, compress_locally=not no_local_compression)
else:
click.echo("Invalid number of arguments")
[docs]
def archive(keep_raw):
"""
Run the backup process with a GUI.
Args:
keep_raw (bool): Whether to keep raw data after compression.
"""
app = QApplication(sys.argv)
archiver = Archiver(keep_raw)
set_path_dialog = RecordingInfoUI(
archiver, "Select archival storage location for freeze (on Archive)"
)
set_path_dialog.show()
app.exec()
archiver.full_archive()
[docs]
def no_gui(local_run_path, remote_subjects_path, compress_locally=True):
"""
Run the backup process without a GUI, using command line arguments.
Args:
local_run_path (str): Path to the local recording session.
remote_subjects_path (str): Path to the remote storage location.
compress_locally (bool): If True, compress files locally before copying. Defaults to True.
"""
archiver = Archiver(keep_raw=False)
archiver.run_path_source = Path(local_run_path)
archiver.subjects_path_target = Path(remote_subjects_path)
archiver.guess_subject_ID()
if compress_locally:
archiver.full_archive_with_local_compression()
else:
archiver.full_archive()
if __name__ == "__main__":
main()