# Copyright 2018 Max Shinn <maxwell.shinn@yale.edu>
# 2018 Norman Lam <norman.lam@yale.edu>
#
# This file is part of PyDDM, and is available under the MIT license.
# Please see LICENSE.txt in the root directory for more information.
__all__ = ['LossFunction', 'LossSquaredError', 'LossLikelihood', 'LossBIC', 'LossRobustLikelihood', 'LossRobustBIC']
import logging
import numpy as np
from paranoid.decorators import accepts, returns, requires, ensures, paranoidclass
from paranoid.types import Self, Number, Positive0, Natural1
from ..sample import Sample
from ..model import Model
from ..logger import logger as _logger
[docs]class LossFunction(object):
"""An abstract class for a function to assess goodness of fit.
This is an abstract class for describing how well data fits a model.
When subclasses are initialized, they will be initialized with the
Sample object to which the model should be fit. Because the data
will not change but the model will change, this is specified with
initialization.
The optional `required_conditions` argument limits the
stratification of `sample` by conditions to only the conditions
mentioned in `required_conditions`. This decreases computation
time by only solving the model for the condition names listed in
`required_conditions`. For example, a simple DDM with no drift
and constant variaince would mean `required_conditions` is an
empty list.
The optional `method` argument can be "analytical", "numerical",
"cn", "implicit", or "explicit".
This will automatically parallelize if set_N_cpus() has been
called.
"""
@classmethod
def _generate(cls):
# Return an instance of each subclass which doesn't have a
# "setup" method, i.e. it takes no arguments.
subs = cls.__subclasses__()
for s in subs:
# Check if setup is the same as its parent.
if s.setup is LossFunction.setup:
samp = Sample.from_numpy_array(np.asarray([[.3, 1], [.4, 0], [.1, 0], [.2, 1]]), [])
yield s(sample=samp, dt=.01, T_dur=2)
def __init__(self, sample, required_conditions=None, method=None, **kwargs):
assert hasattr(self, "name"), "Solver needs a name"
self.sample = sample
self.required_conditions = required_conditions
self.method = method
self.setup(**kwargs)
[docs] def setup(self, **kwargs):
"""Initialize the loss function.
The optional `setup` function is executed at the end of the
initializaiton. It is executed only once at the beginning of
the fitting procedure.
This function may optionally be redefined in subclasses.
"""
pass
[docs] def loss(self, model):
"""Compute the value of the loss function for the given model.
This function must be redefined in subclasses.
`model` should be a Model object. This should return a
floating point value, where smaller values mean a better fit
of the model to the data.
"""
raise NotImplementedError("Loss function %s invalid: must define the loss(self, model) function" % self.__class__.__name__)
[docs] def cache_by_conditions(self, model):
"""Solve the model for all relevant conditions.
Solve `model` for each combination of conditions found within the
dataset. For example, if `required_conditions` is ["hand", "color"],
and hand can be left or right and color can be blue or green, solves
the model for: hand=left and color=blue; hand=right and color=blue;
hand=left and color=green, hand=right and color=green.
This is a convenience function for defining new loss
functions. There is generally no need to redefine this
function in subclasses.
"""
from ..functions import solve_all_conditions
return solve_all_conditions(model, sample=self.sample, method=self.method)
[docs]@paranoidclass
class LossSquaredError(LossFunction):
"""Squared-error loss function"""
name = "Squared Error"
@staticmethod
def _test(v):
assert v.dt in Positive0()
assert v.T_dur in Positive0()
assert v.hists_choice_upper != {}
assert v.hists_choice_lower != {}
assert v.target.size == 2*len(v.hists_choice_upper.keys())*(v.T_dur/v.dt+1)
@staticmethod
def _generate():
yield LossSquaredError(sample=next(Sample._generate()), dt=.01, T_dur=3)
def setup(self, dt, T_dur, **kwargs):
self.dt = dt
self.T_dur = T_dur
self.hists_choice_upper = {}
self.hists_choice_lower = {}
for comb in self.sample.condition_combinations(required_conditions=self.required_conditions):
self.hists_choice_upper[frozenset(comb.items())] = np.histogram(self.sample.subset(**comb).choice_upper, bins=int(T_dur/dt)+1, range=(0-dt/2, T_dur+dt/2))[0]/len(self.sample.subset(**comb))/dt # dt/2 (and +1) is continuity correction
self.hists_choice_lower[frozenset(comb.items())] = np.histogram(self.sample.subset(**comb).choice_lower, bins=int(T_dur/dt)+1, range=(0-dt/2, T_dur+dt/2))[0]/len(self.sample.subset(**comb))/dt
self.target = np.concatenate([s for i in sorted(self.hists_choice_upper.keys()) for s in [self.hists_choice_upper[i], self.hists_choice_lower[i]]])
@accepts(Self, Model)
@returns(Number)
@requires("model.dt == self.dt and model.T_dur == self.T_dur")
def loss(self, model):
assert model.dt == self.dt and model.T_dur == self.T_dur
sols = self.cache_by_conditions(model)
this = np.concatenate([s for i in sorted(self.hists_choice_upper.keys()) for s in [sols[i].pdf("_top"), sols[i].pdf("_bottom")]])
return np.sum((this-self.target)**2)*self.dt**2
[docs]@paranoidclass
class LossLikelihood(LossFunction):
"""Likelihood loss function"""
name = "Negative log likelihood"
_robustness_param = 0
@staticmethod
def _test(v):
assert v.dt in Positive0()
assert v.T_dur in Positive0()
@staticmethod
def _generate():
yield LossLikelihood(sample=next(Sample._generate()), dt=.01, T_dur=3)
def setup(self, dt, T_dur, **kwargs):
self.dt = dt
self.T_dur = T_dur
# Each element in the dict is indexed by the conditions of the
# model (e.g. coherence, trial conditions) as a frozenset.
# Each contains a tuple of lists, which are to contain the
# position for each within a histogram. For instance, if a
# reaction time corresponds to position i, then we can index a
# list representing a normalized histogram/"pdf" (given by dt
# and T_dur) for immediate access to the probability of
# obtaining that value.
self.hist_indexes = {}
for comb in self.sample.condition_combinations(required_conditions=self.required_conditions):
s = self.sample.subset(**comb)
maxt = max(max(s.choice_upper) if s.choice_upper.size != 0 else -1, max(s.choice_lower) if s.choice_lower.size != 0 else -1)
assert maxt <= self.T_dur, "Simulation time T_dur=%f not long enough for these data. (max sample RT=%f)" % (self.T_dur, maxt)
# Find the integers which correspond to the timepoints in
# the pdfs. Also don't group them into the first bin
# because this creates bias.
choice_upper = [int(round(e/dt)) for e in s.choice_upper]
choice_lower = [int(round(e/dt)) for e in s.choice_lower]
undec = self.sample.undecided
self.hist_indexes[frozenset(comb.items())] = (choice_upper, choice_lower, undec)
@accepts(Self, Model)
@returns(Number)
@requires("model.dt == self.dt and model.T_dur == self.T_dur")
def loss(self, model):
assert model.dt == self.dt and model.T_dur == self.T_dur
sols = self.cache_by_conditions(model)
loglikelihood = 0
for k in sols.keys():
# nans come from negative values in the pdfs, which in
# turn come from the dx parameter being set too low. This
# comes up when fitting, because sometimes the algorithm
# will "explore" and look at extreme parameter values.
# For example, this arises when standard deviation is very
# close to 0. We will issue a warning now, but throwing
# an exception may be the better way to handle this to
# make sure it doesn't go unnoticed.
with np.errstate(all='raise', under='ignore'):
try:
loglikelihood += np.sum(np.log(sols[k].pdf("_top")[self.hist_indexes[k][0]] + self._robustness_param))
loglikelihood += np.sum(np.log(sols[k].pdf("_bottom")[self.hist_indexes[k][1]] + self._robustness_param))
except FloatingPointError:
minlike = min(np.min(sols[k].pdf("_top")), np.min(sols[k].pdf("_bottom")))
if minlike == 0:
_logger.warning("Infinite likelihood encountered. Please either use a Robust likelihood method (e.g. LossRobustLikelihood or LossRobustBIC) or even better use a mixture model (via an Overlay) which covers the full range of simulated times to avoid infinite negative log likelihood. See the FAQs in the documentation for more information.")
elif minlike < 0:
_logger.warning("Infinite likelihood encountered. Simulated histogram is less than zero in likelihood calculation. Try decreasing dt.")
_logger.debug(model.parameters())
return np.inf
# This is not a valid way to incorporate undecided trials into a likelihood
#if sols[k].prob_undecided() > 0:
# loglikelihood += np.log(sols[k].prob_undecided())*self.hist_indexes[k][2]
return -loglikelihood
[docs]@paranoidclass
class LossBIC(LossLikelihood):
"""BIC loss function, functionally equivalent to LossLikelihood"""
name = "BIC"
@staticmethod
def _test(v):
assert v.nparams in Natural1()
assert v.samplesize in Natural1()
@staticmethod
def _generate():
samp = Sample.from_numpy_array(np.asarray([[.3, 1], [.4, 0], [.1, 0], [.2, 1]]), [])
yield LossBIC(sample=samp, nparams=4, samplesize=100, dt=.01, T_dur=3)
def setup(self, nparams, samplesize, **kwargs):
self.nparams = nparams
self.samplesize = samplesize
LossLikelihood.setup(self, **kwargs)
@accepts(Self, Model)
@returns(Number)
@requires("model.dt == self.dt and model.T_dur == self.T_dur")
def loss(self, model):
loglikelihood = -LossLikelihood.loss(self, model)
return np.log(self.samplesize)*self.nparams - 2*loglikelihood
[docs]class LossRobustLikelihood(LossLikelihood):
"""Likelihood loss function which will not fail for infinite likelihoods.
Usually you will want to use LossLikelihood instead. See the FAQs
in the documentation for more information on how this differs from
LossLikelihood.
"""
_robustness_param = 1e-20
[docs]class LossRobustBIC(LossBIC):
"""BIC loss function which will not fail for infinite likelihoods.
Usually you will want to use LossBIC instead. See the FAQs in the
documentation for more information on how this differs from
LossBIC.
"""
_robustness_param = 1e-20