Skip to content

utils

fileio

from_neuropype_h5(filename, chunk_names=[])

Import a Neuropype-exported HDF5 file.

Parameters:

Name Type Description Default
filename str

Name of file on disk. Opened with h5py.File.

required
chunk_names List[str]

Limit return to a subset of the chunks in the data file.

[]

Returns:

Type Description
List[Tuple[str, dict]]

A list of (name, chunk_dict) tuples.

Source code in indl/utils/fileio.py
def from_neuropype_h5(filename: str, chunk_names: List[str] = []) -> List[Tuple[str, dict]]:
    """
    Import a Neuropype-exported HDF5 file.

    Args:
        filename: Name of file on disk. Opened with h5py.File.
        chunk_names: Limit return to a subset of the chunks in the data file.

    Returns:
        A list of (name, chunk_dict) tuples.
    """
    import numpy as np
    import h5py
    from pandas import DataFrame
    f = h5py.File(filename, 'r')

    chunks = []
    if 'chunks' in f.keys():
        chunks_group = f['chunks']
        ch_keys = [_ for _ in chunks_group.keys() if _ in chunk_names]
        for ch_key in ch_keys:
            chunk_group = chunks_group.get(ch_key)

            # Process data
            block_group = chunk_group.get('block')
            data_ = block_group.get('data')
            if isinstance(data_, h5py.Dataset):
                data = data_[()]
            else:
                # Data is a group. This only happens with sparse matrices.
                import scipy.sparse
                data = scipy.sparse.csr_matrix((data_['data'][:], data_['indices'][:], data_['indptr'][:]),
                                               data_.attrs['shape'])

            axes_group = block_group.get('axes')
            axes = []
            for ax_ix, axis_key in enumerate(axes_group.keys()):
                axis_group = axes_group.get(axis_key)
                ax_type = axis_group.attrs.get('type')
                new_ax = {'name': axis_key, 'type': ax_type}
                if ax_type == 'axis':
                    new_ax.update(dict(x=np.arange(data.shape[ax_ix])))
                elif ax_type == 'time':
                    nom_rate = axis_group.attrs.get('nominal_rate')
                    if np.isnan(nom_rate):
                        nom_rate = None
                    new_ax.update(dict(nominal_rate=nom_rate,
                                       times=axis_group.get('times')[()]))
                elif ax_type == 'frequency':
                    new_ax.update(dict(frequencies=axis_group.get('frequencies')[()]))
                elif ax_type == 'space':
                    new_ax.update(dict(names=axis_group.get('names')[()],
                                       naming_system=axis_group.attrs['naming_system'],
                                       positions=axis_group.get('positions')[()],
                                       coordinate_system=axis_group.attrs['coordinate_system'],
                                       units=axis_group.get('units')[()]))
                elif ax_type == 'feature':
                    new_ax.update(dict(names=axis_group.get('names')[()],
                                       units=axis_group.get('units')[()],
                                       properties=axis_group.get('properties')[()],
                                       error_distrib=axis_group.get('error_distrib')[()],
                                       sampling_distrib=axis_group.get('sampling_distrib')[()]))
                elif ax_type == 'instance':
                    new_ax.update({'times': axis_group.get('times')[()]})
                    if 'instance_type' in axis_group.attrs:
                        new_ax.update({'instance_type': axis_group.attrs['instance_type']})
                    _dat = axis_group.get('data')[()]
                    if not _dat.dtype.names:
                        new_ax.update({'data': axis_group.get('data')[()]})
                    else:
                        _df = DataFrame(_dat)
                        # Convert binary objects to string objects
                        str_df = _df.select_dtypes([np.object])
                        str_df = str_df.stack().str.decode('utf-8').unstack()
                        for col in str_df:
                            _df[col] = str_df[col]
                        new_ax.update({'data': _df})

                elif ax_type == 'statistic':
                    new_ax.update(dict(param_types=axis_group.get('param_types')[()]))
                elif ax_type == 'lag':
                    new_ax.update(dict(xlags=axis_group.get('lags')[()]))
                if new_ax is not None:
                    axes.append(new_ax)

            chunks.append((ch_key, dict(data=data, axes=axes,
                                        props=_recurse_get_dict_from_group(chunk_group.get('props')))))

    return chunks

metrics

dprime(y_true, y_pred, pmarg=0.01, outputs=['dprime', 'bias', 'accuracy'])

Calculate D-Prime for binary data. 70% for both classes is d=1.0488. Highest possible is 6.93, but effectively 4.65 for 99%

http://www.birmingham.ac.uk/Documents/college-les/psych/vision-laboratory/sdtintro.pdf

This function is not designed to behave as a valid 'Tensorflow metric'.

Parameters:

Name Type Description Default
y_true array-like

True labels.

required
y_pred array-like

Predicted labels.

required
pmarg float 0.01
outputs List[str]

list of outputs among 'dprime', 'bias', 'accuracy'

['dprime', 'bias', 'accuracy']

Returns:

Type Description
tuple

Calculated d-prime value.

Source code in indl/utils/metrics.py
def dprime(y_true, y_pred, pmarg: float = 0.01, outputs: List[str] = ['dprime', 'bias', 'accuracy']) -> tuple:
    """
    Calculate D-Prime for binary data.
    70% for both classes is d=1.0488.
    Highest possible is 6.93, but effectively 4.65 for 99%

    http://www.birmingham.ac.uk/Documents/college-les/psych/vision-laboratory/sdtintro.pdf

    This function is not designed to behave as a valid 'Tensorflow metric'.

    Args:
        y_true (array-like): True labels.
        y_pred (array-like): Predicted labels.
        pmarg:
        outputs: list of outputs among 'dprime', 'bias', 'accuracy'

    Returns:
        Calculated d-prime value.
    """

    import numpy as np
    from scipy.stats import norm

    # TODO: Adapt this function for tensorflow
    # y_pred = ops.convert_to_tensor(y_pred)
    # y_true = math_ops.cast(y_true, y_pred.dtype)
    # return K.mean(math_ops.squared_difference(y_pred, y_true), axis=-1)

    # TODO: Check that true_y only has 2 classes, and test_y is entirely within true_y classes.
    b_true = y_pred == y_true
    b_pos = np.unique(y_true, return_inverse=True)[1].astype(bool)

    true_pos = np.sum(np.logical_and(b_true, b_pos))
    true_neg = np.sum(np.logical_and(b_true, ~b_pos))
    false_pos = np.sum(np.logical_and(~b_true, b_pos))
    false_neg = np.sum(np.logical_and(~b_true, ~b_pos))

    tpr = true_pos / (true_pos + false_neg)
    tpr = max(pmarg, min(tpr, 1-pmarg))
    fpr = false_pos / (false_pos + true_neg)
    fpr = max(pmarg, min(fpr, 1 - pmarg))
    ztpr = norm.ppf(tpr, loc=0, scale=1)
    zfpr = norm.ppf(fpr, loc=0, scale=1)

    # Other measures of performance:
    # sens = tp ./ (tp+fp)
    # spec = tn ./ (tn+fn)
    # balAcc = (sens+spec)/2
    # informedness = sens+spec-1

    output = tuple()
    for out in outputs:
        if out == 'dprime':
            dprime = ztpr - zfpr
            output += (dprime,)
        elif out == 'bias':
            bias = -(ztpr + zfpr) / 2
            output += (bias,)
        elif out == 'accuracy':
            accuracy = 100 * (true_pos + true_neg) / (true_pos + false_pos + false_neg + true_neg)
            output += (accuracy,)

    return output

quickplot_history(history)

A little helper function to do a quick plot of model fit results.

Parameters:

Name Type Description Default
history tf.keras History required
Source code in indl/utils/metrics.py
def quickplot_history(history) -> None:
    """
    A little helper function to do a quick plot of model fit results.
    Args:
        history (tf.keras History):
    """
    import matplotlib.pyplot as plt
    if hasattr(history, 'history'):
        history = history.history
    hist_metrics = [_ for _ in history.keys() if not _.startswith('val_')]

    for m_ix, m in enumerate(hist_metrics):
        plt.subplot(len(hist_metrics), 1, m_ix + 1)
        plt.plot(history[m], label='Train')
        plt.plot(history['val_' + m], label='Valid.')
        plt.xlabel('Epoch')
        plt.ylabel(m)
    plt.legend()
    plt.tight_layout()
    plt.show()

regularizers

KernelLengthRegularizer (Regularizer)

Regularize a kernel by its length. Added loss is a int mask of 1s where abs(weight) is above threshold, and 0s otherwise, multiplied by a window. The window is typically shaped to penalize the presence of non-zero weights further away from the middle of the kernel. Use this regularizer if you want to try to find a minimal-length kernel. (after training, kernel can be shortened for faster inference).

Source code in indl/utils/regularizers.py
class KernelLengthRegularizer(tf.keras.regularizers.Regularizer):
    """
    Regularize a kernel by its length. Added loss is a int mask of 1s where abs(weight) is above threshold,
    and 0s otherwise, multiplied by a window. The window is typically shaped to penalize the presence of
    non-zero weights further away from the middle of the kernel. Use this regularizer if you want to
    try to find a minimal-length kernel. (after training, kernel can be shortened for faster inference).
    """
    def __init__(self, kernel_size: Iterable[int], window_scale: float = 1e-2, window_func: str = 'poly',
                 poly_exp: int = 2, threshold: float = tf.keras.backend.epsilon()):
        """

        Args:
            kernel_size: length(s) of kernel(s)
            window_scale: scale factor to apply to window
            window_func: 'hann', 'hamming', 'poly' (default)
            poly_exp: exponent used when window_func=='poly'
            threshold: weight threshold, below which weights will not be penalized.
        """
        self.kernel_size = kernel_size
        self.window_scale = window_scale
        self.window_func = window_func
        self.poly_exp = poly_exp
        self.threshold = threshold
        self.rebuild_window()

    def rebuild_window(self):
        windows = []
        for win_dim, win_len in enumerate(self.kernel_size):
            if win_len == 1:
                window = np.ones((1,), dtype=np.float32)
            else:
                if self.window_func == 'hann':
                    window = 1 - tf.signal.hann_window(win_len, periodic=False)
                elif self.window_func == 'hamming':
                    window = 1 - tf.signal.hamming_window(win_len, periodic=False)
                else:  # if window_func == 'linear':
                    hl = win_len // 2
                    window = np.zeros((win_len,), dtype=np.float32)
                    window[:hl] = np.arange(1, hl + 1)[::-1]  # Negative slope line to 0 for first half.
                    window[-hl:] = np.arange(1, hl + 1)  # Positive slope line from 0 for second half.
                    window = window / hl  # Scale so it's -1 -- 0 -- 1
                    window = window ** self.poly_exp  # Exponent

            win_shape = [1] * (len(self.kernel_size) + 2)
            win_shape[win_dim] = win_len
            window = tf.reshape(window, win_shape)
            windows.append(window)

        self.window = tf.linalg.matmul(*windows)
        self.window = self.window / tf.reduce_max(self.window)

    def get_config(self) -> dict:
        return {'kernel_size': self.kernel_size,
                'window_scale': self.window_scale,
                'window_func': self.window_func,
                'poly_exp': self.poly_exp,
                'threshold': self.threshold}

    def __call__(self, weights):
        weights = tf.sqrt(tf.square(weights))  # Differentiable abs
        # non_zero = tf.cast(weights > self.threshold, tf.float32)
        non_zero = tf.nn.sigmoid(weights - self.threshold)

        regularization = self.window_scale * self.window * non_zero
        # regularization = tf.reduce_max(regularization, axis=[0, 1], keepdims=True)
        regularization = tf.reduce_sum(regularization)
        return regularization

__init__(self, kernel_size, window_scale=0.01, window_func='poly', poly_exp=2, threshold=1e-07) special

Parameters:

Name Type Description Default
kernel_size Iterable[int]

length(s) of kernel(s)

required
window_scale float

scale factor to apply to window

0.01
window_func str

'hann', 'hamming', 'poly' (default)

'poly'
poly_exp int

exponent used when window_func=='poly'

2
threshold float

weight threshold, below which weights will not be penalized.

1e-07
Source code in indl/utils/regularizers.py
def __init__(self, kernel_size: Iterable[int], window_scale: float = 1e-2, window_func: str = 'poly',
             poly_exp: int = 2, threshold: float = tf.keras.backend.epsilon()):
    """

    Args:
        kernel_size: length(s) of kernel(s)
        window_scale: scale factor to apply to window
        window_func: 'hann', 'hamming', 'poly' (default)
        poly_exp: exponent used when window_func=='poly'
        threshold: weight threshold, below which weights will not be penalized.
    """
    self.kernel_size = kernel_size
    self.window_scale = window_scale
    self.window_func = window_func
    self.poly_exp = poly_exp
    self.threshold = threshold
    self.rebuild_window()

get_config(self)

Returns the config of the regularizer.

An regularizer config is a Python dictionary (serializable) containing all configuration parameters of the regularizer. The same regularizer can be reinstantiated later (without any saved state) from this configuration.

This method is optional if you are just training and executing models, exporting to and from SavedModels, or using weight checkpoints.

This method is required for Keras model_to_estimator, saving and loading models to HDF5 formats, Keras model cloning, some visualization utilities, and exporting models to and from JSON.

Returns:

Type Description
dict

Python dictionary.

Source code in indl/utils/regularizers.py
def get_config(self) -> dict:
    return {'kernel_size': self.kernel_size,
            'window_scale': self.window_scale,
            'window_func': self.window_func,
            'poly_exp': self.poly_exp,
            'threshold': self.threshold}