You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
321 lines
14 KiB
321 lines
14 KiB
3 years ago
|
import numpy as np
|
||
|
import cupy as xp
|
||
|
import scipy.linalg
|
||
|
import pandas as pd
|
||
|
import matplotlib.pyplot as plt
|
||
|
import kernels
|
||
|
import itertools
|
||
|
|
||
|
|
||
|
class SVM():
|
||
|
"""support vector machine"""
|
||
|
|
||
|
def __init__(self, kernel, kernel_params, lambduh=1, max_iter=1000, classification_strategy='ovr', x=None, y=None, n_folds=3, lambda_vals=None, use_optimal_lambda=False, display_plots=True, logging=False):
|
||
|
"""initialize the classifier"""
|
||
|
|
||
|
self._kernel = kernel
|
||
|
self._kernel_params = kernel_params
|
||
|
self._lambduh = lambduh
|
||
|
self._max_iter = max_iter
|
||
|
self._classification_strategy = classification_strategy
|
||
|
self._y = y
|
||
|
self._set_x(x)
|
||
|
|
||
|
self._coef_matrix = []
|
||
|
|
||
|
self._n_folds = n_folds
|
||
|
self._display_plots = display_plots
|
||
|
self._logging = logging
|
||
|
self._lambda_vals = lambda_vals
|
||
|
if self._lambda_vals is None:
|
||
|
self._lambda_vals = [10**i for i in range(-3, 4)]
|
||
|
self._use_optimal_lambda = use_optimal_lambda
|
||
|
|
||
|
def fit(self, x=None, y=None, prevent_relabel=False, use_optimal_lambda=False):
|
||
|
"""Trains the kernel support vector machine with the huberized hinge loss"""
|
||
|
|
||
|
self._set_x(x)
|
||
|
if y is not None:
|
||
|
self._y = y
|
||
|
self._K = self._compute_gram_matrix()
|
||
|
|
||
|
self._n = len(self._x)
|
||
|
objective_val_size = int(self._max_iter//10) + (1 if self._max_iter % 10 == 0 else 0)
|
||
|
self._objective_val_per_iter = xp.zeros(objective_val_size)
|
||
|
|
||
|
if self._classification_strategy == 'ovr':
|
||
|
iterate_over = xp.asarray(np.unique(xp.asnumpy(self._y)))
|
||
|
elif self._classification_strategy == 'ovo' or self._classification_strategy == 'binary':
|
||
|
iterate_over = SVM._get_unique_pairs(self._y)
|
||
|
|
||
|
if self._use_optimal_lambda or use_optimal_lambda:
|
||
|
self._lambduh, misclassification_error = self.compute_optimal_lambda()
|
||
|
print('Misclassification error (train), {}, optimal lambda = {} : {}'.format(self._classification_strategy, self._lambduh, misclassification_error))
|
||
|
|
||
|
for i in range(len(iterate_over)):
|
||
|
if self._logging:
|
||
|
print('Training classifier {} of {}'.format(i + 1, len(iterate_over)))
|
||
|
|
||
|
if self._classification_strategy == 'ovr':
|
||
|
primary_class = iterate_over[i]
|
||
|
x_filtered, y_filtered = SVM._filter_data_by_class_ovr(self._x, self._y, primary_class)
|
||
|
elif self._classification_strategy == 'ovo':
|
||
|
pair = iterate_over[i]
|
||
|
x_filtered, y_filtered = SVM.filter_data_by_class_ovo(self._x, self._y, pair, prevent_relabel)
|
||
|
elif self._classification_strategy == 'binary':
|
||
|
pair = iterate_over[i]
|
||
|
self._n = len(self._x)
|
||
|
self._K = self._compute_gram_matrix()
|
||
|
if prevent_relabel:
|
||
|
self._primary_class = 1
|
||
|
self._secondary_class = -1
|
||
|
else:
|
||
|
self._primary_class = pair[0]
|
||
|
self._secondary_class = pair[1]
|
||
|
self._y = xp.where(self._y == self._primary_class, 1, -1)
|
||
|
self._coef_matrix, self._objective_val_per_iter, self._misclassification_error_per_iter = self._fast_gradient_descent()
|
||
|
return
|
||
|
|
||
|
svm = SVM(self._kernel, self._kernel_params, self._lambduh, self._max_iter, 'binary', display_plots=True)
|
||
|
svm.fit(x_filtered, y_filtered, prevent_relabel=True)
|
||
|
self._coef_matrix.append(svm._coef_matrix)
|
||
|
self._objective_val_per_iter += svm._objective_val_per_iter * (1/len(iterate_over))
|
||
|
if self._display_plots:
|
||
|
self.objective_plot()
|
||
|
return
|
||
|
|
||
|
def cross_validation_error(self):
|
||
|
error_per_lambda = xp.zeros(len(self._lambda_vals))
|
||
|
|
||
|
for i in range(len(self._lambda_vals)):
|
||
|
lambduh = self._lambda_vals[i]
|
||
|
if self._logging:
|
||
|
print('lambduh = {} ({} of {})'.format(lambduh, i + 1, num_lambda))
|
||
|
error_per_fold = xp.zeros(self._n_folds)
|
||
|
for j in range(self._n_folds):
|
||
|
fold_size = int(self._n/self._n_folds)
|
||
|
indicies = xp.array(range(0, self._n))
|
||
|
fold_indicies = ((indicies >= fold_size*j) & (indicies <= fold_size*(j+1)))
|
||
|
x_train = self._x[fold_indicies == True]
|
||
|
y_train = self._y[fold_indicies == True]
|
||
|
x_test = self._x[fold_indicies == False]
|
||
|
y_test = self._y[fold_indicies == False]
|
||
|
y_train = y_train.reshape((len(y_train), 1))
|
||
|
y_test = y_test.reshape((len(y_test), 1))
|
||
|
|
||
|
y_train = xp.ravel(y_train)
|
||
|
y_test = xp.ravel(y_test)
|
||
|
|
||
|
svm = SVM(self._kernel, self._kernel_params, lambduh, self._max_iter, self._classification_strategy)
|
||
|
svm.fit(x_train, y_train)
|
||
|
error_per_fold[j] = svm.compute_misclassification_error(x_test, y_test)
|
||
|
|
||
|
error_per_lambda[i] = xp.mean(error_per_fold)
|
||
|
|
||
|
return error_per_lambda.tolist()
|
||
|
|
||
|
def compute_optimal_lambda(self):
|
||
|
cross_validation_error = self.cross_validation_error()
|
||
|
if self._display_plots:
|
||
|
df = pd.DataFrame({'lambda':self._lambda_vals, 'Cross validation error':xp.asnumpy(cross_validation_error)})
|
||
|
display(df)
|
||
|
df.plot('lambda', 'Cross validation error', logx=True)
|
||
|
plt.show()
|
||
|
return self._lambda_vals[np.nanargmin(cross_validation_error)], np.min(cross_validation_error)
|
||
|
|
||
|
def compute_misclassification_error(self, x, y):
|
||
|
y_pred = self.predict(x)
|
||
|
return xp.mean(y_pred != y)
|
||
|
|
||
|
def predict(self, x):
|
||
|
x = self._standardize(x)
|
||
|
if self._classification_strategy == 'ovr':
|
||
|
return self._predict_ovr(x)
|
||
|
elif self._classification_strategy == 'ovo':
|
||
|
return self._predict_ovo(x)
|
||
|
else:
|
||
|
return self._predict_binary(x)
|
||
|
|
||
|
def objective_plot(self):
|
||
|
fig, ax = plt.subplots()
|
||
|
ax.plot(np.array(range(len(self._objective_val_per_iter)))*10, xp.asnumpy(self._objective_val_per_iter), label='Train', c='red')
|
||
|
plt.xlabel('Iteration')
|
||
|
plt.ylabel('Objective value')
|
||
|
plt.title('Objective value vs. iteration when lambda=' + str(self._lambduh))
|
||
|
ax.legend(loc='upper right')
|
||
|
plt.show()
|
||
|
|
||
|
def plot_misclassification_error(self):
|
||
|
if self._classification_strategy == 'binary':
|
||
|
fig, ax = plt.subplots()
|
||
|
ax.plot(np.array(range(len(self._misclassification_error_per_iter)))*10, xp.asnumpy(self._misclassification_error_per_iter), label='Train', c='red')
|
||
|
plt.xlabel('Iteration')
|
||
|
plt.ylabel('Misclassification error')
|
||
|
plt.title('Misclassification error vs iteration')
|
||
|
ax.legend(loc='upper right')
|
||
|
plt.show()
|
||
|
else:
|
||
|
print('Plotting misclassification error only available for binary classification.')
|
||
|
|
||
|
def _set_x(self, x):
|
||
|
if x is not None:
|
||
|
self._n = len(x)
|
||
|
self._x_sd = xp.std(x, axis=0)
|
||
|
self._x_mean = xp.mean(x, axis=0)
|
||
|
self._x = self._standardize(x)
|
||
|
|
||
|
def _standardize(self, x):
|
||
|
sd = self._x_sd
|
||
|
mean = self._x_mean
|
||
|
mean[sd == 0] = 0
|
||
|
sd[sd == 0] = 1
|
||
|
return (x - mean) / sd
|
||
|
|
||
|
@staticmethod
|
||
|
def filter_data_by_class_ovo(x, y, classes, prevent_relabel=False):
|
||
|
x = SVM._select_classes(x, y, classes)
|
||
|
y = SVM._select_classes(y, y, classes)
|
||
|
if prevent_relabel == False:
|
||
|
y = xp.where(y == classes[0], 1, -1)
|
||
|
|
||
|
return x, y
|
||
|
|
||
|
@staticmethod
|
||
|
def subset_data(x, y, max_samples):
|
||
|
if max_samples is None or max_samples > len(x):
|
||
|
return x, y
|
||
|
else:
|
||
|
idx = np.random.choice(np.arange(len(x)), max_samples, replace=False)
|
||
|
return x[idx], y[idx]
|
||
|
@staticmethod
|
||
|
def subset_data_gpu(x, y, max_samples):
|
||
|
x, y = subset_data(x, y, max_samples)
|
||
|
return xp.asarray(x), xp.asarray(y)
|
||
|
|
||
|
@staticmethod
|
||
|
def _get_unique_pairs(y):
|
||
|
return pd.Series(list(itertools.combinations(np.unique(xp.asnumpy(y)),2)))
|
||
|
|
||
|
@staticmethod
|
||
|
def _select_classes(x, y, classes):
|
||
|
if len(classes) == 2:
|
||
|
return x[(y == classes[0]) | (y == classes[1])]
|
||
|
else:
|
||
|
return x[xp.asarray(np.isin(xp.asnumpy(y), classes))]
|
||
|
|
||
|
@staticmethod
|
||
|
def _select_classes_ovr(x, y, primary_class):
|
||
|
positive = x[y == primary_class]
|
||
|
negative = x[y != primary_class]
|
||
|
# Get random rows of the same length as the positive matrix
|
||
|
if len(positive) < len(negative):
|
||
|
negative = negative[xp.random.choice(negative.shape[0], len(positive), replace=False)]
|
||
|
return xp.concatenate((positive, negative), axis=0)
|
||
|
|
||
|
@staticmethod
|
||
|
def _filter_data_by_class_ovr(x, y, primary_class):
|
||
|
x = SVM._select_classes_ovr(x, y, primary_class)
|
||
|
y = SVM._select_classes_ovr(y, y, primary_class)
|
||
|
y = xp.where(y == primary_class, 1, -1)
|
||
|
|
||
|
return x, y
|
||
|
|
||
|
def _compute_gradient(self, alpha):
|
||
|
"""Computes the gradient ∇F(β) of F"""
|
||
|
K_alpha = xp.dot(self._K, alpha)
|
||
|
grad = -2 / self._n * xp.sum(self._y[:, xp.newaxis] * self._K * xp.max(xp.stack((xp.zeros_like(self._y), 1 - self._y * K_alpha)), axis=0)[:, xp.newaxis], axis=0) + 2 * self._lambduh * K_alpha
|
||
|
return grad
|
||
|
|
||
|
def _objective(self, alpha):
|
||
|
K_alpha = xp.dot(self._K, alpha)
|
||
|
return 1 / self._n * xp.sum(xp.max(xp.stack((xp.zeros_like(self._y), 1 - self._y * K_alpha)), axis=0) ** 2) + self._lambduh * alpha.dot(K_alpha)
|
||
|
|
||
|
def _backtracking_line_search(self, alpha, eta=1, alphaparam=0.5, betaparam=0.8, max_iter=100):
|
||
|
grad_alpha = self._compute_gradient(alpha)
|
||
|
norm_grad_alpha = xp.linalg.norm(grad_alpha)
|
||
|
found_eta = 0
|
||
|
iter = 0
|
||
|
while found_eta == 0 and iter < max_iter:
|
||
|
if self._objective(alpha - eta * grad_alpha) < self._objective(alpha) - alphaparam * eta * norm_grad_alpha ** 2:
|
||
|
found_eta = 1
|
||
|
elif iter == max_iter:
|
||
|
raise ('Max number of iterations of backtracking line search reached')
|
||
|
else:
|
||
|
eta *= betaparam
|
||
|
iter += 1
|
||
|
return eta
|
||
|
|
||
|
def _compute_gram_matrix(self):
|
||
|
"""Computes, for any set of datapoints x1,...,xn, the kernel matrix K"""
|
||
|
kernel = self._kernel
|
||
|
if kernel == 'rbf':
|
||
|
kernel += '_sklearn'
|
||
|
gram = kernels.kernel_dict[kernel](self._x, self._x, self._kernel_params)
|
||
|
return gram
|
||
|
|
||
|
#
|
||
|
def _kernel_eval(self, x, x_train):
|
||
|
keval = kernels.kernel_dict[self._kernel](x_train, x, self._kernel_params)
|
||
|
return keval
|
||
|
|
||
|
def _fast_gradient_descent(self):
|
||
|
eta_init = self._optimal_eta_init()
|
||
|
|
||
|
alpha = xp.zeros(self._n)
|
||
|
theta = xp.zeros(self._n)
|
||
|
eta = eta_init
|
||
|
grad_theta = self._compute_gradient(theta)
|
||
|
objective_val_size = int(self._max_iter//10) + (1 if self._max_iter % 10 == 0 else 0)
|
||
|
objective_vals = xp.ones(objective_val_size)
|
||
|
misclassification_error_per_iter = xp.ones(objective_val_size)
|
||
|
iter = 0
|
||
|
while iter < self._max_iter:
|
||
|
eta = self._backtracking_line_search(theta, eta=eta)
|
||
|
alpha_new = theta - eta * grad_theta
|
||
|
theta = alpha_new + iter / (iter + 3) * (alpha_new - alpha)
|
||
|
grad_theta = self._compute_gradient(theta)
|
||
|
alpha = alpha_new
|
||
|
iter += 1
|
||
|
if self._display_plots and iter % 10 == 0:
|
||
|
objective_vals[int(iter/10)] = self._objective(alpha)
|
||
|
self._coef_matrix = alpha
|
||
|
misclassification_error_per_iter[int(iter/10)] = self.compute_misclassification_error(self._x, xp.where(self._y > 0, self._primary_class, self._secondary_class))
|
||
|
|
||
|
return alpha, objective_vals, misclassification_error_per_iter
|
||
|
|
||
|
def _predict_binary(self, x):
|
||
|
return self._prediction_binary(self._coef_matrix, x, self._x, self._primary_class, self._secondary_class)
|
||
|
|
||
|
def _predict_ovo(self, x):
|
||
|
def mode(a):
|
||
|
counts = xp.bincount(a.astype(xp.int64))
|
||
|
return xp.argmax(counts)
|
||
|
pairs = SVM._get_unique_pairs(self._y)
|
||
|
predictions = xp.zeros((x.shape[0], len(pairs)))
|
||
|
for i in range(len(pairs)):
|
||
|
pair = pairs[i]
|
||
|
alpha = self._coef_matrix[i]
|
||
|
x_train_filtered, y_train_filtered = SVM.filter_data_by_class_ovo(self._x, self._y, pair)
|
||
|
y_pred = self._prediction_binary(alpha, x, x_train_filtered, pair[0], pair[1])
|
||
|
predictions[:,i] = y_pred
|
||
|
return xp.stack([mode(p) for p in predictions])
|
||
|
|
||
|
def _predict_ovr(self, x):
|
||
|
y_unique = xp.asarray(np.unique(xp.asnumpy(self._y)))
|
||
|
prediction_probabilities = xp.zeros((x.shape[0], len(y_unique)))
|
||
|
for i in range(len(y_unique)):
|
||
|
alpha = self._coef_matrix[i]
|
||
|
x_train_filtered, y_train_filtered = SVM._filter_data_by_class_ovr(self._x, self._y, y_unique[i])
|
||
|
pred_prob = self._prediction_prob(alpha, x, x_train_filtered)
|
||
|
prediction_probabilities[:,i] = pred_prob
|
||
|
return xp.stack([y_unique[xp.argmax(p)] for p in prediction_probabilities])
|
||
|
|
||
|
def _prediction_prob(self, alpha, x, x_train):
|
||
|
return xp.dot(self._kernel_eval(x, x_train).T, alpha)
|
||
|
|
||
|
def _prediction_binary(self, alpha, x, x_train, class1=1, class2=-1):
|
||
|
pred_prob = self._prediction_prob(alpha, x, x_train)
|
||
|
return xp.where(pred_prob > 0, class1, class2)
|
||
|
|
||
|
def _optimal_eta_init(self):
|
||
|
return 1 / scipy.linalg.eigh(xp.asnumpy(2 / self._n * xp.dot(self._K, self._K) + 2 * self._lambduh * self._K), eigvals=(self._n - 1, self._n - 1), eigvals_only=True)[0]
|