Source code for pydae.core.diagnostics.dae_check

# pydae/core/diagnostics/dae_check.py
"""
Numerical diagnostic tool for DAE solver failures.

Works with both dense and sparse Jacobians. When the solver is compiled with
a sparse backend (KLU, PARDISO, Accelerate), the Jacobian flat array contains
only the NNZ packed values — this module reconstructs the full dense matrix
from the sparsity pattern before running the analysis.

Improvements over the original version
=======================================
* Sparse support via (Ap, Ai) reconstruction.
* Zero-row / zero-column detection (equations or variables disconnected).
* Diagonal dominance check (useful for implicit integrators).
* Near-zero pivot detection along the diagonal.
* Newton convergence estimate from residual + condition number.
* Colour-coded terminal output (OK / WARNING / CRITICAL).
* Heatmap saved to file instead of blocking on ``plt.show()``.
* Summary verdict at the end with actionable next steps.
"""

import sys
import numpy as np
import logging

# Best-effort UTF-8 for Windows consoles (cp1252 can't encode arrows/sigma).
try:
    sys.stdout.reconfigure(encoding='utf-8', errors='replace')
except Exception:
    pass

try:
    import matplotlib.pyplot as plt
    HAS_MPL = True
except ImportError:
    HAS_MPL = False


# ---------------------------------------------------------------------------
# ANSI colour helpers (degrade gracefully on Windows without colorama)
# ---------------------------------------------------------------------------
try:
    _BOLD  = '\033[1m'
    _RED   = '\033[91m'
    _YEL   = '\033[93m'
    _GRN   = '\033[92m'
    _RST   = '\033[0m'
except Exception:
    _BOLD = _RED = _YEL = _GRN = _RST = ''

def _ok(msg):   return f"{_GRN}OK{_RST}      {msg}"
def _warn(msg): return f"{_YEL}WARNING{_RST}  {msg}"
def _crit(msg): return f"{_RED}CRITICAL{_RST} {msg}"


# ---------------------------------------------------------------------------
# Sparse → Dense reconstruction
# ---------------------------------------------------------------------------
def _sparse_to_dense(jac_values, Ap, Ai, N, backend):
    """
    Reconstruct a dense N×N matrix from packed sparse values + structure.

    Parameters
    ----------
    jac_values : ndarray (NNZ,)
        Packed nonzero values as written by jac_*_eval.
    Ap : array-like
        Column pointers (CSC, 0-based) for KLU / Accelerate, or
        row pointers (CSR, 1-based) for PARDISO.
    Ai : array-like
        Row indices (CSC) or column indices (CSR).
    N : int
        Matrix dimension.
    backend : str
        One of 'klu', 'accelerate', 'pardiso'.
    """
    J = np.zeros((N, N), dtype=np.float64)

    if backend in ('klu', 'accelerate'):
        # CSC: Ap[j] .. Ap[j+1]-1 are the row indices for column j
        for col in range(N):
            for idx in range(Ap[col], Ap[col + 1]):
                row = Ai[idx]
                J[row, col] = jac_values[idx]
    elif backend == 'pardiso':
        # CSR with 1-based indexing: Ap[i]-1 .. Ap[i+1]-2 for row i
        for row in range(N):
            for idx in range(Ap[row] - 1, Ap[row + 1] - 1):
                col = Ai[idx] - 1
                J[row, col] = jac_values[idx]
    else:
        raise ValueError(f"Unknown sparse backend: {backend}")

    return J


# ---------------------------------------------------------------------------
# Main diagnostic entry point
# ---------------------------------------------------------------------------
[docs] def diagnose_dae_model(jac_flat, fg, Nx, Ny, x_names=None, y_names=None, sparse_backend=None, Ap=None, Ai=None, save_figure='jacobian_diagnostic.png'): """ Perform a numerical health check on the DAE Jacobian and residuals. Parameters ---------- jac_flat : ndarray Flat Jacobian array from the C solver. Dense: length N², Sparse: length NNZ. fg : ndarray Residual vector [f; g], length N. Nx, Ny : int Number of differential / algebraic variables. x_names, y_names : list of str, optional Human-readable variable names. sparse_backend : str or None ``None`` for dense, or ``'klu'`` / ``'pardiso'`` / ``'accelerate'``. Ap, Ai : array-like or None Sparsity structure arrays (required when ``sparse_backend`` is set). save_figure : str or None Path to save the heatmap image. ``None`` to skip. """ N = Nx + Ny # ------------------------------------------------------------------ # 0. Reconstruct dense Jacobian # ------------------------------------------------------------------ if sparse_backend is not None: if Ap is None or Ai is None: raise ValueError("Ap and Ai must be provided for sparse diagnostics") J = _sparse_to_dense(jac_flat, Ap, Ai, N, sparse_backend) else: J = jac_flat.reshape((N, N)) # Variable names if x_names is None: x_names = [f"x[{i}]" for i in range(Nx)] if y_names is None: y_names = [f"y[{i}]" for i in range(Ny)] var_names = list(x_names) + list(y_names) issues = [] # Collect verdicts for the summary print(f"\n{'='*60}") print(f"{'DAE SOLVER DIAGNOSTIC REPORT':^60}") print(f"{'='*60}") print(f" System size: {Nx} differential + {Ny} algebraic = {N} total") if sparse_backend: print(f" Sparse backend: {sparse_backend} (NNZ = {len(jac_flat)})") else: print(f" Dense Jacobian ({N}×{N})") # ------------------------------------------------------------------ # 1. Residual analysis # ------------------------------------------------------------------ print(f"\n{_BOLD}[1] RESIDUAL MAGNITUDES{_RST}") has_nan = False has_large = False for i in range(N): val = fg[i] if np.isnan(val): status = f"{_RED}NaN{_RST}" has_nan = True elif abs(val) > 1e3: status = f"{_RED}VERY LARGE{_RST}" has_large = True elif abs(val) > 1.0: status = f"{_YEL}LARGE{_RST}" has_large = True else: status = f"{_GRN}OK{_RST}" eq_type = "f" if i < Nx else "g" eq_idx = i if i < Nx else i - Nx print(f" {eq_type}[{eq_idx}] ({var_names[i]:>12}): {val:12.4e} [{status}]") if has_nan: issues.append(_crit("NaN in residuals — likely division by zero at initial guess.")) elif has_large: issues.append(_warn("Large residuals — initial guess may be far from solution.")) # ------------------------------------------------------------------ # 2. Zero row / zero column detection # ------------------------------------------------------------------ print(f"\n{_BOLD}[2] STRUCTURAL CHECKS{_RST}") row_norms = np.linalg.norm(J, axis=1) col_norms = np.linalg.norm(J, axis=0) zero_rows = np.where(row_norms < 1e-15)[0] zero_cols = np.where(col_norms < 1e-15)[0] if len(zero_rows) > 0: for r in zero_rows: print(f" {_RED}Zero row {r}{_RST} → equation for {var_names[r]} has no dependence on any variable") issues.append(_crit(f"Zero rows: {[var_names[r] for r in zero_rows]} — disconnected equations.")) else: print(f" All rows nonzero {_GRN}OK{_RST}") if len(zero_cols) > 0: for c in zero_cols: print(f" {_RED}Zero col {c}{_RST} → variable {var_names[c]} does not appear in any equation") issues.append(_crit(f"Zero columns: {[var_names[c] for c in zero_cols]} — disconnected variables.")) else: print(f" All columns nonzero {_GRN}OK{_RST}") # ------------------------------------------------------------------ # 3. Row scaling (equation balancing) # ------------------------------------------------------------------ print(f"\n{_BOLD}[3] ROW SCALING{_RST}") nonzero_rows = row_norms[row_norms > 0] if len(nonzero_rows) > 0: max_r = np.max(nonzero_rows) min_r = np.min(nonzero_rows) ratio = max_r / (min_r + 1e-30) print(f" Max row norm: {max_r:.2e}") print(f" Min row norm: {min_r:.2e}") print(f" Ratio: {ratio:.2e}") if ratio > 1e9: issues.append(_crit(f"Row scaling ratio {ratio:.0e} — equations have wildly different magnitudes.")) elif ratio > 1e6: issues.append(_warn(f"Row scaling ratio {ratio:.0e} — consider scaling equations.")) else: print(f" {_GRN}Scaling looks reasonable.{_RST}") # ------------------------------------------------------------------ # 4. Diagonal analysis (near-zero pivots) # ------------------------------------------------------------------ print(f"\n{_BOLD}[4] DIAGONAL ANALYSIS{_RST}") diag = np.abs(np.diag(J)) small_pivots = np.where(diag < 1e-12)[0] if len(small_pivots) > 0: for idx in small_pivots: print(f" {_YEL}Near-zero diagonal [{idx}]{_RST}: {var_names[idx]} (|J[{idx},{idx}]| = {diag[idx]:.2e})") issues.append(_warn(f"Near-zero diagonals at: {[var_names[i] for i in small_pivots]}")) else: print(f" All diagonal entries nonzero {_GRN}OK{_RST}") # Diagonal dominance (informational) dd_count = 0 for i in range(N): off_diag_sum = np.sum(np.abs(J[i, :])) - np.abs(J[i, i]) if np.abs(J[i, i]) >= off_diag_sum: dd_count += 1 print(f" Diagonally dominant rows: {dd_count}/{N}") # ------------------------------------------------------------------ # 5. Condition number # ------------------------------------------------------------------ print(f"\n{_BOLD}[5] CONDITION NUMBER{_RST}") try: c_num = np.linalg.cond(J) print(f" cond(J) = {c_num:.2e}") if c_num > 1e14: issues.append(_crit(f"Condition number {c_num:.0e} — effectively singular.")) elif c_num > 1e8: issues.append(_warn(f"Condition number {c_num:.0e} — ill-conditioned.")) else: print(f" {_GRN}Well-conditioned.{_RST}") except Exception: c_num = np.inf issues.append(_crit("Condition number computation failed — matrix may be singular.")) # ------------------------------------------------------------------ # 6. SVD — identify the broken variable combination # ------------------------------------------------------------------ print(f"\n{_BOLD}[6] SINGULAR VALUE DECOMPOSITION{_RST}") try: U, S, Vh = np.linalg.svd(J) print(f" Largest σ: {S[0]:.2e}") print(f" Smallest σ: {S[-1]:.2e}") # Report all near-zero singular values, not just the smallest near_zero = np.where(S < 1e-10)[0] if len(near_zero) > 0: print(f" {_RED}{len(near_zero)} near-zero singular value(s) detected{_RST}") for sv_idx in near_zero[:5]: # Show up to 5 problem_vec = Vh[sv_idx, :] top3 = np.argsort(np.abs(problem_vec))[::-1][:3] contributors = ", ".join( f"{var_names[k]} ({problem_vec[k]:+.4f})" for k in top3 ) print(f" σ[{sv_idx}] = {S[sv_idx]:.2e}{contributors}") issues.append(_crit(f"SVD found {len(near_zero)} near-zero singular value(s).")) else: print(f" {_GRN}No near-zero singular values.{_RST}") # Still report the smallest for informational purposes min_idx = np.argmin(S) problem_vec = Vh[min_idx, :] top3 = np.argsort(np.abs(problem_vec))[::-1][:3] contributors = ", ".join( f"{var_names[k]} ({problem_vec[k]:+.4f})" for k in top3 ) print(f" Weakest direction: σ[{min_idx}] = {S[min_idx]:.2e}{contributors}") except Exception as e: issues.append(_crit(f"SVD computation failed: {e}")) # ------------------------------------------------------------------ # 7. Newton convergence estimate # ------------------------------------------------------------------ print(f"\n{_BOLD}[7] NEWTON CONVERGENCE ESTIMATE{_RST}") res_norm = np.linalg.norm(fg) print(f" ||residual|| = {res_norm:.2e}") if c_num < np.inf and res_norm > 0: # Estimated correction step size: ||Δxy|| ≈ cond(J) × ||fg|| / ||J|| J_norm = np.linalg.norm(J) est_step = res_norm / (J_norm + 1e-30) print(f" Estimated ||Δxy|| ≈ {est_step:.2e}") if est_step > 1e6: issues.append(_warn(f"Newton step estimate {est_step:.0e} — far from convergence.")) # ------------------------------------------------------------------ # 8. Sparsity statistics (sparse mode only) # ------------------------------------------------------------------ if sparse_backend is not None: nnz = len(jac_flat) density = nnz / (N * N) * 100 print(f"\n{_BOLD}[8] SPARSITY STATISTICS{_RST}") print(f" NNZ = {nnz} / {N*N} ({density:.1f}% fill)") if density > 50: print(f" {_YEL}High fill — dense solver may be faster for this system size.{_RST}") # ------------------------------------------------------------------ # 9. Heatmap # ------------------------------------------------------------------ if HAS_MPL: fig, axes = plt.subplots(1, 2, figsize=(16, 7), gridspec_kw={'width_ratios': [3, 1]}) # Jacobian heatmap ax1 = axes[0] log_J = np.log10(np.abs(J) + 1e-15) im = ax1.imshow(log_J, cmap='viridis', interpolation='nearest', aspect='auto') fig.colorbar(im, ax=ax1, label='|J| (log₁₀)', shrink=0.8) ax1.set_xticks(range(N)); ax1.set_xticklabels(var_names, rotation=90, fontsize=8) ax1.set_yticks(range(N)); ax1.set_yticklabels(var_names, fontsize=8) ax1.set_title('Jacobian Structure & Magnitude') # Residual bar chart ax2 = axes[1] colors = [] for i in range(N): if np.isnan(fg[i]): colors.append('red') elif abs(fg[i]) > 1.0: colors.append('orange') else: colors.append('green') fg_plot = np.where(np.isnan(fg), 0, fg) ax2.barh(range(N), np.abs(fg_plot) + 1e-15, color=colors, log=True) ax2.set_yticks(range(N)); ax2.set_yticklabels(var_names, fontsize=8) ax2.set_xlabel('|residual| (log scale)') ax2.set_title('Residual Magnitudes') ax2.invert_yaxis() fig.tight_layout() if save_figure: fig.savefig(save_figure, dpi=150, bbox_inches='tight') print(f"\n Diagnostic figure saved to: {save_figure}") plt.show() # ------------------------------------------------------------------ # SUMMARY # ------------------------------------------------------------------ print(f"\n{'='*60}") print(f"{'SUMMARY':^60}") print(f"{'='*60}") if len(issues) == 0: print(f" {_GRN}No issues detected. System looks healthy.{_RST}") else: for i, issue in enumerate(issues, 1): print(f" {i}. {issue}") print(f"\n{'SUGGESTED ACTIONS':^60}") print(f"{'-'*60}") if has_nan: print(" → Check initial guesses: ensure no variable in a denominator is 0.") print(" → Inspect the generated C code for division by zero patterns.") if len(zero_rows) > 0 or len(zero_cols) > 0: print(" → Check for redundant conservation laws or disconnected components.") print(" → Verify that every algebraic variable appears in at least one equation.") if c_num > 1e8 and not has_nan: print(" → Scale equations to similar magnitudes (e.g., per-unit system).") print(" → Try a better initial guess closer to the expected steady state.") if len(issues) == 0: print(" → If convergence still fails, try increasing max_it or relaxing itol.") print(f"{'='*60}\n") return J # Return the dense Jacobian for further inspection if needed
# --------------------------------------------------------------------------- # Standalone test # --------------------------------------------------------------------------- if __name__ == '__main__': Nx, Ny = 4, 2 N = Nx + Ny jac_ini = np.array([ -1.91938577e-01, 3.34996016e-05, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00, -9.47507845e-03, 1.04200002e+01, 0.00000000e+00, 0.00000000e+00, -1.00000000e-06, 0.00000000e+00, -0.00000000e+00, 0.00000000e+00, 1.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00, -0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 1.00000000e+00, 0.00000000e+00, 0.00000000e+00, -0.00000000e+00, 0.00000000e+00, 0.00000000e+00, -3.33333333e-05, -3.47333328e-01, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00, -3.33333333e-05, 0.00000000e+00, 1.74532927e-04, 3.33333333e-02 ]) fg = np.array([ -0.00000000e+00, -0.00000000e+00, 8.90765703e-10, 5.10371148e-06, -7.34345917e-12, -0.00000000e+00 ]) # Test dense mode print("=== DENSE MODE TEST ===") diagnose_dae_model(jac_ini, fg, Nx, Ny, save_figure='diag_dense.png') # Test sparse mode (simulate CSC from the same matrix) print("\n=== SPARSE (KLU) MODE TEST ===") J_full = jac_ini.reshape((N, N)) from scipy.sparse import csc_matrix J_csc = csc_matrix(J_full) diagnose_dae_model( J_csc.data, fg, Nx, Ny, sparse_backend='klu', Ap=J_csc.indptr.tolist(), Ai=J_csc.indices.tolist(), save_figure='diag_sparse.png' )