"""
Command line script to extract and downsample physiological data from NIDAQ recording.
The recorded auxiliary data (e.g. diaphragm and pleth) is sampled at 10K.
While the high sampling rate is critical to acquire good EMG, it is excessive for both
the integrated and the pleth.
This script does the following:
1) Downsamples the Flow/Pdiff traces by 10x to be input into breathmetrics (BM does filtering and processing)
2) Filters the EMG (300-5K)
3) Integrates the EMG with a triangular window
4) Downsamples the integrated EMG by 10x to match the pleth
5) Extracts features from the integrated EMG.
6) Extracts heart rate from EKG channel if recorded
6) Saves to alf standardized files in the session path
"""
import subprocess
import numpy as np
from pathlib import Path
import click
import pandas as pd
import spikeglx
import sys
from one.alf import spec
try:
from cibrrig.preprocess import physiology
from cibrrig.preprocess import nidq_utils
except ImportError:
sys.path.append("../")
import physiology
import nidq_utils
import logging
logging.basicConfig()
[docs]
_log = logging.getLogger("extract_physiology")
_log.setLevel(logging.INFO)
# This is fragile
if sys.platform == "linux":
BM_PATH = "/active/ramirez_j/ramirezlab/nbush/projects/cibrrig/cibrrig/preprocess"
else:
BM_PATH = r"Y:/projects/cibrrig/cibrrig/preprocess"
if not Path(BM_PATH).exists():
[docs]
BM_PATH = Path(__file__).parent
_log.warning("Using local path for breathmetrics")
[docs]
def _crop_traces(t, x):
"""
Utility to crop both the time trace and an arbitrary vector to the same length (shortest length).
Useful to prevent off-by-one errors.
Args:
t (np.ndarray): Time stamps for each element in x.
x (np.ndarray): Signal vector.
Returns:
tuple: A tuple containing:
- np.ndarray: Cropped timestamp array.
- np.ndarray: Cropped Signal vector.
"""
assert len(t) == len(x), "Time and signal vectors must be the same length"
tlen = np.min([len(x), len(t)])
return (t[:tlen], x[:tlen])
[docs]
def _get_trigger_from_SR(SR):
"""Get the trigger number from the filename
Args:
SR (spikeglx.Reader):Spikeglx reader object
Returns:
str: Trigger label extracted from the filename (e.g., 't0', 't1', etc.)
"""
assert SR.type == "nidq", "Not a nidaq bin file"
label = SR.file_bin.with_suffix("").with_suffix("").stem
trigger_label = label[-2:]
_log.debug(f"Extracted trigger label is '{trigger_label}'")
return trigger_label
[docs]
def label_sighs(dia_df):
"""
Categorize breaths as sighs or eupnea based on the diaphragm data.
Args:
dia_df (pd.DataFrame): DataFrame containing diaphragm data with columns 'on_sec' and 'auc'.
Returns:
pd.DataFrame: DataFrame with additional columns 'breath_type', 'is_sigh', and 'is_eupnea'.
"""
is_sigh = physiology.compute_sighs(dia_df["on_sec"].values, dia_df["auc"].values)
dia_df["breath_type"] = "eupnea"
dia_df.loc[is_sigh, "breath_type"] = "sigh"
dia_df["is_sigh"] = is_sigh
dia_df["is_eupnea"] = dia_df.eval("~is_sigh")
return dia_df
[docs]
def run_one(SR, wiring, v_in, inhale_pos, save_path):
"""
Process physiological data from a single recording.
Passes data to functions in the physiology module to process and save the data.
Args:
SR (spikeglx.Reader): SpikeGLX reader object for the recording.
wiring (dict): Wiring configuration.
v_in (float): Voltage input.
inhale_pos (int): Inhale position.
save_path (Path): Path to save the processed data.
Returns:
None
"""
DS_FACTOR = 10
# INIT output variables
pdiff = np.array([])
flow = np.array([])
dia_filt = np.array([])
dia_sub = np.array([])
temperature = np.array([])
heartbeats = np.array([])
hr_bpm = np.array([])
has_dia = False
has_ekg = False
has_pdiff = False
has_temperature = False
has_flowmeter = False
analog_offset = 16 # Hardcoded offset for the sync map.
dia_chan = wiring.get("diaphragm", False)
ekg_chan = wiring.get("ekg", False)
pdiff_chan = wiring.get("pdiff", False)
flowmeter_chan = wiring.get("flowmeter", False)
temp_chan = wiring.get("temperature", False)
# Need explicit booleans because channel can be 0
if dia_chan:
dia_chan -= analog_offset
has_dia = True
if ekg_chan:
ekg_chan -= analog_offset
has_ekg = True
if pdiff_chan:
pdiff_chan -= analog_offset
has_pdiff = True
if flowmeter_chan:
flowmeter_chan -= analog_offset
has_flowmeter = True
if temp_chan:
temp_chan -= analog_offset
has_temperature = True
# Make inhales positive deflections in the pdiff signal
if inhale_pos:
inhale_dir = 1
else:
inhale_dir = -1
save_path = save_path or SR.file_bin.parent
# LOAD Memory map from SGLX
sr = SR.fs
# Get tvec
t = nidq_utils.get_tvec_from_SR(SR)
t_full = t.copy()
t = t[::DS_FACTOR]
sr_sub = sr / DS_FACTOR
_log.info(f"Sampling rate is {sr}")
_log.info(f"Downsampling to {sr_sub}")
# Process diaphragm
# Must do before explicit ekg processing because it attempts
# to find the heartbeats, but it is not as good as the
# explicit EKG channel and so we want to overwrite heartbeats with those data if they exist
# Process EKG
heartbeats = (
None # Initialize heartbeats to be none in the case that EKG is not recorded
)
if has_ekg:
_log.info("Processing EKG")
heartbeats = nidq_utils.extract_hr_channel(SR, ekg_chan)
_, hr_bpm = physiology.compute_avg_hr(heartbeats, t_target=t)
# Process dia
if has_dia:
_log.info("Processing diaphragm")
raw_dia, sr_dia = nidq_utils.load_dia_emg(SR, dia_chan)
dia_df, dia_sub, sr_dia_sub, HR, dia_filt, heartbeats = (
nidq_utils.filt_int_ds_dia(
raw_dia, sr_dia, ds_factor=DS_FACTOR, heartbeats=heartbeats
)
)
t, dia_sub = _crop_traces(t, dia_sub)
# Process PDIFF
if has_pdiff:
_log.info("Processing pressure differential sensor")
pdiff, sr_pdiff = nidq_utils.load_ds_pdiff(
SR, pdiff_chan, ds_factor=DS_FACTOR, inhale_dir=inhale_dir
)
t, pdiff = _crop_traces(t, pdiff)
# Process Flowmeter
if has_flowmeter:
_log.info("Processing flowmeter")
flow, sr_flow = nidq_utils.load_ds_process_flowmeter(
SR, flowmeter_chan, v_in, ds_factor=DS_FACTOR, inhale_dir=inhale_dir
)
t, flow = _crop_traces(t, flow)
# Process Temperature
if has_temperature:
_log.info("Processing temperature")
temperature = nidq_utils.extract_temp(SR, temp_chan, ds_factor=DS_FACTOR)
t, temperature = _crop_traces(t, temperature)
# Save the downsampled data to a mat file
trigger_label = _get_trigger_from_SR(SR)
_log.info("Saving outputs")
_log.debug(
f"{t.shape[0]=}\n{pdiff.shape[0]=}\n{dia_sub.shape[0]=}\n{hr_bpm.shape[0]=}\n{temperature.shape[0]=}\n{flow.shape[0]=}"
)
# Save the downsampled data to a parquet file in alf format
physiol_df = pd.DataFrame()
if has_pdiff:
physiol_df["pdiff"] = pdiff
if has_dia:
physiol_df["dia"] = dia_sub
if has_ekg:
physiol_df["hr_bpm"] = hr_bpm
if has_temperature:
physiol_df["temperature"] = temperature
if has_flowmeter:
physiol_df["flowmeter"] = flow
fn_physiol = spec.to_alf(
"physiology", "table", "pqt", "cibrrig", extra=trigger_label
)
fn_physiol_timestamps = spec.to_alf(
"physiology", "times", "npy", "cibrrig", extra=trigger_label
)
fn_heartbeat = spec.to_alf(
"heartbeat", "times", "npy", "cibrrig", extra=trigger_label
)
physiol_df.to_parquet(save_path.joinpath(fn_physiol))
np.save(save_path.joinpath(fn_physiol_timestamps), t)
if has_ekg:
np.save(save_path.joinpath(fn_heartbeat), heartbeats)
fn_breath_onsets = spec.to_alf(
"breaths", "times", "npy", "cibrrig", extra=trigger_label
)
fn_breaths = spec.to_alf("breaths", "table", "pqt", "cibrrig", extra=trigger_label)
if has_dia:
# But strip the data referenced to the 10K sampling
dia_df.drop(
["on_samp", "off_samp", "duration_samp", "pk_samp"], axis=1, inplace=True
)
# Compute sighs from diaphragm
dia_df = label_sighs(dia_df)
# Save breaths features
dia_df.to_parquet(save_path.joinpath(fn_breaths), index=False)
# Save breath onsets
breath_onsets = dia_df["on_sec"].values
np.save(save_path.joinpath(fn_breath_onsets), breath_onsets)
# Save the filtered diaphragm signal
fn_dia_filt = spec.to_alf(
"diaphragm", "filtered", "npy", "cibrrig", extra=trigger_label
)
np.save(save_path.joinpath(fn_dia_filt), dia_filt)
# Save the timestamps for the filtered diaphragm signal
fn_dia_filt_times = spec.to_alf(
"diaphragm", "times", "npy", "cibrrig", extra=trigger_label
)
np.save(save_path.joinpath(fn_dia_filt_times), t_full)
else:
dia_df = pd.DataFrame() # Write an empty table so breathmetrics has a filename
dia_df.to_parquet(save_path.joinpath(fn_breaths))
np.save(save_path.joinpath(fn_breath_onsets), np.array([]))
# TODO:
# Running this not from the cibrrig preprocess folder is problematic because it needs to run the breathmetrics_proc.m file
# At this point I don't know how best to incorporate this matlab script into a python workflow without hacking paths
[docs]
def run(session_path, v_in=9, inhale_pos=False, save_path=None, debug=False):
"""
Run the physiology extraction process on a session.
Args:
session_path (Path): Path to the session data.
v_in (float, optional): Voltage supply for the flowmeter sensor. Defaults to 9.
inhale_pos (bool, optional): Set true if inhale was acquired as a positive signal. Defaults to False.
save_path (Path, optional): Path to save data to. If None, save to parent directory of binary file. Defaults to None.
debug (bool, optional): If True, enable debug loggin. Defaults to False.
"""
_log.setLevel(logging.DEBUG) if debug else None
# Set paths, create save directory, and find Nidaq files
session_path = Path(session_path)
save_path = save_path or session_path.joinpath("alf")
save_path.mkdir(exist_ok=True)
ephys_path = session_path.joinpath("raw_ephys_data")
_log.debug(f"Looking for ephys in : {ephys_path}")
ni_fn_list = list(ephys_path.glob("*nidq.*bin"))
ni_fn_list.sort()
_log.debug(f"Found Nidaq data: {ni_fn_list}")
# Process each Nidaq file
for ni_fn in ni_fn_list:
_log.info(f"Processing {ni_fn}")
SR = spikeglx.Reader(ni_fn)
ni_wiring = spikeglx.get_sync_map(ni_fn.parent)
run_one(SR, ni_wiring, v_in, inhale_pos, save_path)
trigger_label = _get_trigger_from_SR(SR)
has_pdiff = "pdiff" in ni_wiring.keys()
has_flow = "flowmeter" in ni_wiring.keys()
# If we have a pdiff or flowmeter signal, run breathmetrics (requires matlab)
if has_pdiff or has_flow:
_log.info("=" * 50)
_log.info("Sending to Breathmetrics")
command = [
"matlab",
"-batch",
f"breathmetrics_proc('{save_path}','{trigger_label}')",
]
try:
subprocess.run(command, check=True, cwd=BM_PATH)
# Rewrite using pandas for a strange compatability issue
fn = list(save_path.glob(f"*breaths*table*{trigger_label}*.pqt"))[0]
aa = pd.read_parquet(fn)
aa.to_parquet(fn)
except Exception as e:
_log.error(e)
_log.error("=" * 50)
_log.error(
"=" * 15
+ "BREATHMETRICS FAILED Is it installed on the Matlab path or are you not running from the preprocess folder?!!"
+ "=" * 15
)
_log.error("=" * 50)
import time
time.sleep(5)
continue
else:
_log.info("No airflow signal so not performing BM")
@click.command()
@click.argument("session_path")
@click.option("-v", "--v_in", "v_in", default=9, type=float, show_default=True)
@click.option("-i", "--inhale_pos", is_flag=True, default=False, show_default=True)
@click.option("-s", "--save_path", "save_path", default=None, show_default=True)
@click.option("--debug", is_flag=True)
[docs]
def main(session_path, v_in, inhale_pos, save_path, debug):
"""
Main entry point for the physiology extraction script.
Args:
session_path (str): Path to the session data.
v_in (float, optional): Voltage supply for the flowmeter sensor. Defaults to 9.
inhale_pos (bool, optional): Set true if inhale was acquired as a positive signal. Defaults to False.
save_path (str, optional): Path to save data to. If None, save to parent directory of binary file. Defaults to None.
debug (bool): If True, enable debug mode.
"""
run(session_path, v_in, inhale_pos, save_path, debug)
if __name__ == "__main__":
main()