Source code for qtt.algorithms.markov_chain

""" Class to generate signals with continous-time Markov chains


@author: pieter.eendebak@gmail.com
"""

import itertools
import random
from dataclasses import dataclass
from typing import List, Optional, Union

import numpy as np
import scipy.linalg


def _solve_least_squares(a: np.ndarray, b: np.ndarray) -> np.ndarray:
    rcond = None
    solution = np.linalg.lstsq(a, b, rcond=rcond)[0]
    return solution


[docs]@dataclass class ChoiceGenerator: """ Class to generate random elements with weighted selection This is a replacement for random.choices that is efficient when a large number of choices has to be generated. Args: number_of_states: number of choices that has to be generated cum_weights (array[float]): cumulative probabilities of the choices block_size: size of blocks of choices to generate """ number_of_states: int cum_weights: np.ndarray block_size: int = 5000 def __post_init__(self): if not self.number_of_states == len(self.cum_weights): raise Exception( f'specification of cumulative weights (len {len(self.cum_weights)})' + ' does not match number of states {self.number_of_states}') self.rng = np.random.Generator(np.random.SFC64()) self._idx = 0 self._block: List[int] = self._generate_block().tolist() def _generate_block(self, size: Optional[int] = None) -> np.ndarray: if size is None: size = self.block_size else: self.block_size = size weights = np.concatenate(([self.cum_weights[0]], np.diff(self.cum_weights))) # type: ignore counts = np.random.multinomial(size, weights) block = np.concatenate(tuple(choice_idx * np.ones(c, dtype=int) for choice_idx, c in enumerate(counts))) self.rng.shuffle(block) return block
[docs] def generate_choice(self) -> int: """ Generate a single choice Returns: Integer in the range 0 to the number of states """ self._idx = self._idx + 1 if self._idx == self.block_size: self._idx = 0 self._block = self._generate_block().tolist() return self._block[self._idx]
[docs] def generate_choices(self, size: int) -> np.ndarray: """ Generate a specified number of choice Returns: Array with elements in the range 0 to the number of states """ data = self._generate_block(size) return data
[docs]class ContinuousTimeMarkovModel: def __init__(self, states: List[str], holding_parameters: Union[List[float], np.ndarray], jump_chain: np.ndarray): """ Class that represents a continuous-time Markov chain Args: states: list with names for the states holding_parameters: List with the holding parameters. The holding parameters determine the average time before the system will make a jump to a new state jump_chain: The jump chain or transition matrix. This matrix gives the probability for the system to jump from a state to one of the other states. The sum of the probabilities in each column must equal one. For an introduction to Markov chains see https://www.probabilitycourse.com/chapter11/11_3_1_introduction.php Also see: https://vknight.org/unpeudemath/code/2015/08/01/simulating_continuous_markov_chains.html """ self.states = states self.update_model(np.asarray(holding_parameters), jump_chain)
[docs] def update_model(self, holding_parameters: np.ndarray, jump_chain: np.ndarray): """ Update the model of the markov chain Args: holding_parameters: List with the holding parameters jump_chain: The jump chain or transition matrix For a detailed description of the parameters see the class documentation. """ self.holding_parameters = np.array(holding_parameters).flatten().reshape((-1, 1)) self.jump_chain = jump_chain self.generator_matrix = self._create_generator_matrix(self.holding_parameters, self.jump_chain) self._validity_check()
def _validity_check(self): if len(self.states) != len(self.jump_chain): raise AssertionError('States do not equal jump chain!') if not np.allclose(np.sum(self.jump_chain, axis=0), 1): raise AssertionError('Jump chain matrix should represent probabilities!') if np.all(self.holding_parameters <= 0): raise AssertionError('Not all holding parameter are bigger than zero!') @staticmethod def _create_generator_matrix(holding_parameters: np.ndarray, jump_chain: np.ndarray) -> np.ndarray: generator_matrix = np.array(jump_chain, copy=True) for ii in range(generator_matrix.shape[0]): generator_matrix[:, ii] = holding_parameters[ii] * jump_chain[:, ii] for ii in range(generator_matrix.shape[0]): generator_matrix[ii, ii] = -holding_parameters[ii] return generator_matrix
[docs] def number_of_states(self) -> int: """ Return the number of states in the model """ return len(self.states)
[docs] def transition_matrix(self, delta_time: float) -> np.ndarray: """ Return the transition matrix for a specified amount of time """ transition_matrix = scipy.linalg.expm(delta_time * self.generator_matrix) return transition_matrix
def __repr__(self): return "%s(id=0x%x, states=%s, generator=%s)" % (self.__class__.__name__, id(self), self.states, self.generator_matrix)
[docs] def stationary_distribution_direct(self) -> np.ndarray: """ Return the stationary distribution of the model The calculation method is taken from: https://www.probabilitycourse.com/chapter11/11_3_2_stationary_and_limiting_distributions.php, Theorem 11.3 """ pi_tilde = self.stationary_distribution_discrete(self.jump_chain) norm = np.sum(pi_tilde / self.holding_parameters) stationary_distribution = (pi_tilde / self.holding_parameters) / norm return stationary_distribution
[docs] def stationary_distribution(self) -> np.ndarray: """ Return the stationary distribution of the model The calculation method is taken from: https://vknight.org/unpeudemath/code/2015/08/01/simulating_continuous_markov_chains.html """ Q = self.generator_matrix n = Q.shape[0] A = np.vstack((Q, np.ones((1, n)))) B = np.zeros((n + 1, 1)) B[-1] = 1 stationary_distribution = _solve_least_squares(A, B) return stationary_distribution
[docs] @staticmethod def stationary_distribution_discrete(jump_chain) -> np.ndarray: """ Return the stationary distrubution for a Markov chain """ n = jump_chain.shape[0] A = np.vstack((jump_chain - np.eye(n), np.ones((1, n)))) B = np.zeros((n + 1, 1)) B[-1] = 1 pi = _solve_least_squares(A, B) return pi
[docs] def generate_sequence(self, length: int, delta_time: float, initial_state: Union[None, int, np.ndarray] = None, generators: Optional[List[ChoiceGenerator]] = None) -> np.ndarray: """ Generate a random sequence with the model Args: length: number of elements in the sequence delta_time: time step to be used. This is equal to one over the samplerate. initial_state: This parameter determines how the first element of the generated sequence is chosen. If an int, then use that state is initial state. If None then take a random state weighted by the stationary distribution. If the initial_state is a list, then the list is interpreted as a probability distribution and the first element is sampled from all possible states according to the distribution specified. generators: Optional list of generators to use Returns: Array with generated sequence """ number_of_states = self.number_of_states() if initial_state is None: initial_state = self.stationary_distribution().flatten().tolist() initial_state = random.choices(range(number_of_states), weights=initial_state, k=1)[0] # type: ignore elif isinstance(initial_state, (list, np.ndarray, tuple)): initial_state = np.asarray(initial_state).flatten().tolist() initial_state = random.choices(range(number_of_states), weights=initial_state, k=1)[0] # type: ignore if generators is None: generators = self._create_generators(delta_time) sequence = np.zeros(length, dtype=int) sequence[0] = value = initial_state for i in range(1, sequence.size): # value points to the genererator of the previous element in the sequence, and is then updated directly sequence[i] = value = generators[value].generate_choice() return sequence
def _create_generators(self, delta_time: float): number_of_states = self.number_of_states() P = self.transition_matrix(delta_time) generators: List[Optional[ChoiceGenerator]] = [None] * number_of_states # pre-calculate cumulative weights for jj in range(number_of_states): cum_weights = np.array(list(itertools.accumulate(P[:, jj]))) generators[jj] = ChoiceGenerator(number_of_states, cum_weights) return generators
[docs] def generate_sequences(self, length: int, delta_time: float = 1, initial_state: Union[None, int, np.ndarray] = None, number_of_sequences: int = 1) -> np.ndarray: """ Generate multiple random sequences with the model Args: length: number of elements in the sequence delta_time: time step to be used initial_state: This parameter determines how the first element of the generated sequences are chosen. The parameter is passed to the :func:`generate_sequence` method. number_of_sequences : Specified the number of sequences to generate Returns: Array with generated sequences """ if initial_state is None: initial_state = self.stationary_distribution() sequences = np.zeros((number_of_sequences, length), dtype=int) generators = self._create_generators(delta_time) for n in range(number_of_sequences): sequences[n] = self.generate_sequence(length, delta_time, initial_state, generators=generators) return sequences
[docs]def generate_traces(markov_model: ContinuousTimeMarkovModel, std_gaussian_noise: float = 1, state_mapping: Optional[np.ndarray] = None, *args, **kwargs): """ Generate traces for a continuous-time Markov model with added noise Args: markov_model: model to use for generation of traces std_gaussian_noise: standard deviation of Gaussian noise to add to the output signal state_mapping: If not None, replace each state in the generated trace by the corresponding element in the array *args, **kwargs: passed to the `generate_sequences` function of the model The traces are generated by the `generate_sequences` method from the model. """ traces = np.array(markov_model.generate_sequences(*args, **kwargs)) if state_mapping is not None: traces = np.array(state_mapping)[traces] if std_gaussian_noise != 0: traces = traces + np.random.normal(0, std_gaussian_noise, traces.size).reshape(traces.shape) return traces