Source code for tempnet.synth_temp_network

"""#
# flow stability
#
# Copyright (C) 2021 Alexandre Bovet <alexandre.bovet@maths.ox.ac.uk>
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation; either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.


"""
from typing import Union

from queue import Empty, PriorityQueue

import numpy as np
from scipy.sparse import lil_matrix
from scipy.stats import expon


[docs] def make_step_block_probs( deltat1: float, deltat2: float, m1: float = 1.0, p1: float = 1.0, ): """Return a time-dependent block-probability function for 3 groups. The returned function cycles through three phases where different community pairs have elevated cross-community interaction probability. Parameters ---------- deltat1 : float Duration of each *within-community* (identity-block) phase. deltat2 : float Duration of each *cross-community exchange* phase. m1 : float Within-community interaction probability (default 1.0). p1 : float Cross-community interaction probability for the active pair (default 1.0). Returns ------- block_mod_func : callable A function ``block_mod_func(t)`` that accepts a float *t* and returns a 3×3 numpy array of group-level interaction probabilities. """ def block_mod_func(t: float) -> np.ndarray: m2 = (1 - m1) / 2 p2 = (1 - p1) ex12 = np.array([[p2, p1, 0], [p1, p2, 0], [0, 0, 0]]) ex23 = np.array([[0, 0, 0], [0, p2, p1], [0, p1, p2]]) ex13 = np.array([[p2, 0, p1], [0, 0, 0], [p1, 0, p2]]) within = np.array([[m1, m2, m2], [m2, m1, m2], [m2, m2, m1]]) if t >= 0 and t < deltat1: return within elif t >= deltat1 and t < deltat1 + deltat2: return ex12 elif t >= deltat1 + deltat2 and t < 2 * deltat1 + deltat2: return within elif t >= 2 * deltat1 + deltat2 and t < 2 * (deltat1 + deltat2): return ex23 elif (t >= 2 * (deltat1 + deltat2) and t < 2 * (deltat1 + deltat2) + deltat1): return within elif (t >= 2 * (deltat1 + deltat2) + deltat1 and t <= 3 * (deltat1 + deltat2)): return ex13 else: print( "Warning: t must be >=0 and <= 3*(deltat1+deltat2)," f" t is {t}" ) return within return block_mod_func
[docs] class Distro: """Random variables from distributions The loc and scale values are the default values used, but they can also be changed when calling `Distro.draw_val(loc,scale)` """ def __init__(self, loc:Union[float, int]=0.0, scale:Union[float, int]=1, dist_type:str="exponential"): """ Parameters ---------- loc: Location parameter value of a SciPy distribution object scale: Scale parameter value of a SciPy distribution object dist_type: The type of distribution that will be drawn from Currently, only `"exponential"` (the default) is supported """ self.loc=loc self.scale=scale self.dist_type=dist_type if dist_type == "exponential": self.draw = expon.rvs else: raise NotImplementedError("Not implemented distribution")
[docs] def draw_val(self, loc=None, scale=None): if loc is None: loc=self.loc if scale is None: scale=self.scale return self.draw(loc=loc, scale=scale)
[docs] class Individual: """Individual agent class. Parameters ---------- ID: int ID of the individual inter_distro_loc: float Location (mean) of the interactions (i.e. events) durations inter_distro_scale: float Scale (standard-deviation) of the interactions (i.e. events) durations inter_distro_type: string Type of distribution function of the interactions durations. Can be "exponential". inter_distro_mod_func: function function that take an argument `time` and returns a tuple `(loc, scale)` used as parameters for drawing time-dependent interaction durations. activ_distro_loc: float Location (mean) of the inter-activation (i.e. inter-events) durations activ_distro_scale: float Scale (standard-deviation) of the inter-activation (i.e. inter-events) durations activ_distro_type: string Type of distribution function of the inter-activation durations. Can be "exponential". activ_distro_mod_func: function function that take an argument `time` and returns a tuple `(loc, scale)` used as parameters for drawing time-dependent inter-activation durations. group: int ID of the group to which the individual belongs to. """ # these are class attributes used to keep track of parameters of all instances all_IDs = [] all_groups = [] def __init__(self, ID, inter_distro_loc=0, inter_distro_scale=1, inter_distro_type="exponential", inter_distro_mod_func=None, activ_distro_loc=0, activ_distro_scale=5, activ_distro_type="exponential", activ_distro_mod_func=None, group=0): # ID of the individual (must be unique) # group if not isinstance(ID,int): raise ValueError("ID must be an integer.") self.ID = ID Individual.all_IDs.append(ID) # distribution of interaction durations self.inter_distro = Distro(scale=inter_distro_scale, loc=inter_distro_loc, dist_type=inter_distro_type) # distribution of inter-activation times self.activ_distro = Distro(scale=activ_distro_scale, loc=activ_distro_loc, dist_type=activ_distro_type) # loc, scale modulation functions if inter_distro_mod_func is None: self.inter_distro_mod_func = lambda _ : (inter_distro_loc, inter_distro_scale) else: self.inter_distro_mod_func = inter_distro_mod_func if activ_distro_mod_func is None: self.activ_distro_mod_func = lambda _ : (activ_distro_loc, activ_distro_scale) else: self.activ_distro_mod_func = activ_distro_mod_func # group if not isinstance(group,int): raise ValueError("group must be an integer.") self.group = group Individual.all_groups.append(group) # initialize simulation time self.t = 0
[docs] def draw_inter_duration(self, time=None): """Draws a interaction duration time from `inter_distro`. If `time` is provided, computes the `loc` and `scale` parameters using `inter_distro_mod_func(time)`. Otherwise, `loc` and `scale` are taken as the initialized values. """ if time is None: return self.inter_distro.draw_val() else: loc, scale = self.inter_distro_mod_func(time) if scale == 0: raise ValueError("distribution scale cannot be zero.") return self.inter_distro.draw_val(loc=loc, scale=scale)
[docs] def draw_activ_time(self, time=None): """Draws a activation time from `activ_distro`. If `time` is provided, computes the `loc` and `scale` parameters using `activ_distro_mod_func(time)`. Otherwise, `loc` and `scale` are taken as the initialized values. """ if time is None: return self.activ_distro.draw_val() else: loc, scale = self.activ_distro_mod_func(time) if scale == 0: raise ValueError("distribution scale cannot be zero.") return self.activ_distro.draw_val(loc=loc, scale=scale)
[docs] class SynthTempNetwork: """SynthTempNetwork: a class for an agent based model generating a continuous time synthetic temporal network Alexandre Bovet 2019 Parameters ---------- individuals: list List of Individual instances, i.e. the nodes of the network. Individual have a group id. There are N individuals and Ngroups group. t_start: float Starting time of the simulation t_end: float Ending time of the simulation num_interactions_per_activation: int Number of interactions generated each time an individual is activated next_event_method: string Method to choose the individual to interact with. Can be - 'random_uniform' (default): uniform probability to choose any other individuals. - 'block_probs': probabilities given by a block matrix `inter_group_probs`. - 'block_probs_mod': probabilities given by a time-dependent function `block_prob_mod_func`. inter_group_probs: Ngroups x Ngroups numpy array Contains the probabilities of inter-group interactions. block_prob_mod_func: function Functions that depend on t such that `block_prob_mod_func(t)` returns an `inter_group_probs` matrix. Usage: ------ The simulation is run by calling `self.run()`. All the events are stored in 4 lists: `self.indiv_sources`, `self.indiv_targets`, `self.start_times` and `self.end_times`. """ def __init__(self, individuals, t_start: float = 0.0, t_end: float = 200.0, num_interactions_per_activation=1, next_event_method="random_uniform", inter_group_probs=None, block_prob_mod_func=None): # number of individuals self.N = len(individuals) # list of individuals self.individuals = individuals # list of individual IDs self.indiv_ids_list = [ind.ID for ind in individuals] if not all([i < self.N for i in self.indiv_ids_list]): raise ValueError("individual IDs must be from 0 to N-1") # array of individual IDs self.indiv_ids_array = np.array(self.indiv_ids_list, dtype=np.int32) try: assert np.unique(self.indiv_ids_array).size == self.N except Exception as e: print("Individuals ID must be unique.") raise e # individuals group list self.indiv_groups_list = [ind.group for ind in individuals] # individuals group array self.indiv_groups_array = np.array(self.indiv_groups_list, dtype=np.int32) # number of groups self.Ngroups = len(set(self.indiv_groups_list)) if not all([gid < self.Ngroups for gid in self.indiv_groups_list]): raise ValueError("group IDs must be from 0 to Ngroups-1") # group id to indiv ids dict self.group_to_ids = {g : np.where(self.indiv_groups_array == g)[0] \ for g in np.unique(self.indiv_groups_array)} # final and start time of the simulation self.t_end = t_end self.t_start = t_start # number of interactions per activations self.num_interactions_per_activation = num_interactions_per_activation # Priority queue for the discrete event simulation self.queue = PriorityQueue() # dictionary mapping indiv_id to their event in the Priority Queue # Each individual has at most one event in the queue self.event_mapper_activ = {} self.event_mapper_inter = {} # how events are chosen self.next_event_method = next_event_method # inter group interaction probabilities matrix # p[i,j] = prob of group i interacting with j self.inter_group_probs = inter_group_probs if isinstance(inter_group_probs, list): self.inter_group_probs = np.array(inter_group_probs) if next_event_method == "block_probs" and \ self.inter_group_probs.shape != (self.Ngroups, self.Ngroups): raise ValueError("inter_group_probs must have (Ngroups,Ngroups) shape" + \ 'for "block_probs" next_event_method') if next_event_method == "block_probs": # make sure inter_group_probs is properly normalized # i.e. sum_j p[i,j] = 1 for all i self.inter_group_probs = \ (self.inter_group_probs.T/self.inter_group_probs.sum(axis=1)).T if next_event_method == "block_probs_mod": if block_prob_mod_func is None: raise ValueError("`block_prob_mod_func` must be specified for the " + \ "method `block_prob_mod_func`.") else: self.block_prob_mod_func = block_prob_mod_func # events information : start time, length and box number # a list for each individual -> list of lists self.indiv_sources = [] self.indiv_targets = [] self.start_times = [] self.end_times = [] # instantaneous adjacency matrix self._A = lil_matrix((self.N, self.N), dtype=np.int32) # sparse matrix to store last interaction starting events self._last_times = lil_matrix((self.N, self.N), dtype=np.float64)
[docs] @staticmethod def Event(time, indiv_id, event_type="activation", partner=None, is_canceled=False): """Arguments: --------- - time: float, used to order the events in the priority queue - indiv_id: int, id of the individual - event_type: str, type of event, 'activation' (default) or 'interaction' - is_canceled : bool, =True if the event has been replaced and must be discarded This parameter is changed through the event_mapper dict. """ if event_type == "activation": return time, [indiv_id, event_type, is_canceled] if event_type == "interaction": return time, [indiv_id, event_type, partner, is_canceled] else: raise NotImplementedError(f"Unknown event_type {event_type}.")
[docs] def put_event(self, indiv_id, event_type="activation", partner=None, is_instantaneous=False): """Put an event in the priority queue accorind to the rules event_type can be 'activation' or 'interaction' """ if event_type == "activation": next_activation_time = self.individuals[indiv_id].draw_activ_time(self.t) event = self.Event(time=self.t + next_activation_time, indiv_id=indiv_id, event_type=event_type) # map event to indiv_id self.event_mapper_activ[indiv_id] = event if event_type == "interaction": if is_instantaneous: event_length = 0 else: event_length = self.individuals[indiv_id].draw_inter_duration(self.t) event = self.Event(time=self.t + event_length, indiv_id=indiv_id, event_type=event_type, partner=partner) self.event_mapper_inter[indiv_id] = event self.queue.put_nowait(event)
[docs] def get_new_partner(self, indiv_id, current_partner=None): """Compute the next partner""" if self.next_event_method == "random_uniform": # unallowed new partners (self+existing) un_partners = {indiv_id} for n in self._A[indiv_id,:].nonzero()[1]: un_partners.add(n) potential_partners = self.indiv_ids_array if set(potential_partners) == un_partners: new_partner = None else: new_partner = indiv_id while new_partner in un_partners: new_partner = \ np.random.choice(potential_partners) elif self.next_event_method == "random_uniform_within_group": # unallowed new partners (self+existing) un_partners = {indiv_id} for n in self._A[indiv_id,:].nonzero()[1]: un_partners.add(n) potential_partners = self.group_to_ids[self.indiv_groups_array[indiv_id]] if set(potential_partners) == un_partners: new_partner = None else: new_partner = indiv_id while new_partner in un_partners: new_partner = \ np.random.choice(potential_partners) elif self.next_event_method == "block_probs": # block model type interactions indiv_group_id = self.indiv_groups_array[indiv_id] # draw target interacting group #prob of picking groups p = self.inter_group_probs[indiv_group_id,:] if p.sum() == 0: # no possible partners new_partner = None else: target_group = np.random.choice(np.arange(self.Ngroups), p=p) # unallowed new partners (self+existing) un_partners = {indiv_id} for n in self._A[indiv_id,:].nonzero()[1]: un_partners.add(n) potential_partners = self.group_to_ids[target_group] if (set(potential_partners).union(set([indiv_id]))).issubset(un_partners): new_partner = None else: new_partner = indiv_id while new_partner in un_partners: new_partner = \ np.random.choice(potential_partners) elif self.next_event_method == "block_probs_mod": # block model type interactions with time dependence indiv_group_id = self.indiv_groups_array[indiv_id] inter_group_probs = self.block_prob_mod_func(self.t) # draw target interacting group #prob of picking groups p = inter_group_probs[indiv_group_id,:] if p.sum() == 0: # no possible partners new_partner = None else: target_group = np.random.choice(np.arange(self.Ngroups), p=p) # unallowed new partners (self+existing) un_partners = {indiv_id} for n in self._A[indiv_id,:].nonzero()[1]: un_partners.add(n) potential_partners = self.group_to_ids[target_group] if (set(potential_partners).union(set([indiv_id]))).issubset(un_partners): new_partner = None else: new_partner = indiv_id while new_partner in un_partners: new_partner = \ np.random.choice(potential_partners) else: raise NotImplementedError("next_event_method :" +\ f"{self.next_event_method} is not implemented yet.") return new_partner
[docs] def initialize_events(self): """Put one activation event in the queue for each individual""" for indiv_id in self.indiv_ids_list: self.put_event(indiv_id, event_type="activation")
[docs] def run(self, save_all_states=False, save_dt_states=False, dt=10.0, verbose=False): """Run the simulation Parameters ---------- save_all_states : bool saves the positions in `all_states` for each new event save_dt_states : bool saves the positions in `saved_states` at a constant frequency given by `dt` dt : float record period for `saved_states' verbose : bool verbose mode shows the progress """ # time interval at which the data is recorded self.dt= dt # simulation time self.t=self.t_start self._t_next_bin = self.t + self.dt # initialize the simulation self.initialize_events() # save initial conditions if save_all_states: # save positions at every event time self.all_states = dict() self.all_states[self.t] = self._A.copy() if save_dt_states: # save positions every dt self.saved_states = dict() self.saved_states[self.t] = self._A.copy() # advance the simulation while self.t < self.t_end: try: prev_time = self.t (time, event) = self.queue.get_nowait() if verbose: print("treating event : ") print((time, event)) print("--") if not event[-1]: # not is_canceled self.t = time if event[1] == "activation": [indiv_id, _, _] = event # put next event in the queue self.put_event(indiv_id, event_type="activation") for _ in range(self.num_interactions_per_activation): # pick new partner for interaction partner = self.get_new_partner(indiv_id) if verbose>10: print("new interaction between : ", indiv_id, " and ", partner) print("starting at ", self.t) print("--") if partner is not None: #update A and _last_times self._A[indiv_id,partner] = 1 self._last_times[indiv_id,partner] = self.t self.put_event(indiv_id, event_type="interaction", partner=partner) else: pass # if partner is None, continue activations but # there is no interaction elif event[1] == "interaction": # end of an interaction [indiv_id, _, partner, _] = event #update A self._A[indiv_id,partner] = 0 # starting time of this event event_start = self._last_times[indiv_id,partner] # erase last_time self._last_times[indiv_id,partner] = 0 # add new event to lists self.indiv_sources.append(indiv_id) self.indiv_targets.append(partner) self.start_times.append(event_start) self.end_times.append(self.t) # if the simulation time has advanced if self.t > prev_time: if save_all_states: # save positions at every new times self.all_states[self.t] = self._A.copy() if save_dt_states: # save positions every dt if self.t >= self._t_next_bin: # save positions self.saved_states[self.t] = self._A.copy() self._t_next_bin += self.dt if verbose: print("t = ", self.t) except Empty: print("Priority queue is empty") break