lfads
create_generator_lfads(params)
units_gen, units_con, factors_dim, co_dim, ext_input_dim, inject_ext_input_to_gen,
Source code in indl/model/lfads/__init__.py
def create_generator_lfads(params):
"""
units_gen,
units_con,
factors_dim,
co_dim,
ext_input_dim,
inject_ext_input_to_gen,
"""
from indl.model.lfads.complex import ComplexCell
# TODO: Sample/Mean from $q(f)$. This will replace the first element in generator init_states
# TODO: need a custom function for sample-during-train-mean-during-test. See nn.dropout for inspiration.
# TODO: Sample from $q(z_t)$, and optionally concat with ext_input, to build generator inputs.
# TODO: continue generator from lfads-cd/lfadslite.py start at 495
custom_cell = ComplexCell(
params['gen_dim'], # Units in generator GRU
con_hidden_state_dim, # Units in controller GRU
params['factors_dim'],
params['co_dim'],
params['ext_input_dim'],
True,
)
generator = tfkl.RNN(custom_cell, return_sequences=True,
# recurrent_regularizer=tf.keras.regularizers.l2(l=gen_l2_reg),
name='gen_rnn')
init_states = generator.get_initial_state(gen_input)
gen_output = generator(gen_input, initial_state=init_states)
factors = gen_output[-1]
return factors
complex
ComplexCell (AbstractRNNCell)
Source code in indl/model/lfads/complex.py
class ComplexCell(tfkl.AbstractRNNCell):
_BIAS_VARIABLE_NAME = "bias"
_WEIGHTS_VARIABLE_NAME = "kernel"
"""Cell class for the LFADS Generative GRU + Controller Input
This cell uses two GRUClipCells: One for the Generator and one for the Controller.
The Controller - This is equivalent to the "z2" RNN layer in the other disentangling AE formulations.
- Optional -- only used if z2_units (LFADS: con_dim) > 0
- inputs: the concatenation of (a) the encoded controller inputs and (b) the generator cell's state
from the previous iteration transformed through the factor dense layer.
(on the zeroth step, b starts with f-encoded latents)
The encoded controller inputs are themselves the output of an RNN with dim size z1_units,
or 'ci_enc_dim' in LFADS
- initial state: in LFADS -- a **learnable Variable** of zeros.
The Generator - inputs: the output of the Controller cell and optionally 'external' inputs.
- initial state: in LFADS -- a sample of a posterior distribution that is parameterized
by an encoder.
The two cells share the same initialization parameters
(activations, initializers, bias, dropout, regularizer, etc.) except for the number of units.
Arguments:
units_gen: Positive integer, number of units in generator RNN cell.
z2_units: Positive integer, number of units in controller RNN cell. (units_con in LFADS)
factors_dim: Number of units in Dense layer for factors output.
This layer would normally be external to the RNN. However, in LFADS, the factors dense layer
is also used to transform the t-1 generator cell state which becomes part of the _inputs_
to the controller cell.
z_latent_size: Dimensionality of variational posterior from controller output --> inputs to controller RNN (LFADS: co_dim)
ext_input_dim: Size of external input. The cell input will be split into encoded_z and ext_input depending
on this value. Can be 0.
inject_ext_input_to_gen: Only makes sense if ext_input_dim is > 0, and `False` is not implemented.
activation: Activation function to use.
Default: hyperbolic tangent (`tanh`).
If you pass None, no activation is applied (ie. "linear" activation: `a(x) = x`).
recurrent_activation: Activation function to use for the recurrent step.
Default: hard sigmoid (`hard_sigmoid`).
If you pass `None`, no activation is applied (ie. "linear" activation: `a(x) = x`).
Note: LFADS uses normal sigmoid.
use_bias: Boolean, whether the layer uses a bias vector.
kernel_initializer: Initializer for the `kernel` weights matrix,
used for the linear transformation of the inputs.
Default: lecun_normal
Vanilla tensorflow default is glorot_uniform.
recurrent_initializer: Initializer for the `recurrent_kernel` weights matrix,
used for the linear transformation of the recurrent state.
Default: orthogonal
LFADS uses lecun_normal
bias_initializer: Initializer for the bias vector.
Default: zeros
Note: LFADS uses ones for gate bias and zeros for candidate bias
kernel_regularizer: Regularizer function applied to the `kernel` weights matrix.
Default: None
recurrent_regularizer: Regularizer function applied to the `recurrent_kernel` weights matrix.
Default: 'l2' at 0.01
Note: LFADS uses L2 regularization with per-cell scaling.
Default for generator is 2000., and for controller is 0. (sum(v*v)*scale*0.5) / numel
bias_regularizer: Regularizer function applied to the bias vector.
Default: None
kernel_constraint: Constraint function applied to the `kernel` weights matrix.
Default: None
recurrent_constraint: Constraint function applied to the `recurrent_kernel` weights matrix.
Default: None
bias_constraint: Constraint function applied to the bias vector.
Default: None
dropout: Float between 0 and 1.
Fraction of the units to drop for the linear transformation of the inputs.
Default: 0.05
recurrent_dropout: Float between 0 and 1.
Fraction of the units to drop for the linear transformation of the recurrent state.
Default: 0.0
implementation: Implementation mode, either 1 or 2.
Mode 1 will structure its operations as a larger number of
smaller dot products and additions, whereas mode 2 will
batch them into fewer, larger operations. These modes will
have different performance profiles on different hardware and
for different applications.
Note: This applies to the sub-cells.
reset_after: GRU convention (whether to apply reset gate after or
before matrix multiplication). False = "before" (default),
True = "after" (CuDNN compatible).
clip_value: Value at which to clip the GRU cell output.
Default: np.inf (no clipping)
Call arguments:
inputs: A 2D tensor, composed of the following (concatenated together).
- Encoded Z1 (LFADS: "controller inputs", other frameworks: half way through dynamic or z-encoding).
- (Optional) External Input. Set size with `ext_input_dim`, can be 0.
states: List of state tensors corresponding to the previous timestep.
- gen_cell: Generator cell state, of size `units_gen`. Typically initialized from a sample of the f-latent
distribution q(f) (LFADS: "encoded initial conditions"; others: "static").
- z2_cell: Z2 cell state of size `z2_units`. Initialized with Variable inited to zeros. (LFADS: controller input)
- z_latent x 3: Output only for tracking purposes and external KL loss. Not fed back to next iteration.
Controller output means, variances, and sampled output (same as means during *testing*)
- factors: The main output. Not fed back to next iteration.
training: Python boolean indicating whether the layer should behave in
training mode or in inference mode. Only relevant when `dropout` or
`recurrent_dropout` is used.
"""
def __init__(self,
units_gen,
z2_units,
factors_dim,
z_latent_size,
ext_input_dim,
inject_ext_input_to_gen=True,
kernel_initializer='lecun_normal',
bias_initializer='zeros',
recurrent_regularizer='l2',
dropout=0.05,
clip_value=np.inf,
**kwargs):
self.units_gen = units_gen
self.z2_units = z2_units
self.factors_dim = factors_dim
self.z_latent_size = z_latent_size
self.ext_input_dim = ext_input_dim
self.inject_ext_input_to_gen = inject_ext_input_to_gen
self.units = z2_units + units_gen + 3*z_latent_size + factors_dim
super().__init__(**kwargs)
self.dropout = tfkl.Dropout(dropout)
self.fac_lin = tfkl.Dense(self.factors_dim, use_bias=False,
kernel_initializer='lecun_normal', # stdev = 1 / np.sqrt(in_size)
kernel_constraint='unit_norm') # w / sqrt(sum(w**2))
# Note, we use norm constraint whereas LFADS uses norm on init only.
from indl.rnn.gru_clip import GRUClipCell
if self.z2_units > 0:
self.z2_cell = GRUClipCell(self.z2_units,
kernel_initializer=kernel_initializer,
bias_initializer=bias_initializer,
recurrent_regularizer=recurrent_regularizer,
dropout=dropout,
clip_value=clip_value,
**kwargs)
else:
self.z2_cell = None
self.mean_lin = tfkl.Dense(self.z_latent_size, kernel_initializer='lecun_normal', bias_initializer='zeros')
self.logvar_lin = tfkl.Dense(self.z_latent_size, kernel_initializer='lecun_normal', bias_initializer='zeros')
self.gen_cell = GRUClipCell(self.units_gen,
kernel_initializer=kernel_initializer,
bias_initializer=bias_initializer,
recurrent_regularizer=recurrent_regularizer,
dropout=dropout,
clip_value=clip_value,
**kwargs)
@property
def state_size(self):
# [gen_s_new, z2_state, z_latent_mean, z_latent_logvar, q_z_sample, factors_new]
state_sizes = [self.gen_cell.state_size]
if self.z2_units > 0:
state_sizes.append(self.z2_cell.state_size)
return tuple(state_sizes) + (self.z_latent_size,)*3 + (self.factors_dim,)
@property
def output_size(self):
return self.z2_units + self.units_gen + 3 * self.z_latent_size + self.factors_dim
@tf_utils.shape_type_conversion
def build(self, input_shape):
input_dim = input_shape[-1]
if self.z2_units > 0:
self.z2_cell.build(input_dim + self.factors_dim + self.ext_input_dim)
self.gen_cell.build(self.z_latent_size + self.ext_input_dim)
self.built = (self.z2_units == 0 or self.z2_cell.built) and self.gen_cell.built
def get_config(self):
config = {
'units_gen': self.units_gen,
'z2_units': self.z2_units,
'factors_dim': self.factors_dim,
'z_latent_size': self.z_latent_size,
'ext_input_dim': self.ext_input_dim,
'inject_ext_input_to_gen': self.inject_ext_input_to_gen
}
base_config = super().get_config()
gru_config = self.gen_cell.get_config()
return dict(list(base_config.items()) + list(gru_config.items()) + list(config.items()))
def get_initial_state(self, inputs=None, batch_size=None, dtype=None, make_K_tensors=True):
init_state = [self.gen_cell.get_initial_state(inputs=inputs, batch_size=batch_size, dtype=dtype)]
if self.z2_units > 0:
init_state += [self.z2_cell.get_initial_state(inputs=inputs, batch_size=batch_size, dtype=dtype)]
from tensorflow.python.keras.layers.recurrent import _generate_zero_filled_state
if inputs is not None:
batch_size = tf.shape(inputs)[0]
init_state += [_generate_zero_filled_state(batch_size, self.z_latent_size, dtype) for _ in range(3)]
init_state += [_generate_zero_filled_state(batch_size, self.factors_dim, dtype)]
if make_K_tensors:
# import tensorflow.keras.backend as K
# K.is_tensor(init_state[0])
init_state = [tfkl.Lambda(lambda x: x)(_) for _ in init_state]
return tuple(init_state)
def call(self, inputs, states, training=None):
if training is None:
training = K.learning_phase()
# if external inputs are used split the inputs
if self.ext_input_dim > 0:
z1 = inputs[:, :-self.ext_input_dim]
ext_inputs = inputs[:, -self.ext_input_dim:]
else:
z1 = inputs
ext_inputs = None
gen_state, z2_state = states[:2]
if self.z_latent_size > 0:
# if controller is used
# input to the controller is (con_i and previous step's factors)
prev_gen_dropped = self.dropout(gen_state, training=training)
prev_fac = self.fac_lin(prev_gen_dropped)
z2_inputs = tf.concat([z1, prev_fac], axis=1)
z2_inputs = self.dropout(z2_inputs, training=training)
# controller GRU recursion, get new state
z2_outputs, z2_state = self.z2_cell(z2_inputs, z2_state, training=training)
# calculate the inputs to the generator
# transformation to mean and logvar of the posterior
# TODO: use make_variational(params, z2_state)
z_latent_mean = self.mean_lin(z2_state)
z_latent_logvar = self.logvar_lin(z2_state)
z_latent_dist = DiagonalGaussianFromExisting(z_latent_mean, z_latent_logvar)
if training: # TODO: (training or "posterior_sample_and_average"), whatever the latter is.
q_z_sample = z_latent_dist.sample
else:
q_z_sample = z_latent_dist.mean
else:
# pass zeros (0-dim) as inputs to generator
q_z_sample = tf.zeros([tf.shape(input=gen_state)[0], 0])
z2_state = z_latent_mean = z_latent_logvar = tf.zeros([tf.shape(input=gen_state)[0], 0])
# generator's inputs
if self.ext_input_dim > 0 and self.inject_ext_input_to_gen:
# passing external inputs along with controller output as generator's input
gen_inputs = tf.concat([q_z_sample, ext_inputs], axis=1)
elif self.ext_input_dim > 0 and not self.inject_ext_input_to_gen:
assert 0, "Not Implemented!"
else:
# using only controller output as generator's input
gen_inputs = q_z_sample
# generator GRU recursion, get the new state
gen_outputs, gen_s_new = self.gen_cell(gen_inputs, gen_state, training=training)
# calculate the factors
gen_s_new_dropped = self.dropout(gen_s_new, training=training)
factors_new = self.fac_lin(gen_s_new_dropped)
# Output the states and other values to make them available after RNN
new_state = [gen_s_new, z2_state, z_latent_mean, z_latent_logvar, q_z_sample, factors_new]
return new_state, new_state
output_size
property
readonly
Integer or TensorShape: size of outputs produced by this cell.
state_size
property
readonly
size(s) of state(s) used by this cell.
It can be represented by an Integer, a TensorShape or a tuple of Integers or TensorShapes.
call(self, inputs, states, training=None)
The function that contains the logic for one RNN step calculation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
inputs |
the input tensor, which is a slide from the overall RNN input by the time dimension (usually the second dimension). |
required | |
states |
the state tensor from previous step, which has the same shape
as |
required |
Returns:
Type | Description |
---|---|
A tuple of two tensors |
|
Source code in indl/model/lfads/complex.py
def call(self, inputs, states, training=None):
if training is None:
training = K.learning_phase()
# if external inputs are used split the inputs
if self.ext_input_dim > 0:
z1 = inputs[:, :-self.ext_input_dim]
ext_inputs = inputs[:, -self.ext_input_dim:]
else:
z1 = inputs
ext_inputs = None
gen_state, z2_state = states[:2]
if self.z_latent_size > 0:
# if controller is used
# input to the controller is (con_i and previous step's factors)
prev_gen_dropped = self.dropout(gen_state, training=training)
prev_fac = self.fac_lin(prev_gen_dropped)
z2_inputs = tf.concat([z1, prev_fac], axis=1)
z2_inputs = self.dropout(z2_inputs, training=training)
# controller GRU recursion, get new state
z2_outputs, z2_state = self.z2_cell(z2_inputs, z2_state, training=training)
# calculate the inputs to the generator
# transformation to mean and logvar of the posterior
# TODO: use make_variational(params, z2_state)
z_latent_mean = self.mean_lin(z2_state)
z_latent_logvar = self.logvar_lin(z2_state)
z_latent_dist = DiagonalGaussianFromExisting(z_latent_mean, z_latent_logvar)
if training: # TODO: (training or "posterior_sample_and_average"), whatever the latter is.
q_z_sample = z_latent_dist.sample
else:
q_z_sample = z_latent_dist.mean
else:
# pass zeros (0-dim) as inputs to generator
q_z_sample = tf.zeros([tf.shape(input=gen_state)[0], 0])
z2_state = z_latent_mean = z_latent_logvar = tf.zeros([tf.shape(input=gen_state)[0], 0])
# generator's inputs
if self.ext_input_dim > 0 and self.inject_ext_input_to_gen:
# passing external inputs along with controller output as generator's input
gen_inputs = tf.concat([q_z_sample, ext_inputs], axis=1)
elif self.ext_input_dim > 0 and not self.inject_ext_input_to_gen:
assert 0, "Not Implemented!"
else:
# using only controller output as generator's input
gen_inputs = q_z_sample
# generator GRU recursion, get the new state
gen_outputs, gen_s_new = self.gen_cell(gen_inputs, gen_state, training=training)
# calculate the factors
gen_s_new_dropped = self.dropout(gen_s_new, training=training)
factors_new = self.fac_lin(gen_s_new_dropped)
# Output the states and other values to make them available after RNN
new_state = [gen_s_new, z2_state, z_latent_mean, z_latent_logvar, q_z_sample, factors_new]
return new_state, new_state
get_config(self)
Returns the config of the layer.
A layer config is a Python dictionary (serializable) containing the configuration of a layer. The same layer can be reinstantiated later (without its trained weights) from this configuration.
The config of a layer does not include connectivity
information, nor the layer class name. These are handled
by Network
(one layer of abstraction above).
Returns:
Type | Description |
---|---|
Python dictionary. |
Source code in indl/model/lfads/complex.py
def get_config(self):
config = {
'units_gen': self.units_gen,
'z2_units': self.z2_units,
'factors_dim': self.factors_dim,
'z_latent_size': self.z_latent_size,
'ext_input_dim': self.ext_input_dim,
'inject_ext_input_to_gen': self.inject_ext_input_to_gen
}
base_config = super().get_config()
gru_config = self.gen_cell.get_config()
return dict(list(base_config.items()) + list(gru_config.items()) + list(config.items()))
dists
DiagonalGaussianFromExisting (Gaussian)
Diagonal Gaussian with different constant mean and variances in each dimension.
Source code in indl/model/lfads/dists.py
class DiagonalGaussianFromExisting(Gaussian):
"""
Diagonal Gaussian with different constant mean and variances in each
dimension.
"""
def __init__(self, mean_bxn, logvar_bxn, var_min=0.0):
self.mean_bxn = mean_bxn
if var_min > 0.0:
logvar_bxn = tf.math.log(tf.exp(logvar_bxn) + var_min)
# logvar_bxn = tf.nn.relu(logvar_bxn) + tf.math.log(var_min)
self.logvar_bxn = logvar_bxn
self.noise_bxn = noise_bxn = tf.random.normal(tf.shape(input=logvar_bxn))
#self.noise_bxn.set_shape([None, z_size])
self.sample_bxn = mean_bxn + tf.exp(0.5 * logvar_bxn) * noise_bxn
def logp(self, z=None):
"""Compute the log-likelihood under the distribution.
Args:
z (optional): value to compute likelihood for, if None, use sample.
Returns:
The likelihood of z under the model.
"""
if z is None:
z = self.sample
# This is needed to make sure that the gradients are simple.
# The value of the function shouldn't change.
if z == self.sample_bxn:
return gaussian_pos_log_likelihood(self.mean_bxn, self.logvar_bxn, self.noise_bxn)
return diag_gaussian_log_likelihood(z, self.mean_bxn, self.logvar_bxn)
logp(self, z=None)
Compute the log-likelihood under the distribution.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
z |
optional |
value to compute likelihood for, if None, use sample. |
None |
Returns:
Type | Description |
---|---|
The likelihood of z under the model. |
Source code in indl/model/lfads/dists.py
def logp(self, z=None):
"""Compute the log-likelihood under the distribution.
Args:
z (optional): value to compute likelihood for, if None, use sample.
Returns:
The likelihood of z under the model.
"""
if z is None:
z = self.sample
# This is needed to make sure that the gradients are simple.
# The value of the function shouldn't change.
if z == self.sample_bxn:
return gaussian_pos_log_likelihood(self.mean_bxn, self.logvar_bxn, self.noise_bxn)
return diag_gaussian_log_likelihood(z, self.mean_bxn, self.logvar_bxn)
Gaussian
Base class for Gaussian distribution classes.
Source code in indl/model/lfads/dists.py
class Gaussian(object):
"""Base class for Gaussian distribution classes."""
@property
def mean(self):
return self.mean_bxn
@property
def logvar(self):
return self.logvar_bxn
@property
def noise(self):
return tf.random.normal(tf.shape(input=self.logvar))
@property
def sample(self):
# return self.mean + tf.exp(0.5 * self.logvar) * self.noise
return self.sample_bxn
KLCost_GaussianGaussianProcessSampled
log p(x|z) + KL(q||p) terms for Gaussian posterior and Gaussian process prior via sampling.
The log p(x|z) term is the reconstruction error under the model. The KL term represents the penalty for passing information from the encoder to the decoder. To sample KL(q||p), we simply sample ln q - ln p by drawing samples from q and averaging.
Source code in indl/model/lfads/dists.py
class KLCost_GaussianGaussianProcessSampled(object):
""" log p(x|z) + KL(q||p) terms for Gaussian posterior and Gaussian process
prior via sampling.
The log p(x|z) term is the reconstruction error under the model.
The KL term represents the penalty for passing information from the encoder
to the decoder.
To sample KL(q||p), we simply sample
ln q - ln p
by drawing samples from q and averaging.
"""
def __init__(self, post_zs, prior_z_process):
"""Create a lower bound in three parts, normalized reconstruction
cost, normalized KL divergence cost, and their sum.
Args:
post_zs: posterior z ~ q(z|x)
prior_z_process: prior AR(1) process
"""
# assert len(post_zs) > 1, "GP is for time, need more than 1 time step."
# assert isinstance(prior_z_process, GaussianProcess), "Must use GP."
# L = -KL + log p(x|z), to maximize bound on likelihood
# -L = KL - log p(x|z), to minimize bound on NLL
# so 'KL cost' is postive KL divergence
# sample from the posterior for all time points and dimensions
post_zs_sampled = post_zs.sample
# sum KL over time and dimension axis
logq_bxu = tf.reduce_sum(input_tensor=post_zs.logp(post_zs_sampled), axis=[1, 2])
logp_bxu = 0
num_steps = post_zs.mean.get_shape()[1]
for i in range(num_steps):
# posterior is independent in time, prior is not
if i == 0:
z_tm1_bxu = None
else:
z_tm1_bxu = post_zs_sampled[:, i-1, :]
logp_bxu += tf.reduce_sum(input_tensor=prior_z_process.logp_t(
post_zs_sampled[:, i, :], z_tm1_bxu), axis=[1])
kl_b = logq_bxu - logp_bxu
self.kl_cost_b = kl_b
__init__(self, post_zs, prior_z_process)
special
Create a lower bound in three parts, normalized reconstruction cost, normalized KL divergence cost, and their sum.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
post_zs |
posterior z ~ q(z|x) |
required | |
prior_z_process |
prior AR(1) process |
required |
Source code in indl/model/lfads/dists.py
def __init__(self, post_zs, prior_z_process):
"""Create a lower bound in three parts, normalized reconstruction
cost, normalized KL divergence cost, and their sum.
Args:
post_zs: posterior z ~ q(z|x)
prior_z_process: prior AR(1) process
"""
# assert len(post_zs) > 1, "GP is for time, need more than 1 time step."
# assert isinstance(prior_z_process, GaussianProcess), "Must use GP."
# L = -KL + log p(x|z), to maximize bound on likelihood
# -L = KL - log p(x|z), to minimize bound on NLL
# so 'KL cost' is postive KL divergence
# sample from the posterior for all time points and dimensions
post_zs_sampled = post_zs.sample
# sum KL over time and dimension axis
logq_bxu = tf.reduce_sum(input_tensor=post_zs.logp(post_zs_sampled), axis=[1, 2])
logp_bxu = 0
num_steps = post_zs.mean.get_shape()[1]
for i in range(num_steps):
# posterior is independent in time, prior is not
if i == 0:
z_tm1_bxu = None
else:
z_tm1_bxu = post_zs_sampled[:, i-1, :]
logp_bxu += tf.reduce_sum(input_tensor=prior_z_process.logp_t(
post_zs_sampled[:, i, :], z_tm1_bxu), axis=[1])
kl_b = logq_bxu - logp_bxu
self.kl_cost_b = kl_b
LearnableAutoRegressive1Prior
AR(1) model where autocorrelation and process variance are learned parameters. Assumed zero mean.
Source code in indl/model/lfads/dists.py
class LearnableAutoRegressive1Prior(object):
"""
AR(1) model where autocorrelation and process variance are learned
parameters. Assumed zero mean.
"""
def __init__(self, batch_size, z_size,
autocorrelation_taus, noise_variances,
do_train_prior_ar_atau, do_train_prior_ar_nvar,
name):
"""Create a learnable autoregressive (1) process.
Args:
batch_size: The size of the batch, i.e. 0th dim in 2D tensor of samples.
z_size: The dimension of the distribution, i.e. 1st dim in 2D tensor.
autocorrelation_taus: The auto correlation time constant of the AR(1)
process.
A value of 0 is uncorrelated gaussian noise.
noise_variances: The variance of the additive noise, *not* the process
variance.
do_train_prior_ar_atau: Train or leave as constant, the autocorrelation?
do_train_prior_ar_nvar: Train or leave as constant, the noise variance?
num_steps: Number of steps to run the process.
name: The name to prefix to learned TF variables.
"""
# Note the use of the plural in all of these quantities. This is intended
# to mark that even though a sample z_t from the posterior is thought of a
# single sample of a multidimensional gaussian, the prior is actually
# thought of as U AR(1) processes, where U is the dimension of the inferred
# input.
size_bx1 = tf.stack([batch_size, 1])
size__xu = [None, z_size]
# process variance, the variance at time t over all instantiations of AR(1)
# with these parameters.
log_evar_inits_1xu = tf.expand_dims(tf.math.log(noise_variances), 0)
self.logevars_1xu = logevars_1xu = \
tf.Variable(log_evar_inits_1xu, name=name+"/logevars", dtype=tf.float32,
trainable=do_train_prior_ar_nvar)
self.logevars_bxu = logevars_bxu = tf.tile(logevars_1xu, size_bx1)
logevars_bxu.set_shape(size__xu) # tile loses shape
# \tau, which is the autocorrelation time constant of the AR(1) process
log_atau_inits_1xu = tf.expand_dims(tf.math.log(autocorrelation_taus), 0)
self.logataus_1xu = logataus_1xu = \
tf.Variable(log_atau_inits_1xu, name=name+"/logatau", dtype=tf.float32,
trainable=do_train_prior_ar_atau)
# phi in x_t = \mu + phi x_tm1 + \eps
# phi = exp(-1/tau)
# phi = exp(-1/exp(logtau))
# phi = exp(-exp(-logtau))
phis_1xu = tf.exp(-tf.exp(-logataus_1xu))
self.phis_bxu = phis_bxu = tf.tile(phis_1xu, size_bx1)
phis_bxu.set_shape(size__xu)
# process noise
# pvar = evar / (1- phi^2)
# logpvar = log ( exp(logevar) / (1 - phi^2) )
# logpvar = logevar - log(1-phi^2)
# logpvar = logevar - (log(1-phi) + log(1+phi))
self.logpvars_1xu = \
logevars_1xu - tf.math.log(1.0-phis_1xu) - tf.math.log(1.0+phis_1xu)
self.logpvars_bxu = logpvars_bxu = tf.tile(self.logpvars_1xu, size_bx1)
logpvars_bxu.set_shape(size__xu)
# process mean (zero but included in for completeness)
self.pmeans_bxu = pmeans_bxu = tf.zeros_like(phis_bxu)
def logp_t(self, z_t_bxu, z_tm1_bxu=None):
"""Compute the log-likelihood under the distribution for a given time t,
not the whole sequence.
Args:
z_t_bxu: sample to compute likelihood for at time t.
z_tm1_bxu (optional): sample condition probability of z_t upon.
Returns:
The likelihood of p_t under the model at time t. i.e.
p(z_t|z_tm1_bxu) = N(z_tm1_bxu * phis, eps^2)
"""
if z_tm1_bxu is None:
logp_tgtm1_bxu = diag_gaussian_log_likelihood(z_t_bxu, self.pmeans_bxu, self.logpvars_bxu)
else:
means_t_bxu = self.pmeans_bxu + self.phis_bxu * z_tm1_bxu
logp_tgtm1_bxu = diag_gaussian_log_likelihood(z_t_bxu, means_t_bxu, self.logevars_bxu)
return logp_tgtm1_bxu
__init__(self, batch_size, z_size, autocorrelation_taus, noise_variances, do_train_prior_ar_atau, do_train_prior_ar_nvar, name)
special
Create a learnable autoregressive (1) process.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
batch_size |
The size of the batch, i.e. 0th dim in 2D tensor of samples. |
required | |
z_size |
The dimension of the distribution, i.e. 1st dim in 2D tensor. |
required | |
autocorrelation_taus |
The auto correlation time constant of the AR(1) |
required | |
noise_variances |
The variance of the additive noise, not the process variance. |
required | |
do_train_prior_ar_atau |
Train or leave as constant, the autocorrelation? |
required | |
do_train_prior_ar_nvar |
Train or leave as constant, the noise variance? |
required | |
num_steps |
Number of steps to run the process. |
required | |
name |
The name to prefix to learned TF variables. |
required |
Source code in indl/model/lfads/dists.py
def __init__(self, batch_size, z_size,
autocorrelation_taus, noise_variances,
do_train_prior_ar_atau, do_train_prior_ar_nvar,
name):
"""Create a learnable autoregressive (1) process.
Args:
batch_size: The size of the batch, i.e. 0th dim in 2D tensor of samples.
z_size: The dimension of the distribution, i.e. 1st dim in 2D tensor.
autocorrelation_taus: The auto correlation time constant of the AR(1)
process.
A value of 0 is uncorrelated gaussian noise.
noise_variances: The variance of the additive noise, *not* the process
variance.
do_train_prior_ar_atau: Train or leave as constant, the autocorrelation?
do_train_prior_ar_nvar: Train or leave as constant, the noise variance?
num_steps: Number of steps to run the process.
name: The name to prefix to learned TF variables.
"""
# Note the use of the plural in all of these quantities. This is intended
# to mark that even though a sample z_t from the posterior is thought of a
# single sample of a multidimensional gaussian, the prior is actually
# thought of as U AR(1) processes, where U is the dimension of the inferred
# input.
size_bx1 = tf.stack([batch_size, 1])
size__xu = [None, z_size]
# process variance, the variance at time t over all instantiations of AR(1)
# with these parameters.
log_evar_inits_1xu = tf.expand_dims(tf.math.log(noise_variances), 0)
self.logevars_1xu = logevars_1xu = \
tf.Variable(log_evar_inits_1xu, name=name+"/logevars", dtype=tf.float32,
trainable=do_train_prior_ar_nvar)
self.logevars_bxu = logevars_bxu = tf.tile(logevars_1xu, size_bx1)
logevars_bxu.set_shape(size__xu) # tile loses shape
# \tau, which is the autocorrelation time constant of the AR(1) process
log_atau_inits_1xu = tf.expand_dims(tf.math.log(autocorrelation_taus), 0)
self.logataus_1xu = logataus_1xu = \
tf.Variable(log_atau_inits_1xu, name=name+"/logatau", dtype=tf.float32,
trainable=do_train_prior_ar_atau)
# phi in x_t = \mu + phi x_tm1 + \eps
# phi = exp(-1/tau)
# phi = exp(-1/exp(logtau))
# phi = exp(-exp(-logtau))
phis_1xu = tf.exp(-tf.exp(-logataus_1xu))
self.phis_bxu = phis_bxu = tf.tile(phis_1xu, size_bx1)
phis_bxu.set_shape(size__xu)
# process noise
# pvar = evar / (1- phi^2)
# logpvar = log ( exp(logevar) / (1 - phi^2) )
# logpvar = logevar - log(1-phi^2)
# logpvar = logevar - (log(1-phi) + log(1+phi))
self.logpvars_1xu = \
logevars_1xu - tf.math.log(1.0-phis_1xu) - tf.math.log(1.0+phis_1xu)
self.logpvars_bxu = logpvars_bxu = tf.tile(self.logpvars_1xu, size_bx1)
logpvars_bxu.set_shape(size__xu)
# process mean (zero but included in for completeness)
self.pmeans_bxu = pmeans_bxu = tf.zeros_like(phis_bxu)
logp_t(self, z_t_bxu, z_tm1_bxu=None)
Compute the log-likelihood under the distribution for a given time t, not the whole sequence.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
z_t_bxu |
sample to compute likelihood for at time t. |
required | |
z_tm1_bxu |
optional |
sample condition probability of z_t upon. |
None |
Returns:
Type | Description |
---|---|
The likelihood of p_t under the model at time t. i.e. p(z_t|z_tm1_bxu) = N(z_tm1_bxu * phis, eps^2) |
Source code in indl/model/lfads/dists.py
def logp_t(self, z_t_bxu, z_tm1_bxu=None):
"""Compute the log-likelihood under the distribution for a given time t,
not the whole sequence.
Args:
z_t_bxu: sample to compute likelihood for at time t.
z_tm1_bxu (optional): sample condition probability of z_t upon.
Returns:
The likelihood of p_t under the model at time t. i.e.
p(z_t|z_tm1_bxu) = N(z_tm1_bxu * phis, eps^2)
"""
if z_tm1_bxu is None:
logp_tgtm1_bxu = diag_gaussian_log_likelihood(z_t_bxu, self.pmeans_bxu, self.logpvars_bxu)
else:
means_t_bxu = self.pmeans_bxu + self.phis_bxu * z_tm1_bxu
logp_tgtm1_bxu = diag_gaussian_log_likelihood(z_t_bxu, means_t_bxu, self.logevars_bxu)
return logp_tgtm1_bxu
LearnableDiagonalGaussian (Gaussian)
Diagonal Gaussian with different means and variances in each dimension. Means and variances are optionally trainable. For LFADS ics prior, trainable_mean=True, trainable_var=False (both default). For LFADS cos prior (if not using AR1), trainable_mean=False, trainable_var=True
Source code in indl/model/lfads/dists.py
class LearnableDiagonalGaussian(Gaussian):
"""
Diagonal Gaussian with different means and variances in each
dimension. Means and variances are optionally trainable.
For LFADS ics prior, trainable_mean=True, trainable_var=False (both default).
For LFADS cos prior (if not using AR1), trainable_mean=False, trainable_var=True
"""
def __init__(self, batch_size, z_size, name, var, trainable_mean=True, trainable_var=False):
# MRK's fix, letting the mean of the prior to be trainable
mean_init = 0.0
num_steps = z_size[0]
num_dim = z_size[1]
z_mean_1xn = tf.compat.v1.get_variable(name=name+"/mean", shape=[1, 1, num_dim],
initializer=tf.compat.v1.constant_initializer(mean_init),
trainable=trainable_mean)
self.mean_bxn = tf.tile(z_mean_1xn, tf.stack([batch_size, num_steps, 1]))
self.mean_bxn.set_shape([None] + z_size)
# MRK, make Var trainable (for Controller prior)
var_init = np.log(var)
z_logvar_1xn = tf.compat.v1.get_variable(name=name+"/logvar", shape=[1, 1, num_dim],
initializer=tf.compat.v1.constant_initializer(var_init),
trainable=trainable_var)
self.logvar_bxn = tf.tile(z_logvar_1xn, tf.stack([batch_size, num_steps, 1]))
self.logvar_bxn.set_shape([None] + z_size)
# remove time axis if 1 (used for ICs)
if num_steps == 1:
self.mean_bxn = tf.squeeze(self.mean_bxn, axis=1)
self.logvar_bxn = tf.squeeze(self.logvar_bxn, axis=1)
self.noise_bxn = tf.random.normal(tf.shape(input=self.logvar_bxn))
diag_gaussian_log_likelihood(z, mu=0.0, logvar=0.0)
Log-likelihood under a Gaussian distribution with diagonal covariance. Returns the log-likelihood for each dimension. One should sum the results for the log-likelihood under the full multidimensional model.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
z |
The value to compute the log-likelihood. |
required | |
mu |
The mean of the Gaussian |
0.0 |
|
logvar |
The log variance of the Gaussian. |
0.0 |
Returns:
Type | Description |
---|---|
The log-likelihood under the Gaussian model. |
Source code in indl/model/lfads/dists.py
def diag_gaussian_log_likelihood(z, mu=0.0, logvar=0.0):
"""Log-likelihood under a Gaussian distribution with diagonal covariance.
Returns the log-likelihood for each dimension. One should sum the
results for the log-likelihood under the full multidimensional model.
Args:
z: The value to compute the log-likelihood.
mu: The mean of the Gaussian
logvar: The log variance of the Gaussian.
Returns:
The log-likelihood under the Gaussian model.
"""
return -0.5 * (logvar + np.log(2 * np.pi) + \
tf.square((z - mu) / tf.exp(0.5 * logvar)))
gaussian_pos_log_likelihood(unused_mean, logvar, noise)
Gaussian log-likelihood function for a posterior in VAE
Note: This function is specialized for a posterior distribution, that has the form of z = mean + sigma * noise.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
unused_mean |
ignore |
required | |
logvar |
The log variance of the distribution |
required | |
noise |
The noise used in the sampling of the posterior. |
required |
Returns:
Type | Description |
---|---|
The log-likelihood under the Gaussian model. |
Source code in indl/model/lfads/dists.py
def gaussian_pos_log_likelihood(unused_mean, logvar, noise):
"""Gaussian log-likelihood function for a posterior in VAE
Note: This function is specialized for a posterior distribution, that has the
form of z = mean + sigma * noise.
Args:
unused_mean: ignore
logvar: The log variance of the distribution
noise: The noise used in the sampling of the posterior.
Returns:
The log-likelihood under the Gaussian model.
"""
# ln N(z; mean, sigma) = - ln(sigma) - 0.5 ln 2pi - noise^2 / 2
return - 0.5 * (logvar + np.log(2 * np.pi) + tf.square(noise))