Source code for qtt.algorithms.markov_chain

# -*- coding: utf-8 -*-
""" Class to generate signals with continous-time Markov chains


@author: pieter.eendebak@gmail.com
"""

# %%
import numpy as np
import random
import scipy.linalg
import itertools


def _solve_least_squares(a, b):
    rcond = None
    solution = np.linalg.lstsq(a, b, rcond=rcond)[0]
    return solution


[docs]class ChoiceGenerator: """ Class to generate random elements with weighted selection """ def __init__(self, number_of_states, cum_weights, block_size=5000): """ Class to generate random elements with weighted selection This is a replacement for random.choices that is efficient when a large number of choices has to be generated. Args: number_of_states (int): number of choices that has to be generated cum_weights (array[float]): cummulative probabilities of the choices block_size (int): size of blocks of choices to generate """ if not number_of_states == len(cum_weights): raise Exception('specification of cummulative weights does not match number of states') self.number_of_states = number_of_states self._idx = 0 self._block_size = block_size self.cum_weights = cum_weights self._generate_block() def _generate_block(self): values = np.random.rand(self._block_size, ) counts, _ = np.histogram(values, [0] + list(self.cum_weights)) self._block = np.hstack(tuple([choice_idx * np.ones(c, dtype=int) for choice_idx, c in enumerate(counts)])) np.random.shuffle(self._block)
[docs] def generate_choice(self): """ Generate a choice Returns: int: integer in the range 0 to the number of states """ self._idx = self._idx + 1 if self._idx == self._block_size: self._idx = 0 self._generate_block() return self._block[self._idx]
[docs]class ContinuousTimeMarkovModel: def __init__(self, states, holding_parameters, jump_chain): """ Class that represents a continous-time Markov chain Args: states (str[]): list with names for the states holding_parameters(float[]): List with the holding parameters. The holding parameters determine the average time before the system will make a jump to a new state jump_chain (array): The jump chain or transition matrix. This matrix gives the probability for the system to jump from a state to one of the other states. The sum of the probabilities in each column must equal one. For an introduction to Markov chains see https://www.probabilitycourse.com/chapter11/11_3_1_introduction.php Also see: https://vknight.org/unpeudemath/code/2015/08/01/simulating_continuous_markov_chains.html """ self.states = states self.update_model(holding_parameters, jump_chain)
[docs] def update_model(self, holding_parameters, jump_chain): """ Update the model of the markov chain Args: holding_parameters: List with the holding parameters jump_chain: The jump chain or transition matrix For a detailed description of the parameters see the class documentation. """ self.holding_parameters = np.array(holding_parameters).flatten().reshape((-1, 1)) self.jump_chain = jump_chain self.generator_matrix = self._create_generator_matrix(self.holding_parameters, self.jump_chain) self._validity_check()
def _validity_check(self): if len(self.states) != len(self.jump_chain): raise AssertionError('States do not equal jump chain!') if not np.allclose(np.sum(self.jump_chain, axis=0), 1): raise AssertionError('Jump chain matrix should represent probabilities!') if np.all(self.holding_parameters <= 0): raise AssertionError('Not all holding parameter are bigger than zero!') @staticmethod def _create_generator_matrix(holding_parameters, jump_chain): generator_matrix = np.array(jump_chain, copy=True) for ii in range(generator_matrix.shape[0]): generator_matrix[:, ii] = holding_parameters[ii] * jump_chain[:, ii] for ii in range(generator_matrix.shape[0]): generator_matrix[ii, ii] = -holding_parameters[ii] return generator_matrix
[docs] def number_of_states(self): """ Return the number of states in the model """ return len(self.states)
[docs] def transition_matrix(self, delta_time): """ Return the transition matrix for a specified amount of time """ transition_matrix = scipy.linalg.expm(delta_time * self.generator_matrix) return transition_matrix
def __repr__(self): return "%s(id=0x%x, states=%s, generator=%s)" % (self.__class__.__name__, id(self), self.states, self.generator_matrix)
[docs] def stationary_distribution_direct(self): """ Return the stationary distribution of the model The calculation method is taken from: https://www.probabilitycourse.com/chapter11/11_3_2_stationary_and_limiting_distributions.php, Theorem 11.3 """ pi_tilde = self.stationary_distribution_discrete(self.jump_chain) norm = np.sum((pi_tilde / self.holding_parameters)) stationary_distribution = (pi_tilde / self.holding_parameters) / norm return stationary_distribution
[docs] def stationary_distribution(self): """ Return the stationary distribution of the model The calculation method is taken from: https://vknight.org/unpeudemath/code/2015/08/01/simulating_continuous_markov_chains.html """ Q = self.generator_matrix n = Q.shape[0] A = np.vstack((Q, np.ones((1, n)))) B = np.zeros((n + 1, 1)) B[-1] = 1 stationary_distribution = _solve_least_squares(A, B) return stationary_distribution
[docs] @staticmethod def stationary_distribution_discrete(jump_chain): """ Return the stationary distrubution for a Markov chain """ n = jump_chain.shape[0] A = np.vstack((jump_chain - np.eye(n), np.ones((1, n)))) B = np.zeros((n + 1, 1)) B[-1] = 1 pi = _solve_least_squares(A, B) return pi
[docs] def generate_sequence(self, length, delta_time, initial_state=None): """ Generate a random sequence with the model Args: length (int): number of elements in the sequence delta_time (float): time step to be used. This is equal to one over the samplerate. initial_state (None or int or list): This parameter determines how the first element of the generated sequence is chosen. If an int, then use that state is initial state. If None then take a random state weighted by the stationary distribution. If the initial_state is a list, then the list is interpreted as a probability distribution and the first element is samples from all possible states according to the distribution specified. Returns: array : generated sequence """ number_of_states = self.number_of_states() if initial_state is None: initial_state = self.stationary_distribution() initial_state = random.choices(range(number_of_states), weights=initial_state, k=1)[0] elif isinstance(initial_state, (list, np.ndarray, tuple)): initial_state = random.choices(range(number_of_states), weights=initial_state, k=1)[0] P = self.transition_matrix(delta_time) sequence = np.zeros(length, dtype=int) sequence[0] = initial_state # pre-calculate cummulative weights generators = [None] * number_of_states for jj in range(number_of_states): cum_weights = list(itertools.accumulate(P[:, jj])) generators[jj] = ChoiceGenerator(number_of_states, cum_weights) for i in range(1, sequence.size): sequence[i] = generators[sequence[i - 1]].generate_choice() return sequence
[docs] def generate_sequences(self, length, delta_time=1, initial_state=None, number_of_sequences=1): """ Generate multiple random sequences with the model Args: length (int): number of elements in the sequence delta_time (float): time step to be used initial_state (None or int or list): This parameter determines how the first element of the generated sequences are chosen. The parameter is passed to the :func:`generate_sequence` method. number_of_sequences (int): Specified the number of sequences to generate Returns: array : generated sequences """ if initial_state is None: initial_state = self.stationary_distribution() sequences = np.zeros((number_of_sequences, length), dtype=int) for n in range(number_of_sequences): sequences[n] = self.generate_sequence(length, delta_time, initial_state) return sequences
[docs]def generate_traces(markov_model, std_gaussian_noise=1, state_mapping=None, *args, **kwargs): """ Generate traces for a continuous-time Markov model with added noise Args: markov_model (ContinuousTimeMarkovModel): model to use for generation of traces std_gaussian_noise (float): standard deviation of Gaussian noise to add to the output signal state_mapping (None or array): If not None, replace each state in the generated trace by the corresponding element in the array *args, **kwargs: passed to the `generate_sequences` function of the model The traces are generated by the generate_sequences method from the model. """ traces = np.array(markov_model.generate_sequences(*args, **kwargs)) if state_mapping is not None: traces = np.array(state_mapping)[traces] if std_gaussian_noise != 0: traces = traces + np.random.normal(0, std_gaussian_noise, traces.size).reshape(traces.shape) return traces