"""#
# flow stability
#
# Copyright (C) 2021 Alexandre Bovet <alexandre.bovet@maths.ox.ac.uk>
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation; either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
import gzip
import os
import pickle
import time
from dataclasses import dataclass
from functools import partial
import numpy as np
import pandas as pd
from pathlib import Path
from scipy.sparse import (
coo_matrix,
csc_matrix,
csr_matrix,
diags,
dok_matrix,
eye,
isspmatrix,
isspmatrix_csr,
lil_matrix,
)
from scipy.sparse.csgraph import connected_components
from scipy.sparse.linalg import eigsh, expm
from .parallel_expm import compute_subspace_expm_parallel
from stochmat import inplace_csr_row_normalize, SparseStochMat
from .logger import get_logger
# get the logger
logger = get_logger()
@dataclass
class _LaplacianState:
"""Mutable buffers shared by ``compute_laplacian_matrices`` and its
dynamics-specific hooks (``_laplacian_prewarm``,
``_laplacian_on_event_end``, ``_laplacian_step_end``).
"""
A: object # adjacency buffer (lil_matrix or dok_matrix)
S: object # self-loop diagonal (csc)
Dm1: object # inverse-degree diagonal (csc)
degrees: np.ndarray
[docs]
class ContTempNetwork:
"""Continuous time temporal network
Parameters
----------
source_nodes: Python list
List of source nodes of each event, ordered according to
`starting_times`.
target_nodes: Python list
List of target nodes of each event.
starting_times: Python list
List of starting times of each event.
ending_times: Python list
List of ending times of each event.
extra_attrs: Dict
Extra event attributes. Must be given in a dict with
{attr_name: list_of_values}, where list_of_values has the same order
and length as `source_nodes`.
relabel_nodes: boolean
Relabel nodes from 0 to num_nodes and save original labels in
self.node_to_label_dict. Default is `True`
reset_event_table_index: boolean
Reset the index of the `events_table` DataFrame. Default is `True`.
node_to_label_dict: Python dict
If `relabel_nodes` is `False, this can be used to save the original
labels of the nodes.
merge_overlapping_events: boolean
Check for overlapping events (between the same pair of nodes)
and merges them. Default is `False`.
events_table: Pandas Dataframe
Dataframe with columns 'source_nodes', 'target_nodes', 'starting_times'
and 'ending_times' and index corresponding to event index.
Used for instantiating a new ConTempNetwork from the event_table of
an other one.
"""
# parametrize the column names > single place to change them:
_SOURCES = "source_nodes"
_TARGETS = "target_nodes"
_STARTS = "starting_times"
_ENDINGS = "ending_times"
_MANDATORY = [_SOURCES, _TARGETS, _STARTS]
_ESSENTIAL = [_SOURCES, _TARGETS, _STARTS, _ENDINGS]
# to hold endings - starts
_DURATIONS = "durations"
# for instantaneous event this is the duration to use
_DEFAULT_DURATION = 1
def _build_label_maps(self, source_iter, target_iter):
"""Build label<->node id dicts from two iterables of node labels.
Sets `self.label_to_node_dict` (original label -> contiguous
0..N-1 node id) and `self.node_to_label_dict` (its inverse).
Does not modify the input iterables.
"""
all_nodes = set()
all_nodes.update(source_iter)
all_nodes.update(target_iter)
self.label_to_node_dict = {
m: n for n, m in enumerate(sorted(all_nodes))
}
self.node_to_label_dict = {
n: m for m, n in self.label_to_node_dict.items()
}
def __init__(self, *,
source_nodes=[],
target_nodes=[],
starting_times=[],
ending_times=[],
extra_attrs=None,
relabel_nodes=True,
reset_event_table_index=True,
node_to_label_dict=None,
merge_overlapping_events=False,
events_table=None,
**kwargs):
if events_table is None:
if (ending_times is None or len(ending_times) == 0) \
and len(starting_times) > 0:
raise ValueError(
"ContTempNetwork requires 'ending_times' for each event."
" For instantaneous temporal networks use"
" ContTempInstNetwork."
)
assert len(source_nodes) == len(target_nodes) == \
len(starting_times) == len(ending_times)
if relabel_nodes:
# relabel nodes from 0 to num_nodes and save
# original labels in self.node_to_label_dict
self._build_label_maps(source_nodes, target_nodes)
source_nodes = [self.label_to_node_dict[n]
for n in source_nodes]
target_nodes = [self.label_to_node_dict[n]
for n in target_nodes]
else:
self.node_to_label_dict = node_to_label_dict
data = {"source_nodes": source_nodes,
"target_nodes": target_nodes,
"starting_times": starting_times,
"ending_times": ending_times}
columns = ["source_nodes", "target_nodes",
"starting_times", "ending_times"]
if extra_attrs is not None:
assert isinstance(extra_attrs, dict)
for attr_name, val_list in extra_attrs.items():
assert len(val_list) == len(source_nodes)
data[attr_name] = val_list
columns.append(attr_name)
self.events_table = pd.DataFrame(data=data,
columns=columns)
self.events_table.sort_values(by=["starting_times", "ending_times"],
inplace=True)
else:
if isinstance(events_table, (str, Path)):
try:
# Convert Path to string if it's a Path object
self.events_table = pd.read_csv(str(events_table), **kwargs)
logger.debug("Loading events from csv file.")
except FileNotFoundError:
raise ValueError(
f"The file at {events_table} was not found."
)
except pd.errors.EmptyDataError:
raise ValueError(
f"The file at {events_table} is empty."
)
except pd.errors.ParserError:
raise ValueError(
f"The file at {events_table} could not be parsed."
)
elif isinstance(events_table, pd.DataFrame):
# copy to avoid mutating caller's DataFrame when relabeling
self.events_table = events_table.copy() if relabel_nodes \
else events_table
else:
raise ValueError(
"`events_table` must be a pandas DataFrame or the"
"path to a CSV file. "
f"'{type(events_table)} is not acceptable."
)
reset_event_table_index = False
if self._ENDINGS not in self.events_table.columns:
raise ValueError(
f"events_table is missing required column"
f" '{self._ENDINGS}'. For instantaneous temporal"
" networks use ContTempInstNetwork."
)
if relabel_nodes:
self._build_label_maps(
self.events_table[self._SOURCES],
self.events_table[self._TARGETS],
)
self.events_table[self._SOURCES] = self.events_table[
self._SOURCES
].map(self.label_to_node_dict)
self.events_table[self._TARGETS] = self.events_table[
self._TARGETS
].map(self.label_to_node_dict)
else:
self.node_to_label_dict = node_to_label_dict
if reset_event_table_index:
self.events_table.reset_index(inplace=True, drop=True)
self.node_array = np.sort(pd.unique(
self.events_table[["source_nodes",
"target_nodes"]].values.ravel("K")
))
self.num_nodes = self.node_array.shape[0]
self.num_events = self.events_table.shape[0]
self.start_time = self.events_table.starting_times.min()
self.end_time = self.events_table.ending_times.max()
self.events_table[
"durations"
] = self.events_table.ending_times - self.events_table.starting_times
# to record compute times
self._compute_times = {}
self._overlapping_events_merged = False
if merge_overlapping_events:
num_merged = 1
while num_merged != 0:
num_merged = self._merge_overlapping_events()
self._overlapping_events_merged = True
self.is_directed = False
self.instantaneous_events = False
def __repr__(self):
return str(self.__class__) + \
f" with {self.num_nodes} nodes and {self.num_events} events"
@property
def nodes(self):
"""Sorted list of original node labels."""
return sorted(self.label_to_node_dict.keys())
[docs]
def save(self, filename,
matrices_list=None,
attributes_list=None):
"""Save graph event_table and matrices as pickle file
Parameters
----------
filename: str
Filename where to save. The extension is automatically added.
matrices_list: list of strings
List of names of matrices to save.
The default list is:
`matrices_list = ['laplacians',
'adjacencies',
'inter_T',
'inter_T_lin',
'T',
'T_lin',
'_stationary_trans',
'delta_inter_T',
'delta_inter_T_lin',]`
attributes_list: list of strings
List of attribute names to save.
The default list is:
`attributes_list = ['node_to_label_dict',
'events_table',
'times',
'time_grid',
'num_nodes',
'_compute_times',
'_t_start_laplacians',
'_k_start_laplacians',
'_t_stop_laplacians',
'_k_stop_laplacians',
'_overlapping_events_merged',]`
"""
save_dict = dict()
matrices = ["laplacians",
"adjacencies",
"inter_T",
"inter_T_lin",
"T",
"T_lin",
"_stationary_trans",
"delta_inter_T",
"delta_inter_T_lin"]
if matrices_list is None:
matrices_list = matrices
attributes = ["node_to_label_dict",
"events_table",
"times",
"time_grid",
"num_nodes",
"_compute_times",
"_t_start_laplacians",
"_k_start_laplacians",
"_t_stop_laplacians",
"_k_stop_laplacians",
"_overlapping_events_merged",
"is_directed"]
if attributes_list is None:
attributes_list = attributes
for attr in attributes_list:
if hasattr(self, attr):
save_dict[attr] = getattr(self, attr)
for mat in matrices_list:
if hasattr(self, mat):
save_dict[mat] = getattr(self, mat)
with open(os.path.splitext(filename)[0] + ".pickle", "wb") as fopen:
pickle.dump(save_dict, fopen)
[docs]
@classmethod
def load(cls, filename,
matrices_list=None,
attributes_list=None):
"""Load network from file and returns a ContTempNetwork instance.
Parameters
----------
filename: str
Filename where the network is saved save. The extension is
automatically added.
matrices_list: list of strings
List of names of matrices to load.
The default list is:
`matrices_list = ['laplacians',
'adjacencies',
'inter_T',
'inter_T_lin',
'T',
'T_lin',
'_stationary_trans',
'delta_inter_T',
'delta_inter_T_lin',]`
attributes_list: list of strings
List of attribute names to load.
The default list is:
`attributes_list = ['node_to_label_dict',
'events_table',
'times',
'time_grid',
'num_nodes',
'_compute_times',
'_t_start_laplacians',
'_k_start_laplacians',
'_t_stop_laplacians',
'_k_stop_laplacians',
'_overlapping_events_merged',]`
"""
matrices = ["laplacians",
"adjacencies",
"inter_T",
"inter_T_lin",
"T",
"T_lin",
"_stationary_trans",
"delta_inter_T",
"delta_inter_T_lin"]
if matrices_list is None:
matrices_list = matrices
attributes = ["node_to_label_dict",
"events_table",
"times",
"time_grid",
"num_nodes",
"_compute_times",
"_t_start_laplacians",
"_k_start_laplacians",
"_t_stop_laplacians",
"_k_stop_laplacians",
"_overlapping_events_merged",
"is_directed"]
if attributes_list is None:
attributes_list = attributes
# all in a pickle file
graph_dict = pd.read_pickle(os.path.splitext(filename)[0] + ".pickle")
events_table = graph_dict.pop("events_table")
net = cls(events_table=events_table,
relabel_nodes=False,
node_to_label_dict=graph_dict.pop("node_to_label_dict"))
for k, val in graph_dict.items():
if k in matrices_list:
setattr(net, k, val)
if k in attributes_list:
setattr(net, k, val)
return net
[docs]
def save_inter_T(self,
filename,
lamda=None,
round_zeros=True,
tol=1e-8,
compressed=False,
save_delta=False,
replace_existing=False):
"""Saves all the inter transition matrices.
This method saves all matrixes in `self.inter_T[lamda]` in a pickle
file togheter with a dictionary including parameters:
- `_k_start_laplacians`
- `_k_stop_laplacians`
- `_t_start_laplacians`
- `_t_stop_laplacians`
- `_t_stop_laplacians`
- `times_k_start_to_k_stop + 1`
given by
```
self.times.values[
self._k_start_laplacians: self._k_stop_laplacians + 1
]
```
`num_nodes` and `_compute_times`.
if `save_delta` is True, creates delta_inter_T if it is
not already present and saves it together with
inter_T[lamda][0] in a pickle file.
otherwise, saves inter_T[lamda] directly (good if used with
SparseStochMat instances).
Parameters
----------
filename: str
Filename where the data is saved (.pickle or .gz).
lamda: float or int, optional.
Use to save to results for a specific lamba. If None, the results
for all the computed lambdas will be saved. Default is None.
round_zeros: bool.
If True, values smaller than tol*max(abs(inter_T_k)) will be set to
zero to preserve sparsity. Default is True.
tol: float
See round_zeros. Default is 1e-8.
compressed: bool
Used to compress the file with gzip. Default is False.
save_delta: bool
If True, creates delta_inter_T if it is not already present and
saves it together with inter_T[lamda][0].
Only the differences between two consecutives inter_Ts are saved in
order to minimize file size.
Must not be used if `use_sparse_stoch` was used in
`compute_inter_transition_matrices`.
replace_existing: bool
If True, erase and replace files if they already exists.
Default is False.
Returns
-------
None
"""
assert hasattr(self, "inter_T"), f"PID {os.getpid()} : nothing " \
"saved, compute inter_T first."
ext = os.path.splitext(filename)[-1]
file = filename
if compressed:
if ext != ".gz":
file += ".gz"
elif ext != ".pickle":
file += ".pickle"
skipping = False
if os.path.exists(file):
if replace_existing:
# TODO: move to logger usage
print("PID ", os.getpid(), f" : , file {file} already exists, replacing it.")
else:
# TODO: move to logger usage
print("PID ", os.getpid(), f" : , file {file} already exists, skipping.")
skipping = True
if not skipping:
save_dict = {}
save_dict["_k_start_laplacians"] = self._k_start_laplacians
save_dict["_k_stop_laplacians"] = self._k_stop_laplacians
save_dict["_t_start_laplacians"] = self._t_start_laplacians
save_dict["_t_stop_laplacians"] = self._t_stop_laplacians
save_dict["times_k_start_to_k_stop+1"] = self.times.values[
self._k_start_laplacians:self._k_stop_laplacians + 1
]
save_dict["num_nodes"] = self.num_nodes
save_dict["_compute_times"] = self._compute_times
if save_delta:
assert not isinstance(
self.inter_T[lamda][0], SparseStochMat
), "inter_T must not be SparseStochMat"
if not hasattr(
self, "delta_inter_T"
) and hasattr(self, "inter_T"):
if lamda is not None:
self._compute_delta_trans_mat(lamda,
round_zeros=round_zeros,
tol=tol)
else:
# computes it for all lamda
for lamda in self.inter_T.keys():
self._compute_delta_trans_mat(
lamda,
round_zeros=round_zeros,
tol=tol
)
if (
lamda is not None
) and (lamda not in self.delta_inter_T.keys()):
self._compute_delta_trans_mat(lamda,
round_zeros=round_zeros,
tol=tol)
if hasattr(self, "delta_inter_T"):
save_dict["inter_T"] = dict()
save_dict["is_delta_trans"] = True
if lamda is None:
lamdas = self.delta_inter_T.keys()
else:
lamdas = [lamda]
for lamda in lamdas:
save_dict["inter_T"][lamda] = dict()
save_dict["inter_T"][lamda][
"delta_inter_T"] = self.delta_inter_T[lamda]
save_dict["inter_T"][lamda][
"trans_mat0"] = self.inter_T[lamda][0].copy()
if round_zeros:
set_to_zeroes(
save_dict["inter_T"][lamda]["trans_mat0"],
tol=tol
)
text = "delta trans mats"
else:
save_dict["inter_T"] = dict()
save_dict["is_sparse_stoch"] = True
if lamda is None:
lamdas = self.inter_T.keys()
else:
lamdas = [lamda]
for lamda in lamdas:
assert isinstance(
self.inter_T[lamda][0], SparseStochMat
), "inter_T needs to be SparseStochMat"
save_dict["inter_T"][lamda] = []
for interT in self.inter_T[lamda]:
if round_zeros:
interT.set_to_zeroes(tol)
save_dict["inter_T"][lamda].append(interT.to_dict())
text = "sparse stoch trans mats"
if compressed:
# TODO: switch to logging
print("PID ", os.getpid(), " : "," saving " + text + " to " + file)
with gzip.open(file, "wb", compresslevel=2) as fopen:
pickle.dump(save_dict, fopen)
else:
# TODO: switch to logging
print("PID ", os.getpid(), " : "," saving " + text + " to " + file)
with open(file, "wb") as fopen:
pickle.dump(save_dict, fopen)
[docs]
def save_inter_T_lin(self,
filename,
lamda=None,
round_zeros=True,
tol=1e-8,
compressed=False,
replace_existing=True,
save_delta=False):
"""Creates delta_inter_T_lin if it is not already present.
The delta_inter_T_lin is saves together with
inter_T_lin[lamda][t_s][0] in a pickle file.
"""
assert hasattr(
self, "inter_T_lin"
), f"PID {os.getpid()} : nothing saved, compute inter_T_lin first."
ext = os.path.splitext(filename)[-1]
file = filename
if compressed:
if ext != ".gz":
file += ".gz"
elif ext != ".pickle":
file += ".pickle"
skipping = False
if os.path.exists(file):
if replace_existing:
# TODO: switch to logger
print("PID ", os.getpid(), f" : , file {file} already exists, replacing it.")
else:
# TODO: switch to logger
print("PID ", os.getpid(), f" : , file {file} already exists, skipping.")
skipping = True
if not skipping:
save_dict = {}
save_dict["_k_start_laplacians"] = self._k_start_laplacians
save_dict["_k_stop_laplacians"] = self._k_stop_laplacians
save_dict["_t_start_laplacians"] = self._t_start_laplacians
save_dict["_t_stop_laplacians"] = self._t_stop_laplacians
save_dict["times_k_start_to_k_stop+1"] = self.times.values[
self._k_start_laplacians:self._k_stop_laplacians + 1
]
save_dict["num_nodes"] = self.num_nodes
save_dict["_compute_times"] = self._compute_times
if save_delta:
if not hasattr(self, "delta_inter_T_lin") and \
hasattr(self, "inter_T_lin"):
if lamda is not None:
self._compute_delta_trans_mat(lamda,
round_zeros=round_zeros,
tol=tol)
else:
# computes it for all lamda
for lamda in self.inter_T_lin.keys():
self._compute_delta_trans_mat(
lamda,
round_zeros=round_zeros,
tol=tol
)
if (
lamda is not None
) and (
lamda not in self.delta_inter_T_lin.keys()
):
self._compute_delta_trans_mat(lamda,
round_zeros=round_zeros,
tol=tol)
if hasattr(self, "delta_inter_T_lin"):
save_dict["inter_T_lin"] = dict()
save_dict["is_delta_trans"] = True
if lamda is None:
lamdas = self.delta_inter_T_lin.keys()
else:
lamdas = [lamda]
for lamda in lamdas:
save_dict["inter_T_lin"][lamda] = dict()
for t_s in self.inter_T_lin[lamda].keys():
save_dict["inter_T_lin"][lamda][t_s] = dict()
save_dict["inter_T_lin"][lamda][t_s][
"delta_inter_T_lin"
] = self.delta_inter_T_lin[lamda][t_s]
save_dict["inter_T_lin"][lamda][t_s][
"trans_mat_lin0"
] = self.inter_T_lin[lamda][t_s][0].copy()
if round_zeros:
set_to_zeroes(
save_dict["inter_T_lin"][lamda][t_s][
"trans_mat_lin0"],
tol=tol
)
text = "delta trans mats"
else:
save_dict["inter_T_lin"] = dict()
save_dict["is_sparse_stoch"] = True
if lamda is None:
lamdas = self.delta_inter_T_lin.keys()
else:
lamdas = [lamda]
for lamda in lamdas:
save_dict["inter_T_lin"][lamda] = dict()
for t_s in self.inter_T_lin[lamda].keys():
assert isinstance(
self.inter_T_lin[lamda][t_s][0], SparseStochMat
), "inter_T needs to be SparseStochMat"
save_dict["inter_T_lin"][lamda][t_s] = []
for interT in self.inter_T_lin[lamda][t_s]:
if round_zeros:
interT.set_to_zeroes(tol=tol)
save_dict["inter_T_lin"][lamda][t_s].append(
interT.to_dict()
)
text = "sparse stoch trans mats"
if compressed:
# TODO: switch to logging
print("PID ", os.getpid(), " : "," saving " + text + " to " + file)
with gzip.open(file, "wb", compresslevel=2) as fopen:
pickle.dump(save_dict, fopen)
else:
# TODO: switch to logging
print("PID ", os.getpid(), " : "," saving " + text + " to " + file)
with open(file, "wb") as fopen:
pickle.dump(save_dict, fopen)
[docs]
@staticmethod
def load_inter_T(filename):
"""
Loads inter_T and inter_T_lin from 'filename'.
The file must have been generated with `save_inter_T`.
Returns a dictionary with the inter_T restored.
"""
ext = os.path.splitext(filename)[-1]
if ext not in [".pickle", ".gz"]:
# detect extension
if os.path.exists(filename + ".pickle"):
ext = ".pickle"
filename += ".pickle"
elif os.path.exists(filename + ".gz"):
ext = ".gz"
filename += ".gz"
elif os.path.exists(filename + ".pickle.gz"):
ext = ".pickle.gz"
filename += ".pickle.gz"
else:
raise FileNotFoundError(filename)
if ext == ".gz" or ext == ".pickle.gz":
with gzip.open(filename,
"rb") as fopen:
load_dict = pickle.load(fopen)
else:
with open(filename, "rb") as fopen:
load_dict = pickle.load(fopen)
return_dict = {
"_k_start_laplacians": load_dict["_k_start_laplacians"],
"_k_stop_laplacians": load_dict["_k_stop_laplacians"],
"_t_start_laplacians": load_dict["_t_start_laplacians"],
"_t_stop_laplacians": load_dict["_t_stop_laplacians"],
"num_nodes": load_dict["num_nodes"],
"times_k_start_to_k_stop+1": load_dict["times_k_start_to_k_stop+1"]
}
# rebuild inter_T from delta_inter_T
if "inter_T" in load_dict.keys():
return_dict["inter_T"] = dict()
if load_dict.get("is_sparse_stoch", False):
for lamda in load_dict["inter_T"].keys():
return_dict["inter_T"][lamda] = [
SparseStochMat(**mat_dict)
for mat_dict in load_dict["inter_T"][lamda]
]
else:
for lamda in load_dict["inter_T"].keys():
return_dict["inter_T"][lamda] = \
[load_dict["inter_T"][lamda]["trans_mat0"]]
for dT in load_dict["inter_T"][lamda]["delta_inter_T"]:
return_dict["inter_T"][lamda].append(
return_dict["inter_T"][lamda][-1] + dT
)
if "inter_T_lin" in load_dict.keys():
return_dict["inter_T_lin"] = dict()
if load_dict.get("is_sparse_stoch", False):
for lamda in load_dict["inter_T_lin"].keys():
return_dict["inter_T_lin"][lamda] = dict()
for t_s in load_dict["inter_T_lin"][lamda].keys():
return_dict["inter_T_lin"][lamda][t_s] = [
SparseStochMat(**mat_dict)
for mat_dict in load_dict[
"inter_T_lin"][lamda][t_s]
]
else:
for lamda in load_dict["inter_T_lin"].keys():
return_dict["inter_T_lin"][lamda] = dict()
for t_s in load_dict["inter_T_lin"][lamda].keys():
return_dict["inter_T_lin"][lamda][t_s] = [
load_dict[
"inter_T_lin"][lamda][t_s]["trans_mat_lin0"]
]
for dT in load_dict["inter_T_lin"][lamda][t_s][
"delta_inter_T_lin"]:
return_dict["inter_T_lin"][lamda][t_s].append(
return_dict["inter_T_lin"][lamda][t_s][-1] + dT
)
del load_dict
return return_dict
[docs]
def save_T(self,
filename,
lamda=None,
round_zeros=True,
tol=1e-8,
compressed=False):
"""
Save a dict with 'T' as key and net.T as item with other attributes.
This also works with SparseStochMat.
It only works if net.T[lamda] is a matrix and not a list of matrices,
i.e. if compute_transition_matrices was ran without save_intermediate.
"""
assert hasattr(
self, "T"
), f"PID {os.getpid()} : nothing saved, compute inter_T first."
save_dict = {}
save_dict["_k_start_laplacians"] = self._k_start_laplacians
save_dict["_k_stop_laplacians"] = self._k_stop_laplacians
save_dict["_t_start_laplacians"] = self._t_start_laplacians
save_dict["_t_stop_laplacians"] = self._t_stop_laplacians
save_dict["times_k_start_to_k_stop+1"] = self.times.values[
self._k_start_laplacians:self._k_stop_laplacians + 1
]
save_dict["num_nodes"] = self.num_nodes
save_dict["_compute_times"] = self._compute_times
save_dict["T"] = dict()
if lamda is None:
lamdas = self.T.keys()
else:
lamdas = [lamda]
for lamda in lamdas:
if isinstance(self.T[lamda], SparseStochMat):
save_dict["is_sparse_stoch"] = True
if round_zeros:
T = self.T[lamda].copy()
T.set_to_zeroes(tol)
else:
T = self.T[lamda]
save_dict["T"][lamda] = T.to_dict()
text = "SparseStochMat T"
elif isspmatrix_csr(self.T[lamda]):
if round_zeros:
T = self.T[lamda].copy()
set_to_zeroes(T, tol)
else:
T = self.T[lamda]
save_dict["T"][lamda] = T
text = "csr T"
else:
raise TypeError("T must be csr or SparseStochMat.")
ext = os.path.splitext(filename)[-1]
file = filename
if compressed:
if ext != ".gz":
file += ".gz"
# TODO: switch to logging
print("PID ", os.getpid(), " : "," saving " + text + " to " + file)
with gzip.open(file, "wb", compresslevel=2) as fopen:
pickle.dump(save_dict, fopen)
else:
ext = os.path.splitext(filename)[-1]
if ext != ".pickle":
file += ".pickle"
# TODO: switch to logging
print("PID ", os.getpid(), " : "," saving " + text + " to " + file)
with open(file, "wb") as fopen:
pickle.dump(save_dict, fopen)
[docs]
def save_T_lin(self,
filename,
lamda=None,
round_zeros=True,
tol=1e-8,
compressed=False):
"""
Saves a dict with 'T_lin' as key and net.T_lin and other attributes.
This also works with SparseStochMat instance.
It only works if net.T_lin[lamda][t_s] is a matrix and not a list of matrices,
i.e. if compute_transition_matrices was ran without save_intermediate.
"""
assert hasattr(
self, "T_lin"
), f"PID {os.getpid()} : nothing saved, compute inter_T_lin first."
save_dict = {}
save_dict["_k_start_laplacians"] = self._k_start_laplacians
save_dict["_k_stop_laplacians"] = self._k_stop_laplacians
save_dict["_t_start_laplacians"] = self._t_start_laplacians
save_dict["_t_stop_laplacians"] = self._t_stop_laplacians
save_dict["times_k_start_to_k_stop+1"] = self.times.values[
self._k_start_laplacians:self._k_stop_laplacians + 1
]
save_dict["num_nodes"] = self.num_nodes
save_dict["_compute_times"] = self._compute_times
save_dict["T_lin"] = dict()
if lamda is None:
lamdas = self.T_lin.keys()
else:
lamdas = [lamda]
for lamda in lamdas:
save_dict["T_lin"][lamda] = dict()
for t_s in self.T_lin[lamda].keys():
if isinstance(self.T_lin[lamda][t_s], SparseStochMat):
save_dict["is_sparse_stoch"] = True
if round_zeros:
T = self.T_lin[lamda][t_s].copy()
T.set_to_zeroes(tol)
else:
T = self.T_lin[lamda][t_s]
save_dict["T_lin"][lamda][t_s] = T.to_dict()
text = "SparseStochMat T_lin"
elif isspmatrix_csr(self.T_lin[lamda][t_s]):
if round_zeros:
T = self.T_lin[lamda][t_s].copy()
set_to_zeroes(T, tol)
else:
T = self.T_lin[lamda][t_s]
save_dict["T_lin"][lamda][t_s] = T
text = "csr T_lin"
else:
raise TypeError("T must be csr or SparseStochMat.")
ext = os.path.splitext(filename)[-1]
file = filename
if compressed:
if ext != ".gz":
file += ".gz"
# TODO: switch to logging
print("PID ", os.getpid(), " : "," saving " + text + " to " + file)
with gzip.open(file, "wb", compresslevel=2) as fopen:
pickle.dump(save_dict, fopen)
else:
ext = os.path.splitext(filename)[-1]
if ext != ".pickle":
file += ".pickle"
# TODO: switch to logging
print("PID ", os.getpid(), " : "," saving " + text + " to " + file)
with open(file, "wb") as fopen:
pickle.dump(save_dict, fopen)
[docs]
@staticmethod
def load_T(filename):
"""Loads T and T_lin from 'filename' that was saved with save_T.
Returns a dictionary with the T restored.
"""
ext = os.path.splitext(filename)[-1]
if ext not in [".pickle", ".gz"]:
# detect extension
if os.path.exists(filename + ".pickle"):
ext = ".pickle"
filename += ".pickle"
elif os.path.exists(filename + ".gz"):
ext = ".gz"
filename += ".gz"
elif os.path.exists(filename + ".pickle.gz"):
ext = ".pickle.gz"
filename += ".pickle.gz"
else:
raise FileNotFoundError(filename)
if ext == ".gz" or ext == ".pickle.gz":
with gzip.open(filename, "rb") as fopen:
load_dict = pickle.load(fopen)
else:
with open(filename, "rb") as fopen:
load_dict = pickle.load(fopen)
return_dict = {
"_k_start_laplacians": load_dict["_k_start_laplacians"],
"_k_stop_laplacians": load_dict["_k_stop_laplacians"],
"_t_start_laplacians": load_dict["_t_start_laplacians"],
"_t_stop_laplacians": load_dict["_t_stop_laplacians"],
"num_nodes": load_dict["num_nodes"],
"times_k_start_to_k_stop+1": load_dict["times_k_start_to_k_stop+1"]
}
if "T" in load_dict.keys():
return_dict["T"] = dict()
if load_dict.get("is_sparse_stoch", False):
for lamda in load_dict["T"].keys():
return_dict["T"][lamda] = \
SparseStochMat(**load_dict["T"][lamda])
else:
for lamda in load_dict["T"].keys():
return_dict["T"][lamda] = load_dict["T"][lamda]
if "T_lin" in load_dict.keys():
return_dict["T_lin"] = dict()
if load_dict.get("is_sparse_stoch", False):
for lamda in load_dict["T_lin"].keys():
return_dict["T_lin"][lamda] = dict()
for t_s in load_dict["T_lin"][lamda].keys():
return_dict["T_lin"][lamda][t_s] = \
SparseStochMat(**load_dict["T_lin"][lamda][t_s])
else:
for lamda in load_dict["T_lin"].keys():
return_dict["T_lin"][lamda] = dict()
for t_s in load_dict["T_lin"][lamda].keys():
return_dict["T_lin"][lamda][t_s] = \
load_dict["T_lin"][lamda][t_s]
del load_dict
return return_dict
[docs]
def compute_static_adjacency_matrix(self, start_time=None, end_time=None):
"""Returns the adjacency matrix of the static network built from the
aggregagted edge activity between `start_time` and `end_time`.
Parameters
----------
start_time : float or int, optional
Starting time for the aggregation. The default is None, i.e. the
start time of the entire temporal network.
end_time : float or int, optional
Ending time for the aggregation. The default is None, i.e. the
end time of the entire temporal network.
Returns
-------
CSR sparse matrix
Symmetric adjacency matrix, where element ij is equal to the
aggregated time during which egde ij was active after `start_time`
and before `end_time`.
"""
if start_time is None:
start_time = self.start_time
if end_time is None:
end_time = self.end_time
mask = np.logical_and(self.events_table.starting_times < end_time,
self.events_table.ending_times > start_time)
# loop on events
data = []
cols = []
rows = []
for ev in self.events_table.loc[mask].itertuples():
data.append(
min(ev.ending_times,
end_time) - max(ev.starting_times, start_time)
)
rows.append(ev.source_nodes)
cols.append(ev.target_nodes)
A = coo_matrix((data, (rows, cols)),
shape=(self.num_nodes, self.num_nodes))
return A + A.T
def _compute_time_grid(self):
"""Create `self.time_grid`, a dataframe with ('times', 'id') as index,
wre `id` is the index of the corresponding event in `events_table`,
and column 'is_start' which is True is the ('times', 'id')
corresponds to a starting event.
Also creates `self.times`, an array with all the times values.
"""
self.time_grid = pd.DataFrame(
columns=["times", "id", "is_start"],
index=range(self.events_table.shape[0]*2)
)
self.time_grid.iloc[:self.events_table.shape[0], [0, 1]] = \
self.events_table.reset_index()[["starting_times", "index"]]
self.time_grid["is_start"] = False
self.time_grid.loc[0:self.events_table.shape[0]-1, "is_start"] = True
self.time_grid.iloc[self.events_table.shape[0]:, [0, 1]] = \
self.events_table.reset_index()[["ending_times", "index"]]
self.time_grid.times = pd.to_numeric(self.time_grid.times)
self.time_grid.sort_values("times", inplace=True)
# group events with same times
self.time_grid.set_index(["times", "id"], inplace=True)
self.time_grid.sort_index(inplace=True)
self.times = self.time_grid.index.levels[0]
def _get_closest_time(self, t):
"""Return closest smaller or equal time in `times` and its index"""
if not hasattr(self, "times"):
self._compute_time_grid()
if t not in self.times:
# take the largest smaller time
if t <= self.times[0]:
t = self.times[0]
else:
t = self.times[self.times <= t].max()
k = int(np.where(self.times == t)[0][0])
return t, k
[docs]
def compute_laplacian_matrices(self,
*,
t_start=None,
t_stop=None,
save_adjacencies=False):
"""Computes the laplacian matrices and saves them in `self.laplacians`
Computes from the first event time (in `self.times`) before or
equal to `t_start` until the event time index before `t_stop`.
Laplacians are computed from `self.times[self._k_start_laplacians]`
until `self.times[self._k_stop_laplacians-1]`.
The laplacian at step k, is the random walk laplacian
between `times[k]`. and `times[k+1]`.
Parameters
----------
t_start : float or int, optional
The default is None, i.e. starts at the beginning of times.
The computation starts from the first time index before or equal
to t_start. The corresponding starting time index is saved in
`self._k_start_laplacians` and the real starting time is
`self.times[self._k_start_laplacians]` which is saved in
`self._t_start_laplacians`.
t_stop : float or int, optional
Same than `t_start` but for the ending time of computations.
Default is end of times.
Computations stop at self.times[self._k_stop_laplacians-1].
Similarily to `t_start`, the corresponding times are saved in
`self._k_stop_laplacians` and `self._t_stop_laplacians`.
save_adjacencies : bool, optional
Default is False. Use to save adjacency matrices in
`self.adjacencies`.
Returns
-------
None.
"""
logger.info("Computing Laplacians")
if not hasattr(self, "time_grid"):
self._compute_time_grid()
# instantaneous adjacency matrix (subclasses may override the
# buffer type, e.g. dok_matrix for pulse dynamics)
A = self._make_adjacency_buffer(self.num_nodes)
# identity
I = eye(self.num_nodes,
dtype=np.float64).tocsc()
# degree array
degrees = np.zeros(self.num_nodes, dtype=np.float64)
# inverse degrees diagonal matrix
Dm1 = I.copy()
# self loop matrix
S = I.copy()
state = _LaplacianState(A=A, S=S, Dm1=Dm1, degrees=degrees)
self.laplacians = []
if save_adjacencies:
self.adjacencies = []
# set boundary conditions : L_k is the laplacian during t_k and t_k+1
if t_start is None:
self._t_start_laplacians = self.times[0]
self._k_start_laplacians = 0
else:
t, k = self._get_closest_time(t_start)
self._t_start_laplacians = t
self._k_start_laplacians = k
if t_stop is None:
self._t_stop_laplacians = self.times[-1]
self._k_stop_laplacians = len(self.times)-1
else:
t, k = self._get_closest_time(t_stop)
self._t_stop_laplacians = t
self._k_stop_laplacians = k
t0 = time.time()
# dynamics-specific pre-warm (interval: seed adjacency from events
# alive just before _k_start_laplacians; pulse: no-op).
self._laplacian_prewarm(state)
# time grid for this time range
time_grid_range = self.time_grid.loc[(
self.time_grid.index.get_level_values(
"times") >= self._t_start_laplacians
) & (
self.time_grid.index.get_level_values(
"times") < self._t_stop_laplacians
)]
for k, (tk, time_ev) in enumerate(
time_grid_range.groupby(level="times")):
if not k % 1000:
logger.info(
f"{k} over "
f"{self._k_stop_laplacians - self._k_start_laplacians}"
)
logger.info(f"{time.time()-t0:.2f}s")
meet_id = time_ev.index.get_level_values("id")
# starting or ending events
is_starts = time_ev.is_start.values
events_k = [self.events_table.loc[
mid,
["source_nodes", "target_nodes"]
].astype(np.int64) for mid in meet_id.values]
# update instantaneous matrices
for event, is_start in zip(events_k, is_starts):
# unweighted, undirected
if is_start:
# if they are not already connected (can happen if the
# opposite event overlap)
if state.A[event.source_nodes, event.target_nodes] != 1:
state.A[event.source_nodes, event.target_nodes] = 1
state.A[event.target_nodes, event.source_nodes] = 1
state.degrees[event.source_nodes] += 1
state.degrees[event.target_nodes] += 1
else:
# dynamics-specific end-of-event handling
# (interval: clear A entry & decrement degrees;
# pulse: no-op).
self._laplacian_on_event_end(state, event)
# update self loops from current degrees
self._laplacian_update_self_loops(state, event)
# Laplacian L(tk)
Acsc = state.A.tocsc()
# T_D = Dm1 @ (Acsc + S)
# L = I - T_D
self.laplacians.append(I - state.Dm1 @ (Acsc + state.S))
if save_adjacencies:
self.adjacencies.append(state.A.copy())
# dynamics-specific end-of-step handling (interval: persist
# state across steps; pulse: reset A, S, Dm1, degrees).
self._laplacian_step_end(state)
t_end = time.time()-t0
self._compute_times["laplacians"] = t_end
logger.info(f"Finished in {t_end}")
# ------------------------------------------------------------------
# Laplacian-loop extension hooks.
#
# Default implementations encode *interval* dynamics (events occupy
# a finite [start, end) interval, state persists across time steps).
# ``ContTempInstNetwork`` overrides them to encode *pulse* dynamics
# (events are instantaneous; state is reset every step).
# ------------------------------------------------------------------
def _make_adjacency_buffer(self, n):
"""Allocate the mutable adjacency buffer used by the laplacian loop.
Default is ``lil_matrix`` (interval dynamics). Pulse dynamics
overrides this with ``dok_matrix`` because it needs ``.clear()``
in ``_laplacian_step_end``.
"""
return lil_matrix((n, n), dtype=np.float64)
def _laplacian_prewarm(self, state):
"""Seed ``state`` with events that are already active at the time
step just before ``self._k_start_laplacians``.
Interval default: replays the matching events to populate ``A``,
``degrees``, ``S``, ``Dm1``. Pulse override: no-op.
"""
if self._k_start_laplacians <= 0:
return
t_km1 = self.times[self._k_start_laplacians - 1]
# find events that have started before or at t_k-1
# and were still occuring at t_k-1
mask_ini = (
self.events_table.starting_times <= t_km1
) & (
self.events_table.ending_times > t_km1
)
for event in self.events_table.loc[mask_ini][
["source_nodes", "target_nodes"]
].itertuples():
if state.A[event.source_nodes, event.target_nodes] != 1:
state.A[event.source_nodes, event.target_nodes] = 1
state.A[event.target_nodes, event.source_nodes] = 1
state.degrees[event.source_nodes] += 1
state.degrees[event.target_nodes] += 1
self._laplacian_update_self_loops(state, event)
def _laplacian_on_event_end(self, state, event):
"""Handle an event whose ``is_start`` flag is ``False``.
Interval default: clear the corresponding adjacency entry and
decrement degrees (when still set). Pulse override: no-op
(events are instantaneous and state is reset each step).
"""
if state.A[event.source_nodes, event.target_nodes] > 0:
state.A[event.source_nodes, event.target_nodes] = 0
state.A[event.target_nodes, event.source_nodes] = 0
state.degrees[event.source_nodes] -= 1
state.degrees[event.target_nodes] -= 1
def _laplacian_update_self_loops(self, state, event):
"""Sync ``state.S`` and ``state.Dm1`` with ``state.degrees`` for
the two nodes touched by ``event``.
"""
for node in (event.source_nodes, event.target_nodes):
if state.degrees[node] == 0:
state.S.data[node] = 1
state.Dm1.data[node] = 1
else:
state.S.data[node] = 0
state.Dm1.data[node] = 1 / state.degrees[node]
def _laplacian_step_end(self, state):
"""Hook called after each time-step's laplacian is appended.
Interval default: no-op (state persists). Pulse override:
resets ``A``, ``S``, ``Dm1``, ``degrees`` to identity/zero so
the next step starts from a clean slate.
"""
pass
[docs]
def compute_inter_transition_matrices(self, *, lamda=None, t_start=None,
t_stop=None, fix_tau_k=False,
use_sparse_stoch=False,
dense_expm=True):
"""
Computes interevent transition matrices.
T_k(lamda) = expm(-tau_k*lamda*L_k).
The transition matrix T_k is saved in `self.inter_T[lamda][k]`, where
self.inter_T is a dictionary with lamda as keys and lists of transition
matrices as values.
will compute from self.times[self._k_start_laplacians] until
self.times[self._k_stop_laplacians-1]
the transition matrix at step k, is the probability transition matrix
between times[k] and times[k+1]
Parameters
----------
lamda : float, optional
Random walk rate, dynamical resolution parameter.
The default (None) is 1 over the median inter event time.
t_start : float or int, optional
Starting time, passed to `compute_laplacian_matrices` if the
Laplacians have not yet been computed.
Otherwise is not used.
The computation starts at self.times[self._k_start_laplacians].
The default is None, i.e. starts at the beginning of times.
t_stop : float or int, optional
Same than `t_start` but for the ending time of computations.
Computations stop at self.times[self._k_stop_laplacians-1].
Default is end of times.
fix_tau_k : bool, optional
If true, all interevent times (tau_k) in the formula above are set
to 1. This decouples the dynamic scale from the length of event
which is useful for temporal networks with instantaneous events.
The default is False.
use_sparse_stoch : bool, optional
Whether to use custom sparse stochastic matrix format to save the
inter transition matrices. Especially useful for large networks as
the matrix exponential is then computed on each connected component
separately (more memory efficient). The default is False.
dense_expm : bool, optional
Whether to use the dense version of the matrix exponential
algorithm at each time steps.
Recommended for not too large networks.
The inter trans. matrices are still saved as sparse scipy matrices
as they usually have many zero values. The default is True. Has no
effect is use_sparse_stoch is True.
Returns
-------
None.
"""
# NOTE: We might drop such tests if we check for process consistency
# on a higher leverl (i.e. FlowStability)
if not hasattr(self, "laplacians"):
self.compute_laplacian_matrices(t_start=t_start, t_stop=t_stop)
if not hasattr(self, "inter_T"):
self.inter_T = dict()
if lamda is None:
logger.info("Taking lamda as 1/tau_w with tau_w = median "
"interevent time")
lamda = 1/np.median(np.diff(self.times))
# new value of lamda, we need to recompute
if lamda not in self.inter_T.keys():
logger.debug(
f"Computing interevent transition matrices for lamda={lamda}"
)
self.inter_T[lamda] = []
t0 = time.time()
for k, tk in enumerate(self.times[self._k_start_laplacians:
self._k_stop_laplacians]):
if not k % 1000:
logger.debug(
f"{k} over "
f"{self._k_stop_laplacians-1-self._k_start_laplacians}"
)
logger.debug(f"{time.time()-t0:.2f}s")
if fix_tau_k:
tau_k = 1.0
else:
tau_k = self.times[self._k_start_laplacians+k+1] - tk
if use_sparse_stoch:
self.inter_T[lamda].append(
sparse_lapl_expm(self.laplacians[k],
tau_k*lamda,
dense_expm=dense_expm)
)
elif self.laplacians[k].getnnz() == 0:
# expm of zero = identity
self.inter_T[lamda].append(
eye(self.num_nodes, format="csr")
)
elif dense_expm:
self.inter_T[lamda].append(csr_matrix(
expm(-tau_k * lamda * self.laplacians[k].toarray())
))
else:
self.inter_T[lamda].append(expm(
-tau_k * lamda * self.laplacians[k]
).tocsr())
if len(self.inter_T[lamda]) == 0:
logger.info("no events, trans. matrix = identity")
# is there was no event, the transition is identity
if use_sparse_stoch:
self.inter_T[lamda].append(
SparseStochMat.create_diag(size=self.num_nodes)
)
else:
self.inter_T[lamda].append(eye(self.num_nodes,
dtype=np.float64,
format="csr"))
t_end = time.time()-t0
self._compute_times["inter_T_" + str(lamda)] = t_end
logger.debug(
f"Finished computing interevent transition matrices in {t_end}"
)
logger.debug(
f"Interevent transition matrices already computed for {lamda=}"
)
[docs]
def compute_lin_inter_transition_matrices(self,
lamda=None,
t_start=None,
t_stop=None,
t_s=10, fix_tau_k=False,
use_sparse_stoch=False):
"""Compute interevent transition matrices as a linear approximation.
Approximated it expm(-tau_k*lamda*L_k) based on the discrete time
transition matrix.
`t_s` is the time value for which the linear approximation reaches the
stationary transition matrix (default is `t_s=10`).
The transition matrix T_k_lin is saved in
`self.inter_T_lin[lamda][t_s][k]`,
where `self.inter_T_lin` is a dictionary with lamda as keys and
lists of transition matrices as values.
will compute from self.times[self._k_start_laplacians]
until self.times[self._k_stop_laplacians-1]
the transition matrix at step k, is the probability transition matrix
between times[k] and times[k+1]
"""
I = eye(self.num_nodes,
dtype=np.float64, format="csr")
if not hasattr(self, "_stationary_trans"):
self._compute_stationary_transition(
t_start=t_start, t_stop=t_stop,
use_sparse_stoch=use_sparse_stoch
)
if not hasattr(self, "inter_T_lin"):
self.inter_T_lin = dict()
if lamda is None:
logger.info(
"taking lamda as 1/tau_w with tau_w = median interevent time"
)
lamda = 1/np.median(np.diff(self.times))
compute = True
if lamda in self.inter_T_lin.keys():
if t_s in self.inter_T_lin[lamda].keys():
compute = False
logger.debug(
"Interevent transition matrices already computed "
f"for lamda={lamda}, t_s={t_s}"
)
if compute:
if lamda not in self.inter_T_lin.keys():
self.inter_T_lin[lamda] = dict()
self.inter_T_lin[lamda][t_s] = []
else:
self.inter_T_lin[lamda][t_s] = []
logger.debug(
"Computing interevent linear transition matrices for "
f"lamda={lamda}, t_s={t_s}"
)
t0 = time.time()
for k, tk in enumerate(self.times[self._k_start_laplacians:
self._k_stop_laplacians]):
if not k % 1000:
logger.debug(
f"{k} over "
f"{self._k_stop_laplacians-1-self._k_start_laplacians}"
)
logger.debug(f"{time.time()-t0:.2f}s")
if fix_tau_k:
tau_k = 1.0
else:
tau_k = self.times[self._k_start_laplacians+k+1] - tk
Lcsr = self.laplacians[k].tocsr()
if use_sparse_stoch:
# get non zero indices
nonzerosum_rowcols = ~np.logical_and(Lcsr.getnnz(1) == 0,
Lcsr.getnnz(0) == 0)
nz_rowcols, = (nonzerosum_rowcols).nonzero()
self.inter_T_lin[lamda][t_s].append(
sparse_lin_approx(I - Lcsr,
tau_k * lamda,
Pi=self._stationary_trans[k],
t_s=t_s,
nz_rowcols=nz_rowcols)
)
else:
self.inter_T_lin[lamda][t_s].append(
lin_approx_trans_matrix(I - Lcsr,
tau_k * lamda,
Pi=self._stationary_trans[k],
t_s=t_s))
if len(self.inter_T_lin[lamda][t_s]) == 0:
logger.debug("no events, lin. trans. matrix = identity")
# is there was no event, the transition is identity
if use_sparse_stoch:
self.inter_T_lin[lamda].append(
SparseStochMat.create_diag(size=self.num_nodes)
)
else:
self.inter_T_lin[lamda][t_s].append(
eye(self.num_nodes,
dtype=np.float64,
format="csr")
)
t_end = time.time()-t0
self._compute_times[f"tran_matrices_lin_{lamda}_{t_s}"] = t_end
logger.debug("Finished computing linear interevent transition "
f"matrices in {t_end}")
[docs]
def compute_transition_matrices(self,
lamda=None,
t_start=None,
t_stop=None,
save_intermediate=True,
reverse_time=False,
force_csr=False,
tol=None):
"""Compute transition matrices and saves them in a dict of lists.
The matrices are saved as `self.T[lamda]` where `self.T[lamda][k]` is
the product of all interevent transition matrices from t_0 to t_k
computed with lamda.
"""
if not hasattr(self, "inter_T") or \
lamda not in self.inter_T.keys():
raise Exception("Compute inter_T first.")
if not hasattr(self, "T"):
self.T = dict()
if reverse_time:
k_init = len(self.inter_T[lamda])-1
k_range = reversed(range(k_init))
logger.info("Reversed time computation.")
else:
k_init = 0
k_range = range(1, len(self.inter_T[lamda]))
if lamda not in self.T.keys():
if save_intermediate:
if force_csr:
# forcing the first matrix to csr, will ensure that
# all products are done in csr format,
# since CSR @ SparseStochMat t is not implemented
self.T[lamda] = [self.inter_T[lamda][k_init].tocsr()]
else:
self.T[lamda] = [self.inter_T[lamda][k_init]]
if tol is not None:
set_to_zeroes(self.T[lamda][0], tol)
inplace_csr_row_normalize(self.T[lamda][0])
else:
if force_csr:
self.T[lamda] = self.inter_T[lamda][k_init].tocsr()
else:
self.T[lamda] = self.inter_T[lamda][k_init]
if tol is not None:
set_to_zeroes(self.T[lamda], tol)
inplace_csr_row_normalize(self.T[lamda])
logger.info("Computing transition matrix")
t0 = time.time()
for k in k_range:
if not k % 1000:
logger.info(f"{k} over {len(self.inter_T[lamda])}")
logger.info(f"{time.time()-t0:.2f}s")
Tk = self.inter_T[lamda][k]
if tol is not None:
set_to_zeroes(Tk, tol)
inplace_csr_row_normalize(Tk)
if save_intermediate:
self.T[lamda].append(self.T[lamda][-1] @ Tk)
# normalize T to correct precision errors
if tol is not None:
set_to_zeroes(self.T[lamda][-1], tol)
inplace_csr_row_normalize(self.T[lamda][-1])
else:
self.T[lamda] = self.T[lamda] @ Tk
if tol is not None:
set_to_zeroes(self.T[lamda], tol)
# normalize T to correct precision errors
inplace_csr_row_normalize(self.T[lamda])
t_end = time.time()-t0
self._compute_times[
f"trans_matrix_{lamda}_rev{reverse_time}"] = t_end
logger.info(f"Finished in {t_end:.2f}s")
logger.info(
f"Transition matrices already computed for lamda={lamda}"
)
[docs]
def compute_lin_transition_matrices(self,
lamda=None,
t_start=None,
t_stop=None,
t_s=10, save_intermediate=True,
reverse_time=False):
"""Compute transition matrices and saves them in a dict of lists.
The transition matrices are save as `self.T_lin[lamda][t_s]` where
`self.T_lin[lamda][t_s][k]` is the product of all interevent
transition matrices from t_0 to t_k computed with lamda and t_s.
"""
if not hasattr(self, "inter_T_lin") \
or lamda not in self.inter_T_lin.keys() \
or t_s not in self.inter_T_lin[lamda].keys():
raise Exception("Compute inter_T_lin first.")
if not hasattr(self, "T_lin"):
self.T_lin = dict()
compute = True
if lamda in self.T_lin.keys():
if t_s in self.T_lin[lamda].keys():
compute = False
logger.info("Transition matrices already computed "
f"for lamda={lamda}, t_s={t_s}")
if compute:
if reverse_time:
k_init = len(self.inter_T_lin[lamda][t_s])-1
k_range = reversed(range(k_init))
logger.info("Reversed time computation.")
else:
k_init = 0
k_range = range(1, len(self.inter_T_lin[lamda][t_s]))
# initial conditions
if lamda not in self.T_lin.keys():
self.T_lin[lamda] = dict()
if save_intermediate:
self.T_lin[lamda][t_s] = [
self.inter_T_lin[lamda][t_s][k_init]]
else:
self.T_lin[lamda][t_s] = self.inter_T_lin[
lamda][t_s][k_init]
if t_s not in self.T_lin[lamda].keys():
if save_intermediate:
self.T_lin[lamda][t_s] = [self.inter_T_lin[
lamda][t_s][k_init]]
else:
self.T_lin[lamda][t_s] = self.inter_T_lin[
lamda][t_s][k_init]
logger.info(
f"Computing transition matrix for lamda={lamda}, t_s={t_s}"
)
t0 = time.time()
for k in k_range:
if not k % 1000:
logger.info(
f"{k} over {len(self.inter_T_lin[lamda][t_s])}")
logger.info(f"{time.time()-t0:.2f}s")
if save_intermediate:
self.T_lin[lamda][t_s].append(
self.T_lin[lamda][t_s][-1] @ self.inter_T_lin[
lamda][t_s][k]
)
# normalize T to correct precision errors
inplace_csr_row_normalize(self.T_lin[lamda][t_s][-1])
else:
self.T_lin[lamda][t_s] = self.T_lin[lamda][t_s] @ \
self.inter_T_lin[lamda][t_s][k]
# normalize T to correct precision errors
inplace_csr_row_normalize(self.T_lin[lamda][t_s])
t_end = time.time()-t0
self._compute_times[
f"trans_matrix_lin_{lamda}_{t_s}_rev{reverse_time}"] = t_end
logger.info(f"Finished in {t_end}s")
logger.info(
f"Transition matrices already computed for lamda={lamda}"
)
def _compute_stationary_transition(self,
t_start=None,
t_stop=None,
use_sparse_stoch=True):
if not hasattr(self, "laplacians"):
self.compute_laplacian_matrices(t_start=t_start,
t_stop=t_stop)
logger.info("Computing stationary transition matrices")
self._stationary_trans = []
I = eye(self.num_nodes,
dtype=np.float64, format="csc")
# I = np.eye(self.num_nodes, dtype=np.float64)
t0 = time.time()
for k in range(len(self.laplacians)):
if not k % 1000:
logger.info(f"{k} over {len(self.laplacians)}")
logger.info(f"{time.time()-t0:.2f}s")
if use_sparse_stoch:
self._stationary_trans.append(
sparse_stationary_trans(I - self.laplacians[k])
)
else:
self._stationary_trans.append(
compute_stationary_transition(I - self.laplacians[k])
)
t_end = time.time() - t0
self._compute_times["_stationary_trans"] = t_end
logger.info(f"Stationary transition matrices computation took {t_end}s")
def _merge_overlapping_events(self):
"""
Merge temporally overlapping undir. event between each pair of nodes.
"""
events_to_keep = np.ones(self.events_table.shape[0], dtype=bool)
A = self.compute_static_adjacency_matrix()
# loop over nodes
for i, n1 in enumerate(self.node_array):
for n2 in (A[n1, :] > 0).nonzero()[1]:
mask_12 = np.logical_and(
self.events_table.source_nodes.values == n1,
self.events_table.target_nodes.values == n2
)
mask_21 = np.logical_and(
self.events_table.source_nodes.values == n2,
self.events_table.target_nodes.values == n1
)
# sort by starting times
evs = self.events_table.loc[
np.logical_or(mask_12, mask_21)
].sort_values(by=["starting_times", "ending_times"])
evs_list = list(evs.itertuples())
# event to compare
ev1 = evs_list[0]
merged = 0
for k in range(1, len(evs_list)):
ev2 = evs_list[k]
# if ev2 overlaps with ev1, merge them
# otherwise ev2 becomes ev1
if ev2.starting_times < ev1.ending_times:
# merge
events_to_keep[ev2.Index] = False
self.events_table.loc[
ev1.Index, "ending_times"] = ev2.ending_times
ev1._replace(ending_times=ev2.ending_times)
merged += 1
else:
ev1 = ev2
logger.info(f"n1,n2 ({n1},{n2}): {merged} merged")
num_merged = (events_to_keep == False).sum()
logger.info(f"Merged {num_merged} events.")
self.events_table = self.events_table.loc[events_to_keep]
self.events_table.reset_index(inplace=True, drop=True)
self.num_nodes = self.node_array.shape[0]
self.num_events = self.events_table.shape[0]
self.start_time = self.events_table.starting_times.min()
self.end_time = self.events_table.ending_times.max()
self._compute_time_grid()
return num_merged
def _compute_delta_trans_mat(self, lamda, round_zeros=True, tol=1e-8):
"""Comptes and matrix differences between each consecutive inter_T.
The computed difference is put in a attribute `delta_inter_T` the
self.delta_inter_T[
lamda][k] = self.inter_T[lamda][k+1] - self.inter_T[lamda][k]
The length of self.delta_inter_T[lamda] is len(self.inter_T[lamda]) - 1
"""
if hasattr(self, "inter_T") and lamda in self.inter_T.keys():
if not hasattr(self, "delta_inter_T"):
self.delta_inter_T = dict()
if lamda not in self.delta_inter_T.keys():
self.delta_inter_T[lamda] = [
self.inter_T[lamda][k+1] - self.inter_T[lamda][k]
for k in range(len(self.inter_T[lamda])-1)
]
if round_zeros:
for M in self.delta_inter_T[lamda]:
set_to_zeroes(M, tol=tol)
else:
# TODO: switch to logger
print("PID ", os.getpid(), " : ",
f"delta_inter_T has already been computed with lamda={lamda}")
else:
# TODO: switch to logger
print("PID ", os.getpid(), " : ", "delta_inter_T has not been computed")
if hasattr(self, "inter_T_lin") and lamda in self.inter_T_lin.keys():
if not hasattr(self, "delta_inter_T_lin"):
self.delta_inter_T_lin = dict()
if lamda not in self.delta_inter_T_lin.keys():
self.delta_inter_T_lin[lamda] = dict()
for t_s in self.inter_T_lin[lamda].keys():
self.delta_inter_T_lin[lamda][t_s] = [
self.inter_T_lin[
lamda][t_s][k+1] - self.inter_T_lin[lamda][t_s][k]
for k in range(len(self.inter_T_lin[lamda][t_s])-1)
]
if round_zeros:
for M in self.delta_inter_T_lin[lamda][t_s]:
set_to_zeroes(M, tol=tol)
else:
# TODO: switch to logger
print("PID ", os.getpid(), " : ",
f"delta_inter_T_lin has already been computed with lamda={lamda}")
else:
# TODO: switch to logger
print("PID ", os.getpid(), " : ", "delta_inter_T_lin has not been computed")
[docs]
def active_nodes(self, t_start, t_end):
"""Return the nodes active within the given time window."""
assert t_start < t_end , \
"t_end should be bigger than t_start"
t_start=max(self.start_time, t_start)
t_end=min(self.end_time, t_end)
mask = (self.events_table["starting_times"] < t_end) & (self.events_table["ending_times"] > t_start)
edges = self.events_table[mask]
nodes = set(edges["source_nodes"]).union(set(edges["target_nodes"]))
return np.array(list(nodes))
[docs]
def num_active_nodes(self, t_start, t_end):
"""Return the number of nodes active within the given time window."""
nodes=self.active_nodes(t_start, t_end)
return len(nodes)
[docs]
def num_active_edges(self, t_start, t_end):
"""Return the number of edges active within the given time window."""
assert t_start < t_end , \
"t_end should be bigger than t_start"
t_start=max(self.start_time, t_start)
t_end=min(self.end_time, t_end)
mask = (self.events_table["starting_times"] < t_end) & (self.events_table["ending_times"] > t_start)
return mask.sum()
[docs]
class ContTempInstNetwork(ContTempNetwork):
"""Continuous time temporal network with instantaneous events.
This is a subclass of ContTempNetwork for continuous time temporal
networks where events do not have a duration.
In practice, it is implemented as a ContTempNetwork where
ending_times_k = starting_times_k+1 and where durations (tau_k) = 1
for all events for the computation of the transition matrices.
Parameters
----------
source_nodes: Python list
List of source nodes of each event, ordered according to
`starting_times`.
target_nodes: Python list
List of target nodes of each event
starting_times: Python list
List of starting times of each event
relabel_nodes: boolean
Relabel nodes from 0 to num_nodes and save original labels in
self.node_to_label_dict. Default is `True`
reset_event_table_index: boolean
Reset the index of the `events_table` DataFrame. Default is `True`.
node_to_label_dict: Python dict
If `relabel_nodes` is `False, this can be used to save the original labels
of the nodes.
events_table: Pandas Dataframe
Dataframe with columns 'source_nodes', 'target_nodes', 'starting_times'
and 'ending_times' and index corresponding to event index. Used for
instantiating a new ConTempNetwork from the event_table of an other one.
"""
def __init__(self,
source_nodes=[],
target_nodes=[],
starting_times=[],
relabel_nodes=True,
reset_event_table_index=True,
node_to_label_dict=None,
events_table=None):
if events_table is None:
ending_times = [t + self._DEFAULT_DURATION
for t in starting_times]
else:
# Instant networks store events_tables without an ending_times
# column. The parent constructor's events_table branch requires
# ending_times, so we synthesize it here as start + default
# duration. For CSV inputs we read the file first, then forward a
# DataFrame to the parent.
if isinstance(events_table, (str, Path)):
events_table = pd.read_csv(str(events_table))
if isinstance(events_table, pd.DataFrame) and \
self._ENDINGS not in events_table.columns:
events_table = events_table.copy()
events_table[self._ENDINGS] = (
events_table[self._STARTS] + self._DEFAULT_DURATION
)
ending_times = [] # ignored when events_table is provided
super().__init__(source_nodes=source_nodes,
target_nodes=target_nodes,
starting_times=starting_times,
ending_times=ending_times,
relabel_nodes=relabel_nodes,
reset_event_table_index=reset_event_table_index,
node_to_label_dict=node_to_label_dict,
merge_overlapping_events=False,
events_table=events_table)
self.events_table["durations"] = [1.0]*self.events_table.shape[0]
self.instantaneous_events = True
[docs]
def compute_laplacian_matrices(self,
*,
t_start=None,
t_stop=None,
save_adjacencies=False):
"""Compute all laplacian matrices and saves them in self.laplacians.
Computes from the first time index before or equal to t_start until
the time index before t_stop.
laplacians are computed from self.times[self._k_start_laplacians]
until self.times[self._k_stop_laplacians-1]
The laplacian at step k, is the random walk laplacian
between times[k] and times[k+1]
NOTE: This subclass implements *pulse dynamics* (state ``A``,
``S``, ``Dm1``, ``degrees`` are reset to zero at every time step,
and event ends are no-ops). This is intentionally distinct from
``ContTempNetwork.compute_laplacian_matrices`` which implements
*interval dynamics* (persistent state across time steps, with
event ends clearing the corresponding adjacency entry). The
behavior here mirrors upstream ``TemporalNetwork.py`` at commit
f99bca3, so the two classes are not expected to produce equal
laplacians even when ending_times are aligned to start + 1.
The pulse semantics are encoded entirely via the
``_make_adjacency_buffer``, ``_laplacian_prewarm``,
``_laplacian_on_event_end`` and ``_laplacian_step_end`` hooks
below; the loop body itself lives in the parent class.
"""
return super().compute_laplacian_matrices(
t_start=t_start,
t_stop=t_stop,
save_adjacencies=save_adjacencies,
)
# --- pulse-dynamics hook overrides --------------------------------
def _make_adjacency_buffer(self, n):
# dok_matrix supports .clear() which is needed in
# _laplacian_step_end below.
return dok_matrix((n, n), dtype=np.float64)
def _laplacian_prewarm(self, state):
# Pulse dynamics: no events persist across step boundaries, so
# nothing to seed.
pass
def _laplacian_on_event_end(self, state, event):
# Pulse dynamics: end events are no-ops (state is reset every
# step in _laplacian_step_end below).
pass
def _laplacian_step_end(self, state):
state.A.clear()
state.S.data.fill(1.0)
state.Dm1.data.fill(1.0)
state.degrees.fill(0.0)
[docs]
def compute_inter_transition_matrices(self,
lamda=None,
t_start=None,
t_stop=None,
use_sparse_stoch=False,
dense_expm=True):
"""Compute interevent transition matrices.
T_k(lamda) = expm(-lamda*L_k).
Here, for instantaneous events, all events are assumed to have the
same duration of unit time (i.e. tau_k =1 for all k).
The transition matrix T_k is saved in `self.inter_T[lamda][k]`,
where self.inter_T is a dictionary with lamda as keys and
lists of transition matrices as values.
will compute from self.times[self._k_start_laplacians]
until self.times[self._k_stop_laplacians-1]
the transition matrix at step k, is the probability transition matrix
between times[k] and times[k+1].
"""
super().compute_inter_transition_matrices(
lamda=lamda,
t_start=t_start,
t_stop=t_stop,
fix_tau_k=True,
use_sparse_stoch=use_sparse_stoch,
dense_expm=dense_expm
)
[docs]
def compute_lin_inter_transition_matrices(self,
lamda=None,
t_start=None,
t_stop=None,
t_s=10,
use_sparse_stoch=False):
"""Compute interevent transition matrices as a linear approximation.
Approximation is done for expm(-lamda*L_k) based on the discrete time
transition matrix.
Here, for instantaneous events, all events are assumed to have the
same duration of unit time (i.e. tau_k =1 for all k).
`t_s` is the time value for which the linear approximation reaches the
stationary transition matrix (default is `t_s=10`).
The transition matrix T_k_lin is saved in
`self.inter_T_lin[lamda][t_s][k]`,
where `self.inter_T_lin` is a dictionary with lamda as keys and
lists of transition matrices as values.
will compute from self.times[self._k_start_laplacians]
until self.times[self._k_stop_laplacians-1]
the transition matrix at step k, is the probability transition matrix
between times[k] and times[k+1]
"""
super().compute_lin_inter_transition_matrices(
lamda=lamda,
t_start=t_start,
t_stop=t_stop,
t_s=t_s,
fix_tau_k=True,
use_sparse_stoch=use_sparse_stoch
)
[docs]
def lin_approx_trans_matrix(T, t, Pi=None, t_s=10):
r"""Linear approximation of the continuous time transition matrix.
:math:`T(t) = e^{-tL}` based on an interpolation between :math:`I` and
:math:`T` and a second interpolation between :math:`T` and :math:`\Pi`,
the transition matrix at stationarity.
For each connected component of the graph, :math:`T(t)` is approximated as
.. math::
\tilde{T}(t) & = & (1-t) I + tT \text{ for } 0\leq t \leq 1 \\
\tilde{T}(t) & = & \frac{1}{1-t_s}[T(t-t_s) + \Pi(1-t)] \text{ for } 1 < t \leq t_s \\
\tilde{T}(t) & = & \Pi \text{ for } t > t_s
where :math:`t_s` is the mixing time of the random walk
(default is `t_s=10`).
Parameters
----------
T : scipy.sparse csr matrix
Transition matrix of the discrete time random walk
t : float
Time, greater or equal to zero.
Pi : scipy.sparse matrix
Transition matrix of the discrete time random walk at stationarity.
If None, it will be computed from T.
t_s : float
Stationarity time at which the interpolation reaches Pi.
Returns
-------
Tapprox : scipy.sparse.csr matrix
Linear approximation of expmL at time t.
"""
assert isspmatrix_csr(T)
num_nodes = T.shape[0]
I = eye(num_nodes,
dtype=np.float64, format="csr")
if t < 0:
raise ValueError("t must be >= 0")
elif t <= 1:
return (1-t)*I + t*T.tocsr()
else:
if Pi is None:
Pi = compute_stationary_transition(T)
if t < t_s:
return (1/(1-t_s))*(T*(t-t_s)+Pi*(1-t))
else:
return Pi
[docs]
def compute_stationary_transition(T):
"""Compute the transition matrix at stationarity for matrix `T`
Parameters
----------
T : scipy.sparse matrix or numpy.ndarray
Transition matrix of the discrete time random walk.
Can be a CSC or CSR matrix.
Returns
-------
Pi : scipy.sparse.csr matrix
Stationary transition matrix.
"""
num_nodes = T.shape[0]
# otherwise 0 values may count as an edge
T.eliminate_zeros()
T.sort_indices()
n_comp, comp_labels = connected_components(T, directed=False)
comp_sizes = np.bincount(comp_labels)
cmp_to_indices = {
cmp: (comp_labels == cmp).nonzero()[0]
for cmp in range(n_comp)
}
# constructors for sparse array
data = np.zeros((comp_sizes**2).sum(), dtype=np.float64)
indices = np.zeros((comp_sizes**2).sum(), dtype=np.int32)
indptr = np.zeros(num_nodes+1, dtype=np.int32)
# vector of degrees (number of nonzero elements of each row)
if isspmatrix(T):
degs = np.diff(T.indptr)
else:
degs = (T > 0).sum(1)
data_ind = 0
for row in range(num_nodes):
cmp = comp_labels[row]
cmp_degs = degs[cmp_to_indices[cmp]]
data[data_ind:data_ind+comp_sizes[cmp]] = cmp_degs/cmp_degs.sum()
indices[data_ind:data_ind+comp_sizes[cmp]] = cmp_to_indices[cmp]
indptr[row] = data_ind
data_ind += comp_sizes[cmp]
indptr[num_nodes] = data_ind
# Stationary transition matrix
return csr_matrix(
(data, indices, indptr),
shape=(num_nodes, num_nodes),
dtype=np.float64
)
[docs]
def compute_subspace_expm(A,
n_comp=None,
comp_labels=None,
thresh_ratio=None,
normalize_rows=True):
"""Compute the exponential matrix of `A`.
The computation is done by applying expm on each connected subgraphs
defined by A and recomposing it to return expm(A).
Parameters
----------
A : scipy.sparse.csc_matrix
thresh_ratio: float, optional.
Threshold ratio used to trim negligible values in the resulting matrix.
Values smaller than `max(expm(A))/thresh_ratio` are set to
zero. Default is None.
normalize_rows: bool, optional.
Whether rows of the resulting matrix are normalized to sum to 1.
Returns
-------
expm(A) : scipy.sparse.csr_matrix
matrix exponential of A
"""
num_nodes = A.shape[0]
# otherwise 0 values may count as an edge
A.eliminate_zeros()
A.sort_indices()
if (n_comp is None) or (comp_labels is None):
n_comp, comp_labels = connected_components(A, directed=False)
comp_sizes = np.bincount(comp_labels)
cmp_indices = [
(comp_labels == cmp).nonzero()[0] for cmp in range(n_comp)
]
logger.info(f"subspace_expm with {n_comp} components")
# constructors for sparse array
data = np.zeros((comp_sizes**2).sum(), dtype=np.float64)
indices = np.zeros((comp_sizes**2).sum(), dtype=np.int32)
indptr = np.zeros(num_nodes+1, dtype=np.int32)
# if nproc == 1:
# expm_func = lambda M: expm(M)
# else:
# expm_func = lambda M: compute_parallel_expm(M, nproc=nproc,
# thresh_ratio=None,
# normalize_rows=False)
subnets_expms = []
for i, cmp_ind in enumerate(cmp_indices):
logger.info(
f"Computing component {i} over {n_comp}, with size {cmp_ind.size}"
)
subnets_expms.append(expm(A[cmp_ind, :][:, cmp_ind]).toarray())
# reconstruct csr sparse matrix
logger.info("Reconstructing expm mat")
data_ind = 0
for row in range(num_nodes):
cmp = comp_labels[row]
cmp_expm = subnets_expms[cmp]
sub_expm_row, = np.where(cmp_indices[cmp] == row)
data[data_ind:data_ind+comp_sizes[cmp]] = cmp_expm[sub_expm_row, :]
indices[data_ind:data_ind+comp_sizes[cmp]] = cmp_indices[cmp]
indptr[row] = data_ind
data_ind += comp_sizes[cmp]
indptr[num_nodes] = data_ind
expmA = csr_matrix(
(data, indices, indptr),
shape=(num_nodes, num_nodes),
dtype=np.float64
)
if thresh_ratio is not None:
expmA.data[expmA.data < expmA.data.max() / thresh_ratio] = 0.0
expmA.eliminate_zeros()
if normalize_rows:
inplace_csr_row_normalize(expmA)
return expmA
[docs]
def csc_row_normalize(X):
"""Row normalize scipy sparse csc matrices.
returns a copy of X row-normalized and in CSC format.
"""
X = X.tocsr()
for i in range(X.shape[0]):
row_sum = X.data[X.indptr[i]:X.indptr[i+1]].sum()
if row_sum != 0:
X.data[X.indptr[i]:X.indptr[i+1]] /= row_sum
return X.tocsc()
[docs]
def find_spectral_gap(L):
"""L is assummed to be connected"""
Lcsr = L.tocsr()
I = eye(L.shape[0],
dtype=np.float64,
format="csr")
degs = np.diff((I-Lcsr).indptr)
D12 = diags(np.sqrt(degs),
format="csr")
Dm12 = diags(1/np.sqrt(degs),
format="csr")
Lsym = D12 @ Lcsr @ Dm12
# stationary solution
Pi = np.vstack([degs/degs.sum()]*L.shape[0])
gap = eigsh(Lsym.toarray()-Pi, 1, sigma=0, return_eigenvectors=False)
return gap
[docs]
def remove_nnz_rowcol(L):
"""CSC or CSR matrix with removed zero row and columns
This also returns an array of the indices of rows/columns with non-zero
values and the (linear) size of L.
Returns
-------
L_small, nonzero_indices, size
"""
# indicies with zero sum row AND col
nonzerosum_rowcols = ~np.logical_and(L.getnnz(1) == 0,
L.getnnz(0) == 0)
nonzero_indices, = (nonzerosum_rowcols).nonzero()
return (
L[nonzerosum_rowcols][:, nonzerosum_rowcols],
nonzero_indices,
L.shape[0]
)
[docs]
def numpy_rebuild_nnz_rowcol(T_data,
T_indices,
T_indptr,
zero_indices):
"""Returns a CSR matrix.
The CSR matrix (data, indices, rownnz, shape) is built from the CSR matrix
T_small but with added row-colums at zero_indicies (with 1 on the diagonal)
"""
n_rows = T_indptr.size-1 + zero_indices.size
data = np.zeros(T_data.size+zero_indices.size, dtype=np.float64)
indices = np.zeros(T_data.size+zero_indices.size, dtype=np.int32)
indptr = np.zeros(n_rows+1, dtype=np.int32)
new_col_inds = np.zeros(T_indptr.size-1, dtype=np.int32)
Ts_indices = np.zeros(T_indices.size, dtype=np.int32)
zero_set = set(zero_indices)
# map col indices to new positions
k = 0
for i in range(n_rows):
if i not in zero_set:
new_col_inds[k] = i
k += 1
for k, i in enumerate(T_indices):
Ts_indices[k] = new_col_inds[i]
row_id_small_t = -1
data_ind = 0
for row_id in range(n_rows):
row_id_small_t += 1
if row_id in zero_set:
# add a row with just 1 on the diagonal
data[data_ind] = 1.0
indices[data_ind] = row_id
indptr[row_id+1] = indptr[row_id]+1
row_id_small_t -= 1
data_ind += 1
else:
row_start = T_indptr[row_id_small_t]
row_end = T_indptr[row_id_small_t+1]
num_data_row = row_end - row_start
data[data_ind:data_ind+num_data_row] = T_data[row_start:row_end]
indices[
data_ind:data_ind+num_data_row] = Ts_indices[row_start:row_end]
indptr[row_id+1] = indptr[row_id]+num_data_row
data_ind += num_data_row
return (data, indices, indptr, n_rows)
[docs]
def sparse_lapl_expm(L,
fact,
dense_expm=True,
nproc=1,
thresh_ratio=None,
normalize_rows=True):
"""Computes the matrix exponential of a laplacian L.
The exponential, expm(-fact*L), is computed considering only the non-zeros
rows/cols of L
Parameters
----------
L : scipy sparse csc matrix
Laplacian matrix with large proportion of zero rows/cols.
fact : float
factor in front of the laplacian
dense_expm : boolean
Whether to compute the expm on the small Laplacian as a dense
or sparse array. Default is True.
nproc : int, optional
number of parallel processes for dense_expm=False. The default is 1.
thresh_ratio: float, optional.
Threshold ratio used to trim negligible values in the resulting matrix.
Values smaller than `max(expm(A))/thresh_ratio` are set to
zero. For dense_expm=False. Default is None.
normalize_rows: bool, optional.
Whether rows of the resulting matrix are normalized to sum to 1.
For dense_expm=False
Returns
-------
expm(-fact*L) : `SparseStochMat` object
Transition matrix
"""
if L.getnnz() == 0: # zero matrix
# return identity
return SparseStochMat.create_diag(L.shape[0])
L_small, nz_inds, size = remove_nnz_rowcol(L)
if nproc == 1:
expm_func = partial(compute_subspace_expm,
A=-fact*L_small,
thresh_ratio=thresh_ratio,
normalize_rows=normalize_rows)
else:
expm_func = partial(compute_subspace_expm_parallel,
A=-fact*L_small,
nproc=nproc,
thresh_ratio=thresh_ratio,
normalize_rows=normalize_rows)
if dense_expm:
T_small = csr_matrix(expm(-fact*L_small.toarray()))
else:
# for large networks, try subspace expm
L_small.eliminate_zeros()
if L_small.shape[0] >= 1000:
n_comp, comp_labels = connected_components(L_small, directed=False)
if n_comp > 1 :
T_small = expm_func(n_comp=n_comp,
comp_labels=comp_labels)
else:
T_small = expm(-fact*L_small).tocsr()
else:
T_small = expm(-fact*L_small).tocsr()
return SparseStochMat(size, T_small.data, T_small.indices,
T_small.indptr, nz_inds)
[docs]
def sparse_lin_approx(T, t, Pi=None, t_s=10, nz_rowcols=None):
"""Linear approximation of a continuous time transition matrix.
This is desgned for sparse transition matrices.
Performs computation using `lin_approx_trans_matrix`
on a smallest L matrices with no zeros
row/cols and returns a SparseStochMat
Parameters
----------
T : scipy sparse csr matrix
Original full size laplacian matrix.
t : float
Interpolation time.
Pi : scipcy sparse csr matrix, optional
Transition matrix at stationarity. Same shape that T.
The default is None, i.e. computed from T.
t_s : float, optional
Stationarity time at which the interpolation reaches Pi.
The default is 10.
nz_rowcols : ndarray of int32
indices of T of nonzero offdiagonal rows/cols to build a
SparseStochMat.
Returns
-------
Tapprox : SparseStochMat object
Linear approximation at time t.
"""
T_ss = SparseStochMat.from_full_csr_matrix(T, nz_rowcols=nz_rowcols)
if Pi is None:
Pi_small = compute_stationary_transition(T_ss.T_small)
elif isinstance(Pi, SparseStochMat):
Pi_small = Pi.T_small
elif isinstance(Pi, csr_matrix):
Pi_small = SparseStochMat.from_full_csr_matrix(
Pi, nz_rowcols=nz_rowcols
).T_small
else:
raise TypeError("Pi must be a csr or SparseStochMat.")
Tapprox_small = lin_approx_trans_matrix(T_ss.T_small,
t=t,
Pi=Pi_small,
t_s=t_s)
return SparseStochMat(T_ss.size, Tapprox_small.data, Tapprox_small.indices,
Tapprox_small.indptr, T_ss.nz_rowcols)
[docs]
def sparse_stationary_trans(T):
"""Parameters
----------
T : scipy sparse csr matrix
Discrete time transition matrix.
Returns
-------
Pi : scipy sparse csr matrix
Transition matrix at stationarity
"""
T_ss = SparseStochMat.from_full_csr_matrix(T.tocsr())
Pi_small = compute_stationary_transition(T_ss.T_small)
return SparseStochMat(T_ss.size, Pi_small.data, Pi_small.indices,
Pi_small.indptr, T_ss.nz_rowcols)
[docs]
def set_to_ones(Tcsr, tol=1e-8):
"""In-place replacement of ones in sparse matrix within the tolerence.
Replace values within a tolerance to one with actual ones.
"""
Tcsr.data[np.abs(Tcsr.data - 1) <= tol] = 1
[docs]
def set_to_zeroes(Tcsr, tol=1e-8, relative=True, use_absolute_value=False):
"""In-place replacement of zeroes in sparse matrix within a tolerance.
Replace values that are, within the tolerence, close to zero with actual
zeroes.
If tol is None, does nothing
"""
if tol is not None:
if isinstance(Tcsr, SparseStochMat):
Tcsr.set_to_zeroes(tol, relative=relative)
elif isinstance(Tcsr, (csr_matrix, csc_matrix)):
if Tcsr.data.size > 0:
if relative:
# tol = tol*np.abs(Tcsr.data).max()
# finding the max of the absolute value without making a
# copy of the whole array
tol = tol*np.abs([Tcsr.data.min(), Tcsr.data.max()]).max()
if use_absolute_value:
Tcsr.data[np.abs(Tcsr.data) <= tol] = 0
else:
Tcsr.data[Tcsr.data <= tol] = 0
Tcsr.eliminate_zeros()
else:
raise TypeError("Tcsr must be csc,csr or SparseStochMat")