Source code for viabel.optimization

from abc import ABC, abstractmethod

import autograd.numpy as np
import tqdm

from viabel._mc_diagnostics import MCSE, R_hat_convergence_check
from viabel._utils import Timer, StanModel_cache
from viabel.approximations import MFGaussian
from collections import defaultdict

__all__ = [
    'Optimizer',
    'StochasticGradientOptimizer',
    'RMSProp',
    'Adam',
    'Adagrad',
    'WindowedAdagrad',
    'AveragedRMSProp',
    'AveragedAdam',
    'FASO',
    'RAABBVI'
]


[docs]class Optimizer(ABC): """An abstract class for optimization """
[docs] @abstractmethod def optimize(self, n_iters, objective, init_param, **kwargs): """ Parameters ---------- n_iters : `int` Number of iterations of the optimization objective : `function` Function for constructing the objective and gradient function init_param : `numpy.ndarray`, shape(var_param_dim,) Initial values of the variational parameters **kwargs Keyword arguments to pass (example: smoothed_prop) Returns ---------- results : `dict` Must contain at least `opt_param`, the estimate for the optimal variational parameter. """
[docs]class StochasticGradientOptimizer(Optimizer): """Stochastic gradient descent. """
[docs] def __init__(self, learning_rate, *, weight_decay=0, iterate_avg_prop=0.2, diagnostics=False): """ Parameters ----------- learning_rate : `float` Tuning parameter that determines the step size weight_decay: `float` L2 regularization weight iterate_avg_prop : `float` Proportion of iterates to use for computing iterate average. `None` means no iterate averaging. The default is 0.2. diagnostics : `bool`, optional Record diagnostic information if `True`. The default is `False`. """ self._learning_rate = learning_rate self._weight_decay = weight_decay if iterate_avg_prop is not None and (iterate_avg_prop > 1.0 or iterate_avg_prop <= 0.0): raise ValueError('"iterate_avg_prop" must be None or between 0 and 1') self._iterate_avg_prop = iterate_avg_prop self._diagnostics = diagnostics self.reset_state()
[docs] def reset_state(self): """Reset internal state of the optimizer""" pass
[docs] def optimize(self, n_iters, objective, init_param, init_hamflow_model_param=None, init_hamflow_rho_param=None): variational_param = init_param.copy() iap = self._iterate_avg_prop results = defaultdict(list) # value_history = [] # variational_param_history = [] # descent_dir_history = [] with tqdm.trange(n_iters) as progress: try: for k in progress: # take step in descent direction object_val, object_grad = objective(variational_param) descent_dir = self.descent_direction(object_grad) variational_param = objective.update(variational_param, self._learning_rate * descent_dir) if variational_param.ndim == 2: variational_param *= (1 - self._weight_decay) # record state information results['value_history'].append(object_val) if self._diagnostics or iap is not None: results['variational_param_history'].append(variational_param.copy()) if (iap is not None and len(results['variational_param_history']) > iap * k): results['variational_param_history'].pop(0) if self._diagnostics: results['descent_dir_history'].append(descent_dir) if k % 10 == 0: avg_loss = np.mean(results['value_history'][max(0, k - 1000):k + 1]) progress.set_description( 'average loss = {:,.5g}'.format(avg_loss)) except (KeyboardInterrupt, StopIteration): # pragma: no cover # do not print log on the same line progress.close() finally: progress.close() if iap is not None: window = max(1, int(k * iap)) results['opt_param'] = np.mean(results['variational_param_history'][-window:], axis=0) else: results['opt_param'] = variational_param.copy() # if descent_dir_history is not None: # results['descent_dir_history'] = descent_dir_history results_dict = {d: np.array(h) for d, h in results.items()} return results_dict
[docs] def descent_direction(self, grad): """Compute descent direction for optimization. Default implementation returns ``grad``. Parameters ----------- grad : `numpy.ndarray`, shape(var_param_dim,) (stochastic) gradient of the objective function Returns ---------- descent_dir : `numpy.ndarray`, shape(var_param_dim,) Descent direction of the optimization algorithm """ return grad
[docs]class RMSProp(StochasticGradientOptimizer): """RMSProp optimization method (Hinton and Tieleman, 2012) Tracks the exponential moving average of squared gradient: .. math:: \\nu^{(k+1)} = \\beta \\nu^{(k)} + (1-\\beta) \\hat{g}^{(k)} \\cdot \\hat{g}^{(k)} and uses :math:`\\nu^{(k)}` to rescale the current stochastic gradient: .. math:: \\hat{g}^{(k+1)}/\\sqrt{\\nu^{(k)}}. Parameters ----------- beta : `float` optional Squared gradient moving average hyper parameter. The default is 0.9 jitter: `float` optional Small value used for numerical stability. The default is 1e-8 Returns ---------- descent_dir : `numpy.ndarray`, shape(var_param_dim,) Descent direction of the optimization algorithm """
[docs] def __init__(self, learning_rate, *, weight_decay=0, iterate_avg_prop=0.2, beta=0.9, jitter=1e-8, diagnostics=False): self._beta = beta self._jitter = jitter super().__init__(learning_rate, weight_decay=weight_decay, iterate_avg_prop=iterate_avg_prop, diagnostics=diagnostics)
[docs] def reset_state(self): """ resetting :math:`\\nu`, the exponential moving average of squared gradient """ self._avg_grad_sq = None
[docs] def descent_direction(self, grad): if self._avg_grad_sq is None: avg_grad_sq = grad**2 else: avg_grad_sq = self._avg_grad_sq avg_grad_sq *= self._beta avg_grad_sq += (1. - self._beta) * grad**2 descent_dir = grad / np.sqrt(self._jitter + avg_grad_sq) self._avg_grad_sq = avg_grad_sq return descent_dir
[docs]class AveragedRMSProp(StochasticGradientOptimizer): """Averaged RMSProp optimization method (Mukkamala and Hein, 2017, §4) Uses averaged squared gradient by setting :math:`\\beta_k = 1-1/k` such that .. math:: \\nu^{(k+1)} = \\beta_k \\nu^{(k)} + (1-\\beta_k) \\hat{g}^{(k)} \\cdot \\hat{g}^{(k)}. Then, .. math:: \\nu^{(k+1)} = (k+1)^{-1} \\sum^k_{k^\\prime =0}\\hat{g}^{(k)} \\cdot \\hat{g}^{(k)}, where :math:`\\nu^{(k)}` converges to a constant almost surely under certain conditions. Parameters ----------- jitter: `float` optional Small value used for numerical stability. The default is 1e-8 component_wise: `boolean` optional Indication of component wise discent direction computation Returns ---------- descent_dir : `numpy.ndarray`, shape(var_param_dim,) Descent direction of the optimization algorithm """
[docs] def __init__(self, learning_rate, *, jitter=1e-8, diagnostics=False, component_wise=True): self._jitter = jitter self._component_wise = component_wise super().__init__(learning_rate, diagnostics=diagnostics)
[docs] def reset_state(self): """ resetting :math:`\\nu` and k, the exponential moving average of squared gradient and iteration respectively """ self._avg_grad_sq = None self._t = None
[docs] def descent_direction(self, grad): if self._avg_grad_sq is None: avg_grad_sq = grad**2 t = 1 else: avg_grad_sq = self._avg_grad_sq t = self._t + 1 beta = 1 - 1/t avg_grad_sq *= beta avg_grad_sq += (1.-beta)*grad**2 if self._component_wise: descent_dir = grad / np.sqrt(self._jitter+avg_grad_sq) else: descent_dir = grad / np.sqrt(self._jitter+np.sum(avg_grad_sq)) self._avg_grad_sq = avg_grad_sq self._t = t return descent_dir
[docs]class Adam(StochasticGradientOptimizer): """Adam optimization method (Kingma and Ba, 2015) Tracks exponential moving average of the gradient as well as the squared gradient: .. math:: m^{(k+1)} &= \\beta_1 m^{(k)} + (1-\\beta_1) \\hat{g}^{(k)}\\\\ \\nu^{(k+1)} &= \\beta_2 \\nu^{(k)} + (1-\\beta_2) \\hat{g}^{(k)} \\cdot \\hat{g}^{(k)} and uses :math:`m^{(k)}` and :math:`\\nu^{(k)}` to rescale the current stochastic gradient: .. math:: m^{(k)}/\\sqrt{\\nu^{(k)}}. Parameters ---------- beta1 : `float` optional Gradient moving average hyper parameter. The default is 0.9 beta2 : `float` optional Squared gradient moving average hyper parameter. The default is 0.999 jitter: `float` optional Small value used for numerical stability. The default is 1e-8 component_wise: `boolean` optional Indicator for component-wise normalization of discent direction Returns ---------- descent_dir : `numpy.ndarray`, shape(var_param_dim,) Descent direction of the optimization algorithm """
[docs] def __init__(self, learning_rate, *, beta1=0.9, beta2=0.999, jitter=1e-8, iterate_avg_prop=0.2, diagnostics=False): self._beta1 = beta1 self._beta2 = beta2 self._jitter = jitter super().__init__(learning_rate, iterate_avg_prop=iterate_avg_prop, diagnostics=diagnostics)
[docs] def reset_state(self): """ resetting m and :math:`\\nu`, the exponential moving average of gradient and squared gradient respectively """ self._momentum = None self._avg_grad_sq = None
[docs] def descent_direction(self, grad): if self._avg_grad_sq is None: avg_grad_sq = grad**2 else: avg_grad_sq = self._avg_grad_sq if self._momentum is None: momentum = grad else: momentum = self._momentum momentum *= self._beta1 momentum += (1. - self._beta1) * grad avg_grad_sq *= self._beta2 avg_grad_sq += (1. - self._beta2) * grad**2 descent_dir = momentum / np.sqrt(self._jitter + avg_grad_sq) self._momentum = momentum self._avg_grad_sq = avg_grad_sq return descent_dir
[docs]class AveragedAdam(StochasticGradientOptimizer): """Averaged Adam optimization method (Mukkamala and Hein, 2017, §4) Uses averaged squared gradient by setting :math:`\\beta_k = 1-1/k` such that .. math:: \\nu^{(k+1)} = \\beta_k \\nu^{(k)} + (1-\\beta_k) \\hat{g}^{(k)} \\cdot \\hat{g}^{(k)}. Then, .. math:: \\nu^{(k+1)} = (k+1)^{-1} \\sum^k_{k^\\prime =0}\\hat{g}^{(k)} \\cdot \\hat{g}^{(k)}, where :math:`\\nu^{(k)}` converges to a constant almost surely under certain conditions. Parameters ---------- beta1 : `float` optional Gradient moving average hyper parameter. The default is 0.9 jitter: `float` optional Small value used for numerical stability. The default is 1e-8 component_wise: `boolean` optional Indicator for component-wise normalization of discent direction Returns ---------- descent_dir : `numpy.ndarray`, shape(var_param_dim,) Descent direction of the optimization algorithm """
[docs] def __init__(self, learning_rate, *, beta1=0.9, jitter=1e-8, diagnostics=False, component_wise=True): self._beta1 = beta1 self._jitter = jitter self._component_wise = component_wise super().__init__(learning_rate, diagnostics=diagnostics)
[docs] def reset_state(self): """ resetting m, :math:`\\nu` and, k, the exponential moving average of gradient, squared gradient, and iteration respectively """ self._momentum = None self._avg_grad_sq = None self._t = None
[docs] def descent_direction(self, grad): if self._avg_grad_sq is None: avg_grad_sq = grad**2 t = 1 else: avg_grad_sq = self._avg_grad_sq t = self._t + 1 if self._momentum is None: momentum = grad else: momentum = self._momentum momentum *= self._beta1 momentum += (1. - self._beta1) * grad beta2 = 1 - 1/t avg_grad_sq *= beta2 avg_grad_sq += (1. - beta2) * grad**2 if self._component_wise: descent_dir = momentum / np.sqrt(self._jitter+avg_grad_sq) else: descent_dir = momentum / np.sqrt(self._jitter+np.sum(avg_grad_sq)) self._momentum = momentum self._avg_grad_sq = avg_grad_sq self._t = t return descent_dir
[docs]class Adagrad(StochasticGradientOptimizer): """Adagrad optimization method (Duchi et al., 2011) Uses accumilated squared gradients to rescale the current stochastic gradient: .. math:: \\frac{\\hat{g}^{(k+1)}}{\\sqrt{\\sum^k_{k^\\prime} \\hat{g}^{(k^\\prime)} \\cdot \\hat{g}^{(k^\\prime)}}} Parameters ----------- jitter: `float` optional Small value used for numerical stability. The default is 1e-8 Returns ---------- descent_dir : `numpy.ndarray`, shape(var_param_dim,) Descent direction of the optimization algorithm """
[docs] def __init__(self, learning_rate, *, weight_decay=0, jitter=1e-8, iterate_avg_prop=0.2, diagnostics=False): self._jitter = jitter super().__init__(learning_rate, weight_decay=weight_decay, iterate_avg_prop=iterate_avg_prop, diagnostics=diagnostics)
[docs] def reset_state(self): """ restting accumilated squared gradient """ self._sum_grad_sq = 0
[docs] def descent_direction(self, grad): self._sum_grad_sq += grad**2 descent_dir = grad / np.sqrt(self._jitter + self._sum_grad_sq) return descent_dir
[docs]class WindowedAdagrad(StochasticGradientOptimizer): """Windowed Adagrad optimization method (Default optimizer in Pymc3) Uses a running window (w) to get the mean squared gradient to rescale the current stochastic gradient: .. math:: \\frac{\\hat{g}^{(k+1)}}{\\sqrt{\\sum^k_{k^\\prime = k-w} \\hat{g}^{(k^\\prime)} \\cdot \\hat{g}^{(k^\\prime)}}} Parameters ----------- window size : `int` optional Window size used to store the square of the gradients. The default is 10 jitter: `float` optional Small value used for numerical stability. The default is 1e-8 Returns ---------- descent_dir : `numpy.ndarray`, shape(var_param_dim,) Descent direction of the optimization algorithm """
[docs] def __init__(self, learning_rate, *, weight_decay=0, window_size=10, jitter=1e-8, diagnostics=False): self._window_size = window_size self._jitter = jitter super().__init__(learning_rate, weight_decay=weight_decay, diagnostics=diagnostics)
[docs] def reset_state(self): """ resetting the running squared gradients """ self._history = []
[docs] def descent_direction(self, grad): self._history.append(grad**2) if len(self._history) > self._window_size: self._history.pop(0) mean_grad_squared = np.mean(self._history, axis=0) descent_dir = grad / np.sqrt(self._jitter + mean_grad_squared) return descent_dir
[docs]class FASO(Optimizer): """Fixed-learning rate stochastic optimization meta-algorithm (FASO) This algorithm runs stochastic optimization with a fixed-learning rate using a user specified optimization method. It determines the convergence at the fixed-learning rate using the potential scale reduction factor \(\hat{R}\) and combined with a Monte Carlo standard error cutoff. See more details: https://arxiv.org/pdf/2203.15945.pdf Parameters ---------- sgo : `StochasticGradientOptimizer` object optimization method to use mcse_threshold : `float` optional Monte Carlo standard error threshold for convergence. The default is 0.1. W_min : `int`, optional Minimum window size for checking convergence. The default is 200. ESS_min : `int`, optional Minimum ESS for computing iterate average. Default is `W_min / 8`. k_check : `int`, optional Frequency with which to check convergence. The default is `W_min`. """
[docs] def __init__(self, sgo, *, mcse_threshold=0.1, W_min=200, ESS_min=None, k_check=None): if not isinstance(sgo, StochasticGradientOptimizer): raise ValueError('sgo must be a subclass of StochasticGradientOptimizer') self._sgo = sgo self._mcse_threshold = mcse_threshold self._W_min = W_min self._ESS_min = W_min // 8 if ESS_min is None else ESS_min self._k_check = W_min if k_check is None else k_check if mcse_threshold <= 0: raise ValueError('"mcse_threshold" must be greater than zero') if W_min <= 0: raise ValueError('"W_min" must be greater than zero') if self._k_check <= 0: raise ValueError('"k_check" must be greater than zero') if self._ESS_min <= 0: raise ValueError('"ESS_min" must be greater than zero')
[docs] def optimize(self, n_iters, objective, init_param): diagnostics = self._sgo._diagnostics k_conv = None # Iteration number when reached convergence k_stopped = None # Iteration number when MCSE/ESS conditions met k_Rhat = None # Iteration number when R hat convergence criterion met learning_rate = self._sgo._learning_rate variational_param = init_param.copy() history = defaultdict(list) iterate_average = variational_param.copy() if diagnostics: history['iterate_average_k_history'].append(0) history['iterate_average_history'].append(iterate_average) total_opt_time = 0 # total time spent on optimization with tqdm.trange(n_iters) as progress: try: for k in progress: # take step in descent direction with Timer() as opt_timer: object_val, object_grad = objective(variational_param) history['value_history'].append(object_val) history['grad_history'].append(object_grad) descent_dir = self._sgo.descent_direction(object_grad) variational_param = objective.update(variational_param, learning_rate * descent_dir) history['variational_param_history'].append(variational_param.copy()) if diagnostics: history['descent_dir_history'].append(descent_dir) total_opt_time += opt_timer.interval # If convergence has not been reached then check for # convergence using R hat if k_conv is None and k % self._k_check == 0: W_upper = int(0.95 * k) if W_upper > self._W_min: windows = np.linspace(self._W_min, W_upper, num=5, dtype=int) R_hat_success, best_W = R_hat_convergence_check( history['variational_param_history'], windows) iterate_average = np.mean(history['variational_param_history'][-best_W:], axis=0) if diagnostics: history['iterate_average_k_history'].append(k) history['iterate_average_history'].append(iterate_average) if R_hat_success: k_Rhat = k k_conv = k - best_W W_check = best_W # immediately check MCSE # Once convergence has been reached compute the MCSE if k_conv is not None and k - k_conv == W_check: W = W_check converged_iterates = np.array(history['variational_param_history'][-W:]) iterate_average = np.mean(converged_iterates, axis=0) if diagnostics and k not in history['iterate_average_k_history']: history['iterate_average_k_history'].append(k) history['iterate_average_history'].append(iterate_average) # compute MCSE with Timer() as mcse_timer: if isinstance(objective.approx, MFGaussian): dim = int(init_param.size/2) # For MF Gaussian, use MCSE(mu/sigma,log_sigma) iterate_diff = (converged_iterates[W - 2, :] - converged_iterates[W - 1, :]) iterate_diff_zero = iterate_diff == 0 # ignore constant variational parameters if np.any(iterate_diff_zero): indices = np.argwhere(iterate_diff_zero) converged_iterates = np.delete(converged_iterates, indices, 1) converged_log_sdevs = converged_iterates[:, -dim:] mean_log_stdev = np.mean(converged_log_sdevs, axis=0) ess, mcse = MCSE(converged_iterates) mcse_mean = mcse[:dim] / np.exp(mean_log_stdev) mcse_stdev = mcse[-dim:] mcse = np.concatenate((mcse_mean, mcse_stdev)) else: ess, mcse = MCSE(converged_iterates) if diagnostics: history['ess_and_mcse_k_history'].append(k) history['ess_history'].append(ess) history['mcse_history'].append(mcse) if (np.max(mcse) < self._mcse_threshold and np.min(ess) > self._ESS_min): k_stopped = k break else: relative_mcse_time = mcse_timer.interval / W relative_opt_time = total_opt_time / k relative_time_ratio = relative_opt_time / relative_mcse_time recheck_scale = max(1.05, 1 + 1 / np.sqrt(1 + relative_time_ratio)) W_check = int(recheck_scale * W_check + 1) if k % self._k_check == 0: avg_loss = np.mean(history['value_history'][max(0, k - 1000):k + 1]) R_conv = 'converged' if k_conv is not None else 'not converged' progress.set_description( 'average loss = {:,.5g} | R hat {}|'.format(avg_loss, R_conv)) except (KeyboardInterrupt, StopIteration): # pragma: no cover # do not print log on the same line progress.close() finally: progress.close() if k_stopped is None: if k_conv is None: print('WARNING: stationarity not reached after maximum number of iterations') print('WARNING: try incresing the learning rate or the maximum number of ' 'iterations') else: print('WARNING: stationarity reached but MCSE too large and/or ESS too small') print('WARNING: maximum MCSE = {:.3g}'.format(np.max(mcse))) print('WARNING: minimum ESS = {:.1f}'.format(np.min(ess))) # print(ess) else: print('Convergence reached at iteration', k_stopped) results = {d: np.array(h) for d, h in history.items()} results['k_conv'] = k_conv results['k_Rhat'] = k_Rhat results['k_stopped'] = k_stopped results['opt_param'] = iterate_average return results
[docs]class RAABBVI(FASO): """A robust, automated, and accurate BBVI optimizer (RAABBVI) This algorithm combines the FASO algorithm with a termination rule to determine the appropriate point where the algorithm could terminate. The termination rule is based on the trade-off between improved accuracy of the variational approximation if the current learning rate is reduced by an adaptation factor :math:`\\rho \in (0,1)` and the time required to reach that improved accuracy. If the improved accuracy level is large compared to the runtime then this algorithm adaptively decrease the learning rate and if not algorithm will be terminated. See more details: https://arxiv.org/pdf/2203.15945.pdf Parameters ---------- sgo : `StochasticGradientOptimizer` object Optimizer to use for computing descent direction. rho : `float`, optional Learning rate reducing factor. The default is 0.5 iters0 : `int`, optional Small iteration number. The default is 1000 for decreased learning rate. accuracy_threshold : `float`, optional Absolute SKL accuracy threshold inefficiency_threshold : `float`, optional Threshold for the inefficiency index. init_rmpsprop : `Boolean`, optional Indicate whether to run using RMSProp optimization method for the initial learning rate **kwargs: Keyword arguments for `FASO` """
[docs] def __init__(self, sgo, *, rho=0.5, iters0=1000, accuracy_threshold=0.1, inefficiency_threshold=1.0, init_rmsprop=False, **kwargs): super().__init__(sgo, **kwargs) self._iters0 = iters0 self._rho = rho self._accuracy_threshold = accuracy_threshold self._inefficiency_threshold = inefficiency_threshold self._init_rmsprop = init_rmsprop if rho < 0 or rho > 1: raise ValueError('"rho" must be between zero and one')
[docs] def weighted_linear_regression(self, model, y, x, s=9, a=0.25, n_chains=4): """ weighted regression with likelihood term having the weight Parameters ---------- model : `pystan model` Pystan model to conduct the sampling y : `numpy_ndarray` Response variable x : `numpy_ndarray` Predictor variable s : `int`, optional Observation weight parameter. The default is 9. a : `int`, optional Power of the weight parameter. The default is 1/4. n_chains: `int`, optional Number of sampling chains. The default is 4. Returns ------- kappa : `float` Power c : `float` Constant fit : `Pystan object` Pystan fit object if results = True """ #defining initialization function def initfun(log_c, sigma, kappa=None, chain_id=1): if kappa is None: return dict(log_c=log_c, sigma=sigma) else: return dict(kappa=kappa, log_c=log_c, sigma=sigma) N = len(y) w = np.array(1/(1 + np.arange(N)[::-1]**2/s)**a) #weights data = dict(N=N, y=y, x=x, rho=self._rho, w=w) #data if isinstance(self._sgo, AveragedRMSProp) or isinstance(self._sgo, AveragedAdam): init = [initfun(100, 5, chain_id=i) for i in range(n_chains) ] #initial values else: init = [initfun(100, 5, 0.8, chain_id=i) for i in range(n_chains) ] #initial values fit = model.sampling(data=data, init=init, iter=1000, chains=n_chains, control=dict(adapt_delta=0.98)) #sampling from the model if isinstance(self._sgo, AveragedRMSProp) or isinstance(self._sgo, AveragedAdam): kappa = 1 else: kappa = np.mean(fit['kappa']) log_c = np.mean(fit['log_c']) c = np.exp(log_c) return fit, kappa, c
[docs] def wls(self, x, y, s=9, a=0.25): """ weighted least squares Parameters ---------- x : `numpy_ndarray` Predictor variable y : `numpy_ndarray` Response variable s : `int`, optional Observation weight parameter. The default is 9. a : `int`, optional Power of the weight parameter. The default is 1/4. Returns ------- b0 : `float` Intercept b1 : `float` Slope """ n = y.size x = np.column_stack((np.ones(n),x)) w = np.diag(1/(1 + np.arange(n)[::-1]**2/s**2)**a) #weights y = np.reshape(y,(n,1)) beta = np.linalg.inv(x.T @ w @ x) @ (x.T @ w @ y) return beta[0], beta[1]
[docs] def convg_iteration_trend_detection(self, slope): """ Detecting the relationship trend between learning rate and number of iterations to reach convergence Parameters ---------- slope : `float` slope of the fitted regression model Returns ------- bool Indicating having negative relationship or not """ if slope < 0: return True else: return False
[docs] def optimize(self, K_max, objective, init_param): """ Parameters ---------- K_max : `int` Number of iterations of the optimization objective: `function` Function for constructing the objective and gradient function init_param : `numpy.ndarray`, shape(var_param_dim,) Initial values of the variational parameters """ if not objective.approx.supports_kl: print('WARNING: approximation family does not support KL. Using FASO.', flush=True) return super().optimize(K_max, objective, init_param) k_new = -1 #Number of iterations at the given learning rate k = 0 #Number of times learning rate decreases k_total = 0 #Total number of iterations k_add = 0 #Iteration number when convergence reached for a fixed step size k_stopped_final = None #Iteration number when stopping rule criteria met sgo = self._sgo diagnostics = self._sgo._diagnostics if isinstance(self._sgo, AveragedRMSProp) or isinstance(self._sgo, AveragedAdam): reg_model = StanModel_cache(model_name='weighted_lin_regression_sgd') else: reg_model = StanModel_cache(model_name='weighted_lin_regression') iterate_average_curr = init_param.copy() history = defaultdict(list) history['iterate_average_curr_hist'].append(iterate_average_curr) history['k_mcse'].append(0) stopped = False try: while not stopped: K_max -= (k_new + 1) iterate_average_prev = iterate_average_curr if k == 0 and self._init_rmsprop: #Using RMSProp optimization initially if specified rmsprop = RMSProp(learning_rate=sgo._learning_rate, diagnostics=diagnostics) faso = FASO(sgo = rmsprop) opt = faso.optimize(K_max, objective, iterate_average_curr) else: opt = super().optimize(K_max, objective, iterate_average_curr) if opt['k_stopped'] is not None and k != 0: #removing the number of convergence iterations of initial learning rate convg_iters = opt['k_stopped'] history['conv_iters_hist'].append(convg_iters) # CI.append(convg_iters) iterate_average_curr = opt['opt_param'] history['iterate_average_curr_hist'].append(iterate_average_curr) k_new = opt['k_stopped'] # checking whether R hat convergence criteria reached and the FASO stopping criteria reached to add new number of iterations history['k_Rhat'].append(opt['k_Rhat'] + k_add if opt['k_Rhat'] is not None and k_new is not None else opt['k_Rhat']) #checking whether R hat convergence criteria and the FASO stopping criteria reached to add new number of iterations history['k_conv'].append(opt['k_conv'] + k_add if opt['k_conv'] is not None and k_new is not None else opt['k_conv']) history['k_mcse'].append(k_new + k_add if k_new is not None else k_new) history['variational_param_history'].extend(opt['variational_param_history']) history['value_history'].extend(opt['value_history']) history['grad_history'].extend(opt['grad_history']) #if user require diagnostics if diagnostics: history['descent_dir_history'].extend(opt['descent_dir_history']) #checking convergence detection to store computed MCSE and ESS if opt['k_conv'] is not None: history['ess_history'].extend(opt['ess_history']) history['mcse_history'].extend(opt['mcse_history']) if len(history['mcse_history']) > 0: history['final_mcse_history'].append(history['mcse_history'][-1]) else: history['final_mcse_history'].append(history['mcse_history']) #saving the iteration histories at the initial learning rate if k == 0: history['iterate_average_k_history'].extend(opt['iterate_average_k_history']) history['iterate_average_history'].extend(opt['iterate_average_history']) else: #adding previous number of iterations to the current iterations history['iterate_average_k_history'].extend(opt['iterate_average_k_history'][1:] + k_add) history['iterate_average_history'].extend(opt['iterate_average_history'][1:,:]) k_add = history['iterate_average_k_history'][-1] if k_new is None: # maximum number of iterations reached break else: #compute the stopping criteria k_total += k_new sgo._learning_rate *= self._rho self._mcse_threshold *= self._rho if isinstance(self._sgo, AveragedRMSProp) or isinstance(self._sgo, AveragedAdam): self._sgo.reset_state() if len(history['learning_rate_hist']) > 0: SKL = (objective.approx.kl(iterate_average_prev, iterate_average_curr) + objective.approx.kl(iterate_average_curr, iterate_average_prev)) history['SKL_history'].append(SKL) # Conduct weighted linear regression to estimate parameters # of SKL hat if len(history['SKL_history']) > 0: y_wlr = np.log(history['SKL_history']) x_wlr = np.log(history['learning_rate_hist']) fit, kappa, c = self.weighted_linear_regression(reg_model, y_wlr, x_wlr) if diagnostics: history['c_sample_hist'].append(np.exp(fit['log_c'])) if isinstance(self._sgo, AveragedRMSProp) or \ isinstance(self._sgo, AveragedAdam): history['kappa_sample_hist'] = None else: history['kappa_sample_hist'].append(fit['kappa']) history['kappa_hist'].append(kappa) history['c_hist'].append(c) #computing the termination rule criteria if len(history['learning_rate_hist']) > 1: relative_skl = (self._rho)**kappa + \ (self._accuracy_threshold/(np.sqrt(c) * history['learning_rate_hist'][-1]**kappa)) curr_iters = history['conv_iters_hist'][-1] _, slope = self.wls(np.log(history['learning_rate_hist']), np.log(history['conv_iters_hist'])) trend_check = self.convg_iteration_trend_detection(slope) if trend_check: #if negative relationship use all observations y_wls = history['conv_iters_hist'] x_wls = history['learning_rate_hist'] else: #remove the initial observation y_wls = history['conv_iters_hist'][1:] x_wls = history['learning_rate_hist'][1:] b0, b1 = self.wls(np.log(x_wls), np.log(y_wls)) pred_iters = int(np.exp(b0) * \ (self._rho * history['learning_rate_hist'][-1])**b1) history['predicted_iters_hist'].append(pred_iters) relative_iters = pred_iters/(curr_iters + self._iters0) history['stopping_crt'].append(relative_skl * relative_iters) #checking whether termination rule condition satisfied if relative_skl * relative_iters > self._inefficiency_threshold: stopped = True k_stopped_final = k_total history['k_stopped_final_hist'].append(k_total) break history['learning_rate_hist'].append(sgo._learning_rate) # LR.append(sgo._learning_rate) k += 1 except (KeyboardInterrupt, StopIteration) as e: # pragma: no cover pass if stopped: print('Termination rule reached at iteration', k_total) print('Inefficiency Index:', relative_skl * relative_iters) else: print('WARNING: maximum number of iterations reached before ' 'stopping rule was triggered') results = {d: np.array(h) for d, h in history.items() if d!='k_Rhat' and d!='k_mcse' and d!='k_conv' } results['opt_param'] = iterate_average_curr results['k_stopped_final'] = k_stopped_final results['k_Rhat'] = history['k_Rhat']; results['k_mcse'] = history['k_mcse'] results['k_conv'] = history['k_conv'] return results