You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
321 lines
14 KiB
321 lines
14 KiB
import numpy as np |
|
import cupy as xp |
|
import scipy.linalg |
|
import pandas as pd |
|
import matplotlib.pyplot as plt |
|
import kernels |
|
import itertools |
|
|
|
|
|
class SVM(): |
|
"""support vector machine""" |
|
|
|
def __init__(self, kernel, kernel_params, lambduh=1, max_iter=1000, classification_strategy='ovr', x=None, y=None, n_folds=3, lambda_vals=None, use_optimal_lambda=False, display_plots=True, logging=False): |
|
"""initialize the classifier""" |
|
|
|
self._kernel = kernel |
|
self._kernel_params = kernel_params |
|
self._lambduh = lambduh |
|
self._max_iter = max_iter |
|
self._classification_strategy = classification_strategy |
|
self._y = y |
|
self._set_x(x) |
|
|
|
self._coef_matrix = [] |
|
|
|
self._n_folds = n_folds |
|
self._display_plots = display_plots |
|
self._logging = logging |
|
self._lambda_vals = lambda_vals |
|
if self._lambda_vals is None: |
|
self._lambda_vals = [10**i for i in range(-3, 4)] |
|
self._use_optimal_lambda = use_optimal_lambda |
|
|
|
def fit(self, x=None, y=None, prevent_relabel=False, use_optimal_lambda=False): |
|
"""Trains the kernel support vector machine with the huberized hinge loss""" |
|
|
|
self._set_x(x) |
|
if y is not None: |
|
self._y = y |
|
self._K = self._compute_gram_matrix() |
|
|
|
self._n = len(self._x) |
|
objective_val_size = int(self._max_iter//10) + (1 if self._max_iter % 10 == 0 else 0) |
|
self._objective_val_per_iter = xp.zeros(objective_val_size) |
|
|
|
if self._classification_strategy == 'ovr': |
|
iterate_over = xp.asarray(np.unique(xp.asnumpy(self._y))) |
|
elif self._classification_strategy == 'ovo' or self._classification_strategy == 'binary': |
|
iterate_over = SVM._get_unique_pairs(self._y) |
|
|
|
if self._use_optimal_lambda or use_optimal_lambda: |
|
self._lambduh, misclassification_error = self.compute_optimal_lambda() |
|
print('Misclassification error (train), {}, optimal lambda = {} : {}'.format(self._classification_strategy, self._lambduh, misclassification_error)) |
|
|
|
for i in range(len(iterate_over)): |
|
if self._logging: |
|
print('Training classifier {} of {}'.format(i + 1, len(iterate_over))) |
|
|
|
if self._classification_strategy == 'ovr': |
|
primary_class = iterate_over[i] |
|
x_filtered, y_filtered = SVM._filter_data_by_class_ovr(self._x, self._y, primary_class) |
|
elif self._classification_strategy == 'ovo': |
|
pair = iterate_over[i] |
|
x_filtered, y_filtered = SVM.filter_data_by_class_ovo(self._x, self._y, pair, prevent_relabel) |
|
elif self._classification_strategy == 'binary': |
|
pair = iterate_over[i] |
|
self._n = len(self._x) |
|
self._K = self._compute_gram_matrix() |
|
if prevent_relabel: |
|
self._primary_class = 1 |
|
self._secondary_class = -1 |
|
else: |
|
self._primary_class = pair[0] |
|
self._secondary_class = pair[1] |
|
self._y = xp.where(self._y == self._primary_class, 1, -1) |
|
self._coef_matrix, self._objective_val_per_iter, self._misclassification_error_per_iter = self._fast_gradient_descent() |
|
return |
|
|
|
svm = SVM(self._kernel, self._kernel_params, self._lambduh, self._max_iter, 'binary', display_plots=True) |
|
svm.fit(x_filtered, y_filtered, prevent_relabel=True) |
|
self._coef_matrix.append(svm._coef_matrix) |
|
self._objective_val_per_iter += svm._objective_val_per_iter * (1/len(iterate_over)) |
|
if self._display_plots: |
|
self.objective_plot() |
|
return |
|
|
|
def cross_validation_error(self): |
|
error_per_lambda = xp.zeros(len(self._lambda_vals)) |
|
|
|
for i in range(len(self._lambda_vals)): |
|
lambduh = self._lambda_vals[i] |
|
if self._logging: |
|
print('lambduh = {} ({} of {})'.format(lambduh, i + 1, num_lambda)) |
|
error_per_fold = xp.zeros(self._n_folds) |
|
for j in range(self._n_folds): |
|
fold_size = int(self._n/self._n_folds) |
|
indicies = xp.array(range(0, self._n)) |
|
fold_indicies = ((indicies >= fold_size*j) & (indicies <= fold_size*(j+1))) |
|
x_train = self._x[fold_indicies == True] |
|
y_train = self._y[fold_indicies == True] |
|
x_test = self._x[fold_indicies == False] |
|
y_test = self._y[fold_indicies == False] |
|
y_train = y_train.reshape((len(y_train), 1)) |
|
y_test = y_test.reshape((len(y_test), 1)) |
|
|
|
y_train = xp.ravel(y_train) |
|
y_test = xp.ravel(y_test) |
|
|
|
svm = SVM(self._kernel, self._kernel_params, lambduh, self._max_iter, self._classification_strategy) |
|
svm.fit(x_train, y_train) |
|
error_per_fold[j] = svm.compute_misclassification_error(x_test, y_test) |
|
|
|
error_per_lambda[i] = xp.mean(error_per_fold) |
|
|
|
return error_per_lambda.tolist() |
|
|
|
def compute_optimal_lambda(self): |
|
cross_validation_error = self.cross_validation_error() |
|
if self._display_plots: |
|
df = pd.DataFrame({'lambda':self._lambda_vals, 'Cross validation error':xp.asnumpy(cross_validation_error)}) |
|
display(df) |
|
df.plot('lambda', 'Cross validation error', logx=True) |
|
plt.show() |
|
return self._lambda_vals[np.nanargmin(cross_validation_error)], np.min(cross_validation_error) |
|
|
|
def compute_misclassification_error(self, x, y): |
|
y_pred = self.predict(x) |
|
return xp.mean(y_pred != y) |
|
|
|
def predict(self, x): |
|
x = self._standardize(x) |
|
if self._classification_strategy == 'ovr': |
|
return self._predict_ovr(x) |
|
elif self._classification_strategy == 'ovo': |
|
return self._predict_ovo(x) |
|
else: |
|
return self._predict_binary(x) |
|
|
|
def objective_plot(self): |
|
fig, ax = plt.subplots() |
|
ax.plot(np.array(range(len(self._objective_val_per_iter)))*10, xp.asnumpy(self._objective_val_per_iter), label='Train', c='red') |
|
plt.xlabel('Iteration') |
|
plt.ylabel('Objective value') |
|
plt.title('Objective value vs. iteration when lambda=' + str(self._lambduh)) |
|
ax.legend(loc='upper right') |
|
plt.show() |
|
|
|
def plot_misclassification_error(self): |
|
if self._classification_strategy == 'binary': |
|
fig, ax = plt.subplots() |
|
ax.plot(np.array(range(len(self._misclassification_error_per_iter)))*10, xp.asnumpy(self._misclassification_error_per_iter), label='Train', c='red') |
|
plt.xlabel('Iteration') |
|
plt.ylabel('Misclassification error') |
|
plt.title('Misclassification error vs iteration') |
|
ax.legend(loc='upper right') |
|
plt.show() |
|
else: |
|
print('Plotting misclassification error only available for binary classification.') |
|
|
|
def _set_x(self, x): |
|
if x is not None: |
|
self._n = len(x) |
|
self._x_sd = xp.std(x, axis=0) |
|
self._x_mean = xp.mean(x, axis=0) |
|
self._x = self._standardize(x) |
|
|
|
def _standardize(self, x): |
|
sd = self._x_sd |
|
mean = self._x_mean |
|
mean[sd == 0] = 0 |
|
sd[sd == 0] = 1 |
|
return (x - mean) / sd |
|
|
|
@staticmethod |
|
def filter_data_by_class_ovo(x, y, classes, prevent_relabel=False): |
|
x = SVM._select_classes(x, y, classes) |
|
y = SVM._select_classes(y, y, classes) |
|
if prevent_relabel == False: |
|
y = xp.where(y == classes[0], 1, -1) |
|
|
|
return x, y |
|
|
|
@staticmethod |
|
def subset_data(x, y, max_samples): |
|
if max_samples is None or max_samples > len(x): |
|
return x, y |
|
else: |
|
idx = np.random.choice(np.arange(len(x)), max_samples, replace=False) |
|
return x[idx], y[idx] |
|
@staticmethod |
|
def subset_data_gpu(x, y, max_samples): |
|
x, y = subset_data(x, y, max_samples) |
|
return xp.asarray(x), xp.asarray(y) |
|
|
|
@staticmethod |
|
def _get_unique_pairs(y): |
|
return pd.Series(list(itertools.combinations(np.unique(xp.asnumpy(y)),2))) |
|
|
|
@staticmethod |
|
def _select_classes(x, y, classes): |
|
if len(classes) == 2: |
|
return x[(y == classes[0]) | (y == classes[1])] |
|
else: |
|
return x[xp.asarray(np.isin(xp.asnumpy(y), classes))] |
|
|
|
@staticmethod |
|
def _select_classes_ovr(x, y, primary_class): |
|
positive = x[y == primary_class] |
|
negative = x[y != primary_class] |
|
# Get random rows of the same length as the positive matrix |
|
if len(positive) < len(negative): |
|
negative = negative[xp.random.choice(negative.shape[0], len(positive), replace=False)] |
|
return xp.concatenate((positive, negative), axis=0) |
|
|
|
@staticmethod |
|
def _filter_data_by_class_ovr(x, y, primary_class): |
|
x = SVM._select_classes_ovr(x, y, primary_class) |
|
y = SVM._select_classes_ovr(y, y, primary_class) |
|
y = xp.where(y == primary_class, 1, -1) |
|
|
|
return x, y |
|
|
|
def _compute_gradient(self, alpha): |
|
"""Computes the gradient ∇F(β) of F""" |
|
K_alpha = xp.dot(self._K, alpha) |
|
grad = -2 / self._n * xp.sum(self._y[:, xp.newaxis] * self._K * xp.max(xp.stack((xp.zeros_like(self._y), 1 - self._y * K_alpha)), axis=0)[:, xp.newaxis], axis=0) + 2 * self._lambduh * K_alpha |
|
return grad |
|
|
|
def _objective(self, alpha): |
|
K_alpha = xp.dot(self._K, alpha) |
|
return 1 / self._n * xp.sum(xp.max(xp.stack((xp.zeros_like(self._y), 1 - self._y * K_alpha)), axis=0) ** 2) + self._lambduh * alpha.dot(K_alpha) |
|
|
|
def _backtracking_line_search(self, alpha, eta=1, alphaparam=0.5, betaparam=0.8, max_iter=100): |
|
grad_alpha = self._compute_gradient(alpha) |
|
norm_grad_alpha = xp.linalg.norm(grad_alpha) |
|
found_eta = 0 |
|
iter = 0 |
|
while found_eta == 0 and iter < max_iter: |
|
if self._objective(alpha - eta * grad_alpha) < self._objective(alpha) - alphaparam * eta * norm_grad_alpha ** 2: |
|
found_eta = 1 |
|
elif iter == max_iter: |
|
raise ('Max number of iterations of backtracking line search reached') |
|
else: |
|
eta *= betaparam |
|
iter += 1 |
|
return eta |
|
|
|
def _compute_gram_matrix(self): |
|
"""Computes, for any set of datapoints x1,...,xn, the kernel matrix K""" |
|
kernel = self._kernel |
|
if kernel == 'rbf': |
|
kernel += '_sklearn' |
|
gram = kernels.kernel_dict[kernel](self._x, self._x, self._kernel_params) |
|
return gram |
|
|
|
# |
|
def _kernel_eval(self, x, x_train): |
|
keval = kernels.kernel_dict[self._kernel](x_train, x, self._kernel_params) |
|
return keval |
|
|
|
def _fast_gradient_descent(self): |
|
eta_init = self._optimal_eta_init() |
|
|
|
alpha = xp.zeros(self._n) |
|
theta = xp.zeros(self._n) |
|
eta = eta_init |
|
grad_theta = self._compute_gradient(theta) |
|
objective_val_size = int(self._max_iter//10) + (1 if self._max_iter % 10 == 0 else 0) |
|
objective_vals = xp.ones(objective_val_size) |
|
misclassification_error_per_iter = xp.ones(objective_val_size) |
|
iter = 0 |
|
while iter < self._max_iter: |
|
eta = self._backtracking_line_search(theta, eta=eta) |
|
alpha_new = theta - eta * grad_theta |
|
theta = alpha_new + iter / (iter + 3) * (alpha_new - alpha) |
|
grad_theta = self._compute_gradient(theta) |
|
alpha = alpha_new |
|
iter += 1 |
|
if self._display_plots and iter % 10 == 0: |
|
objective_vals[int(iter/10)] = self._objective(alpha) |
|
self._coef_matrix = alpha |
|
misclassification_error_per_iter[int(iter/10)] = self.compute_misclassification_error(self._x, xp.where(self._y > 0, self._primary_class, self._secondary_class)) |
|
|
|
return alpha, objective_vals, misclassification_error_per_iter |
|
|
|
def _predict_binary(self, x): |
|
return self._prediction_binary(self._coef_matrix, x, self._x, self._primary_class, self._secondary_class) |
|
|
|
def _predict_ovo(self, x): |
|
def mode(a): |
|
counts = xp.bincount(a.astype(xp.int64)) |
|
return xp.argmax(counts) |
|
pairs = SVM._get_unique_pairs(self._y) |
|
predictions = xp.zeros((x.shape[0], len(pairs))) |
|
for i in range(len(pairs)): |
|
pair = pairs[i] |
|
alpha = self._coef_matrix[i] |
|
x_train_filtered, y_train_filtered = SVM.filter_data_by_class_ovo(self._x, self._y, pair) |
|
y_pred = self._prediction_binary(alpha, x, x_train_filtered, pair[0], pair[1]) |
|
predictions[:,i] = y_pred |
|
return xp.stack([mode(p) for p in predictions]) |
|
|
|
def _predict_ovr(self, x): |
|
y_unique = xp.asarray(np.unique(xp.asnumpy(self._y))) |
|
prediction_probabilities = xp.zeros((x.shape[0], len(y_unique))) |
|
for i in range(len(y_unique)): |
|
alpha = self._coef_matrix[i] |
|
x_train_filtered, y_train_filtered = SVM._filter_data_by_class_ovr(self._x, self._y, y_unique[i]) |
|
pred_prob = self._prediction_prob(alpha, x, x_train_filtered) |
|
prediction_probabilities[:,i] = pred_prob |
|
return xp.stack([y_unique[xp.argmax(p)] for p in prediction_probabilities]) |
|
|
|
def _prediction_prob(self, alpha, x, x_train): |
|
return xp.dot(self._kernel_eval(x, x_train).T, alpha) |
|
|
|
def _prediction_binary(self, alpha, x, x_train, class1=1, class2=-1): |
|
pred_prob = self._prediction_prob(alpha, x, x_train) |
|
return xp.where(pred_prob > 0, class1, class2) |
|
|
|
def _optimal_eta_init(self): |
|
return 1 / scipy.linalg.eigh(xp.asnumpy(2 / self._n * xp.dot(self._K, self._K) + 2 * self._lambduh * self._K), eigvals=(self._n - 1, self._n - 1), eigvals_only=True)[0] |