utils
fileio
from_neuropype_h5(filename, chunk_names=[])
Import a Neuropype-exported HDF5 file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filename |
str |
Name of file on disk. Opened with h5py.File. |
required |
chunk_names |
List[str] |
Limit return to a subset of the chunks in the data file. |
[] |
Returns:
Type | Description |
---|---|
List[Tuple[str, dict]] |
A list of (name, chunk_dict) tuples. |
Source code in indl/utils/fileio.py
def from_neuropype_h5(filename: str, chunk_names: List[str] = []) -> List[Tuple[str, dict]]:
"""
Import a Neuropype-exported HDF5 file.
Args:
filename: Name of file on disk. Opened with h5py.File.
chunk_names: Limit return to a subset of the chunks in the data file.
Returns:
A list of (name, chunk_dict) tuples.
"""
import numpy as np
import h5py
from pandas import DataFrame
f = h5py.File(filename, 'r')
chunks = []
if 'chunks' in f.keys():
chunks_group = f['chunks']
ch_keys = [_ for _ in chunks_group.keys() if _ in chunk_names]
for ch_key in ch_keys:
chunk_group = chunks_group.get(ch_key)
# Process data
block_group = chunk_group.get('block')
data_ = block_group.get('data')
if isinstance(data_, h5py.Dataset):
data = data_[()]
else:
# Data is a group. This only happens with sparse matrices.
import scipy.sparse
data = scipy.sparse.csr_matrix((data_['data'][:], data_['indices'][:], data_['indptr'][:]),
data_.attrs['shape'])
axes_group = block_group.get('axes')
axes = []
for ax_ix, axis_key in enumerate(axes_group.keys()):
axis_group = axes_group.get(axis_key)
ax_type = axis_group.attrs.get('type')
new_ax = {'name': axis_key, 'type': ax_type}
if ax_type == 'axis':
new_ax.update(dict(x=np.arange(data.shape[ax_ix])))
elif ax_type == 'time':
nom_rate = axis_group.attrs.get('nominal_rate')
if np.isnan(nom_rate):
nom_rate = None
new_ax.update(dict(nominal_rate=nom_rate,
times=axis_group.get('times')[()]))
elif ax_type == 'frequency':
new_ax.update(dict(frequencies=axis_group.get('frequencies')[()]))
elif ax_type == 'space':
new_ax.update(dict(names=axis_group.get('names')[()],
naming_system=axis_group.attrs['naming_system'],
positions=axis_group.get('positions')[()],
coordinate_system=axis_group.attrs['coordinate_system'],
units=axis_group.get('units')[()]))
elif ax_type == 'feature':
new_ax.update(dict(names=axis_group.get('names')[()],
units=axis_group.get('units')[()],
properties=axis_group.get('properties')[()],
error_distrib=axis_group.get('error_distrib')[()],
sampling_distrib=axis_group.get('sampling_distrib')[()]))
elif ax_type == 'instance':
new_ax.update({'times': axis_group.get('times')[()]})
if 'instance_type' in axis_group.attrs:
new_ax.update({'instance_type': axis_group.attrs['instance_type']})
_dat = axis_group.get('data')[()]
if not _dat.dtype.names:
new_ax.update({'data': axis_group.get('data')[()]})
else:
_df = DataFrame(_dat)
# Convert binary objects to string objects
str_df = _df.select_dtypes([np.object])
str_df = str_df.stack().str.decode('utf-8').unstack()
for col in str_df:
_df[col] = str_df[col]
new_ax.update({'data': _df})
elif ax_type == 'statistic':
new_ax.update(dict(param_types=axis_group.get('param_types')[()]))
elif ax_type == 'lag':
new_ax.update(dict(xlags=axis_group.get('lags')[()]))
if new_ax is not None:
axes.append(new_ax)
chunks.append((ch_key, dict(data=data, axes=axes,
props=_recurse_get_dict_from_group(chunk_group.get('props')))))
return chunks
metrics
dprime(y_true, y_pred, pmarg=0.01, outputs=['dprime', 'bias', 'accuracy'])
Calculate D-Prime for binary data. 70% for both classes is d=1.0488. Highest possible is 6.93, but effectively 4.65 for 99%
http://www.birmingham.ac.uk/Documents/college-les/psych/vision-laboratory/sdtintro.pdf
This function is not designed to behave as a valid 'Tensorflow metric'.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
y_true |
array-like |
True labels. |
required |
y_pred |
array-like |
Predicted labels. |
required |
pmarg |
float |
0.01 |
|
outputs |
List[str] |
list of outputs among 'dprime', 'bias', 'accuracy' |
['dprime', 'bias', 'accuracy'] |
Returns:
Type | Description |
---|---|
tuple |
Calculated d-prime value. |
Source code in indl/utils/metrics.py
def dprime(y_true, y_pred, pmarg: float = 0.01, outputs: List[str] = ['dprime', 'bias', 'accuracy']) -> tuple:
"""
Calculate D-Prime for binary data.
70% for both classes is d=1.0488.
Highest possible is 6.93, but effectively 4.65 for 99%
http://www.birmingham.ac.uk/Documents/college-les/psych/vision-laboratory/sdtintro.pdf
This function is not designed to behave as a valid 'Tensorflow metric'.
Args:
y_true (array-like): True labels.
y_pred (array-like): Predicted labels.
pmarg:
outputs: list of outputs among 'dprime', 'bias', 'accuracy'
Returns:
Calculated d-prime value.
"""
import numpy as np
from scipy.stats import norm
# TODO: Adapt this function for tensorflow
# y_pred = ops.convert_to_tensor(y_pred)
# y_true = math_ops.cast(y_true, y_pred.dtype)
# return K.mean(math_ops.squared_difference(y_pred, y_true), axis=-1)
# TODO: Check that true_y only has 2 classes, and test_y is entirely within true_y classes.
b_true = y_pred == y_true
b_pos = np.unique(y_true, return_inverse=True)[1].astype(bool)
true_pos = np.sum(np.logical_and(b_true, b_pos))
true_neg = np.sum(np.logical_and(b_true, ~b_pos))
false_pos = np.sum(np.logical_and(~b_true, b_pos))
false_neg = np.sum(np.logical_and(~b_true, ~b_pos))
tpr = true_pos / (true_pos + false_neg)
tpr = max(pmarg, min(tpr, 1-pmarg))
fpr = false_pos / (false_pos + true_neg)
fpr = max(pmarg, min(fpr, 1 - pmarg))
ztpr = norm.ppf(tpr, loc=0, scale=1)
zfpr = norm.ppf(fpr, loc=0, scale=1)
# Other measures of performance:
# sens = tp ./ (tp+fp)
# spec = tn ./ (tn+fn)
# balAcc = (sens+spec)/2
# informedness = sens+spec-1
output = tuple()
for out in outputs:
if out == 'dprime':
dprime = ztpr - zfpr
output += (dprime,)
elif out == 'bias':
bias = -(ztpr + zfpr) / 2
output += (bias,)
elif out == 'accuracy':
accuracy = 100 * (true_pos + true_neg) / (true_pos + false_pos + false_neg + true_neg)
output += (accuracy,)
return output
quickplot_history(history)
A little helper function to do a quick plot of model fit results.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
history |
tf.keras History |
required |
Source code in indl/utils/metrics.py
def quickplot_history(history) -> None:
"""
A little helper function to do a quick plot of model fit results.
Args:
history (tf.keras History):
"""
import matplotlib.pyplot as plt
if hasattr(history, 'history'):
history = history.history
hist_metrics = [_ for _ in history.keys() if not _.startswith('val_')]
for m_ix, m in enumerate(hist_metrics):
plt.subplot(len(hist_metrics), 1, m_ix + 1)
plt.plot(history[m], label='Train')
plt.plot(history['val_' + m], label='Valid.')
plt.xlabel('Epoch')
plt.ylabel(m)
plt.legend()
plt.tight_layout()
plt.show()
regularizers
KernelLengthRegularizer (Regularizer)
Regularize a kernel by its length. Added loss is a int mask of 1s where abs(weight) is above threshold, and 0s otherwise, multiplied by a window. The window is typically shaped to penalize the presence of non-zero weights further away from the middle of the kernel. Use this regularizer if you want to try to find a minimal-length kernel. (after training, kernel can be shortened for faster inference).
Source code in indl/utils/regularizers.py
class KernelLengthRegularizer(tf.keras.regularizers.Regularizer):
"""
Regularize a kernel by its length. Added loss is a int mask of 1s where abs(weight) is above threshold,
and 0s otherwise, multiplied by a window. The window is typically shaped to penalize the presence of
non-zero weights further away from the middle of the kernel. Use this regularizer if you want to
try to find a minimal-length kernel. (after training, kernel can be shortened for faster inference).
"""
def __init__(self, kernel_size: Iterable[int], window_scale: float = 1e-2, window_func: str = 'poly',
poly_exp: int = 2, threshold: float = tf.keras.backend.epsilon()):
"""
Args:
kernel_size: length(s) of kernel(s)
window_scale: scale factor to apply to window
window_func: 'hann', 'hamming', 'poly' (default)
poly_exp: exponent used when window_func=='poly'
threshold: weight threshold, below which weights will not be penalized.
"""
self.kernel_size = kernel_size
self.window_scale = window_scale
self.window_func = window_func
self.poly_exp = poly_exp
self.threshold = threshold
self.rebuild_window()
def rebuild_window(self):
windows = []
for win_dim, win_len in enumerate(self.kernel_size):
if win_len == 1:
window = np.ones((1,), dtype=np.float32)
else:
if self.window_func == 'hann':
window = 1 - tf.signal.hann_window(win_len, periodic=False)
elif self.window_func == 'hamming':
window = 1 - tf.signal.hamming_window(win_len, periodic=False)
else: # if window_func == 'linear':
hl = win_len // 2
window = np.zeros((win_len,), dtype=np.float32)
window[:hl] = np.arange(1, hl + 1)[::-1] # Negative slope line to 0 for first half.
window[-hl:] = np.arange(1, hl + 1) # Positive slope line from 0 for second half.
window = window / hl # Scale so it's -1 -- 0 -- 1
window = window ** self.poly_exp # Exponent
win_shape = [1] * (len(self.kernel_size) + 2)
win_shape[win_dim] = win_len
window = tf.reshape(window, win_shape)
windows.append(window)
self.window = tf.linalg.matmul(*windows)
self.window = self.window / tf.reduce_max(self.window)
def get_config(self) -> dict:
return {'kernel_size': self.kernel_size,
'window_scale': self.window_scale,
'window_func': self.window_func,
'poly_exp': self.poly_exp,
'threshold': self.threshold}
def __call__(self, weights):
weights = tf.sqrt(tf.square(weights)) # Differentiable abs
# non_zero = tf.cast(weights > self.threshold, tf.float32)
non_zero = tf.nn.sigmoid(weights - self.threshold)
regularization = self.window_scale * self.window * non_zero
# regularization = tf.reduce_max(regularization, axis=[0, 1], keepdims=True)
regularization = tf.reduce_sum(regularization)
return regularization
__init__(self, kernel_size, window_scale=0.01, window_func='poly', poly_exp=2, threshold=1e-07)
special
Parameters:
Name | Type | Description | Default |
---|---|---|---|
kernel_size |
Iterable[int] |
length(s) of kernel(s) |
required |
window_scale |
float |
scale factor to apply to window |
0.01 |
window_func |
str |
'hann', 'hamming', 'poly' (default) |
'poly' |
poly_exp |
int |
exponent used when window_func=='poly' |
2 |
threshold |
float |
weight threshold, below which weights will not be penalized. |
1e-07 |
Source code in indl/utils/regularizers.py
def __init__(self, kernel_size: Iterable[int], window_scale: float = 1e-2, window_func: str = 'poly',
poly_exp: int = 2, threshold: float = tf.keras.backend.epsilon()):
"""
Args:
kernel_size: length(s) of kernel(s)
window_scale: scale factor to apply to window
window_func: 'hann', 'hamming', 'poly' (default)
poly_exp: exponent used when window_func=='poly'
threshold: weight threshold, below which weights will not be penalized.
"""
self.kernel_size = kernel_size
self.window_scale = window_scale
self.window_func = window_func
self.poly_exp = poly_exp
self.threshold = threshold
self.rebuild_window()
get_config(self)
Returns the config of the regularizer.
An regularizer config is a Python dictionary (serializable) containing all configuration parameters of the regularizer. The same regularizer can be reinstantiated later (without any saved state) from this configuration.
This method is optional if you are just training and executing models, exporting to and from SavedModels, or using weight checkpoints.
This method is required for Keras model_to_estimator
, saving and
loading models to HDF5 formats, Keras model cloning, some visualization
utilities, and exporting models to and from JSON.
Returns:
Type | Description |
---|---|
dict |
Python dictionary. |
Source code in indl/utils/regularizers.py
def get_config(self) -> dict:
return {'kernel_size': self.kernel_size,
'window_scale': self.window_scale,
'window_func': self.window_func,
'poly_exp': self.poly_exp,
'threshold': self.threshold}