# Use the sample.pyi implementation of SIS in order to obtain counts and error estimates.
from __future__ import annotations
import logging
import time
import numpy as np
import tqdm
from numpy.typing import ArrayLike
from matrix_count._input_output import (
_log_symmetric_matrices_check_arguments,
_log_symmetric_matrices_hardcoded,
_symmetric_simplify_input,
)
from matrix_count._util import _log_sum_exp, _log_weight
from matrix_count.sample import sample_symmetric_matrix
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
[docs]
def count_log_symmetric_matrices(
row_sums: list[int],
*,
binary_matrix: bool = False,
diagonal_sum: int | None = None,
alpha: float = 1.0,
max_samples: int = 1000,
error_target: float = 0.001,
seed: int | None = None,
timeout: float = 60.0, # Timeout in seconds
verbose: bool = False,
) -> tuple[float, float]:
"""
Dirichlet-multinomial moment-matching estimate of the logarithm
of the number of symmetric non-negative matrices with given row sums.
Not available for block_sums or index_partition arguments.
Parameters
----------
row_sums : ArrayLike
Row sums of the matrix. Length n array-like of non-negative integers.
binary_matrix : bool, optional
Whether the matrix is binary (0 or 1) instead of non-negative integer valued. Defaults to False.
diagonal_sum : int or None, optional
What the sum of the diagonal elements should be constrained to.
Either an integer greater than or equal to 0 or None, resulting in no constraint on the diagonal elements, defaults to None.
alpha : float, optional
Dirichlet-multinomial parameter greater than or equal to 0 to weigh the matrices in the sum.
A value of 1 gives the uniform count of matrices, defaults to 1.
max_samples : int, optional
Maximum number of samples to take. Defaults to 1000.
error_target : float, optional
Target absolute error in the logarithm of the count. Defaults to 0.01.
seed : int, optional
Seed for the random number generator. Defaults to None.
timeout : float, optional
Timeout in seconds. Defaults to 60.0.
verbose : bool, optional
Whether to print details of calculation. Defaults to False.
Returns
-------
tuple of (float, float)
log_count_est : The logarithm of the number of symmetric matrices under given conditions.
err : The estimated absolute error in the logarithm of the count.
"""
# Check input validity
_log_symmetric_matrices_check_arguments(
row_sums,
binary_matrix=binary_matrix,
diagonal_sum=diagonal_sum,
alpha=alpha,
verbose=verbose,
)
# Remove empty margins
row_sums, diagonal_sum, _ = _symmetric_simplify_input(
row_sums,
binary_matrix=binary_matrix,
diagonal_sum=diagonal_sum,
verbose=verbose,
)
# Check for hardcoded cases
hardcoded_result = _log_symmetric_matrices_hardcoded(
row_sums,
binary_matrix=binary_matrix,
diagonal_sum=diagonal_sum,
alpha=alpha,
verbose=verbose,
)
if hardcoded_result is not None:
return (hardcoded_result, 0) # No error in the hardcoded cases
# Set seed if provided
rng = np.random.default_rng(seed)
# Sample the matrices
min_num_samples = 10 # Minimum number of samples to take
entropies = []
log_count_est = 0 # Estimated log count
log_count_err_est = np.inf # Estimated error in the log count
start_time = time.time() # Start time for timeout
# Use tqdm progress bar if verbose, otherwise use a simple range
if verbose:
progress_bar = tqdm.tqdm(range(max_samples))
use_progress_bar = True
progress_iter = progress_bar
else:
progress_bar = None
use_progress_bar = False
progress_iter = range(max_samples) # type: ignore[assignment]
for sample_num in progress_iter:
# Check for timeout
elapsed_time = time.time() - start_time
if elapsed_time > timeout:
if verbose:
logger.info("Timeout reached.")
break
# Seed to use for this sample
sample_seed = int(
rng.integers(0, 2**31 - 1)
) # If the integer is too large it screws up the pybind wrapper
# pylint seems to not see the binding on the following line
sample, entropy = sample_symmetric_matrix( # pylint: disable=unpacking-non-sequence
row_sums,
binary_matrix=binary_matrix,
diagonal_sum=diagonal_sum,
alpha=alpha,
seed=sample_seed,
verbose=verbose,
)
entropy += _log_weight(
sample, alpha
) # Really should be averaging w(A)/Q(A), entropy = -log Q(A)
entropies.append(entropy)
log_count_est = _log_sum_exp(entropies) - np.log(len(entropies))
log_E_entropy_2 = _log_sum_exp(2 * np.array(entropies)) - np.log(len(entropies))
log_E_entropy = _log_sum_exp(entropies) - np.log(len(entropies))
if (
log_E_entropy_2 - 2 * log_E_entropy > 0.0001
): # Estimate the error by the standard deviation of the counts
log_std = 0.5 * (
np.log(np.exp(0) - np.exp(2 * log_E_entropy - log_E_entropy_2))
+ log_E_entropy_2
)
log_count_err_est = np.exp(
log_std - 0.5 * np.log(len(entropies)) - log_E_entropy
)
if use_progress_bar and progress_bar is not None:
progress_bar.set_postfix_str(
f"Log count: {log_E_entropy:.3f} +/- {log_count_err_est:.3f}"
)
if (
log_count_err_est < error_target and sample_num >= min_num_samples
): # Terminate if the error is below the target and we have taken enough samples
break
else: # If all the sampled values are the same, report zero error (and a warning)
log_count_err_est = 0.0
if verbose:
logger.warning(
"All sampled entropies are the same, reporting zero error."
)
return (log_count_est, log_count_err_est)
[docs]
def log_estimate(
row_sums: list[int] | ArrayLike,
column_sums: list[int] | None = None,
*,
binary_matrix: bool = False,
diagonal_sum: int | None = None,
index_partition: list[int] | None = None,
block_sums: ArrayLike | None = None,
block_diagonal_sums: ArrayLike | None = None,
alpha: float = 1.0,
force_second_order: bool = False,
allow_pseudo: bool = True,
verbose: bool = False,
) -> float:
# Placeholder implementation
return 0.
[docs]
def log_count(
row_sums: list[int],
column_sums: list[int] | None = None,
*,
binary_matrix: bool = False,
diagonal_sum: int | None = None,
alpha: float = 1.0,
max_samples: int = 1000,
error_target: float = 0.001,
seed: int | None = None,
timeout: float = 60.0, # Timeout in seconds
verbose: bool = False,
) -> tuple[float, float]:
return (1, 0) # Placeholder for the result and error, to be implemented later
[docs]
def sample(
row_sums: list[int],
column_sums: list[int] | None = None,
*,
binary_matrix: bool = False,
diagonal_sum: int | None = None,
alpha: float = 1.0,
num_samples: int = 1,
importance_sample: bool = False,
seed: int | None = None,
verbose: bool = False,
) -> tuple[ArrayLike, float | tuple[float, float]]:
"""
Sample a symmetric matrix with given row sums and diagonal sum.
Not available for block_sums or index_partition arguments.
"""
return [0]