Source code for mrftools.opt

"""Optimization utility class containing various optimizers and utility objects for callback functions"""
import time

import matplotlib.pyplot as plt
import numpy as np
from scipy.optimize import minimize


[docs]def sgd(func, grad, x, args={}, callback=None): """ Stochastic gradient descent with a linear rate decay :param func: function to be minimized (used here only to update the gradient) :param grad: gradient function that returns the gradient of the function to be minimized :param x: vector initial value of value being optimized over :param args: arguments with optimizer options and for the func and grad functions :param callback: function to be called with the current iterate each iteration :return: optimized solution """ t = 1 if not args: args = {} tolerance = args.get('tolerance', 1e-8) max_iter = args.get('max_iter', 10000) change = np.inf while change > tolerance and t < max_iter: old_x = x g = grad(x, args) x = x - 0.5 * g / t change = np.sum(np.abs(x - old_x)) t += 1 if callback: callback(x) return x
[docs]def ada_grad(func, grad, x, args={}, callback=None): """ Adagrad adaptive gradient optimizer :param func: function to be minimized (used here only to update the gradient) :param grad: gradient function that returns the gradient of the function to be minimized :param x: vector initial value of value being optimized over :param args: arguments with optimizer options and for the func and grad functions :param callback: function to be called with the current iterate each iteration :return: optimized solution """ t = 1 if not args: args = {} x_tol = args.get('x_tol', 1e-6) g_tol = args.get('g_tol', 0.01) eta = args.get('eta', 0.1) offset = args.get('offset', 1.0) max_iter = args.get('max_iter', 10000) grad_norm = np.inf x_change = np.inf grad_sum = 0 while grad_norm > g_tol and x_change > x_tol and t < max_iter: if callback: callback(x) func(x, args) g = grad(x, args) grad_sum += g * g change = eta * g / (np.sqrt(grad_sum) + offset) x = x - change grad_norm = np.sqrt(g.dot(g)) x_change = np.sqrt(change.dot(change)) # grad_norm = np.sqrt(g.dot(g)) t += 1 if callback: callback(x) return x
[docs]def rms_prop(func, grad, x, args={}, callback=None): """ RMSProp adaptive gradient optimizer :param func: function to be minimized (used here only to update the gradient) :param grad: gradient function that returns the gradient of the function to be minimized :param x: vector initial value of value being optimized over :param args: arguments with optimizer options and for the func and grad functions :param callback: function to be called with the current iterate each iteration :return: optimized solution """ t = 1 if not args: args = {} x_tol = args.get('x_tol', 0.02) g_tol = args.get('g_tol', 1e-6) eta = args.get('eta', 0.1) gamma = args.get('gamma', 0.1) eps = args.get('eps', 1e-8) max_iter = args.get('max_iter', 10000) grad_norm = np.inf x_change = np.inf avg_sq_grad = np.zeros(len(x)) grad_sum = 0 while grad_norm > g_tol and x_change > x_tol and t < max_iter: if callback: callback(x) func(x, args) g = grad(x, args) avg_sq_grad = avg_sq_grad * gamma + g ** 2 * (1 - gamma) change = eta * g / (np.sqrt(avg_sq_grad) + eps) x = x - change grad_norm = np.sqrt(g.dot(g)) x_change = np.sqrt(change.dot(change)) # grad_norm = np.sqrt(g.dot(g)) t += 1 if callback: callback(x) return x
[docs]def adam(func, grad, x, args={}, callback=None): """ Adam adaptive gradient optimizer :param func: function to be minimized (used here only to update the gradient) :param grad: gradient function that returns the gradient of the function to be minimized :param x: vector initial value of value being optimized over :param args: arguments with optimizer options and for the func and grad functions :param callback: function to be called with the current iterate each iteration :return: optimized solution """ t = 1 if not args: args = {} x_tol = args.get('x_tol', 1e-3) g_tol = args.get('g_tol', 1e-3) eps = args.get('eps', 1e-8) b1 = args.get('b1', 0.9) b2 = args.get('b2', 0.999) step_size = args.get('step_size', 0.01) max_iter = args.get('max_iter', 10000) grad_norm = np.inf x_change = np.inf m = np.zeros(len(x)) v = np.zeros(len(x)) while grad_norm > g_tol and x_change > x_tol and t < max_iter: if callback: callback(x) func(x, args) g = grad(x, args) m = (1 - b1) * g + b1 * m v = (1 - b2) * (g ** 2) + b2 * v m_hat = m / (1 - b1 ** (t + 1)) v_hat = v / (1 - b2 ** (t + 1)) change = step_size * m_hat / (np.sqrt(v_hat) + eps) x = x - change grad_norm = np.sqrt(g.dot(g)) x_change = np.sqrt(change.dot(change)) t += 1 if callback: callback(x) return x
[docs]def lbfgs(func, grad, x, args={}, callback=None): """ Adapter for scipy's standard minimize function, which defaults to using the LBFGS-B optimizer :param func: function to be minimized (used here only to update the gradient) :param grad: gradient function that returns the gradient of the function to be minimized :param x: vector initial value of value being optimized over :param args: arguments with optimizer options and for the func and grad functions :param callback: function to be called with the current iterate each iteration :return: optimized solution """ if callback: res = minimize(fun=func, x0=x, args=args, jac=grad, callback=callback) else: res = minimize(fun=func, x0=x, args=args, jac=grad) return res.x
[docs]class WeightRecord(object): """ Class used to store solutions during optimization. Used to generate a callback function that will store the solution passed in. Useful for diagnostics, but in production, usually suboptimal solutions don't need to be saved. """ def __init__(self): self.weight_record = np.array([]) self.time_record = np.array([])
[docs] def callback(self, x): """ Save x into the WeightRecord with a timestamp :param x: vector to be saved into the weight record :return: """ a = np.copy(x) if self.weight_record.size == 0: self.weight_record = a.reshape((1, a.size)) self.time_record = np.array([time.time()]) else: self.weight_record = np.vstack((self.weight_record, a)) self.time_record = np.vstack((self.time_record, time.time()))
[docs]class ObjectivePlotter(object): """ Class to generate a plot of the objective function during the callback """ def __init__(self, func, grad=None): """ Initializes the plotter with the function and gradient :param func: function being optimized :param grad: gradient of function """ self.objectives = [] self.func = func # plt.switch_backend("MacOSX") self.timer = time.time() self.interval = 2.0 self.last_x = 0 self.grad = grad self.t = 0 self.iters = [] if self.grad: print("Iter\tf(x)\t\t\tnorm(g)\t\t\tdx")
[docs] def callback(self, x): """ Plot the current objectvie value and the current solution, and prints diagnostic information about the current solution, objective, and gradient, when available. :param x: current iterate :return: """ elapsed_time = time.time() - self.timer if elapsed_time > self.interval: self.objectives.append(self.func(x)) self.iters.append(self.t) plt.clf() plt.subplot(131) plt.plot(self.iters, self.objectives) plt.ylabel('Objective') plt.xlabel('Iteration') plt.title(self.objectives[-1]) plt.subplot(132) plt.plot(self.iters[-50:], self.objectives[-50:]) plt.ylabel('Objective') plt.xlabel('Iteration') plt.title("Zoom") plt.subplot(133) plt.plot(x) plt.title('Current solution') # print out diagnostic info if self.grad: g = self.grad(x) diff = x - self.last_x print("%d\t%e\t%e\t%e" % ( self.iters[-1], self.objectives[-1], np.sqrt(g.dot(g)), np.sqrt(diff.dot(diff)))) plt.pause(1.0 / 120.0) self.timer = time.time() self.last_x = x self.t += 1