Program to assist librarians with weeding by making predictions based on past decision data and integrating librarian-approved predictions into the data set
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

321 lines
14 KiB

import numpy as np
import cupy as xp
import scipy.linalg
import pandas as pd
import matplotlib.pyplot as plt
import kernels
import itertools
class SVM():
"""support vector machine"""
def __init__(self, kernel, kernel_params, lambduh=1, max_iter=1000, classification_strategy='ovr', x=None, y=None, n_folds=3, lambda_vals=None, use_optimal_lambda=False, display_plots=True, logging=False):
"""initialize the classifier"""
self._kernel = kernel
self._kernel_params = kernel_params
self._lambduh = lambduh
self._max_iter = max_iter
self._classification_strategy = classification_strategy
self._y = y
self._set_x(x)
self._coef_matrix = []
self._n_folds = n_folds
self._display_plots = display_plots
self._logging = logging
self._lambda_vals = lambda_vals
if self._lambda_vals is None:
self._lambda_vals = [10**i for i in range(-3, 4)]
self._use_optimal_lambda = use_optimal_lambda
def fit(self, x=None, y=None, prevent_relabel=False, use_optimal_lambda=False):
"""Trains the kernel support vector machine with the huberized hinge loss"""
self._set_x(x)
if y is not None:
self._y = y
self._K = self._compute_gram_matrix()
self._n = len(self._x)
objective_val_size = int(self._max_iter//10) + (1 if self._max_iter % 10 == 0 else 0)
self._objective_val_per_iter = xp.zeros(objective_val_size)
if self._classification_strategy == 'ovr':
iterate_over = xp.asarray(np.unique(xp.asnumpy(self._y)))
elif self._classification_strategy == 'ovo' or self._classification_strategy == 'binary':
iterate_over = SVM._get_unique_pairs(self._y)
if self._use_optimal_lambda or use_optimal_lambda:
self._lambduh, misclassification_error = self.compute_optimal_lambda()
print('Misclassification error (train), {}, optimal lambda = {} : {}'.format(self._classification_strategy, self._lambduh, misclassification_error))
for i in range(len(iterate_over)):
if self._logging:
print('Training classifier {} of {}'.format(i + 1, len(iterate_over)))
if self._classification_strategy == 'ovr':
primary_class = iterate_over[i]
x_filtered, y_filtered = SVM._filter_data_by_class_ovr(self._x, self._y, primary_class)
elif self._classification_strategy == 'ovo':
pair = iterate_over[i]
x_filtered, y_filtered = SVM.filter_data_by_class_ovo(self._x, self._y, pair, prevent_relabel)
elif self._classification_strategy == 'binary':
pair = iterate_over[i]
self._n = len(self._x)
self._K = self._compute_gram_matrix()
if prevent_relabel:
self._primary_class = 1
self._secondary_class = -1
else:
self._primary_class = pair[0]
self._secondary_class = pair[1]
self._y = xp.where(self._y == self._primary_class, 1, -1)
self._coef_matrix, self._objective_val_per_iter, self._misclassification_error_per_iter = self._fast_gradient_descent()
return
svm = SVM(self._kernel, self._kernel_params, self._lambduh, self._max_iter, 'binary', display_plots=True)
svm.fit(x_filtered, y_filtered, prevent_relabel=True)
self._coef_matrix.append(svm._coef_matrix)
self._objective_val_per_iter += svm._objective_val_per_iter * (1/len(iterate_over))
if self._display_plots:
self.objective_plot()
return
def cross_validation_error(self):
error_per_lambda = xp.zeros(len(self._lambda_vals))
for i in range(len(self._lambda_vals)):
lambduh = self._lambda_vals[i]
if self._logging:
print('lambduh = {} ({} of {})'.format(lambduh, i + 1, num_lambda))
error_per_fold = xp.zeros(self._n_folds)
for j in range(self._n_folds):
fold_size = int(self._n/self._n_folds)
indicies = xp.array(range(0, self._n))
fold_indicies = ((indicies >= fold_size*j) & (indicies <= fold_size*(j+1)))
x_train = self._x[fold_indicies == True]
y_train = self._y[fold_indicies == True]
x_test = self._x[fold_indicies == False]
y_test = self._y[fold_indicies == False]
y_train = y_train.reshape((len(y_train), 1))
y_test = y_test.reshape((len(y_test), 1))
y_train = xp.ravel(y_train)
y_test = xp.ravel(y_test)
svm = SVM(self._kernel, self._kernel_params, lambduh, self._max_iter, self._classification_strategy)
svm.fit(x_train, y_train)
error_per_fold[j] = svm.compute_misclassification_error(x_test, y_test)
error_per_lambda[i] = xp.mean(error_per_fold)
return error_per_lambda.tolist()
def compute_optimal_lambda(self):
cross_validation_error = self.cross_validation_error()
if self._display_plots:
df = pd.DataFrame({'lambda':self._lambda_vals, 'Cross validation error':xp.asnumpy(cross_validation_error)})
display(df)
df.plot('lambda', 'Cross validation error', logx=True)
plt.show()
return self._lambda_vals[np.nanargmin(cross_validation_error)], np.min(cross_validation_error)
def compute_misclassification_error(self, x, y):
y_pred = self.predict(x)
return xp.mean(y_pred != y)
def predict(self, x):
x = self._standardize(x)
if self._classification_strategy == 'ovr':
return self._predict_ovr(x)
elif self._classification_strategy == 'ovo':
return self._predict_ovo(x)
else:
return self._predict_binary(x)
def objective_plot(self):
fig, ax = plt.subplots()
ax.plot(np.array(range(len(self._objective_val_per_iter)))*10, xp.asnumpy(self._objective_val_per_iter), label='Train', c='red')
plt.xlabel('Iteration')
plt.ylabel('Objective value')
plt.title('Objective value vs. iteration when lambda=' + str(self._lambduh))
ax.legend(loc='upper right')
plt.show()
def plot_misclassification_error(self):
if self._classification_strategy == 'binary':
fig, ax = plt.subplots()
ax.plot(np.array(range(len(self._misclassification_error_per_iter)))*10, xp.asnumpy(self._misclassification_error_per_iter), label='Train', c='red')
plt.xlabel('Iteration')
plt.ylabel('Misclassification error')
plt.title('Misclassification error vs iteration')
ax.legend(loc='upper right')
plt.show()
else:
print('Plotting misclassification error only available for binary classification.')
def _set_x(self, x):
if x is not None:
self._n = len(x)
self._x_sd = xp.std(x, axis=0)
self._x_mean = xp.mean(x, axis=0)
self._x = self._standardize(x)
def _standardize(self, x):
sd = self._x_sd
mean = self._x_mean
mean[sd == 0] = 0
sd[sd == 0] = 1
return (x - mean) / sd
@staticmethod
def filter_data_by_class_ovo(x, y, classes, prevent_relabel=False):
x = SVM._select_classes(x, y, classes)
y = SVM._select_classes(y, y, classes)
if prevent_relabel == False:
y = xp.where(y == classes[0], 1, -1)
return x, y
@staticmethod
def subset_data(x, y, max_samples):
if max_samples is None or max_samples > len(x):
return x, y
else:
idx = np.random.choice(np.arange(len(x)), max_samples, replace=False)
return x[idx], y[idx]
@staticmethod
def subset_data_gpu(x, y, max_samples):
x, y = subset_data(x, y, max_samples)
return xp.asarray(x), xp.asarray(y)
@staticmethod
def _get_unique_pairs(y):
return pd.Series(list(itertools.combinations(np.unique(xp.asnumpy(y)),2)))
@staticmethod
def _select_classes(x, y, classes):
if len(classes) == 2:
return x[(y == classes[0]) | (y == classes[1])]
else:
return x[xp.asarray(np.isin(xp.asnumpy(y), classes))]
@staticmethod
def _select_classes_ovr(x, y, primary_class):
positive = x[y == primary_class]
negative = x[y != primary_class]
# Get random rows of the same length as the positive matrix
if len(positive) < len(negative):
negative = negative[xp.random.choice(negative.shape[0], len(positive), replace=False)]
return xp.concatenate((positive, negative), axis=0)
@staticmethod
def _filter_data_by_class_ovr(x, y, primary_class):
x = SVM._select_classes_ovr(x, y, primary_class)
y = SVM._select_classes_ovr(y, y, primary_class)
y = xp.where(y == primary_class, 1, -1)
return x, y
def _compute_gradient(self, alpha):
"""Computes the gradient ∇F(β) of F"""
K_alpha = xp.dot(self._K, alpha)
grad = -2 / self._n * xp.sum(self._y[:, xp.newaxis] * self._K * xp.max(xp.stack((xp.zeros_like(self._y), 1 - self._y * K_alpha)), axis=0)[:, xp.newaxis], axis=0) + 2 * self._lambduh * K_alpha
return grad
def _objective(self, alpha):
K_alpha = xp.dot(self._K, alpha)
return 1 / self._n * xp.sum(xp.max(xp.stack((xp.zeros_like(self._y), 1 - self._y * K_alpha)), axis=0) ** 2) + self._lambduh * alpha.dot(K_alpha)
def _backtracking_line_search(self, alpha, eta=1, alphaparam=0.5, betaparam=0.8, max_iter=100):
grad_alpha = self._compute_gradient(alpha)
norm_grad_alpha = xp.linalg.norm(grad_alpha)
found_eta = 0
iter = 0
while found_eta == 0 and iter < max_iter:
if self._objective(alpha - eta * grad_alpha) < self._objective(alpha) - alphaparam * eta * norm_grad_alpha ** 2:
found_eta = 1
elif iter == max_iter:
raise ('Max number of iterations of backtracking line search reached')
else:
eta *= betaparam
iter += 1
return eta
def _compute_gram_matrix(self):
"""Computes, for any set of datapoints x1,...,xn, the kernel matrix K"""
kernel = self._kernel
if kernel == 'rbf':
kernel += '_sklearn'
gram = kernels.kernel_dict[kernel](self._x, self._x, self._kernel_params)
return gram
#
def _kernel_eval(self, x, x_train):
keval = kernels.kernel_dict[self._kernel](x_train, x, self._kernel_params)
return keval
def _fast_gradient_descent(self):
eta_init = self._optimal_eta_init()
alpha = xp.zeros(self._n)
theta = xp.zeros(self._n)
eta = eta_init
grad_theta = self._compute_gradient(theta)
objective_val_size = int(self._max_iter//10) + (1 if self._max_iter % 10 == 0 else 0)
objective_vals = xp.ones(objective_val_size)
misclassification_error_per_iter = xp.ones(objective_val_size)
iter = 0
while iter < self._max_iter:
eta = self._backtracking_line_search(theta, eta=eta)
alpha_new = theta - eta * grad_theta
theta = alpha_new + iter / (iter + 3) * (alpha_new - alpha)
grad_theta = self._compute_gradient(theta)
alpha = alpha_new
iter += 1
if self._display_plots and iter % 10 == 0:
objective_vals[int(iter/10)] = self._objective(alpha)
self._coef_matrix = alpha
misclassification_error_per_iter[int(iter/10)] = self.compute_misclassification_error(self._x, xp.where(self._y > 0, self._primary_class, self._secondary_class))
return alpha, objective_vals, misclassification_error_per_iter
def _predict_binary(self, x):
return self._prediction_binary(self._coef_matrix, x, self._x, self._primary_class, self._secondary_class)
def _predict_ovo(self, x):
def mode(a):
counts = xp.bincount(a.astype(xp.int64))
return xp.argmax(counts)
pairs = SVM._get_unique_pairs(self._y)
predictions = xp.zeros((x.shape[0], len(pairs)))
for i in range(len(pairs)):
pair = pairs[i]
alpha = self._coef_matrix[i]
x_train_filtered, y_train_filtered = SVM.filter_data_by_class_ovo(self._x, self._y, pair)
y_pred = self._prediction_binary(alpha, x, x_train_filtered, pair[0], pair[1])
predictions[:,i] = y_pred
return xp.stack([mode(p) for p in predictions])
def _predict_ovr(self, x):
y_unique = xp.asarray(np.unique(xp.asnumpy(self._y)))
prediction_probabilities = xp.zeros((x.shape[0], len(y_unique)))
for i in range(len(y_unique)):
alpha = self._coef_matrix[i]
x_train_filtered, y_train_filtered = SVM._filter_data_by_class_ovr(self._x, self._y, y_unique[i])
pred_prob = self._prediction_prob(alpha, x, x_train_filtered)
prediction_probabilities[:,i] = pred_prob
return xp.stack([y_unique[xp.argmax(p)] for p in prediction_probabilities])
def _prediction_prob(self, alpha, x, x_train):
return xp.dot(self._kernel_eval(x, x_train).T, alpha)
def _prediction_binary(self, alpha, x, x_train, class1=1, class2=-1):
pred_prob = self._prediction_prob(alpha, x, x_train)
return xp.where(pred_prob > 0, class1, class2)
def _optimal_eta_init(self):
return 1 / scipy.linalg.eigh(xp.asnumpy(2 / self._n * xp.dot(self._K, self._K) + 2 * self._lambduh * self._K), eigvals=(self._n - 1, self._n - 1), eigvals_only=True)[0]