Shinn et al. (2021) - Transient neuronal suppression for exploitation of new sensory evidence

Read the paper

Download all code referenced here (shinn2021.py)

Run this model online interactively

Model definition

Let’s import the libraries we’ll need:

import pyddm as ddm
import pyddm.plot
import numpy as np
import scipy.stats

Included below are the three GDDMs included in the paper. For convenience of implementation, each is given a “diptype” code: the “pause model” is diptype=1, the “reset model” is diptype=2, and the “motor suppression model” is diptype=3. For no dip, we use the convention diptype=-1.

First, we define several helper functions which we will use throughout in the model:

def coh_transform(coh, max_coh):
    """Convert coherence from 0-100 to -1-1"""
    return (coh-50)/(max_coh-50)

def urgency(t, base, t1, slope):
    """Urgency signal"""
    return base + ((t-t1)*slope if t>=t1 else 0)

def get_detect_prob(coh, param):
    """Probability of detection given coherence"""
    return 2/(1+np.exp(-param*(coh-50)/50))-1

Now we define the drift rate:

class DriftDip(ddm.models.Drift):
    name = "Piecewise urgency signal, reward bias, and coherence change transient"
    required_parameters = ["snr", "noise", "t1", "t1slope", "maxcoh", "leak", "leaktarget",
                           "leaktargramp", "dipstart", "dipstop", "diptype", "dipparam"]
    required_conditions = ["coherence", "presample", "highreward"]
    default_parameters = {"leaktargramp": 0, "dipparam": 0, "diptype": -1}

    def get_drift(self, t, x, conditions, **kwargs):
        dipstart = min(self.dipstart, self.dipstop) + conditions["presample"]/1000
        dipstop = max(self.dipstart, self.dipstop) + conditions["presample"]/1000
        if self.diptype == 1 and dipstart < t and t < dipstop:
            return 0
        # Coherence coefficient == coherence with a non-linear transform
        coh_coef = coh_transform(conditions["coherence"], self.maxcoh)
        is_past_delay = 1 if t > conditions["presample"]/1000 else 0
        cur_urgency = self.snr * urgency(t, self.noise, self.t1, self.t1slope)
        leaktarg = self.leaktarget if conditions["highreward"] else -self.leaktarget
        leak = self.leak
        leaktargramp = self.leaktargramp if conditions["highreward"] else -self.leaktargramp
        if self.diptype == 2 and dipstart < t and t < dipstop:
            leak += self.dipparam
            leaktarg = 0
            leaktargramp = 0
        return coh_coef * (cur_urgency * is_past_delay) - leak*(x-(leaktarg+leaktargramp*(t)))

And the noise, which corresponds to the drift rate we just defined:

class NoiseDip(ddm.models.Noise):
    name = "Noise with piecewise linear urgency signal"
    required_parameters = ["noise", "t1", "t1slope", "dipstart", "dipstop", "diptype"]

    def get_noise(self, t, conditions, **kwargs):
        dipstart = min(self.dipstart, self.dipstop) + conditions["presample"]/1000
        dipstop = max(self.dipstart, self.dipstop) + conditions["presample"]/1000
        if self.diptype == 1 and dipstart < t and t < dipstop:
            return 0.001 # Not 0 to avoid numerical problems
        return urgency(t, self.noise, self.t1, self.t1slope) + .001

And the starting position:

class ICPoint(ddm.models.InitialCondition):
    """Initial condition: a dirac delta function in the center of the domain."""
    name = "point_source"
    required_parameters = ["x0"]
    required_conditions = ["highreward"]
    def get_IC(self, x, dx, conditions={}):
        start = np.round(self.x0/dx)
        if not conditions['highreward']:
            start = -start
        shift_i = int(start + (len(x)-1)/2)
        assert shift_i >= 0 and shift_i < len(x), "Invalid initial conditions"
        pdf = np.zeros(len(x))
        pdf[shift_i] = 1. # Initial condition at x=self.x0.
        return pdf

For motor suppression, we use an increasing bound as an equivalent formulation of a motor decision variable. It increases smoothly (according to a Beta(3,3) function) to avoid numerical transients:

class BoundDip(ddm.Bound):
    name = "Increasing bound for motor suppression"
    required_parameters = ["B", "dipstart", "dipstop", "diptype"]
    required_conditions = ["presample"]
    def get_bound(self, t, conditions, *args, **kwargs):
        dipstart = min(self.dipstart, self.dipstop) + conditions["presample"]/1000
        dipstop = max(self.dipstart, self.dipstop) + conditions["presample"]/1000
        if self.diptype == 3 and dipstart < t and t < dipstop:
            return self.B + 4*scipy.stats.beta.pdf(t, a=3, b=3, loc=dipstart, scale=dipstop-dipstart)
        return self.B

Finally, we have an overlay which simulates the detection probability, i.e. the fraction of trials on which the dip is actually exhibited. This overlay achieves this by eliminating the dip mechanism (setting diptype=-1), re-simulating the model, and then blending the resulting histogram with the actual model’s simulation with the given ratio. This also implements a trial-by-trial method, whereby the choice between the two models is probabilistic:

class OverlayDipRatio(ddm.Overlay):
    name = "Probability of detecting the change"
    required_parameters = ["detect", "diptype"]
    required_conditions = ["coherence"]
    def apply(self, solution):
        if self.diptype not in [1, 2, 3]:
            return solution
        corr = solution.corr
        err = solution.err
        m = solution.model
        cond = solution.conditions
        undec = solution.undec
        evolution = solution.evolution
        diptype = m.get_dependence("drift").diptype
        def set_dip_type(m, diptype):
            m.get_dependence("drift").diptype = diptype
            m.get_dependence("noise").diptype = diptype
            m.get_dependence("bound").diptype = diptype
            m.get_dependence("overlay").diptype = diptype
        set_dip_type(m, -1)
        ratio = get_detect_prob(cond['coherence'], self.detect)
        s = m.solve_numerical_implicit(conditions=cond, return_evolution=True)
        newcorr = corr * ratio + s.corr * (1-ratio)
        newerr = err * ratio + s.err * (1-ratio)
        newevo = evolution
        #newevo = evolution * ratio + s.evolution * (1-ratio)
        set_dip_type(m, diptype)
        return ddm.Solution(newcorr, newerr, m, cond, undec, newevo)
    def apply_trajectory(self, trajectory, model, rk4, seed, conditions={}):
        if self.diptype not in [1, 2, 3]:
            return trajectory
        prob = get_detect_prob(conditions['coherence'], self.detect)
        # We have a `prob` probability of detecting the dip.  If we
        # detected the dip, just use the given trajectory.  Otherwise,
        # simulate a new trajectory without the dip.
        if prob > np.random.rand():
            return trajectory
        diptype = model.get_dependence("drift").diptype
        def set_dip_type(m, diptype):
            m.get_dependence("drift").diptype = diptype
            m.get_dependence("noise").diptype = diptype
            m.get_dependence("bound").diptype = diptype
            m.get_dependence("overlay").diptype = diptype
        set_dip_type(model, -1)
        traj = model.simulate_trial(conditions=conditions, rk4=rk4, seed=seed, cutoff=True)
        set_dip_type(model, diptype)
        return traj

Running the model

Now that we have defined all of the pieces, let’s test the model with the GUI:

    DIPTYPE = 1 # Change to 1, 2, or 3 depending on which model you want
    snr = ddm.Fittable(minval=0.5, maxval=20, default=9.243318909157688)
    leak = ddm.Fittable(minval=-10, maxval=30, default=9.46411355874963)
    x0 = ddm.Fittable(minval=-.5, maxval=.5, default=0.1294632585920082)
    leaktargramp = ddm.Fittable(minval=0, maxval=3, default=0)
    noise = ddm.Fittable(minval=.2, maxval=2, default=1.1520906498077081)
    t1 = ddm.Fittable(minval=0, maxval=1, default=0.34905555600815663)
    t1slope = ddm.Fittable(minval=0, maxval=3, default=1.9643425020687162)

    dipstart = ddm.Fittable(minval=-.4, maxval=0, default=-.2)
    dipstop = ddm.Fittable(minval=0, maxval=.5, default=.05)
    nondectime = ddm.Fittable(minval=0, maxval=.3, default=.1)
    detect = ddm.Fittable(minval=2, maxval=50, default=10)
    diptype = DIPTYPE
    dipparam = ddm.Fittable(minval=0, maxval=50) if diptype == 2 else 0
    pmixturecoef = ddm.Fittable(minval=0, maxval=.2, default=.03)
    rate = ddm.Fittable(minval=.1, maxval=10, default=1)
    m = ddm.Model(drift = DriftDip(snr=snr,
                                   noise=noise,
                                   t1=t1,
                                   t1slope=t1slope,
                                   leak=leak,
                                   maxcoh=70,
                                   leaktarget=x0,
                                   leaktargramp=leaktargramp,
                                   dipstart=dipstart,
                                   dipstop=dipstop,
                                   diptype=diptype,
                                   dipparam=dipparam,
                                   ),
                  noise = NoiseDip(noise=noise,
                                   t1=t1,
                                   t1slope=t1slope,
                                   dipstart=dipstart,
                                   dipstop=dipstop,
                                   diptype=diptype,
                                   ),
                  IC =     ICPoint(x0=x0),
                  bound = BoundDip(B=1,
                                   dipstart=dipstart,
                                   dipstop=dipstop,
                                   diptype=diptype
                                   ),
                  overlay=ddm.OverlayChain(overlays=[
                                    ddm.OverlayNonDecision(nondectime=nondectime),
                                           OverlayDipRatio(detect=detect,
                                                           diptype=diptype),
                                 ddm.OverlayPoissonMixture(pmixturecoef=pmixturecoef,
                                                           rate=rate)
                                            ]),
                  dx=0.002, dt=0.002, T_dur=3.0)

Optionally, we can run the model in parallel with 4 CPUs using:

ddm.set_N_cpus(4)

Finally, plot the model in a GUI interface:

pyddm.plot.model_gui(model=m, conditions={"coherence": [50, 53, 60, 70],
                                        "presample": [0, 400, 800],
                                        "highreward": [0, 1]})

Or, if running a Jupyter notebook:

pyddm.plot.model_gui_jupyter(model=m, conditions={"coherence": [50, 53, 60, 70],
                                                "presample": [0, 400, 800],
                                                "highreward": [0, 1]})

Voila, look at that! It’s a dip!