Source code for mrftools.MarkovNet

"""Markov network class for storing potential functions and structure."""
import numpy as np
from scipy.sparse import coo_matrix


[docs]class MarkovNet(object): """Object containing the definition of a pairwise Markov net.""" def __init__(self): """Initialize a Markov net.""" self.edge_potentials = dict() self.unary_potentials = dict() self.neighbors = dict() self.variables = set() self.num_states = dict() self.matrix_mode = False self.tree_probabilities = dict() # initialize values only used in matrix mode to None self.max_states = None self.message_to_map = None self.message_to = None self.message_from = None self.var_index = None self.var_list = None self.unary_mat = None self.edge_pot_tensor = None self.num_edges = None self.message_index = None self.degrees = None
[docs] def set_unary_factor(self, variable, potential): """ Set the potential function for the unary factor. Implicitly declare variable. Must be called before setting edge factors. :param variable: name of the variable (can be any hashable object) :param potential: length-k vector of log potential values for the respective k states :return: None """ self.unary_potentials[variable] = potential if variable not in self.variables: self.declare_variable(variable, np.size(potential))
[docs] def declare_variable(self, variable, num_states): """ Indicate the existence of a variable :param variable: name of the variable (can be any hashable object) :param num_states: integer number of states the variable can take :return: None """ if variable not in self.variables: self.variables.add(variable) self.neighbors[variable] = set() self.num_states[variable] = num_states else: print("Warning: declaring a variable %s that was previously declared." % repr(variable))
[docs] def set_edge_factor(self, edge, potential): """ Set a factor by inputting the involved variables then the potential function. The potential function should be a np matrix. :param edge: 2-tuple of the variables in the edge. Can be in any order. :param potential: k1 by k2 matrix of potential values for the joint state of the two variables :return: None """ assert np.shape(potential) == (len(self.unary_potentials[edge[0]]), len(self.unary_potentials[edge[1]])), \ "potential size %d, %d incompatible with unary sizes %d, %d" % \ (np.shape(potential)[0], np.shape(potential)[1], len(self.unary_potentials[edge[0]]), len(self.unary_potentials[edge[1]])) if edge[0] < edge[1]: self.edge_potentials[edge] = potential else: self.edge_potentials[(edge[1], edge[0])] = potential.T self.neighbors[edge[0]].add(edge[1]) self.neighbors[edge[1]].add(edge[0])
[docs] def get_potential(self, key): """ Return the potential between pair[0] and pair[1]. If (pair[1], pair[0]) is in our dictionary instead, return the transposed potential. :param key: name of the key whose potential to get. Can either be a variable name or a pair of variables (edge) :return potential table for the key """ if key in self.edge_potentials: return self.edge_potentials[key] else: return self.edge_potentials[(key[1], key[0])].T
[docs] def get_neighbors(self, variable): """ Return the neighbors of variable. :param variable: name of variable :return: set of neighboring variables connected in MRF """ return self.neighbors[variable]
[docs] def evaluate_state(self, states): """ Evaluate the energy of a state. states should be a dictionary of variable: state (int) pairs. :param states: dictionary of variable states with a key-value pair for each variable :return: MRF energy value for the state as a float """ energy = 0.0 for var in self.variables: energy += self.unary_potentials[var][states[var]] for neighbor in self.neighbors[var]: if var < neighbor: energy += self.get_potential((var, neighbor))[states[var], states[neighbor]] return energy
[docs] def set_unary_mat(self, unary_mat): """ Set the matrix representation of the unary potentials :param unary_mat: (num vars) by (max states) of unary potentials :return: None """ assert np.array_equal(self.unary_mat.shape, unary_mat.shape) self.unary_mat[:, :] = unary_mat
[docs] def set_edge_tensor(self, edge_tensor): """ Set the tensor representation of the edge potentials :param edge_tensor: (max states) by (max states) by (num edges) tensor of the edge potentials :return: None """ if np.array_equal(self.edge_pot_tensor.shape, edge_tensor.shape): self.edge_pot_tensor[:, :, :] = edge_tensor else: mirrored_edge_tensor = np.concatenate((edge_tensor, edge_tensor.transpose((1, 0, 2))), 2) assert np.array_equal(self.edge_pot_tensor.shape, mirrored_edge_tensor.shape) self.edge_pot_tensor[:, :, :] = mirrored_edge_tensor
[docs] def create_matrices(self): """ Create matrix representations of the MRF structure and potentials to allow inference to be done via matrix operations :return: None """ self.matrix_mode = True self.max_states = max([len(x) for x in self.unary_potentials.values()]) self.unary_mat = -np.inf * np.ones((self.max_states, len(self.variables))) self.degrees = np.zeros(len(self.variables)) # var_index allows looking up the numerical index of a variable by its hashable name self.var_index = dict() self.var_list = [] message_num = 0 for var in self.variables: potential = self.unary_potentials[var] self.unary_mat[0:len(potential), message_num] = potential self.var_index[var] = message_num self.var_list.append(var) self.degrees[message_num] = len(self.neighbors[var]) message_num += 1 # set up pairwise tensor self.num_edges = 0 for var in self.variables: for neighbor in self.neighbors[var]: if var < neighbor: self.num_edges += 1 self.edge_pot_tensor = -np.inf * np.ones((self.max_states, self.max_states, 2 * self.num_edges)) self.message_index = {} # set up sparse matrix representation of adjacency from_rows = [] from_cols = [] to_rows = [] to_cols = [] message_num = 0 for var in self.variables: for neighbor in self.neighbors[var]: if var < neighbor: # for each unique edge potential = self.get_potential((var, neighbor)) dims = potential.shape # store copies of the potential for each direction messages can travel on the edge # forward self.edge_pot_tensor[0:dims[1], 0:dims[0], message_num] = potential.T # and backward self.edge_pot_tensor[0:dims[0], 0:dims[1], message_num + self.num_edges] = potential # get numerical index of var and neighbor var_i = self.var_index[var] neighbor_i = self.var_index[neighbor] # store that the forward slice represents a message from var from_rows.append(message_num) from_cols.append(var_i) # store that the backward slice represents a message from neighbor from_rows.append(message_num + self.num_edges) from_cols.append(neighbor_i) # store that the forward slice represents a message to neighbor to_rows.append(message_num) to_cols.append(neighbor_i) # store that the backward slice represents a message to var to_rows.append(message_num + self.num_edges) to_cols.append(var_i) self.message_index[(var, neighbor)] = message_num message_num += 1 # generate a sparse matrix representation of the message indices to variables that receive messages self.message_to_map = coo_matrix((np.ones(len(to_rows)), (to_rows, to_cols)), (2 * self.num_edges, len(self.variables))) # store an array that lists which variable each message is sent to self.message_to = np.zeros(2 * self.num_edges, dtype=np.intp) self.message_to[to_rows] = to_cols # store an array that lists which variable each message is received from self.message_from = np.zeros(2 * self.num_edges, dtype=np.intp) self.message_from[from_rows] = from_cols