6
\$\begingroup\$

I am trying to write a python implementation of Euler-Maruyama and Milstein schemes for numerically solving stochastic differential equations. The pseudo-code for the algorithms is in the Wikipedia links.

Would you review my code to see if it can be improved and is closer to production-quality? I've tried to use snake_style convention for all variable names.

from dataclasses import dataclass
from abc import ABC, abstractmethod
from typing import Callable, Optional
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

sns.set_style("whitegrid")

@dataclass
class SDE:
    """
    An abstraction for a stochastic differential equation
    """

    initial_condition: float
    drift: Callable[[float, float], float]
    vol: Callable[[float, float], float]
    dvol_dx: Optional[Callable[[float, float], float]]

@dataclass
class Solver(ABC):
    """
    An abstract base class for all numerical schemes
    """

    t_start: float = 0.0
    t_end: float = 1.0
    step_size: float = 0.001
    num_paths: int = 10

    def __post_init__(self):
        self.iter = 0
        self.x_curr = np.zeros((self.num_paths,))
        self.num_steps = int((self.t_end - self.t_start) / self.step_size)
        self.times = np.linspace(self.t_start, self.t_end, self.num_steps + 1)
        self.brownian_increments = np.sqrt(self.step_size) * np.random.standard_normal(
            size=(self.num_paths, self.num_steps)
        )
        self.brownian = np.cumsum(self.brownian_increments, axis=1)
        self.brownian = np.concatenate(
            [np.zeros(shape=(self.num_paths, 1)), self.brownian], axis=1
        )
        self.solution = []

    @abstractmethod
    def iterate(self):
        """
        Compute the next iterate X(n+1)
        """

    def solve(self, sde: SDE):
        """
        Solve the SDE
        """
        self.x_curr = np.full(shape=(self.num_paths,), fill_value=sde.initial_condition)
        self.solution = [self.x_curr.copy()]

        while self.iter < self.num_steps:
            self.solution.append(self.iterate(sde))
            self.iter += 1

        return np.array(self.solution).transpose()

    def reset(self):
        self.__post_init__()

@dataclass
class Milstein(Solver):
    """
    Numerical solver for a stochastic differential equation(SDE) using
    the Euler-Maruyama method.

    Consider an SDE of the form :

    dX_t = mu(t,X_t) dt + sigma(t,X_t) dB_t

    with initial condition X_0 = x_0

    The solution to the SDE can be computed using the increments

    X_{n+1} - X_n = mu(n,X_n)(t_{n+1}-t_n) + sigma(n,X_n)(B(n+1)-B(n))
    + 0.5 * sigma(n,X_n) * sigma'(n,X_n) * ((B(n+1) - B(n))**2 - (t_{n+1} - t_n))

    """

    def iterate(self, sde: SDE):
        """
        Generate the next iterate X(n+1)
        """

        mu_n = np.array([sde.drift(self.times[self.iter], x) for x in self.x_curr])
        sigma_n = np.array([sde.vol(self.times[self.iter], x) for x in self.x_curr])
        dvol_dx_n = np.array(
            [sde.dvol_dx(self.times[self.iter], x) for x in self.x_curr]
        )

        d_brownian = self.brownian[:, self.iter + 1] - self.brownian[:, self.iter]

        self.x_curr += (
            mu_n * self.step_size
            + sigma_n * d_brownian
            + 0.5 * sigma_n * dvol_dx_n * (d_brownian**2 - self.step_size)
        )

        return self.x_curr.copy()

@dataclass
class EulerMaruyama(Solver):
    """
    Numerical solver for a stochastic differential equation(SDE) using
    the Euler-Maruyama method.

    Consider an SDE of the form :

    dX_t = mu(t,X_t) dt + sigma(t,X_t) dB_t

    with initial condition X_0 = x_0

    The solution to the SDE can be computed using the increments

    X_{n+1} - X_n = mu(n,X_n)(t_{n+1}-t_n) + sigma(n,X_n)(B(n+1)-B(n))

    """

    def iterate(self, sde: SDE):
        """
        Generate the next iterate X(n+1)
        """

        mu_n = np.array([sde.drift(self.times[self.iter], x) for x in self.x_curr])
        sigma_n = np.array([sde.vol(self.times[self.iter], x) for x in self.x_curr])

        d_brownian = self.brownian[:, self.iter + 1] - self.brownian[:, self.iter]

        self.x_curr += mu_n * self.step_size + sigma_n * d_brownian

        return self.x_curr.copy()

if __name__ == "__main__":
    # Let's solve the following SDE

    # dS_t = S_t dB_t + (mu + 1/2) S_t dt

    # where mu = 0. This has the known solution S_t = exp(B_t - t/2)

    gbm_sde = SDE(
        initial_condition=1.0,
        drift = lambda t, s_t : 0.50 * s_t,
        vol = lambda t, s_t : s_t,
        dvol_dx = lambda t, s_t : 1
    )

    t = np.linspace(start=0.0, stop=1.0, num=1001)

    euler = EulerMaruyama()
    solution = euler.solve(gbm_sde)

    plt.xlabel(r'Time $t$')
    plt.ylabel(r'$S(t)$')
    plt.title(r'10 sample paths of Geometric Brownian Motion, $\mu=0.0$, Euler method')
    plt.grid(True)
    for i in range(10):
        plt.plot(t, solution[i])

    plt.show()

    milstein = Milstein()
    solution = milstein.solve(gbm_sde)

    plt.xlabel(r'Time $t$')
    plt.ylabel(r'$S(t)$')
    plt.title(r'10 sample paths of Geometric Brownian Motion, $\mu=0.0$, Milstein method')
    plt.grid(True)
    for i in range(10):
        plt.plot(t, solution[i])

    plt.show()
\$\endgroup\$
2
  • 1
    \$\begingroup\$ The immediate red flag is using so many list comprehensions with numpy code, since they are almost never actually needed. \$\endgroup\$
    – qwr
    Commented Jul 12 at 4:00
  • \$\begingroup\$ I think, I abused dataclass as well. dataclass is mainly used to represent data, and a long __post_init__() usually means code smell. Some of my classes are workers. \$\endgroup\$
    – Quasar
    Commented Jul 12 at 16:22

3 Answers 3

7
\$\begingroup\$
  1. Use NumPy vectorized operations instead of list comprehensions whenever possible. This can significantly improve performance, especially when dealing with large data. For instance, in the iterate functions of both EulerMaruyama and Milstein, you can replace the list comprehensions with NumPy vectorized operations to compute mu_n and sigma_n.

  2. Use type hints for function parameters and return types for better code readability and compatibility with statically-typed languages or IDEs. For example, in your current implementation, the drift, vol, and dvol_dx functions are expected to take two arguments (time and state), but you haven't specified this in the type hints.

  3. Use a logging library instead of printing debug information directly to the console. This will make it easier to manage and filter log messages, especially when dealing with large datasets or complex simulations. For instance, you can use Python's built-in logging module to configure different log levels and output formats.

  4. Use appropriate error handling mechanisms for function inputs or simulation scenarios that may raise exceptions. This will help prevent your code from crashing unexpectedly when encountering invalid input values or other errors. For example, you can use try-except blocks to handle potential errors and provide meaningful error messages.

  5. Consider implementing additional methods or functions to analyze the simulation results, such as computing statistics (e.g., mean, standard deviation), visualizing the sample paths, or plotting summary figures (e.g., histograms, density plots). This can help gain a deeper understanding of the simulated SDE and its behavior under different conditions.

Here's an example of how you might modify your EulerMaruyama class to use NumPy vectorized operations:

import numpy as np

class EulerMaruyama(Solver):
    def iterate(self, sde, state):
        mu_n = self.sde.drift(np.repeat(self.times[self.iter], len(state)), state)
        sigma_n = self.sde.vol(np.repeat(self.times[self.iter], len(state)), state)

        d_brownian = self.brownian[:, self.iter + 1] - self.brownian[:, self.iter]

        delta_t = np.ones((len(state), 1)) * self.step_size

        self.x_curr += np.hstack((mu_n, sigma_n * d_brownian + delta_t))

        return self.x_curr.copy()

In this example, the EulerMaruyama class now uses NumPy's vectorized operations (repeat, hstack) to compute mu_n and sigma_n for all states in parallel instead of using a list comprehension.

\$\endgroup\$
4
  • 1
    \$\begingroup\$ Re #2: drift, vol, and dvol_dx are type-hinted in OP's implementation. Please clarify what you mean. \$\endgroup\$
    – tdy
    Commented Jul 10 at 16:20
  • \$\begingroup\$ Re #3: I don't see any debugging info being printed in OP's implementation. Please clarify. \$\endgroup\$
    – tdy
    Commented Jul 10 at 16:21
  • \$\begingroup\$ Re #5: Please clarify what you mean that OP should consider visualizing the sample paths. OP already plotted the sample paths as solution[i] vs t. \$\endgroup\$
    – tdy
    Commented Jul 10 at 17:03
  • 3
    \$\begingroup\$ @tdy this smells of an AI generated answer based on the writing style and new user. \$\endgroup\$
    – qwr
    Commented Jul 12 at 3:57
9
\$\begingroup\$

This looks pretty good to me.

The only thing I would comment is that:

mu_n = np.array([sde.drift(self.times[self.iter], x) for x in self.x_curr])
sigma_n = np.array([sde.vol(self.times[self.iter], x) for x in self.x_curr])
dvol_dx_n = np.array([sde.dvol_dx(self.times[self.iter], x) for x in self.x_curr])

are not vectorised, and are therefore potentially a little or a lot slower than they could be.

Consider the following example:

def slow():
    a = np.ones(1000)
    f = lambda x: 0.5 * x
    b = np.array([f(x) for x in a])
    return b

%timeit slow()
# 185 μs ± 11.8 μs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)

def fast():
    a = np.ones(1000)
    f = lambda x: 0.5 * x
    b = f(a)
    return b

%timeit fast()
# 3.82 μs ± 261 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops each)

Your class iterate methods enforce that the underlying function is not vectorised and therefore enforces slowness.

Try expecting a vectorised solution and allowing the initialiser to handle cases where vectorisation is not possible, since you gain flexibility this way in the scenario where it is not possible to vectorise.

mu_n = sde.drift(self.times[self.iter], self.x_curr)
sigma_n = sde.vol(self.times[self.iter], self.x_curr)
dvol_dx_n = sde.dvol_dx(self.times[self.iter], self.x_curr)

where

# FAST
gbm_sde = SDE(
    initial_condition=1.0,
    drift = lambda t, s_t : 0.50 * s_t,
    vol = lambda t, s_t : s_t,
    dvol_dx = lambda t, s_t : np.ones(len(s_t)),
)

# SLOW
gbm_sde = SDE(
    initial_condition=1.0,
    drift = lambda t, s_t : np.array([0.50 * x for x in s_t]),
    vol = lambda t, s_t : np.array([x for x in s_t]),
    dvol_dx = lambda t, s_t : np.array([1.0 for x in s_t]),
)
\$\endgroup\$
8
\$\begingroup\$
self.num_steps = int((self.t_end - self.t_start) / self.step_size)
self.times = np.linspace(self.t_start, self.t_end, self.num_steps + 1)

You may not reach t_end. If the step-size is fixed, why not use num_steps as the parameter?

self.brownian_increments = np.sqrt(self.step_size) * np.random.standard_normal(
    size=(self.num_paths, self.num_steps)
)
self.brownian = np.cumsum(self.brownian_increments, axis=1)
self.brownian = np.concatenate(
    [np.zeros(shape=(self.num_paths, 1)), self.brownian], axis=1
)

You don't really use either of these, only through

d_brownian = self.brownian[:, self.iter + 1] - self.brownian[:, self.iter]

at each time-step. I'd calculate it once at the start, and store that. In fact, the differences are the Gaussians you have already computed:

self.d_brownian = np.sqrt(self.step_size) * np.random.standard_normal(
        size=(self.num_paths, self.num_steps)
)

If the number of steps is known apriori, then there is no reason for self.solution to be a list, instead of a numpy array that you successively fill.

self.solution = np.zeros((self.num_paths, self.num_steps))

It would be faster if your functions like sde.drift were assumed to be vectorized, returning numpy arrays, and then

mu_n = np.array([sde.drift(self.times[self.iter], x) for x in self.x_curr])

would become

mu_n = sde.drift(self.times[self.iter], x)

This is minor, but

for i in range(10):
    plt.plot(t, solution[i])

could be

plt.plot(t, solution.T)

I also see you doing

self.x_curr += ...

right before a

return self.x_curr.copy()

which brings me to suggest that you restructure your code.

I like the SDE class. I'd either remove the initial_condition from it, or also include the t_start, t_end values. All these three are properties of the initial value problem, not really the properties of the stochastic diff.eq. itself.

We are perhaps in the territory of what looks subjectively cleaner, so I'm just going to offer a rewrite.

from dataclasses import dataclass
from abc import ABC, abstractmethod
from typing import Callable, Optional
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

sns.set_style("whitegrid")

# for more descriptive type hints
T = np.array
X = np.ndarray

@dataclass
class SIVP:
    """
    An abstraction for a stochastic initial value problem
    """

    t_start: float
    t_end: float
    initial_condition: float
    
    drift: Callable[[float, float], float]
    vol: Callable[[float, float], float]
    dvol_dx: Optional[Callable[[float, float], float]]

@dataclass
class Solver(ABC):
    """
    An abstract base class for all numerical schemes
    """

    num_steps: int = 1000
    num_paths: int = 10

    @staticmethod
    @abstractmethod
    def time_step(dt: float, t: T, x: X, dw: X) -> X:
        """
        Compute the next iterate X(n+1)
        """

    def solve(self, sivp: SIVP) -> (T, X):
        """
        Solve the SDE
        """

        # stepsize
        dt = (sivp.t_end - sivp.t_start) / self.num_steps

        # Gaussian increments
        dws = np.random.normal(
            scale=np.sqrt(dt),
            size=(self.num_steps, self.num_paths)
        )
        
        # times
        ts = np.linspace(sivp.t_start, sivp.t_end, self.num_steps + 1)

        # trajectories
        xs = np.zeros((self.num_steps + 1, self.num_paths))
        xs[0] = sivp.initial_condition

        #time-stepping
        for step_ix in range(self.num_steps):
            xs[step_ix + 1] = self.time_step(sivp, dt, ts[step_ix], xs[step_ix], dws[step_ix])

        return ts, xs


@dataclass
class Milstein(Solver):
    """
    Numerical solver for a stochastic differential equation(SDE) using
    the Euler-Maruyama method.

    Consider an SDE of the form :

    dX_t = mu(t,X_t) dt + sigma(t,X_t) dB_t

    with initial condition X_0 = x_0

    The solution to the SDE can be computed using the increments

    X_{n+1} - X_n = mu(n,X_n)(t_{n+1}-t_n) + sigma(n,X_n)(B(n+1)-B(n))
    + 0.5 * sigma(n,X_n) * sigma'(n,X_n) * ((B(n+1) - B(n))**2 - (t_{n+1} - t_n))

    """

    @staticmethod
    def time_step(sivp: SIVP, dt: float, t: T, x: X, dw: X) -> X:
        """
        Generate the next iterate X(n+1)
        """

        mu_n = sivp.drift(t, x)
        sigma_n = sivp.vol(t, x)
        dvol_dx_n = sivp.dvol_dx(t, x)

        dx = (
            dt * mu_n
            + sigma_n * dw
            + 0.5 * sigma_n * dvol_dx_n * (dw**2 - dt)
        )
        return x + dx

@dataclass
class EulerMaruyama(Solver):
    """
    Numerical solver for a stochastic differential equation(SDE) using
    the Euler-Maruyama method.

    Consider an SDE of the form :

    dX_t = mu(t,X_t) dt + sigma(t,X_t) dB_t

    with initial condition X_0 = x_0

    The solution to the SDE can be computed using the increments

    X_{n+1} - X_n = mu(n,X_n)(t_{n+1}-t_n) + sigma(n,X_n)(B(n+1)-B(n))

    """
    
    @staticmethod
    def time_step(sivp: SIVP, dt: float, t: T, x: X, dw: X) -> X:
        """
        Generate the next iterate X(n+1)
        """

        mu_n = sivp.drift(t, x)
        sigma_n = sivp.vol(t, x)
        dvol_dx_n = sivp.dvol_dx(t, x)

        dx = dt * mu_n + sigma_n * dw
        return x + dx

# Let's solve the following SDE
# dS_t = S_t dB_t + (mu + 1/2) S_t dt
# where mu = 0. This has the known solution S_t = exp(B_t - t/2)

gbm_sde = SIVP(
    t_start = 0.0,
    t_end = 1.0,
    initial_condition = 1.0,
    drift = lambda t, s_t : 0.50 * s_t,
    vol = lambda t, s_t : s_t,
    dvol_dx = lambda t, s_t: 1 + 0*s_t
)

ts, xs = EulerMaruyama().solve(gbm_sde)

plt.xlabel(r'Time $t$')
plt.ylabel(r'$S(t)$')
plt.title(r'10 sample paths of Geometric Brownian Motion, $\mu=0.0$, Euler method')
plt.grid(True)

plt.plot(ts, xs)

plt.show()

ts, xs = Milstein().solve(gbm_sde)

plt.xlabel(r'Time $t$')
plt.ylabel(r'$S(t)$')
plt.title(r'10 sample paths of Geometric Brownian Motion, $\mu=0.0$, Milstein method')
plt.grid(True)

plt.plot(ts, xs)

plt.show()
\$\endgroup\$

Not the answer you're looking for? Browse other questions tagged or ask your own question.