# coding: utf8
""" Implementation of exotic DPP objects:
- Uniform spanning trees :class:`UST`
- Descent procresses :class:`Descent`:
* :class:`CarriesProcess`
* :class:`DescentProcess`
* :class:`VirtualDescentProcess`
- :class:`PoissonizedPlancherel` measure
.. seealso:
`Documentation on ReadTheDocs <https://dppy.readthedocs.io/en/latest/exotic_dpps/index.html>`_
"""
from abc import ABCMeta, abstractmethod
import numpy as np
from scipy.linalg import qr
import matplotlib.pyplot as plt
from matplotlib import collections as mc # see plot_diagram
from dppy.exotic_dpps_core import ust_sampler_wilson, ust_sampler_aldous_broder
from dppy.exact_sampling import proj_dpp_sampler_eig, proj_dpp_sampler_kernel
# For DescentProcess
from re import findall # to convert class names to string in
# from dppy.exotic_dpps_core import wrapper_plot_descent
# For Poissonized Plancherel measure
from dppy.exotic_dpps_core import RSK, xy_young_ru, limit_shape
# For both Descent Processes and Poissonized Plancherel
from dppy.exotic_dpps_core import uniform_permutation
from dppy.utils import check_random_state
#####################
# Descent Processes #
#####################
class Descent(metaclass=ABCMeta):
def __init__(self):
self.name = ' '.join(findall('[A-Z][^A-Z]*', self.__class__.__name__))
self.list_of_samples = []
self.size = 100
@property
@abstractmethod
def _bernoulli_param(self):
"""Parameter of the corresponding process formed by i.i.d. Bernoulli variables.
This parameter corresponds to the probability that a descent occurs any index"""
return 0.5
@abstractmethod
def sample(self, random_state=None):
"""Sample from corresponding process"""
def flush_samples(self):
""" Empty the :py:attr:`list_of_samples` attribute.
"""
self.list_of_samples = []
def plot(self, vs_bernoullis=True, random_state=None):
"""Display the last realization of the process.
If ``vs_bernoullis=True`` compare it to a sequence of i.i.d. Bernoullis with parameter ``_bernoulli_param``
.. seealso::
- :py:meth:`sample`
"""
rng = check_random_state(random_state)
title = 'Realization of the {} process'.format(self.name)
fig, ax = plt.subplots(figsize=(19, 2))
sampl = self.list_of_samples[-1]
ax.scatter(sampl,
np.zeros_like(sampl) + (1.0 if vs_bernoullis else 0.0),
color='b', s=20, label=self.name)
if vs_bernoullis:
title += r' vs independent Bernoulli variables with parameter $p$={:.3f}'.format(self._bernoulli_param)
bern = np.where(rng.rand(self.size) < self._bernoulli_param)[0]
ax.scatter(bern, -np.ones_like(bern),
color='r', s=20, label='Bernoullis')
plt.title(title)
# Spine options
ax.spines['bottom'].set_position('center')
ax.spines['left'].set_visible(False)
ax.spines['top'].set_visible(False)
ax.spines['right'].set_visible(False)
# Ticks options
minor_ticks = np.arange(0, self.size + 1)
major_ticks = np.arange(0, self.size + 1, 10)
ax.set_xticks(major_ticks)
ax.set_xticks(minor_ticks, minor=True)
ax.set_xticklabels(major_ticks, fontsize=15)
ax.xaxis.set_ticks_position('bottom')
ax.tick_params(
axis='y', # changes apply to the y-axis
which='both', # both major and minor ticks are affected
left=False, # ticks along the left edge are off
right=False, # ticks along the right edge are off
labelleft=False) # labels along the left edge are off
ax.xaxis.grid(True)
ax.set_xlim([-1, self.size + 1])
ax.legend(bbox_to_anchor=(0, 0.85),
frameon=False,
prop={'size': 15})
[docs]class CarriesProcess(Descent):
""" DPP on :math:`\\{1, \\dots, N-1\\}` (with a non symmetric kernel) derived from the cumulative sum of :math:`N` i.i.d. digits in :math:`\\{0, \\dots, b-1\\}`.
:param base:
Base/radix
:type base:
int, default 10
.. seealso::
- :cite:`BoDiFu10`
- :ref:`carries_process`
"""
def __init__(self, base=10):
super().__init__()
self.base = base
def __str__(self):
str_info = ['Carries process in base {}'.format(self.base),
'Number of samples = {}'.format(len(self.list_of_samples))]
return '\n'.join(str_info)
@property
def _bernoulli_param(self):
return 0.5 * (1 - 1 / self.base)
[docs] def sample(self, size=100, random_state=None):
""" Compute the cumulative sum (in base :math:`b`) of a sequence of i.i.d. digits and record the position of carries.
:param size:
size of the sequence of i.i.d. digits in :math:`\\{0, \\dots, b-1\\}`
:type size:
int
"""
rng = check_random_state(random_state)
self.size = size
A = rng.randint(0, self.base, self.size)
B = np.mod(np.cumsum(A), self.base)
carries = 1 + np.where(B[:-1] > B[1:])[0]
self.list_of_samples.append(carries.tolist())
[docs]class DescentProcess(Descent):
""" DPP on :math:`\\{1, \\dots, N-1\\}` associated to the descent process on the symmetric group :math:`\\mathfrak{S}_N`.
.. seealso::
- :cite:`BoDiFu10`
- :ref:`descent_process`
"""
def __init__(self):
super().__init__()
def __str__(self):
str_info = ['Descent process',
'Number of samples = {}'.format(len(self.list_of_samples))]
return '\n'.join(str_info)
@property
def _bernoulli_param(self):
return 0.5
[docs] def sample(self, size=100, random_state=None):
""" Draw a permutation :math:`\\sigma \\in \\mathfrak{S}_N` uniformly at random and record the descents i.e. :math:`\\{ i ~;~ \\sigma_i > \\sigma_{i+1} \\}`.
:param size:
size of the permutation i.e. degree :math:`N` of :math:`\\mathfrak{S}_N`.
:type size:
int
"""
rng = check_random_state(random_state)
self.size = size
sigma = uniform_permutation(self.size, random_state=rng)
descent = 1 + np.where(sigma[:-1] > sigma[1:])[0]
self.list_of_samples.append(descent.tolist())
[docs]class VirtualDescentProcess(Descent):
""" This is a DPP on :math:`\\{1, \\dots, N-1\\}` with a non symmetric kernel appearing in (or as a limit of) the descent process on the symmetric group :math:`\\mathfrak{S}_N`.
.. seealso::
- :cite:`Kam18`
- :ref:`limiting_descent_process`
- :class:`DescentProcess`
"""
def __init__(self, x_0=0.5):
super().__init__()
if not (0 <= x_0 <= 1):
raise ValueError("x_0 must be in [0,1]")
self.x_0 = x_0
def __str__(self):
str_info = ['Limitting Descent process for vitural permutations',
'Number of samples = {}'.format(len(self.list_of_samples))]
return '\n'.join(str_info)
@property
def _bernoulli_param(self):
return 0.5 * (1 - self.x_0**2)
[docs] def sample(self, size=100, random_state=None):
""" Draw a permutation uniformly at random and record the descents i.e. indices where :math:`\\sigma(i+1) < \\sigma(i)` and something else...
:param size:
size of the permutation i.e. degree :math:`N` of :math:`\\mathfrak{S}_N`.
:type size:
int
.. seealso::
- :cite:`Kam18`, Sec ??
.. todo::
ask @kammmoun to complete the docsting and Section in see also
"""
rng = check_random_state(random_state)
self.size = size
sigma = uniform_permutation(self.size + 1, random_state=rng)
X = sigma[:-1] > sigma[1:] # Record the descents in permutation
Y = rng.binomial(n=2, p=self.x_0, size=self.size + 1) != 1
descent = [i for i in range(self.size)
if (~Y[i] and Y[i + 1]) or (~Y[i] and ~Y[i + 1] and X[i])]
# ~ symbol is equivalent to not on boolean numpy array
self.list_of_samples.append(descent)
##########################
# Poissonized Plancherel #
##########################
[docs]class PoissonizedPlancherel:
""" DPP on partitions associated to the Poissonized Plancherel measure
:param theta:
Poisson parameter i.e. expected length of permutation
:type theta:
int, default 10
.. seealso::
- :cite:`Bor09` Section 6
- :ref:`poissonized_plancherel_measure`
"""
def __init__(self, theta=10):
self.theta = theta # Poisson param = expected length of permutation
self.list_of_young_diag = []
self.list_of_samples = []
def __str__(self):
str_info = ['Poissonized Plancherel measure\
with parameter {}'.format(self.theta),
'Number of samples = {}'.format(len(self.list_of_samples))]
return '\n'.join(str_info)
# def info(self):
# """ Print infos about the :class:`UST` object
# """
# print(self.__str__())
[docs] def sample(self, random_state=None):
""" Sample from the Poissonized Plancherel measure.
:param random_state:
:type random_state:
None, np.random, int, np.random.RandomState
"""
rng = check_random_state(random_state)
N = rng.poisson(self.theta)
sigma = uniform_permutation(N, random_state=rng)
P, _ = RSK(sigma)
# young_diag = [len(row) for row in P]
young_diag = np.fromiter(map(len, P), dtype=int)
self.list_of_young_diag.append(young_diag)
# sampl = [len(row) - i + 0.5 for i, row in enumerate(P, start=1)]
sampl = young_diag - np.arange(0.5, young_diag.size)
self.list_of_samples.append(sampl.tolist())
[docs] def plot(self, title=''):
"""Display the process on the real line
:param title:
Plot title
:type title:
string
.. seealso::
- :py:meth:`sample`
"""
sampl = self.list_of_samples[-1]
# Display the reparametrized Plancherel sample
fig, ax = plt.subplots(figsize=(19, 2))
ax.scatter(sampl, np.zeros_like(sampl), color='blue', s=20)
# Spine options
ax.spines['bottom'].set_position('center')
ax.spines['left'].set_visible(False)
ax.spines['top'].set_visible(False)
ax.spines['right'].set_visible(False)
# Ticks options
x_max = np.max(np.abs(sampl)) + 0.5
minor_ticks = np.arange(-x_max, x_max + 1)
major_ticks = np.arange(-100, 100 + 1, 10)
ax.set_xticks(major_ticks)
ax.set_xticks(minor_ticks, minor=True)
ax.set_xticklabels(major_ticks, fontsize=15)
ax.xaxis.set_ticks_position('bottom')
ax.tick_params(
axis='y', # changes apply to the y-axis
which='both', # both major and minor ticks are affected
left=False, # ticks along the left edge are off
right=False, # ticks along the right edge are off
labelleft=False) # labels along the left edge are off
ax.xaxis.grid(True)
ax.set_xlim([-x_max - 2, x_max + 2])
# ax.legend(bbox_to_anchor=(0,0.85), frameon=False, prop={'size':20})
str_title = r'Realization of the DPP associated to the Poissonized Plancherel measure with parameter $\theta=${}'.format(self.theta)
plt.title(title if title else str_title)
[docs] def plot_diagram(self, normalization=False):
""" Display the Young diagram (russian convention), the associated sample and potentially rescale the two to visualize the limit-shape theorem :cite:`Ker96`.
The sample corresponds to the projection onto the real line of the descending surface edges.
:param normalization:
If ``normalization=True``, the Young diagram and the corresponding sample are scaled by a factor :math:`\\sqrt{\\theta}` and the limiting
:type normalization:
bool, default False
.. seealso::
- :py:meth:`sample`
- :py:meth:`plot`
- :cite:`Ker96`
"""
y_diag = self.list_of_young_diag[-1]
sampl = self.list_of_samples[-1].copy()
x_max = 1.1 * max(y_diag.size, y_diag[0])
xy_young = xy_young_ru(y_diag)
if normalization:
sampl /= np.sqrt(self.theta)
x_max /= np.sqrt(self.theta)
xy_young /= np.sqrt(self.theta)
fig, ax = plt.subplots(figsize=(12, 8))
# Display corresponding sample
ax.scatter(sampl, np.zeros_like(sampl),
s=3, label='sample')
# Display absolute value wedge
ax.plot([-x_max, 0.0, x_max], [x_max, 0.0, x_max],
c='k', lw=1)
# Display young diagram in russian notation
lc = mc.LineCollection(xy_young.reshape((-1, 2, 2)),
color='k', linewidths=2)
ax.add_collection(lc)
# Display limit shape
if normalization:
x_lim_sh = np.linspace(-x_max, x_max, 100)
ax.plot(x_lim_sh, limit_shape(x_lim_sh),
c='r', label='limit shape')
# Display stems linking sample on real line and descent in young diag
# xy_y_diag = np.column_stack([y_diag,
# np.arange(0.5, y_diag.size)]).dot(rot_45_and_scale.T)
# if normalization:
# xy_y_diag /= np.sqrt(theta)
# plt.scatter(xy_y_diag[:,0], np.zeros_like(y_diag), color='r')
# plt.stem(xy_y_diag[:,0], xy_y_diag[:,1], linefmt='C0--', basefmt=' ')
plt.legend(loc='best')
plt.axis('equal')
str_title = r'Young diagram associated to Poissonized Plancherel measure with parameter $\theta=${}'.format(self.theta)
plt.title(str_title)
##########################
# Uniform Spanning Trees #
##########################
[docs]class UST:
""" DPP on edges of a connected graph :math:`G` with correlation kernel the projection kernel onto the span of the rows of the incidence matrix :math:`\\text{Inc}` of :math:`G`.
This DPP corresponds to the uniform measure on spanning trees (UST) of :math:`G`.
:param graph:
Connected undirected graph
:type graph:
networkx graph
.. seealso::
- :ref:`UST`
- :ref:`Definition of DPP <finite_dpps_definition>`
"""
def __init__(self, graph):
# For Uniform Spanning Trees
try:
import networkx as nx
self.nx = nx
except ImportError:
raise ValueError('The networkx package is required to sample spanning trees (see setup.py).')
if nx.is_connected(graph):
self.graph = graph
else:
raise ValueError('graph not connected')
self.nodes = list(self.graph.nodes())
self.nb_nodes = self.graph.number_of_nodes() # len(self.graph)
self.edges = list(self.graph.edges())
self.nb_edges = self.graph.number_of_edges() # len(self.edges)
self.edge_labels = {edge: r'$e_{}$'.format(i)
for i, edge in enumerate(self.edges)}
self.neighbors = [list(graph.neighbors(v))
for v in range(self.nb_nodes)]
self.sampling_mode = 'Wilson' # Default (avoid eig_vecs computation)
self._sampling_modes = {'markov-chain': ['Wilson', 'Aldous-Broder'],
'spectral-method': ['GS'],
'projection-K-kernel': ['Schur', 'Chol']}
self.list_of_samples = []
self.kernel = None
self.kernel_eig_vecs = None
def __str__(self):
str_info = ['Uniform Spanning Tree measure on a graph with:',
'- {} nodes'.format(self.nb_nodes),
'- {} edges'.format(self.nb_edges),
'Sampling mode = {}'.format(self.sampling_mode),
'Number of samples = {}'.format(len(self.list_of_samples))]
return '\n'.join(str_info)
# def info(self):
# """ Print infos about the :class:`UST` object
# """
# print(self.__str__())
[docs] def flush_samples(self):
""" Empty the :py:attr:`list_of_samples` attribute.
"""
self.list_of_samples = []
[docs] def sample(self, mode='Wilson', root=None, random_state=None):
""" Sample a spanning of the underlying graph uniformly at random.
It generates a networkx graph object.
:param mode:
Markov-chain-based samplers:
- ``'Wilson'``, ``'Aldous-Broder'``
Chain-rule-based samplers:
- ``'GS'``, ``'GS_bis'``, ``'KuTa12'`` from eigenvectors
- ``'Schur'``, ``'Chol'``, from :math:`\\mathbf{K}` correlation kernel
:type mode:
string, default ``'Wilson'``
:param root:
Starting node of the random walk when using Markov-chain-based samplers
:type root:
int
:param random_state:
:type random_state:
None, np.random, int, np.random.RandomState
.. seealso::
- Wilson :cite:`PrWi98`
- Aldous-Broder :cite:`Ald90`
- :py:meth:`~dppy.FiniteDPP.sample`
"""
rng = check_random_state(random_state)
self.sampling_mode = mode
if self.sampling_mode in self._sampling_modes['markov-chain']:
if self.sampling_mode == 'Wilson':
sampl = ust_sampler_wilson(self.neighbors,
random_state=rng)
elif self.sampling_mode == 'Aldous-Broder':
sampl = ust_sampler_aldous_broder(self.neighbors,
random_state=rng)
elif self.sampling_mode in self._sampling_modes['spectral-method']:
self.compute_kernel_eig_vecs()
dpp_sample = proj_dpp_sampler_eig(self.kernel_eig_vecs,
mode=self.sampling_mode,
random_state=rng)
sampl = self.nx.Graph()
sampl.add_edges_from([self.edges[e] for e in dpp_sample])
elif self.sampling_mode in self._sampling_modes['projection-K-kernel']:
self.compute_kernel()
dpp_sample = proj_dpp_sampler_kernel(self.kernel,
mode=self.sampling_mode,
random_state=rng)
sampl = self.nx.Graph()
sampl.add_edges_from([self.edges[e] for e in dpp_sample])
else:
err_print = '\n'.join(
'Invalid sampling mode',
'Chose from: {}'.format(self._sampling_modes.values()),
'Given {}'.format(mode))
raise ValueError()
self.list_of_samples.append(sampl)
[docs] def compute_kernel(self):
""" Compute the orthogonal projection kernel :math:`\\mathbf{K} = \\text{Inc}^+ \\text{Inc}` i.e. onto the span of the rows of the vertex-edge incidence matrix :math:`\\text{Inc}` of size :math:`|V| \\times |E|`.
In fact, for a connected graph, :math:`\\text{Inc}` has rank :math:`|V|-1` and any row can be discarded to get an basis of row space. If we note :math:`A` the amputated version of :math:`\\text{Inc}`, then :math:`\\text{Inc}^+ = A^{\\top}[AA^{\\top}]^{-1}`.
In practice, we orthogonalize the rows of :math:`A` to get the eigenvectors :math:`U` of :math:`\\mathbf{K}=UU^{\\top}`.
.. seealso::
- :py:meth:`plot_kernel`
"""
if self.kernel is None:
self.compute_kernel_eig_vecs() # U = QR(Inc[:-1,:].T)
# K = UU.T
self.kernel = self.kernel_eig_vecs.dot(self.kernel_eig_vecs.T)
[docs] def compute_kernel_eig_vecs(self):
""" See explaination in :func:`compute_kernel <compute_kernel>`
"""
if self.kernel_eig_vecs is None:
inc_mat = self.nx.incidence_matrix(self.graph, oriented=True)
# Discard any row e.g. the last one
A = inc_mat[:-1, :].toarray()
# Orthonormalize rows of A
self.kernel_eig_vecs, _ = qr(A.T, mode='economic')
[docs] def plot(self, title=''):
""" Display the last realization (spanning tree) of the corresponding :class:`UST` object.
:param title:
Plot title
:type title:
string
.. seealso::
- :py:meth:`sample`
"""
graph_to_plot = self.list_of_samples[-1]
plt.figure(figsize=(4, 4))
pos = self.nx.circular_layout(self.graph)
self.nx.draw_networkx(graph_to_plot,
pos=pos,
node_color='orange',
with_labels=True,
width=3)
edge_labs = {e: self.edge_labels[e if e in self.edges else e[::-1]]
for e in graph_to_plot.edges()}
self.nx.draw_networkx_edge_labels(graph_to_plot,
pos=pos,
edge_labels=edge_labs,
font_size=20)
plt.axis('off')
str_title = 'A realization of UST with {} procedure'.format(self.sampling_mode)
plt.title(title if title else str_title)
[docs] def plot_graph(self, title=''):
"""Display the original graph defining the :class:`UST` object
:param title:
Plot title
:type title:
string
.. seealso::
- :func:`compute_kernel <compute_kernel>`
"""
# edge_lab = [r'$e_{}$'.format(i) for i in range(self.nb_edges)]
# edge_labels = dict(zip(self.edges, edge_lab))
# node_labels = dict(zip(self.nodes, self.nodes))
plt.figure(figsize=(4, 4))
pos = self.nx.circular_layout(self.graph)
self.nx.draw_networkx(self.graph,
pos=pos,
node_color='orange',
with_labels=True,
width=3)
# nx.draw_networkx_labels(self.graph,
# pos,
# node_labels)
self.nx.draw_networkx_edge_labels(self.graph,
pos=pos,
edge_labels=self.edge_labels,
font_size=20)
plt.axis('off')
str_title = 'Original graph'
plt.title(title if title else str_title)
[docs] def plot_kernel(self, title=''):
"""Display a heatmap of the underlying orthogonal projection kernel :math:`\\mathbf{K}` associated to the DPP underlying the :class:`UST` object
:param title:
Plot title
:type title:
string
.. seealso::
- :func:`compute_kernel <compute_kernel>`
"""
self.compute_kernel()
fig, ax = plt.subplots(1, 1, figsize=(4, 4))
heatmap = ax.pcolor(self.kernel, cmap='jet')
ax.set_aspect('equal')
ticks = np.arange(self.nb_edges)
ticks_label = [r'${}$'.format(tic) for tic in ticks]
ax.xaxis.tick_top()
ax.set_xticks(ticks + 0.5, minor=False)
ax.invert_yaxis()
ax.set_yticks(ticks + 0.5, minor=False)
ax.set_xticklabels(ticks_label, minor=False)
ax.set_yticklabels(ticks_label, minor=False)
str_title = 'Correlation K kernel: transfer current matrix'
plt.title(title if title else str_title, y=1.08)
# Adapt size of colbar to plot
# https://stackoverflow.com/questions/18195758/set-matplotlib-colorbar-size-to-match-graph
cax = fig.add_axes([ax.get_position().x1 + 0.02,
ax.get_position().y0,
0.05,
ax.get_position().height])
plt.colorbar(heatmap, cax=cax)