import numpy as np
import cupy as xp
import scipy.linalg
import pandas as pd
import matplotlib.pyplot as plt
import kernels
import itertools
class SVM():
"""support vector machine"""
def __init__(self, kernel, kernel_params, lambduh=1, max_iter=1000, classification_strategy='ovr', x=None, y=None, n_folds=3, lambda_vals=None, use_optimal_lambda=False, display_plots=True, logging=False):
"""initialize the classifier"""
self._kernel = kernel
self._kernel_params = kernel_params
self._lambduh = lambduh
self._max_iter = max_iter
self._classification_strategy = classification_strategy
self._y = y
self._coef_matrix = []
self._n_folds = n_folds
self._display_plots = display_plots
self._logging = logging
self._lambda_vals = lambda_vals
if self._lambda_vals is None:
self._lambda_vals = [10**i for i in range(-3, 4)]
self._use_optimal_lambda = use_optimal_lambda
def fit(self, x=None, y=None, prevent_relabel=False, use_optimal_lambda=False):
"""Trains the kernel support vector machine with the huberized hinge loss"""
if y is not None:
self._y = y
self._K = self._compute_gram_matrix()
self._n = len(self._x)
objective_val_size = int(self._max_iter//10) + (1 if self._max_iter % 10 == 0 else 0)
self._objective_val_per_iter = xp.zeros(objective_val_size)
if self._classification_strategy == 'ovr':
iterate_over = xp.asarray(np.unique(xp.asnumpy(self._y)))
elif self._classification_strategy == 'ovo' or self._classification_strategy == 'binary':
iterate_over = SVM._get_unique_pairs(self._y)
if self._use_optimal_lambda or use_optimal_lambda:
self._lambduh, misclassification_error = self.compute_optimal_lambda()
print('Misclassification error (train), {}, optimal lambda = {} : {}'.format(self._classification_strategy, self._lambduh, misclassification_error))
for i in range(len(iterate_over)):
if self._logging:
print('Training classifier {} of {}'.format(i + 1, len(iterate_over)))
if self._classification_strategy == 'ovr':
primary_class = iterate_over[i]
x_filtered, y_filtered = SVM._filter_data_by_class_ovr(self._x, self._y, primary_class)
elif self._classification_strategy == 'ovo':
pair = iterate_over[i]
x_filtered, y_filtered = SVM.filter_data_by_class_ovo(self._x, self._y, pair, prevent_relabel)
elif self._classification_strategy == 'binary':
pair = iterate_over[i]
self._n = len(self._x)
self._K = self._compute_gram_matrix()
if prevent_relabel:
self._primary_class = 1
self._secondary_class = -1
self._primary_class = pair[0]
self._secondary_class = pair[1]
self._y = xp.where(self._y == self._primary_class, 1, -1)
self._coef_matrix, self._objective_val_per_iter, self._misclassification_error_per_iter = self._fast_gradient_descent()
svm = SVM(self._kernel, self._kernel_params, self._lambduh, self._max_iter, 'binary', display_plots=True), y_filtered, prevent_relabel=True)
self._objective_val_per_iter += svm._objective_val_per_iter * (1/len(iterate_over))
if self._display_plots:
def cross_validation_error(self):
error_per_lambda = xp.zeros(len(self._lambda_vals))
for i in range(len(self._lambda_vals)):
lambduh = self._lambda_vals[i]
if self._logging:
print('lambduh = {} ({} of {})'.format(lambduh, i + 1, num_lambda))
error_per_fold = xp.zeros(self._n_folds)
for j in range(self._n_folds):
fold_size = int(self._n/self._n_folds)
indicies = xp.array(range(0, self._n))
fold_indicies = ((indicies >= fold_size*j) & (indicies <= fold_size*(j+1)))
x_train = self._x[fold_indicies == True]
y_train = self._y[fold_indicies == True]
x_test = self._x[fold_indicies == False]
y_test = self._y[fold_indicies == False]
y_train = y_train.reshape((len(y_train), 1))
y_test = y_test.reshape((len(y_test), 1))
y_train = xp.ravel(y_train)
y_test = xp.ravel(y_test)
svm = SVM(self._kernel, self._kernel_params, lambduh, self._max_iter, self._classification_strategy), y_train)
error_per_fold[j] = svm.compute_misclassification_error(x_test, y_test)
error_per_lambda[i] = xp.mean(error_per_fold)
return error_per_lambda.tolist()
def compute_optimal_lambda(self):
cross_validation_error = self.cross_validation_error()
if self._display_plots:
df = pd.DataFrame({'lambda':self._lambda_vals, 'Cross validation error':xp.asnumpy(cross_validation_error)})
df.plot('lambda', 'Cross validation error', logx=True)
return self._lambda_vals[np.nanargmin(cross_validation_error)], np.min(cross_validation_error)
def compute_misclassification_error(self, x, y):
y_pred = self.predict(x)
return xp.mean(y_pred != y)
def predict(self, x):
x = self._standardize(x)
if self._classification_strategy == 'ovr':
return self._predict_ovr(x)
elif self._classification_strategy == 'ovo':
return self._predict_ovo(x)
return self._predict_binary(x)
def objective_plot(self):
fig, ax = plt.subplots()
ax.plot(np.array(range(len(self._objective_val_per_iter)))*10, xp.asnumpy(self._objective_val_per_iter), label='Train', c='red')
plt.ylabel('Objective value')
plt.title('Objective value vs. iteration when lambda=' + str(self._lambduh))
ax.legend(loc='upper right')
def plot_misclassification_error(self):
if self._classification_strategy == 'binary':
fig, ax = plt.subplots()
ax.plot(np.array(range(len(self._misclassification_error_per_iter)))*10, xp.asnumpy(self._misclassification_error_per_iter), label='Train', c='red')
plt.ylabel('Misclassification error')
plt.title('Misclassification error vs iteration')
ax.legend(loc='upper right')
print('Plotting misclassification error only available for binary classification.')
def _set_x(self, x):
if x is not None:
self._n = len(x)
self._x_sd = xp.std(x, axis=0)
self._x_mean = xp.mean(x, axis=0)
self._x = self._standardize(x)
def _standardize(self, x):
sd = self._x_sd
mean = self._x_mean
mean[sd == 0] = 0
sd[sd == 0] = 1
return (x - mean) / sd
def filter_data_by_class_ovo(x, y, classes, prevent_relabel=False):
x = SVM._select_classes(x, y, classes)
y = SVM._select_classes(y, y, classes)
if prevent_relabel == False:
y = xp.where(y == classes[0], 1, -1)
return x, y
def subset_data(x, y, max_samples):
if max_samples is None or max_samples > len(x):
return x, y
idx = np.random.choice(np.arange(len(x)), max_samples, replace=False)
return x[idx], y[idx]
def subset_data_gpu(x, y, max_samples):
x, y = subset_data(x, y, max_samples)
return xp.asarray(x), xp.asarray(y)
def _get_unique_pairs(y):
return pd.Series(list(itertools.combinations(np.unique(xp.asnumpy(y)),2)))
def _select_classes(x, y, classes):
if len(classes) == 2:
return x[(y == classes[0]) | (y == classes[1])]
return x[xp.asarray(np.isin(xp.asnumpy(y), classes))]
def _select_classes_ovr(x, y, primary_class):
positive = x[y == primary_class]
negative = x[y != primary_class]
# Get random rows of the same length as the positive matrix
if len(positive) < len(negative):
negative = negative[xp.random.choice(negative.shape[0], len(positive), replace=False)]
return xp.concatenate((positive, negative), axis=0)
def _filter_data_by_class_ovr(x, y, primary_class):
x = SVM._select_classes_ovr(x, y, primary_class)
y = SVM._select_classes_ovr(y, y, primary_class)
y = xp.where(y == primary_class, 1, -1)
return x, y
def _compute_gradient(self, alpha):
"""Computes the gradient ∇F(β) of F"""
K_alpha =, alpha)
grad = -2 / self._n * xp.sum(self._y[:, xp.newaxis] * self._K * xp.max(xp.stack((xp.zeros_like(self._y), 1 - self._y * K_alpha)), axis=0)[:, xp.newaxis], axis=0) + 2 * self._lambduh * K_alpha
return grad
def _objective(self, alpha):
K_alpha =, alpha)
return 1 / self._n * xp.sum(xp.max(xp.stack((xp.zeros_like(self._y), 1 - self._y * K_alpha)), axis=0) ** 2) + self._lambduh *
def _backtracking_line_search(self, alpha, eta=1, alphaparam=0.5, betaparam=0.8, max_iter=100):
grad_alpha = self._compute_gradient(alpha)
norm_grad_alpha = xp.linalg.norm(grad_alpha)
found_eta = 0
iter = 0
while found_eta == 0 and iter < max_iter:
if self._objective(alpha - eta * grad_alpha) < self._objective(alpha) - alphaparam * eta * norm_grad_alpha ** 2:
found_eta = 1
elif iter == max_iter:
raise ('Max number of iterations of backtracking line search reached')
eta *= betaparam
iter += 1
return eta
def _compute_gram_matrix(self):
"""Computes, for any set of datapoints x1,...,xn, the kernel matrix K"""
kernel = self._kernel
if kernel == 'rbf':
kernel += '_sklearn'
gram = kernels.kernel_dict[kernel](self._x, self._x, self._kernel_params)
return gram
def _kernel_eval(self, x, x_train):
keval = kernels.kernel_dict[self._kernel](x_train, x, self._kernel_params)
return keval
def _fast_gradient_descent(self):
eta_init = self._optimal_eta_init()
alpha = xp.zeros(self._n)
theta = xp.zeros(self._n)
eta = eta_init
grad_theta = self._compute_gradient(theta)
objective_val_size = int(self._max_iter//10) + (1 if self._max_iter % 10 == 0 else 0)
objective_vals = xp.ones(objective_val_size)
misclassification_error_per_iter = xp.ones(objective_val_size)
iter = 0
while iter < self._max_iter:
eta = self._backtracking_line_search(theta, eta=eta)
alpha_new = theta - eta * grad_theta
theta = alpha_new + iter / (iter + 3) * (alpha_new - alpha)
grad_theta = self._compute_gradient(theta)
alpha = alpha_new
iter += 1
if self._display_plots and iter % 10 == 0:
objective_vals[int(iter/10)] = self._objective(alpha)
self._coef_matrix = alpha
misclassification_error_per_iter[int(iter/10)] = self.compute_misclassification_error(self._x, xp.where(self._y > 0, self._primary_class, self._secondary_class))
return alpha, objective_vals, misclassification_error_per_iter
def _predict_binary(self, x):
return self._prediction_binary(self._coef_matrix, x, self._x, self._primary_class, self._secondary_class)
def _predict_ovo(self, x):
def mode(a):
counts = xp.bincount(a.astype(xp.int64))
return xp.argmax(counts)
pairs = SVM._get_unique_pairs(self._y)
predictions = xp.zeros((x.shape[0], len(pairs)))
for i in range(len(pairs)):
pair = pairs[i]
alpha = self._coef_matrix[i]
x_train_filtered, y_train_filtered = SVM.filter_data_by_class_ovo(self._x, self._y, pair)
y_pred = self._prediction_binary(alpha, x, x_train_filtered, pair[0], pair[1])
predictions[:,i] = y_pred
return xp.stack([mode(p) for p in predictions])
def _predict_ovr(self, x):
y_unique = xp.asarray(np.unique(xp.asnumpy(self._y)))
prediction_probabilities = xp.zeros((x.shape[0], len(y_unique)))
for i in range(len(y_unique)):
alpha = self._coef_matrix[i]
x_train_filtered, y_train_filtered = SVM._filter_data_by_class_ovr(self._x, self._y, y_unique[i])
pred_prob = self._prediction_prob(alpha, x, x_train_filtered)
prediction_probabilities[:,i] = pred_prob
return xp.stack([y_unique[xp.argmax(p)] for p in prediction_probabilities])
def _prediction_prob(self, alpha, x, x_train):
return, x_train).T, alpha)
def _prediction_binary(self, alpha, x, x_train, class1=1, class2=-1):
pred_prob = self._prediction_prob(alpha, x, x_train)
return xp.where(pred_prob > 0, class1, class2)
def _optimal_eta_init(self):
return 1 / scipy.linalg.eigh(xp.asnumpy(2 / self._n *, self._K) + 2 * self._lambduh * self._K), eigvals=(self._n - 1, self._n - 1), eigvals_only=True)[0]