"""#
# flow stability
#
# Copyright (C) 2021 Alexandre Bovet <alexandre.bovet@maths.ox.ac.uk>
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation; either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
import os
import time
from multiprocessing import Pool, RawArray
import numpy as np
from scipy.sparse import csc_matrix, csr_matrix, isspmatrix_csc, vstack
from scipy.sparse.csgraph import connected_components
from scipy.sparse.linalg import expm, expm_multiply
from stochmat import inplace_csr_row_normalize
# A global dictionary storing the variables passed from the initializer.
var_dict = {}
def _init_worker(data, indices, indptr, N):
# reconstruct A from shared arrays
var_dict["A"] = csc_matrix((np.frombuffer(data, dtype=np.float64),
np.frombuffer(indices, dtype=np.int32),
np.frombuffer(indptr, dtype=np.int32)),
shape=(N,N))
var_dict["N"] = N
def _worker(args):
i, thresh_ratio = args
delta_i = np.zeros(var_dict["N"],
dtype=np.float64)
delta_i[i] = 1.0
Tcol_i = expm_multiply(var_dict["A"],
delta_i)
if thresh_ratio is not None:
Tcol_i[Tcol_i<Tcol_i.max()/thresh_ratio]=0
return csc_matrix(Tcol_i)
[docs]
def compute_parallel_expm(A, nproc=1, thresh_ratio=None,
normalize_rows=True,
verbose=True):
"""Computes the exponential matrix of A by computing each column separately
exploiting the fact that the column i of expm(A) is expm_multiply(A,delta_i)
where delta i is the vector with zeros everywhere except on i.
This only works if A is equal to (minus) a Laplacian matrix
Parameters
----------
A : scipy csc sparse matrix
Square csc sparse matrix representing (+ or -) a Laplacian.
If A is not csc, it will be converted to csc format.
nproc : int, optional
number of parallel processes. The default is 1.
thresh_ratio: float, optional.
Threshold ratio used to trim negligible values in the resulting matrix.
For each columns `c`, values smaller than `max(c)/thresh_ratio` are set to
zero. Default is None.
normalize_rows: bool, optional.
Whether rows of the resulting matrix are normalized to sum to 1.
Returns
-------
expm(A).
scipy csr sparse matrix
"""
if not isspmatrix_csc(A):
A = csc_matrix(A)
N = A.shape[0]
# create arrays to share A between processes
indices = RawArray("i",A.indices)
indptr = RawArray("i",A.indptr)
data = RawArray("d", A.data)
if verbose:
print("PID ", os.getpid(), " : ",f"compute_parallel_expm starting a pool of {nproc} workers")
t0 = time.time()
with Pool(nproc, initializer=_init_worker,
initargs=(data, indices, indptr, N)) as p:
res = p.map(_worker, [(i,thresh_ratio) for i in range(N)])
# delete shared arrays
global var_dict
var_dict = {}
# seems faster than _stack_sparse_cols
T = vstack(res).T.tocsr()
if normalize_rows:
inplace_csr_row_normalize(T)
if verbose:
print("PID ", os.getpid(), " : ", f"compute_parallel_expm took {time.time()-t0:.3f}s, computed expm has {T.getnnz()} non-zeros.")
return T
def _stack_sparse_cols(col_list):
# create csc sparse matric from sparse column list
data_size = sum(C.data.size for C in col_list)
N = max(col_list[0].shape)
data = np.zeros(data_size, dtype=np.float64)
indices = np.zeros(data_size, dtype=np.int32)
indptr = np.zeros(N+1, dtype=np.int32)
ptr = 0
for col, C in enumerate(col_list):
nnz_col = C.data.size
data[ptr:ptr+nnz_col] = C.T.tocsc().data
indices[ptr:ptr+nnz_col] = C.T.tocsc().indices
ptr = ptr + nnz_col
indptr[col+1] = ptr
return csc_matrix((data, indices, indptr), shape=(N,N))
def _expm_worker(cmp_ind):
return expm(var_dict["A"][cmp_ind,:][:,cmp_ind]).toarray()
[docs]
def compute_subspace_expm_parallel(A, n_comp=None, comp_labels=None, verbose=False,
nproc=1,
thresh_ratio=None,
normalize_rows=True):
"""Compute the exponential matrix of `A` by applying expm on each connected
subgraphs defined by A and recomposing it to return expm(A).
Small subgraphs are computed in parallel, each using scipy expm,
and large subgraphs are computed with compute_parallel_expm.
Parameters
----------
A : scipy.sparse.csc_matrix
nproc : int, optional
number of parallel processes. The default is 1.
thresh_ratio: float, optional.
Threshold ratio used to trim negligible values in the resulting matrix.
Values smaller than `max(expm(A))/thresh_ratio` are set to
zero. Default is None.
normalize_rows: bool, optional.
Whether rows of the resulting matrix are normalized to sum to 1.
Returns
-------
expm(A) : scipy.sparse.csr_matrix
matrix exponential of A
"""
num_nodes = A.shape[0]
# otherwise 0 values may count as an edge
A.eliminate_zeros()
A.sort_indices()
if (n_comp is None) or (comp_labels is None):
n_comp, comp_labels = connected_components(A,directed=False)
comp_sizes = np.bincount(comp_labels)
cmp_indices = [(comp_labels == cmp).nonzero()[0] for \
cmp in range(n_comp)]
if verbose:
print("PID ", os.getpid(), f" : subspace_expm with {n_comp} components")
large_comps, = np.nonzero(comp_sizes >= 1000)
small_comps, = np.nonzero(comp_sizes < 1000)
subnets_expms = dict()
# first compute large comps with exmp_multiply
for cmp in large_comps:
cmp_ind = cmp_indices[cmp]
if verbose:
print("PID ", os.getpid(), f" : computing component {cmp} over {n_comp}, with size {cmp_ind.size} using expm_multiply")
subnets_expms[cmp] = compute_parallel_expm(A[cmp_ind,:][:,cmp_ind],
nproc=nproc,
thresh_ratio=None,
normalize_rows=False,
verbose=verbose,
).toarray()
Aindices = RawArray("i",A.indices)
Aindptr = RawArray("i",A.indptr)
Adata = RawArray("d", A.data)
# now compute small comps in parallel
if verbose:
print("PID ", os.getpid(), f" : computing {small_comps.size} small components with {nproc} workers")
t0 = time.time()
with Pool(nproc, initializer=_init_worker,
initargs=(Adata, Aindices, Aindptr, num_nodes)) as p:
res = p.map(_expm_worker, [cmp_indices[c] for c in small_comps])
# delete shared arrays
global var_dict
var_dict = {}
if verbose:
print("PID ", os.getpid(), f" : small components computation took {time.time()-t0:.3f}s")
t0 = time.time()
# organize results
for c, sub_expm in zip(small_comps, res):
subnets_expms[c] = sub_expm
# constructors for sparse array
data = np.zeros((comp_sizes**2).sum(), dtype=np.float64)
indices = np.zeros((comp_sizes**2).sum(), dtype=np.int32)
indptr = np.zeros(num_nodes+1, dtype=np.int32)
# reconstruct csr sparse matrix
if verbose:
print("PID ", os.getpid(), " : reconstructing expm mat")
data_ind = 0
for row in range(num_nodes):
cmp = comp_labels[row]
cmp_expm = subnets_expms[cmp]
sub_expm_row, = np.where(cmp_indices[cmp] == row)
data[data_ind:data_ind+comp_sizes[cmp]] = cmp_expm[sub_expm_row,:]
indices[data_ind:data_ind+comp_sizes[cmp]] = cmp_indices[cmp]
indptr[row] = data_ind
data_ind += comp_sizes[cmp]
indptr[num_nodes] = data_ind
expmA = csr_matrix((data, indices, indptr), shape=(num_nodes,num_nodes),
dtype=np.float64)
if thresh_ratio is not None:
expmA.data[expmA.data<expmA.data.max()/thresh_ratio] = 0.0
expmA.eliminate_zeros()
if normalize_rows:
inplace_csr_row_normalize(expmA)
return expmA