Source code for pyddm.model

# Copyright 2018 Max Shinn <maxwell.shinn@yale.edu>
#           2018 Norman Lam <norman.lam@yale.edu>
# 
# This file is part of PyDDM, and is available under the MIT license.
# Please see LICENSE.txt in the root directory for more information.

import logging
import numpy as np
from scipy import sparse
import scipy.sparse.linalg
from .tridiag import TriDiagMatrix

from . import parameters as param
from .analytic import analytic_ddm
from .models.drift import DriftConstant, Drift
from .models.noise import NoiseConstant, Noise
from .models.ic import ICPointSourceCenter, ICPoint, InitialCondition
from .models.bound import BoundConstant, BoundCollapsingLinear, Bound
from .models.overlay import OverlayNone, Overlay
from .models.paranoid_types import Conditions
from .sample import Sample
from .solution import Solution
from .fitresult import FitResult, FitResultEmpty
from .logger import logger as _logger

from paranoid.types import Numeric, Number, Self, List, Generic, Positive, Positive0, String, Boolean, Natural1, Natural0, Dict, Set, Integer, NDArray, Maybe, Nothing
from paranoid.decorators import accepts, returns, requires, ensures, paranoidclass, paranoidconfig
import dis

try:
    from . import csolve
    HAS_CSOLVE = True
except ImportError:
    HAS_CSOLVE = False

# "Model" describes how a variable is dependent on other variables.
# Principally, we want to know how drift and noise depend on x and t.
# `name` is the type of dependence (e.g. "linear") for methods which
# implement the algorithms, and any parameters these algorithms could
# need should be passed as kwargs. To compare to legacy code, the
# `name` used to be `f_mu_setting` or `f_sigma_setting` and kwargs now
# encompassed (e.g.) `param_mu_t_temp`.



##Pre-defined list of models that can be used, and the corresponding default parameters
[docs]@paranoidclass class Model(object): """A full simulation of a single DDM-style model. Each model simulation depends on five key components: - A description of how drift rate (drift) changes throughout the simulation. - A description of how variability (noise) changes throughout the simulation. - A description of how the boundary changes throughout the simulation. - Starting conditions for the model - Specific details of a task which cause dynamic changes in the model (e.g. a stimulus intensity change) This class manages these, and also provides the affiliated services, such as analytical or numerical simulations of the resulting reaction time distribution. """ # TODO it would be nice to have explicit "save" and "load" # functions, which just provide safe wrappers around saving and # loading text files with "repr" and "exec". @staticmethod def _test(v): assert v.get_dependence("drift") in Generic(Drift) assert v.get_dependence("noise") in Generic(Noise) assert v.get_dependence("bound") in Generic(Bound) assert v.get_dependence("IC") in Generic(InitialCondition) assert v.get_dependence("overlay") in Generic(Overlay) assert v.dx in Positive() assert v.dt in Positive() assert v.T_dur in Positive() assert v.name in String() assert v.required_conditions in List(String) assert v.fitresult in Generic(FitResult) @staticmethod def _generate(): # TODO maybe generate better models? # TODO test a model where the bound size changes based on a parameter #yield Model(dx=.01, dt=.01, T_dur=2) yield Model() # Default model yield Model(dx=.05, dt=.01, T_dur=3) # Non-default numerics #yield Model(dx=.005, dt=.005, T_dur=.5) def __init__(self, drift=DriftConstant(drift=0), noise=NoiseConstant(noise=1), bound=BoundConstant(B=1), IC=ICPointSourceCenter(), overlay=OverlayNone(), name="", dx=param.dx, dt=param.dt, T_dur=param.T_dur, fitresult=None, choice_names=("correct", "error")): """Construct a Model object from the 5 key components. The five key components of our DDM-style models describe how the drift rate (`drift`), noise (`noise`), and bounds (`bound`) change over time, and the initial conditions (`IC`). These five components are given by the parameters `drift`, `noise`, `bound`, and `IC`, respectively. They should be types which inherit from the types Drift, Noise, Bound, and InitialCondition, respectively. They default to constant unitary values. Additionally, simulation parameters can be set, such as time and spacial precision (`dt` and `dx`) and the simulation duration `T_dur`. If not specified, they will be taken from the defaults specified in the parameters file. If you are creating a model, `fitresult` should always be None. This is provided as an optional parameter because when models are output as a string (using "str" or "repr"), they must save fitting information. Thus, this allows you to fit a model, convert it to a string, save that string to a text file, and then run "exec" on that file in a new script to load the model. By default, the choice associated with the upper boundary is "correct responses" and the lower boundary is "error responses". To change these, set the `choice_names` argument to be a tuple containing two strings, with the names of the boundaries. So the default is ("correct", "error"), but could be anything, e.g. ("left", "right"), ("high value" and "low value"), etc. This is sometimes referred to as "accuracy coding" and "stimulus coding". When fitting data, this must match the choice names of the sample. The `name` parameter is exclusively for convenience, and may be used in plotting or in debugging. """ assert isinstance(name, str) self.name = name assert isinstance(drift, Drift) self._driftdep = drift assert isinstance(noise, Noise) self._noisedep = noise assert isinstance(bound, Bound) self._bounddep = bound assert isinstance(IC, InitialCondition) self._IC = IC assert isinstance(overlay, Overlay) self._overlay = overlay self.dependencies = [self._driftdep, self._noisedep, self._bounddep, self._IC, self._overlay] self.required_conditions = list(set([x for l in self.dependencies for x in l.required_conditions])) assert isinstance(choice_names, tuple) and len(choice_names) == 2, "choice_names must be a tuple of length 2" self.choice_names = choice_names self.dx = dx self.dt = dt if self.dx > .01: _logger.warning("dx is large. Estimated pdfs may be imprecise. Decrease dx to 0.01 or less.") if self.dt > .01: _logger.warning("dt is large. Estimated pdfs may be imprecise. Decrease dt to 0.01 or less.") self.T_dur = T_dur self.fitresult = FitResultEmpty() if fitresult is None else fitresult # If the model was fit, store the status here def __eq__(self, other): for i in range(0, len(self.dependencies)): if self.dependencies[i] != other.dependencies[i]: return False if self.dx != other.dx: return False if self.dt != other.dt: return False if self.fitresult != other.fitresult: return False if self.name != other.name: return False if self.choice_names != other.choice_names: return False return True # Get a string representation of the model def __repr__(self, pretty=False): # Use a list so they can be sorted allobjects = [("name", self.name), ("drift", self.get_dependence('drift')), ("noise", self.get_dependence('noise')), ("bound", self.get_dependence('bound')), ("IC", self.get_dependence('ic')), ("overlay", self.get_dependence('overlay')), ("dx", self.dx), ("dt", self.dt), ("T_dur", self.T_dur)] params = "" for n,o in allobjects: params += n + "=" + o.__repr__() if (n,o) != allobjects[-1]: if pretty: params += ",\n" + " "*(len(type(self).__name__)+1) else: params += ", " if not isinstance(self.fitresult, FitResultEmpty): if pretty: params += ",\n fitresult=" + repr(self.fitresult) else: params += ", fitresult=" + repr(self.fitresult) if self.choice_names != ("correct", "error"): if pretty: params += ",\n choice_names="+repr(self.choice_names) else: params += ", choice_names="+repr(self.choice_names) return type(self).__name__ + "(" + params + ")" def __str__(self): return self.__repr__(pretty=True)
[docs] def parameters(self): """Return all parameters in the model This will return a dictionary of dictionaries. The keys of this dictionary will be "drift", "noise", "bound", "IC", and "overlay". The values will be dictionaries, each containing the parameters used by these. Note that this includes both fixed parameters and Fittable parameters. If a parameter is fittable, it will return either a Fittable object or a Fitted object in place of the parameter, depending on whether or not the model has been fit to data yet. """ ret = {} for depname in ["drift", "noise", "bound", "IC", "overlay"]: ret[depname] = {} dep = self.get_dependence(depname) for param_name in dep.required_parameters: param_value = getattr(dep, param_name) ret[depname][param_name] = param_value return ret
[docs] def get_model_parameters(self): """Get an ordered list of all model parameters. Returns a list of each model parameter which can be varied during a fitting procedure. The ordering is arbitrary but is guaranteed to be in the same order as get_model_parameter_names() and set_model_parameters(). If multiple parameters refer to the same "Fittable" object, then that object will only be listed once. """ params = [] for dep in self.dependencies: for param_name in dep.required_parameters: param_value = getattr(dep, param_name) # If this can be fit to data if isinstance(param_value, Fittable): # be fit) via optimization If we have the same # Fittable object in two different components # inside the model, we only want to list it once. if id(param_value) not in map(id, params): params.append(param_value) return params
[docs] def get_model_parameter_names(self): """Get an ordered list of the names of all parameters in the model. Returns the name of each model parameter. The ordering is arbitrary, but is uaranteed to be in the same order as get_model_parameters() and set_model_parameters(). If multiple parameters refer to the same "Fittable" object, then that object will only be listed once, however the names of the parameters will be separated by a "/" character. """ params = [] param_names = [] for dep in self.dependencies: for param_name in dep.required_parameters: param_value = getattr(dep, param_name) # If this can be fit to data if isinstance(param_value, Fittable): # If we have the same Fittable object in two # different components inside the model, we only # want to list it once. if id(param_value) not in map(id, params): param_names.append(param_name) params.append(param_value) else: ind = list(map(id, params)).index(id(param_value)) if param_name not in param_names[ind].split("/"): param_names[ind] += "/" + param_name return param_names
[docs] def set_model_parameters(self, params): """Set the parameters of the model from an ordered list. Takes as an argument a list of parameters in the same order as those from get_model_parameters(). Sets the associated parameters as a "Fitted" object. If multiple parameters refer to the same "Fittable" object, then that object will only be listed once. """ old_params = self.get_model_parameters() param_object_ids = list(map(id, old_params)) assert len(params) == len(param_object_ids), "Invalid number of parameters specified: " \ "got %i, expected %i" % (len(params), len(param_object_ids)) new_params = [p if isinstance(p, Fittable) else op.make_fitted(p) \ for p,op in zip(params, old_params)] for dep in self.dependencies: for param_name in dep.required_parameters: param_value = getattr(dep, param_name) # If this can be fit to data if isinstance(param_value, Fittable): i = param_object_ids.index(id(param_value)) setattr(dep, param_name, new_params[i])
[docs] def get_fit_result(self): """Returns a FitResult object describing how the model was fit. Returns the FitResult object describing the last time this model was fit to data, including the loss function, fitting method, and the loss function value. If the model was never fit to data, this will return FitResultEmpty. """ return self.fitresult
[docs] def get_dependence(self, name): """Return the dependence object given by the string `name`.""" if name.lower() in ["drift", "driftdep", "_driftdep"]: return self._driftdep elif name.lower() in ["noise", "noisedep", "_noisedep"]: return self._noisedep elif name.lower() in ["b", "bound", "bounddep", "_bounddep"]: return self._bounddep elif name.lower() in ["ic", "initialcondition", "_ic"]: return self._IC elif name.lower() in ["overlay", "_overlay"]: return self._overlay raise NameError("Invalid dependence name")
def check_conditions_satisfied(self, conditions): rc = list(sorted(self.required_conditions)) ck = list(sorted(conditions.keys())) assert set(rc) - set(ck) == set(), \ "Please specify valid conditions for this simulation.\nSpecified: %s\nExpected: %s" % (str(ck), str(rc))
[docs] def get_model_type(self): """Return a dictionary which fully specifies the class of the five key model components.""" tt = lambda x : (x.depname, type(x)) return dict(map(tt, self.dependencies))
[docs] @accepts(Self, Conditions, Maybe(Positive0)) def x_domain(self, conditions, t=None): """A list which spans from the lower boundary to the upper boundary by increments of dx.""" # Find the maximum size of the bound across the t-domain in # case we have increasing bounds if t is None: B = max([self.get_dependence("bound").get_bound(t=t, conditions=conditions) for t in self.t_domain()]) else: B = self.get_dependence("bound").get_bound(t=t, conditions=conditions) B = np.ceil(B/self.dx)*self.dx # Align the bound to dx borders return np.arange(-B, B+0.1*self.dx, self.dx) # +.1*dx is to ensure that the largest number in the array is B
[docs] def t_domain(self): """A list of all of the timepoints over which the joint PDF will be defined (increments of dt from 0 to T_dur).""" return np.arange(0., self.T_dur+0.1*self.dt, self.dt)
[docs] def flux(self, x, t, conditions): """The flux across the boundary at position `x` at time `t`.""" drift_flux = self.get_dependence('drift').get_flux(x, t, dx=self.dx, dt=self.dt, conditions=conditions) noise_flux = self.get_dependence('noise').get_flux(x, t, dx=self.dx, dt=self.dt, conditions=conditions) return drift_flux + noise_flux
[docs] def IC(self, conditions): """The initial distribution at t=0. Returns a length N ndarray (where N is the size of x_domain()) which should sum to 1. """ return self.get_dependence('IC').get_IC(self.x_domain(conditions=conditions), dx=self.dx, conditions=conditions)
[docs] @accepts(Self, conditions=Conditions, cutoff=Boolean, seed=Natural0, rk4=Boolean) @returns(NDArray(t=Number, d=1)) @ensures('0 < len(return) <= len(self.t_domain())') @ensures('not cutoff --> len(return) == len(self.t_domain())') def simulate_trial(self, conditions={}, cutoff=True, rk4=True, seed=0): """Simulate the decision variable for one trial. Given conditions `conditions`, this function will simulate the decision variable for a single trial. It will cut off the simulation when the decision variable crosses the boundary unless `cutoff` is set to False. By default, Runge-Kutta is used to simulate the trial, however if `rk4` is set to False, the less efficient Euler's method is used instead. This returns a trajectory of the simulated trial over time as a numpy array. Note that this will return the same trajectory on each run unless the random seed `seed` is varied. Also note that you shouldn't normally need to use this function. To simulate an entire probability distributions, call Model.solve() and the results of the simulation will be in the returned Solution object. This is only useful for finding individual trajectories instead of the probability distribution as a whole. """ self.check_conditions_satisfied(conditions) h = self.dt T = self.t_domain() # Choose a starting position from the IC rng = np.random.RandomState(seed) ic = self.IC(conditions=conditions) x0 = rng.choice(self.x_domain(conditions=conditions), p=ic) pos = [x0] # Convenience functions _driftdep = self.get_dependence("drift") _noisedep = self.get_dependence("noise") fm = lambda x,t : _driftdep.get_drift(t=t, x=x, conditions=conditions) fs = lambda x,t : _noisedep.get_noise(t=t, x=x, conditions=conditions) for i in range(1, len(T)): # Stochastic Runge-Kutta order 4. See "Introduction to # Stochastic Differential Equations" by Thomas C. Gard rn = rng.randn() dw = np.sqrt(h)*rn drift1 = fm(t=T[i-1], x=pos[i-1]) s1 = fs(t=T[i-1], x=pos[i-1]) if rk4: # Use Runge-Kutta order 4 drift2 = fm(t=(T[i-1]+h/2), x=(pos[i-1] + drift1*h/2 + s1*dw/2)) # Should be dw/sqrt(2)? s2 = fs(t=(T[i-1]+h/2), x=(pos[i-1] + drift1*h/2 + s1*dw/2)) drift3 = fm(t=(T[i-1]+h/2), x=(pos[i-1] + drift2*h/2 + s2*dw/2)) s3 = fs(t=(T[i-1]+h/2), x=(pos[i-1] + drift2*h/2 + s2*dw/2)) drift4 = fm(t=(T[i-1]+h), x=(pos[i-1] + drift3*h + s3*dw)) # Should this be 1/2*s3*dw? s4 = fs(t=(T[i-1]+h), x=(pos[i-1] + drift3*h + s3*dw)) # Should this be 1/2*s3*dw? dx = h*(drift1 + 2*drift2 + 2*drift3 + drift4)/6 + dw*(s1 + 2*s2 + 2*s3 + s4)/6 else: # Use Euler's method dx = h*drift1 + dw*s1 pos.append(pos[i-1] + dx) B = self.get_dependence("bound").get_bound(t=T[i], conditions=conditions) if cutoff and (pos[i] > B or pos[i] < -B): break traj = self.get_dependence("overlay").apply_trajectory(trajectory=np.asarray(pos), model=self, seed=seed, rk4=rk4, conditions=conditions) if cutoff is False and len(traj) < len(T): traj = np.append(traj, [traj[-1]]*(len(T)-len(traj))) if len(traj) > len(T): traj = traj[0:len(T)] return traj
[docs] @accepts(Self, Conditions, Natural1, Boolean, Natural0) @returns(Sample) @paranoidconfig(max_runtime=.1) def simulated_solution(self, conditions={}, size=1000, rk4=True, seed=0): """Simulate individual trials to obtain a distribution. Given conditions `conditions` and the number `size` of trials to simulate, this will run the function "simulate_trial" `size` times, and use the result to find a histogram analogous to solve. Returns a Sample object. Note that in practice you should never need to use this function. This function uses an outdated method to simulate the model and should be used for comparison perposes only. To produce a probability density function of boundary crosses, use Model.solve(). To sample from the probability distribution (e.g. for finding confidence intervals for limited amounts of data), call Model.solve() and then use the Solution.resample() function of the resulting Solution. """ _logger.warning("To generate a sample from a model, please use Solution.resample(). The only practical purpose of the simulated_solution function is debugging the simulate_trial function for custom Overlays.") choice_upper_times = [] choice_lower_times = [] undec_count = 0 T = self.t_domain() for s in range(0, size): if s % 200 == 0: _logger.info("Simulating trial %i" % s) timecourse = self.simulate_trial(conditions=conditions, seed=(hash((s, seed)) % 2**32), cutoff=True, rk4=rk4) T_finish = T[len(timecourse) - 1] B = self.get_dependence("bound").get_bound(t=T_finish, conditions=conditions) # Correct for the fact that the particle could have # crossed at any point between T_finish-dt and T_finish. dt_correction = self.dt/2 # Determine whether the sim is a correct or error trial. if timecourse[-1] > B: choice_upper_times.append(T_finish - dt_correction) elif timecourse[-1] < -B: choice_lower_times.append(T_finish - dt_correction) elif len(timecourse) == len(T): undec_count += 1 else: raise SystemError("Internal error: Invalid particle simulation") aa = lambda x : np.asarray(x) conds = {k:(aa(len(choice_upper_times)*[v]), aa(len(choice_lower_times)*[v]), aa(undec_count*[v])) for k,v in conditions.items() if k and v} return Sample(aa(choice_upper_times), aa(choice_lower_times), undec_count, **conds)
[docs] @accepts(Self) @returns(Boolean) def has_analytical_solution(self): """Is it possible to find an analytic solution for this model?""" # First check to make sure drift doesn't vary with time or # particle location if self.get_dependence("drift")._uses_t() or self.get_dependence("drift")._uses_x(): return False # Check noise to make sure it doesn't vary with time or particle location if self.get_dependence("noise")._uses_t() or self.get_dependence("noise")._uses_x(): return False # Check to make sure bound is one that we can solve for if self.get_dependence("bound")._uses_t() and self.get_dependence("bound").__class__ != BoundCollapsingLinear: return False # Make sure initial condition is a single point if not issubclass(self.get_dependence("IC").__class__,(ICPointSourceCenter,ICPoint)): return False # Assuming none of these is the case, return True. return True
[docs] @accepts(Self, conditions=Conditions) @returns(Boolean) def can_solve_explicit(self, conditions={}): """Check explicit method stability criterion""" self.check_conditions_satisfied(conditions) noise_max = max((self._noisedep.get_noise(x=0, t=t, dx=self.dx, dt=self.dt, conditions=conditions) for t in self.t_domain())) return noise_max**2 * self.dt/(self.dx**2) < 1
[docs] @accepts(Self, conditions=Conditions) @returns(Boolean) def can_solve_cn(self, conditions={}): """Check whether this model is compatible with Crank-Nicolson solver. All bound functions which do not depend on time are compatible.""" # TODO in the future, instead of looking for parameters this # way, we should use "t in [i.argrepr for i in # dis.get_instructions(get_bound)]" to see if it is used in the # function rather than looking to see if it is passed to the # function. if self.get_dependence("bound")._uses_t(): return False return True
[docs] @accepts(Self, conditions=Conditions, return_evolution=Boolean, force_python=Boolean) @returns(Solution) def solve(self, conditions={}, return_evolution=False, force_python=False): """Solve the model using an analytic solution if possible, and a numeric solution if not. First, it tries to use Crank-Nicolson as the solver, and then backward Euler. See documentation of Model.solve_numerical() for more information. The return_evolution argument should be set to True if you need to use the Solution.get_evolution() function from the returned Solution. Return a Solution object describing the joint PDF distribution of reaction times. """ # TODO solves this using the dis module as described in the # comment for can_solve_cn self.check_conditions_satisfied(conditions) if self.has_analytical_solution() and return_evolution is False: return self.solve_analytical(conditions=conditions) elif isinstance(self.get_dependence("bound"), BoundConstant) and return_evolution is False and (force_python or not HAS_CSOLVE): return self.solve_numerical_cn(conditions=conditions) else: return self.solve_numerical_implicit(conditions=conditions, return_evolution=return_evolution, force_python=force_python)
[docs] @accepts(Self, conditions=Conditions, force_python=Boolean) @returns(Solution) def solve_analytical(self, conditions={}, force_python=False): """Solve the model with an analytic solution, if possible. Analytic solutions are only possible in a select number of special cases; in particular, it works for simple DDM and for linearly collapsing bounds and arbitrary single-point initial conditions. (See Anderson (1960) for implementation details.) For most reasonably complex models, the method will fail. Check whether a solution is possible with has_analytic_solution(). If successful, this returns a Solution object describing the joint PDF. If unsuccessful, this will raise an exception. """ assert self.has_analytical_solution(), "Cannot solve for this model analytically" self.check_conditions_satisfied(conditions) #calculate shift in initial conditions if present if isinstance(self.get_dependence('IC'),ICPoint): ic = self.IC(conditions=conditions) assert np.count_nonzero(ic)==1, "Cannot solve analytically for models with non-point initial conditions" shift = np.flatnonzero(ic) / (len(ic) - 1) #rescale to proprotion of total bound height else: shift = None # The analytic_ddm function does the heavy lifting. if isinstance(self.get_dependence('bound'), BoundCollapsingLinear): # Linearly Collapsing Bound anal_pdf_choice_upper, anal_pdf_choice_lower = analytic_ddm(self.get_dependence("drift").get_drift(t=0, x=0, conditions=conditions), self.get_dependence("noise").get_noise(t=0, x=0, conditions=conditions), self.get_dependence("bound").get_bound(t=0, x=0, conditions=conditions), self.t_domain(), shift, -self.get_dependence("bound").t, force_python=force_python) # TODO why must this be negative? -MS else: # Constant bound DDM anal_pdf_choice_upper, anal_pdf_choice_lower = analytic_ddm(self.get_dependence("drift").get_drift(t=0, x=0, conditions=conditions), self.get_dependence("noise").get_noise(t=0, x=0, conditions=conditions), self.get_dependence("bound").get_bound(t=0, x=0, conditions=conditions), self.t_domain(), shift, force_python=force_python) ## Remove some abnormalities such as NaN due to trivial reasons. anal_pdf_choice_upper[anal_pdf_choice_upper==np.NaN] = 0. # FIXME Is this a bug? You can't use == to compare nan to nan... anal_pdf_choice_upper[0] = 0. anal_pdf_choice_lower[anal_pdf_choice_lower==np.NaN] = 0. anal_pdf_choice_lower[0] = 0. # Fix numerical errors anal_pdf_choice_upper *= self.dt anal_pdf_choice_lower *= self.dt pdfsum = np.sum(anal_pdf_choice_upper) + np.sum(anal_pdf_choice_lower) if pdfsum > 1: if pdfsum > 1.01 and param.renorm_warnings: _logger.warning(("Renormalizing probability density from " + str(pdfsum) + " to 1." + " Try decreasing dt. If that doesn't eliminate this warning, it may be due" + " to extreme parameter values and/or bugs in your model spefication.")) _logger.debug(self.parameters()) anal_pdf_choice_upper /= pdfsum anal_pdf_choice_lower /= pdfsum sol = Solution(anal_pdf_choice_upper, anal_pdf_choice_lower, self, conditions=conditions) return self.get_dependence('overlay').apply(sol)
[docs] def solve_numerical_c(self, conditions={}): """Solve the DDM model using the implicit method with C extensions. This function should give near identical results to solve_numerical_implicit. However, it uses compiled C code instead of Python code to do so, which should make it much (10-100x) faster. This does not current work with non-Gaussian diffusion matrices (a currently undocumented feature). """ get_drift = self.get_dependence("drift").get_drift drift_uses_t = self.get_dependence("drift")._uses_t() drift_uses_x = self.get_dependence("drift")._uses_x() if not drift_uses_t and not drift_uses_x: drifttype = 0 drift = np.asarray([get_drift(conditions=conditions, x=0, t=0)]) elif drift_uses_t and not drift_uses_x: drifttype = 1 drift = np.asarray([get_drift(t=t, conditions=conditions, x=0) for t in self.t_domain()]) elif not drift_uses_t and drift_uses_x: drifttype = 2 drift = np.asarray(get_drift(x=self.x_domain(conditions=conditions), t=0, conditions=conditions)) elif drift_uses_t and drift_uses_x: drifttype = 3 # TODO: Right now this calculates and passes the maximum x domain, # even if it is not necessary to do so. Performance could be # improved by only calculating the parts of x domain that are # needed and then finding a way to index that within C. # Alternatively, it could only calculate the ones it needs, and # then fill the extra space with nonsense values just to form a # rectangular matrix, since they will never be read within the C # code. maxt is a workaround so we don't have to find the maximum # in the t domain on each iteration. maxt = self.t_domain()[np.argmax([self.get_dependence("bound").get_bound(t=t, conditions=conditions) for t in self.t_domain()])] xdomain = self.x_domain(t=maxt, conditions=conditions) drift = np.concatenate([get_drift(t=t, x=xdomain, conditions=conditions) for t in self.t_domain()]) get_noise = self.get_dependence("noise").get_noise noise_uses_t = self.get_dependence("noise")._uses_t() noise_uses_x = self.get_dependence("noise")._uses_x() if not noise_uses_t and not noise_uses_x: noisetype = 0 noise = np.asarray([get_noise(conditions=conditions, x=0, t=0)]) elif noise_uses_t and not noise_uses_x: noisetype = 1 noise = np.asarray([get_noise(t=t, conditions=conditions, x=0) for t in self.t_domain()]) elif not noise_uses_t and noise_uses_x: noisetype = 2 noise = np.asarray(get_noise(x=self.x_domain(conditions=conditions), conditions=conditions, t=0)) elif noise_uses_t and noise_uses_x: noisetype = 3 # See comment in drifttype = 3 maxt = self.t_domain()[np.argmax([self.get_dependence("bound").get_bound(t=t, conditions=conditions) for t in self.t_domain()])] xdomain = self.x_domain(t=maxt, conditions=conditions) noise = np.concatenate([get_noise(t=t, x=xdomain, conditions=conditions) for t in self.t_domain()]) bound_uses_t = self.get_dependence("bound")._uses_t() if not bound_uses_t: boundtype = 0 bound = np.asarray([self.get_dependence("Bound").get_bound(conditions=conditions, t=0)]) elif bound_uses_t: boundtype = 1 bound = np.asarray([self.get_dependence("Bound").get_bound(t=t, conditions=conditions) for t in self.t_domain()]) res = csolve.implicit_time(drift, drifttype, noise, noisetype, bound, boundtype, self.get_dependence("IC").get_IC(self.x_domain(conditions=conditions), self.dx, conditions=conditions), self.T_dur, self.dt, self.dx, len(self.t_domain())) # TODO: Handle the pdf going below zero, returning pdfcurr, and fix numerical errors choice_upper = (res[0]*self.dt) choice_upper[choice_upper<0] = 0 choice_lower = (res[1]*self.dt) choice_lower[choice_lower<0] = 0 undec = res[2] undec[undec<0] = 0 return self.get_dependence('overlay').apply(Solution(choice_upper, choice_lower, self, conditions=conditions, pdf_undec=undec))
[docs] @accepts(Self, method=Set(["explicit", "implicit", "cn"]), conditions=Conditions, return_evolution=Boolean, force_python=Boolean) @returns(Solution) @requires("method == 'explicit' --> self.can_solve_explicit(conditions=conditions)") @requires("method == 'cn' --> self.can_solve_cn()") def solve_numerical(self, method="cn", conditions={}, return_evolution=False, force_python=False): """Solve the DDM model numerically. Use `method` to solve the DDM. `method` can either be "explicit", "implicit", or "cn" (for Crank-Nicolson). This is the core DDM solver of this library. Crank-Nicolson is the default and works for any model with constant bounds. Implicit is the fallback method. It should work well in most cases and is generally stable. Normally, the explicit method should not be used. Also note the stability criteria for explicit method is: | noise^2/2 * dt/dx^2 < 1/2 It returns a Solution object describing the joint PDF. This method should not fail for any model type. `return_evolution` (default=False) governs whether or not the function returns the full evolution of the pdf as part of the Solution object. This only works with methods "explicit" or "implicit", not with "cn". `force_python` makes PyDDM use the solver written in Python instead of the optimized solver written in C. """ self.check_conditions_satisfied(conditions) if method == "cn": if return_evolution == False: return self.solve_numerical_cn(conditions=conditions) else: _logger.warning("return_evolution is not supported with the Crank-Nicolson solver, using implicit (backward Euler) instead.") method = "implicit" if method == "implicit" and HAS_CSOLVE and not force_python and not return_evolution: return self.solve_numerical_c(conditions=conditions) # Initial condition of decision variable pdf_curr = self.IC(conditions=conditions) # Output correct and error pdfs. If pdf_corr + pdf_err + # undecided probability are summed, they equal 1. So these # are componets of the joint pdf. pdf_choice_upper = np.zeros(len(self.t_domain())) # Not a proper pdf on its own (doesn't sum to 1) pdf_choice_lower = np.zeros(len(self.t_domain())) # Not a proper pdf on its own (doesn't sum to 1) x_list = self.x_domain(conditions=conditions) # If evolution of pdf should be returned, preallocate np.array pdf_evolution for performance reasons if return_evolution: pdf_evolution = np.zeros((len(x_list), len(self.t_domain()))) pdf_evolution[:,0] = pdf_curr # Find maximum bound for increasing bounds _bound_func = self.get_dependence("bound").get_bound bmax = max([_bound_func(t=t, conditions=conditions) for t in self.t_domain()]) # Looping through time and updating the pdf. for i_t, t in enumerate(self.t_domain()[:-1]): # -1 because nothing will happen at t=0 so each step computes the value for the next timepoint # Alias pdf_prev to pdf_curr for clarity pdf_prev = pdf_curr # For efficiency only do diffusion if there's at least # some densities remaining in the channel. if np.sum(pdf_curr[:]) < 0.0001: break # Boundary at current time-step. bound = self.get_dependence('bound').get_bound(t=t, conditions=conditions) # Now figure out which x positions are still within # the (collapsing) bound. assert bmax >= bound, "Invalid change in bound" # Ensure the bound didn't expand bound_shift = bmax - bound # Note that we linearly approximate the bound by the two surrounding grids sandwiching it. x_index_inner = int(np.ceil(bound_shift/self.dx)) # Index for the inner bound (smaller matrix) x_index_outer = int(np.floor(bound_shift/self.dx)) # Index for the outer bound (larger matrix) # We weight the upper and lower matrices according to # how far away the bound is from each. The weight of # each matrix is approximated linearly. The inner # bound is 0 when bound exactly at grids. weight_inner = (bound_shift - x_index_outer*self.dx)/self.dx weight_outer = 1. - weight_inner # The weight of the lower bound matrix, approximated linearly. x_list_inbounds = x_list[x_index_outer:len(x_list)-x_index_outer] # List of x-positions still within bounds. # Diffusion Matrix for Implicit Method. Here defined as # Outer Matrix, and inder matrix is either trivial or an # extracted submatrix. drift_matrix = self.get_dependence('drift').get_matrix(x=x_list_inbounds, t=t, dt=self.dt, dx=self.dx, conditions=conditions, implicit=(method!="explicit")) noise_matrix = self.get_dependence('noise').get_matrix(x=x_list_inbounds, t=t, dt=self.dt, dx=self.dx, conditions=conditions, implicit=(method!="explicit")) if method == "implicit": diffusion_matrix = TriDiagMatrix.eye(len(x_list_inbounds)) + drift_matrix + noise_matrix elif method == "explicit": # Explicit method flips sign except for the identity matrix diffusion_matrix_explicit = TriDiagMatrix.eye(len(x_list_inbounds)) - drift_matrix - noise_matrix ### Compute Probability density functions (pdf) # PDF for outer matrix if method == "implicit": pdf_outer = diffusion_matrix.spsolve(pdf_prev[x_index_outer:len(x_list)-x_index_outer]) elif method == "explicit": pdf_outer = diffusion_matrix_explicit.dot(pdf_prev[x_index_outer:len(x_list)-x_index_outer]).squeeze() # If the bounds are the same the bound perfectly # aligns with the grid), we don't need so solve the # diffusion matrix again since we don't need a linear # approximation. if x_index_inner == x_index_outer: pdf_inner = pdf_outer else: # Need a separate matrix here to get the proper corrections drift_matrix = self.get_dependence('drift').get_matrix(x=x_list_inbounds[1:-1], t=t, dt=self.dt, dx=self.dx, conditions=conditions, implicit=(method!="explicit")) noise_matrix = self.get_dependence('noise').get_matrix(x=x_list_inbounds[1:-1], t=t, dt=self.dt, dx=self.dx, conditions=conditions, implicit=(method!="explicit")) if method == "implicit": diffusion_matrix = TriDiagMatrix.eye(len(x_list_inbounds)-2) + drift_matrix + noise_matrix elif method == "explicit": # Explicit method flips sign except for the identity matrix diffusion_matrix_explicit = TriDiagMatrix.eye(len(x_list_inbounds)) - drift_matrix - noise_matrix if method == "implicit": pdf_inner = diffusion_matrix.spsolve(pdf_prev[x_index_inner:len(x_list)-x_index_inner]) elif method == "explicit": pdf_inner = diffusion_matrix_explicit.dot(pdf_prev[x_index_inner:len(x_list)-x_index_inner]) # Pdfs out of bound is considered decisions made. pdf_choice_lower[i_t+1] += weight_outer * np.sum(pdf_prev[:x_index_outer]) \ + weight_inner * np.sum(pdf_prev[:x_index_inner]) pdf_choice_upper[i_t+1] += weight_outer * np.sum(pdf_prev[len(x_list)-x_index_outer:]) \ + weight_inner * np.sum(pdf_prev[len(x_list)-x_index_inner:]) # Reconstruct current probability density function, # adding outer and inner contribution to it. Use # .fill() method to avoid allocating memory with # np.zeros(). pdf_curr.fill(0) # Reset pdf_curr[x_index_outer:len(x_list)-x_index_outer] += weight_outer*pdf_outer # For explicit, should be pdf_outer.T? pdf_curr[x_index_inner:len(x_list)-x_index_inner] += weight_inner*pdf_inner # Increase current, transient probability of crossing # either bounds, as flux. Corr is a correct answer, err # is an incorrect answer _inner_B_choice_upper = x_list[len(x_list)-1-x_index_inner] _outer_B_choice_upper = x_list[len(x_list)-1-x_index_outer] _inner_B_choice_lower = x_list[x_index_inner] _outer_B_choice_lower = x_list[x_index_outer] if len(pdf_inner) == 0: # Otherwise we get an error when bounds collapse to 0 pdf_inner = np.array([0]) pdf_choice_upper[i_t+1] += weight_outer * pdf_outer[-1] * self.flux(_outer_B_choice_upper, t, conditions=conditions) \ + weight_inner * pdf_inner[-1] * self.flux(_inner_B_choice_upper, t, conditions=conditions) pdf_choice_lower[i_t+1] += weight_outer * pdf_outer[0] * self.flux(_outer_B_choice_lower, t, conditions=conditions) \ + weight_inner * pdf_inner[0] * self.flux(_inner_B_choice_lower, t, conditions=conditions) # Renormalize when the channel size has <1 grid, although # all hell breaks loose in this regime. if bound < self.dx: pdf_choice_upper[i_t+1] *= (1+ (1-bound/self.dx)) pdf_choice_lower[i_t+1] *= (1+ (1-bound/self.dx)) # If evolution of pdf should be returned, append pdf_curr to pdf_evolution if return_evolution: pdf_evolution[:,i_t+1] = pdf_curr # Detect and fix below zero errors pdf_undec = pdf_curr minval = np.min((np.min(pdf_choice_upper), np.min(pdf_choice_lower), np.min(pdf_undec))) if minval < 0: sum_negative_strength = np.sum(pdf_choice_upper[pdf_choice_upper<0]) + np.sum(pdf_choice_lower[pdf_choice_lower<0]) sum_negative_strength_undec = np.sum(pdf_undec[pdf_undec<0]) if sum_negative_strength < -.01 and param.renorm_warnings: _logger.warning(("Probability density included values less than zero (minimum=%f, " + "total=%f. Please decrease dt and/or avoid extreme parameter values.") % (minval, sum_negative_strength)) _logger.debug(self.parameters()) if sum_negative_strength_undec < -.01 and param.renorm_warnings: _logger.warning(("Remaining FP distribution included values less than zero " + "(minimum=%f, total=%f). Please decrease dt and/or avoid extreme parameter " + "values.") % (minval, sum_negative_strength_undec)) _logger.debug(self.parameters()) pdf_choice_lower[pdf_choice_lower < 0] = 0 pdf_choice_lower[pdf_choice_lower < 0] = 0 pdf_undec[pdf_undec < 0] = 0 # Fix numerical errors pdfsum = np.sum(pdf_choice_upper) + np.sum(pdf_choice_lower) + np.sum(pdf_undec) if pdfsum > 1: if pdfsum > 1.01 and param.renorm_warnings: _logger.warning(("Renormalizing probability density from " + str(pdfsum) + "to 1. " + " Try decreasing dt or using the implicit (backward Euler) method instead. " + "If that doesn't eliminate this warning, it may be due to extreme parameter " + "values and/or bugs in your model spefication.")) _logger.debug(self.parameters()) pdf_choice_upper /= pdfsum pdf_choice_lower /= pdfsum pdf_undec /= pdfsum if return_evolution: return self.get_dependence('overlay').apply(Solution(pdf_choice_upper, pdf_choice_lower, self, conditions=conditions, pdf_undec=pdf_undec, pdf_evolution=pdf_evolution)) return self.get_dependence('overlay').apply(Solution(pdf_choice_upper, pdf_choice_lower, self, conditions=conditions, pdf_undec=pdf_undec))
[docs] @accepts(Self, Conditions) @returns(Solution) @requires("self.can_solve_explicit(conditions=conditions)") def solve_numerical_explicit(self, conditions={}, **kwargs): """Solve the model using the explicit method (Forward Euler). See documentation for the solve_numerical method. """ return self.solve_numerical(method="explicit", conditions=conditions, **kwargs)
[docs] @accepts(Self, Conditions) @returns(Solution) def solve_numerical_implicit(self, conditions={}, **kwargs): """Solve the model using the implicit method (Backward Euler). See documentation for the solve_numerical method. """ return self.solve_numerical(method="implicit", conditions=conditions, **kwargs)
[docs] @accepts(Self, conditions=Conditions) @requires("self.can_solve_cn(conditions=conditions)") @returns(Solution) def solve_numerical_cn(self, conditions={}): """Solve the DDM model numerically using Crank-Nicolson. This uses the Crank Nicolson method to solve the DDM at each timepoint. Results are then compiled together. This is the core DDM solver of this library. It returns a Solution object describing the joint PDF. """ ### Initialization: Lists self.check_conditions_satisfied(conditions) pdf_curr = self.IC(conditions=conditions) # Initial condition pdf_outer = self.IC(conditions=conditions) pdf_inner = self.IC(conditions=conditions) # pdf_prev = np.zeros((len(pdf_curr))) # If pdf_corr + pdf_err + undecided probability are summed, they # equal 1. So these are componets of the joint pdf. pdf_choice_upper = np.zeros(len(self.t_domain())+1) # Not a proper pdf on its own (doesn't sum to 1) pdf_choice_lower = np.zeros(len(self.t_domain())+1) # Not a proper pdf on its own (doesn't sum to 1) x_list = self.x_domain(conditions=conditions) bound_shift = 0. # Note that we linearly approximate the bound by the two surrounding grids sandwiching it. x_index_inner = int(np.ceil(bound_shift/self.dx)) # Index for the inner bound (smaller matrix) x_index_outer = int(np.floor(bound_shift/self.dx)) # Index for the outer bound (larger matrix) # We weight the upper and lower matrices according to # how far away the bound is from each. The weight of # each matrix is approximated linearly. The inner # bound is 0 when bound exactly at grids. weight_inner = (bound_shift - x_index_outer*self.dx)/self.dx weight_outer = 1. - weight_inner # The weight of the lower bound matrix, approximated linearly. x_list_inbounds = x_list[x_index_outer:len(x_list)-x_index_outer] # List of x-positions still within bounds. prev_t = 0 prev_i_t = 0 # Looping through time and updating the pdf. for i_t, t in enumerate(self.t_domain()): # Update Previous state. pdf_outer_prev = pdf_outer.copy() pdf_inner_prev = pdf_inner.copy() # For efficiency only do diffusion if there's at least # some densities remaining in the channel. if np.sum(pdf_curr[:])>0.0001: ## Define the boundaries at current time. bound = self.get_dependence('bound').get_bound(t=t, conditions=conditions) # Boundary at current time-step. # Now figure out which x positions are still within # the (collapsing) bound. assert self.get_dependence("bound").get_bound(t=0, conditions=conditions) >= bound, "Invalid change in bound" # Ensure the bound didn't expand bound_shift = self.get_dependence("bound").get_bound(t=0, conditions=conditions) - bound # Note that we linearly approximate the bound by the two surrounding grids sandwiching it. x_index_inner_prev = x_index_inner x_index_outer_prev = x_index_outer x_index_inner = int(np.ceil(bound_shift/self.dx)) # Index for the inner bound (smaller matrix) x_index_outer = int(np.floor(bound_shift/self.dx)) # Index for the outer bound (larger matrix) x_index_inner_shift = x_index_inner - x_index_inner_prev x_index_outer_shift = x_index_outer - x_index_outer_prev x_index_io_shift = x_index_inner - x_index_outer x_index_io_shift_prev = x_index_inner_prev - x_index_outer_prev # We weight the upper and lower matrices according to # how far away the bound is from each. The weight of # each matrix is approximated linearly. The inner # bound is 0 when bound exactly at grids. weight_inner_prev = weight_inner weight_outer_prev = weight_outer weight_inner = (bound_shift - x_index_outer*self.dx)/self.dx weight_outer = 1. - weight_inner # The weight of the lower bound matrix, approximated linearly. x_list_inbounds_prev = x_list_inbounds.copy() # List of x-positions still within bounds. x_list_inbounds = x_list[x_index_outer:len(x_list)-x_index_outer] # List of x-positions still within bounds. # Diffusion Matrix for Implicit Method. Here defined as # Outer Matrix, and inner matrix is either trivial or an # extracted submatrix. # diffusion_matrix_prev = 2.* np.diag(np.ones(len(x_list_inbounds_prev))) - diffusion_matrix #Diffusion Matrix for Implicit Method. Here defined as Outer Matrix, and inner matrix is either trivial or an extracted submatrix. local_dt = t - prev_t if t!=0 else self.t_domain()[1] drift_matrix = self.get_dependence('drift').get_matrix(x=x_list_inbounds, t=t, dt=local_dt, dx=self.dx, conditions=conditions, implicit=True) drift_matrix *= .5 noise_matrix = self.get_dependence('noise').get_matrix(x=x_list_inbounds, t=t, dt=local_dt, dx=self.dx, conditions=conditions, implicit=True) noise_matrix *= .5 diffusion_matrix = TriDiagMatrix.eye(len(x_list_inbounds)) diffusion_matrix += drift_matrix diffusion_matrix += noise_matrix drift_matrix_prev = self.get_dependence('drift').get_matrix(x=x_list_inbounds_prev, t=prev_t, dt=local_dt, dx=self.dx, conditions=conditions, implicit=True) drift_matrix_prev *= .5 noise_matrix_prev = self.get_dependence('noise').get_matrix(x=x_list_inbounds_prev, t=prev_t, dt=local_dt, dx=self.dx, conditions=conditions, implicit=True) noise_matrix_prev *= .5 diffusion_matrix_prev = TriDiagMatrix.eye(len(x_list_inbounds_prev)) diffusion_matrix_prev -= drift_matrix_prev diffusion_matrix_prev -= noise_matrix_prev ### Compute Probability density functions (pdf) # PDF for outer matrix # Probability density function for outer matrix. # Considers the whole space in the previous step # for matrix multiplication, then restrains to current # space when solving for matrix_diffusion.. Separating # outer and inner pdf_prev # # For constant bounds pdf_inner is unnecessary. # For changing bounds pdf_inner is always needed, # even if the current inner and outer bounds # coincide. I currently make this generally but # we can constrain it to changing-bound # simulations only. so_from = x_index_outer_shift so_to = len(x_list_inbounds)+x_index_outer_shift si_from = x_index_io_shift si_to = len(x_list_inbounds)-x_index_io_shift si2_from = x_index_io_shift_prev si2_to = len(x_list_inbounds_prev)-x_index_io_shift_prev si3_from = x_index_inner_shift si3_to = len(x_list)-2*x_index_inner+x_index_inner_shift pdf_outer = diffusion_matrix.spsolve(diffusion_matrix_prev.dot(pdf_outer_prev)[so_from:so_to]) if x_index_inner == x_index_outer: # Should always be the case, since we removed CN changing bounds support pdf_inner = pdf_outer else: # Need a separate matrix here to get the proper corrections drift_matrix = self.get_dependence('drift').get_matrix(x=x_list_inbounds[si_from:si_to], t=t, dt=local_dt, dx=self.dx, conditions=conditions, implicit=True) drift_matrix *= .5 noise_matrix = self.get_dependence('noise').get_matrix(x=x_list_inbounds[si_from:si_to], t=t, dt=local_dt, dx=self.dx, conditions=conditions, implicit=True) noise_matrix *= .5 diffusion_matrix = TriDiagMatrix.eye(len(x_list_inbounds[si_from:si_to])) diffusion_matrix += drift_matrix diffusion_matrix += noise_matrix drift_matrix_prev = self.get_dependence('drift').get_matrix(x=x_list_inbounds_prev[si2_from:si2_to], t=prev_t, dt=local_dt, dx=self.dx, conditions=conditions, implicit=True) drift_matrix_prev *= .5 noise_matrix_prev = self.get_dependence('noise').get_matrix(x=x_list_inbounds_prev[si2_from:si2_to], t=prev_t, dt=local_dt, dx=self.dx, conditions=conditions, implicit=True) noise_matrix_prev *= .5 diffusion_matrix_prev = TriDiagMatrix.eye(len(x_list_inbounds_prev[si2_from:si2_to])) diffusion_matrix_prev -= drift_matrix_prev diffusion_matrix_prev -= noise_matrix_prev pdf_inner = diffusion_matrix.spsolve(diffusion_matrix_prev.dot(pdf_inner_prev)[si3_from:si3_to]) # Pdfs out of bound is considered decisions made. pdf_choice_lower[i_t+1] += weight_outer_prev * np.sum(pdf_outer_prev[:x_index_outer_shift]) \ + weight_inner_prev * np.sum(pdf_inner_prev[:x_index_inner_shift]) pdf_choice_upper[i_t+1] += weight_outer_prev * np.sum(pdf_outer_prev[len(pdf_outer_prev)-x_index_outer_shift:]) \ + weight_inner_prev * np.sum(pdf_inner_prev[len(pdf_inner_prev)-x_index_inner_shift:]) # Reconstruct current probability density function, # adding outer and inner contribution to it. Use # .fill() method to avoid allocating memory with # np.zeros(). pdf_curr.fill(0) # Reset pdf_curr[x_index_outer:len(x_list)-x_index_outer] += weight_outer*pdf_outer pdf_curr[x_index_inner:len(x_list)-x_index_inner] += weight_inner*pdf_inner else: break #break if the remaining densities are too small.... # Increase current, transient probability of crossing # either bounds, as flux. Corr is a correct answer, err # is an incorrect answer _inner_B_choice_upper = x_list[len(x_list)-1-x_index_inner] _outer_B_choice_upper = x_list[len(x_list)-1-x_index_outer] _inner_B_choice_lower = x_list[x_index_inner] _outer_B_choice_lower = x_list[x_index_outer] flux_outer_B_choice_upper = self.flux(_outer_B_choice_upper, t, conditions=conditions) flux_inner_B_choice_upper = self.flux(_inner_B_choice_upper, t, conditions=conditions) flux_outer_B_choice_lower = self.flux(_outer_B_choice_lower, t, conditions=conditions) flux_inner_B_choice_lower = self.flux(_inner_B_choice_lower, t, conditions=conditions) if len(pdf_inner) == 0: # Otherwise we get an error when bounds collapse to 0 pdf_inner = np.array([0]) pdf_choice_upper[i_t+1] += 0.5*weight_outer * pdf_outer[-1] * flux_outer_B_choice_upper \ + 0.5*weight_inner * pdf_inner[-1] * flux_inner_B_choice_upper pdf_choice_lower[i_t+1] += 0.5*weight_outer * pdf_outer[0] * flux_outer_B_choice_lower \ + 0.5*weight_inner * pdf_inner[0] * flux_inner_B_choice_lower pdf_choice_upper[i_t] += 0.5*weight_outer * pdf_outer[-1] * flux_outer_B_choice_upper \ + 0.5*weight_inner * pdf_inner[-1] * flux_inner_B_choice_upper pdf_choice_lower[i_t] += 0.5*weight_outer * pdf_outer[0] * flux_outer_B_choice_lower \ + 0.5*weight_inner * pdf_inner[0] * flux_inner_B_choice_lower # Renormalize when the channel size has <1 grid, although # all hell breaks loose in this regime. if bound < self.dx: pdf_choice_upper[i_t+1] *= (1+ (1-bound/self.dx)) pdf_choice_lower[i_t+1] *= (1+ (1-bound/self.dx)) prev_t = t prev_i_t = i_t # Fix the time-offest error of dt/2 pdf_choice_upper = np.concatenate([[0], np.mean([pdf_choice_upper[1:], pdf_choice_upper[:-1]], axis=0)]) pdf_choice_lower = np.concatenate([[0], np.mean([pdf_choice_lower[1:], pdf_choice_lower[:-1]], axis=0)]) # Fix the truncation error at the end pdf_choice_upper = pdf_choice_upper[:-1] pdf_choice_lower = pdf_choice_lower[:-1] # Detect and fix below zero errors. Here, we don't worry # about undecided probability as we did with the implicit # method, because CN tends to oscillate around zero, # especially when noise (sigma) is large. The user would be # directed to decrease dt. pdf_undec = pdf_curr minval = np.min((np.min(pdf_choice_upper), np.min(pdf_choice_lower))) if minval < 0: sum_negative_strength = np.sum(pdf_choice_upper[pdf_corr<0]) + np.sum(pdf_choice_lower[pdf_choice_lower<0]) # For small errors, don't bother alerting the user if sum_negative_strength < -.01 and param.renorm_warnings: _logger.warning(("Probability density included values less than zero (minimum=%f, " + "total=%f). Please decrease dt and/or avoid extreme parameter values.") % (minval, sum_negative_strength)) _logger.debug(self.parameters()) pdf_choice_upper[pdf_choice_upper < 0] = 0 pdf_choice_lower[pdf_choice_lower < 0] = 0 # Fix numerical errors pdfsum = np.sum(pdf_choice_upper) + np.sum(pdf_choice_lower) if pdfsum > 1 and False: print('Renorm for', pdfsum) # If it is only a small renormalization, don't bother alerting the user. if pdfsum > 1.01 and param.renorm_warnings: _logger.warning(("Renormalizing probability density from " + str(pdfsum) + " to 1." + " Try decreasing dt. If that doesn't eliminate this warning, it may be due" + " to extreme parameter values and/or bugs in your model spefication.")) _logger.debug(self.parameters()) pdf_choice_upper /= pdfsum pdf_choice_lower /= pdfsum # TODO Crank-Nicolson still has something weird going on with pdf_curr near 0, where it seems to oscillate return self.get_dependence('overlay').apply(Solution(pdf_choice_upper, pdf_choice_lower, self, conditions=conditions, pdf_undec=None))
[docs]@paranoidclass class Fittable(float): """For parameters that should be adjusted when fitting a model to data. Each Fittable object does not need any parameters, however several parameters may improve the ability to fit the model. In particular, `maxval` and `minval` ensure we do not choose an invalid parameter value. `default` is the value to start with when fitting; if it is not given, it will be selected at random. """ @staticmethod def _test(v): assert v in Numeric() # This if statement technically violates the liskov # substitution principle. However this is only the case # because Fittable inherits from float. If it didn't, passing # nan as the "value" for a Fittable wouldn't be necessary and # everything would be fine. Inheriting from float is a # convenience so that we don't have to redefine all of # python's floating point internal methods for the Fitted # class. In theory you could get around this problem by # making Fitted inherit from Fittable (which would inherit # only from nothing) and float via multiple inheritance, but # this gets complicated quickly since float is a builtin. if type(v) is Fittable: assert np.isnan(v), "Fittable has already been fitted" elif type(v) is Fitted: assert v in Number(), "Fitted value is invalid" @staticmethod def _generate(): yield Fittable() yield Fittable(minval=1) yield Fittable(maxval=3) yield Fittable(minval=-1, maxval=1) yield Fittable(default_value=.001) yield Fittable(minval=3, default_value=6) yield Fittable(minval=10, maxval=100, default_value=20) yield Fitted(0) yield Fitted(1) yield Fitted(4, minval=0, maxval=10) def __new__(cls, val=np.nan, **kwargs): if not np.isnan(val): raise ValueError("No positional arguments for Fittables. Received argument: %s." % str(val)) return float.__new__(cls, np.nan) def __init__(self, **kwargs): minval = kwargs['minval'] if "minval" in kwargs else -np.inf maxval = kwargs['maxval'] if "maxval" in kwargs else np.inf default_value = kwargs['default'] if 'default' in kwargs else None object.__setattr__(self, 'minval', minval) object.__setattr__(self, 'maxval', maxval) object.__setattr__(self, 'default_value', default_value) def __setattr__(self, name, val): """No changing the state.""" raise AttributeError def __delattr__(self, name): """No deletions of existing parameters.""" raise AttributeError def __repr__(self): components = [] if not np.isnan(self): components.append(str(float(self))) if self.minval != -np.inf: components.append("minval=" + self.minval.__repr__()) if self.maxval != np.inf: components.append("maxval=" + self.maxval.__repr__()) if self.default_value is not None: components.append("default=" + self.default_value.__repr__()) return type(self).__name__ + "(" + ", ".join(components) + ")"
[docs] @accepts(Self) @returns(Number) def default(self): """Choose a default value. This chooses a value for the Fittable object randomly abiding by any constraints. Note that calling this function multiple times will give different results. """ if self.default_value is not None: return self.default_value else: maxval = self.maxval # Makes equations below more readable minval = self.minval if maxval < np.inf and minval > -np.inf: return np.random.beta(2, 2)*(maxval-minval) + minval elif maxval == np.inf and minval > -np.inf: return np.random.pareto(1) + minval elif maxval < np.inf and minval == -np.inf: return maxval - np.random.pareto(1) elif maxval == np.inf and minval == -np.inf: return np.random.standard_cauchy() else: raise ValueError("Error with the maximum or minimum bounds")
@accepts(Self, Number) @returns(Self) def make_fitted(self, val): return Fitted(float(val), maxval=self.maxval, minval=self.minval)
[docs]class Fitted(Fittable): """Parameters which used to be Fittable but now hold a value. This extends float so that the parameters for the Fittable can be saved in the final model, even though there are no more Fittable objects in the final model. """ def __new__(cls, val, **kwargs): return float.__new__(cls, val) def __init__(self, val, **kwargs): Fittable.__init__(self, **kwargs) def default(self): return float(self)