import numpy as np
from sklearn import preprocessing
from numpy import random
from scipy.special import softmax, expit
from copy import deepcopy
from sklearn.metrics import log_loss, accuracy_score, mean_squared_error
from sklearn.base import BaseEstimator, ClassifierMixin, RegressorMixin
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import Ridge, RidgeClassifier
from sklearn.preprocessing import normalize
from sklearn.utils.validation import check_X_y, check_array, check_is_fitted
from sklearn.utils import column_or_1d
from sklearn.utils.estimator_checks import check_estimator
import joblib
MAX_RESPONSE = 4
MACHINE_EPSILON = np.finfo(np.float64).eps
class NNLinear:
def __init__(self, W, bias):
self.W = W
self.bias = bias
def decision_function(self, X):
return X @ self.W + self.bias.reshape((1, -1))
def predict(self, X):
return X @ self.W + self.bias.reshape((1, -1))
class myEncoder:
def __init__(self):
self.labelEncoder = preprocessing.LabelBinarizer(pos_label=1, neg_label=0)
def fit(self, y):
self.labelEncoder.fit(y)
self.n_classes_ = len(self.labelEncoder.classes_)
def transform(self, y):
if self.n_classes_ == 2:
y_ = self.labelEncoder.transform(y)
return np.c_[1 - y_, y_]
else:
return self.labelEncoder.transform(y)
class node_generator:
def __init__(self, active_function='relu', n_nodes_H=10):
self.n_nodes_H = n_nodes_H
self.active_function = active_function
@staticmethod
def _sigmoid(data):
return 1.0 / (1 + np.exp(-data))
@staticmethod
def _linear(data):
return data
@staticmethod
def _tanh(data):
return (np.exp(data) - np.exp(-data)) / (np.exp(data) + np.exp(-data))
@staticmethod
def _relu(data):
return np.maximum(data, 0)
@staticmethod
def _generator(fea_dim, node_size):
W = 2 * random.random(size=(fea_dim, node_size)) - 1
b = 2 * random.random() - 1
return W, b
def _generate_h(self, X, sample_weight=None):
self.nonlinear_ = {
'linear': self._linear,
'sigmoid': self._sigmoid,
'tanh': self._tanh,
'relu': self._relu
}[self.active_function]
fea_dim = X.shape[1]
W_, b_ = self._generator(fea_dim, self.n_nodes_H)
data_ = np.dot(X, W_) + b_
scaler_ = StandardScaler()
scaler_.fit(data_, sample_weight=sample_weight)
try:
self.W_.append(W_)
self.b_.append(b_)
self.scaler_.append(scaler_)
except:
self.W_ = []
self.b_ = []
self.scaler_ = []
self.W_.append(W_)
self.b_.append(b_)
self.scaler_.append(scaler_)
return self.nonlinear_(scaler_.transform(data_))
def _transform_iter(self, X, i=None):
return self.nonlinear_(self.scaler_[i].transform(X.dot(self.W_[i]) + self.b_[i]))
class BF(BaseEstimator, node_generator):
def __init__(self, active_function='relu', n_nodes_H=10):
node_generator.__init__(self, active_function, n_nodes_H)
def _output2prob(self, output):
if self.n_classes_ == 1:
return expit(output)
else:
return softmax(output, axis=1)
def save_model(self, file):
"""
Parameters
----------
file: str
Controls the filename.
"""
check_is_fitted(self, ['estimators_'])
joblib.dump(self, filename=file)
@staticmethod
def _MixUp(X, y):
l = np.random.beta(0.75, 0.75)
l = max(l, 1 - l)
random_index = np.random.choice(range(len(X)), len(X), replace=False)
X_mix = l * X + (1 - l) * X[random_index]
z_mix = l * y + (1 - l) * y[random_index]
return X_mix, z_mix
@staticmethod
def _down_sample(index, weight, bs):
if weight.sum() == 0:
return random.choice(index, bs, replace=True)
else:
return random.choice(index, bs, replace=True, p=weight / weight.sum())
[docs]class BFClassifier(ClassifierMixin, BF):
"""
TrBF classifier. Construct a TrBF model to fine-tune the initial model.
Parameters
----------
max_iterations: int, default=10
Controls the number of boosting iterations.
active_function: {str, ('relu', 'tanh', 'sigmoid' or 'linear')}, default='relu'
Controls the active function of enhancement nodes.
n_nodes_H: int, default=100
Controls the number of enhancement nodes.
reg_alpha: float, default=0.001
Regularization strength; must be a positive float. Regularization improves the conditioning of the problem and reduces the variance of the estimates. Larger values specify stronger regularization.
verbose: bool, default=False
Controls wether to show the boosting process.
boosting_model: str, default='ridge'
Controls the base learner used in boosting.
batch_size: int, default=256
Controls the batch size.
learning_rate: float, default=0.05
Controls the learning rate.
initLearner: obj, default=None
Controls the initial model.
random_state: int, default=0
Controls the randomness of the estimator.
"""
def __init__(self, max_iterations=10, active_function='relu', n_nodes_H=100, reg_alpha=0.001, verbose=False, boosting_model='ridge',
batch_size=256, learning_rate=0.05, initLearner=None, random_state=0):
BF.__init__(self, active_function, n_nodes_H)
self.reg_alpha = reg_alpha
self.initLearner = initLearner
self.batch_size = batch_size
self.max_iterations = max_iterations
self.learning_rate = learning_rate
self.boosting_model = boosting_model
self.verbose = verbose
self.random_state = random_state
def _predict_score(self, node_model, X):
new_scores = [e.predict(X) for e in node_model]
new_scores = np.asarray(new_scores).T
new_scores = np.clip(new_scores, a_min=-MAX_RESPONSE, a_max=MAX_RESPONSE)
if self.n_classes_ > 1:
new_scores -= new_scores.mean(keepdims=True, axis=1)
new_scores *= (self.n_classes_ - 1) / self.n_classes_
new_scores = normalize(new_scores)
return new_scores * self.learning_rate
@staticmethod
def _weight_and_response(y, prob):
z = np.where(y, 1. / (prob + MACHINE_EPSILON), -1. / (1. - prob + MACHINE_EPSILON))
z = np.clip(z, a_min=-MAX_RESPONSE, a_max=MAX_RESPONSE)
sample_weight = (y - prob) / z
sample_weight = np.maximum(sample_weight, 2. * MACHINE_EPSILON)
return sample_weight, z
def _get_init_output(self, X):
if self.initLearner is not None:
initOutput = self.initLearner.decision_function(X)
if len(initOutput.shape) == 1:
initOutput = np.c_[-initOutput, initOutput]
initOutput -= initOutput.mean(keepdims=True, axis=1)
initOutput = normalize(initOutput)
else:
initOutput = np.zeros((len(X), self.n_classes_))
return initOutput
def _get_balance_index(self, y, weight=None, bs=None):
if weight is None:
weight = np.ones(len(y))
# balance index
balance_index = []
index_positive = np.where(y == 1)[0]
index_negative = np.where(y == 0)[0]
if len(index_positive) > 0:
balance_index.extend(self._down_sample(index_positive, weight[index_positive], bs))
if len(index_negative) > 0:
balance_index.extend(self._down_sample(index_negative, weight[index_negative], bs))
return balance_index
def _decision_function(self, X, iter=None):
if iter is None:
iter = self.max_iterations
output = self._get_init_output(X)
for i in range(iter):
output += self._predict_score(self.estimators_[i], self._transform_iter(X, i))
return output
[docs] def predict(self, X, iter=None):
"""
Predict class labels for samples in X.
Parameters
----------
X : array_like or sparse matrix, shape (n_samples, n_features)
Samples.
iter: int
Total number of iterations used in the prediction.
Returns
-------
C : array, shape [n_samples]
Predicted class label per sample.
"""
check_is_fitted(self, ['estimators_'])
X = check_array(X)
y_pred = self._decision_function(X, iter)
scores = y_pred.reshape(len(X), -1)
scores = scores.ravel() if scores.shape[1] == 1 else scores
if len(scores.shape) == 1:
indices = (scores > 0).astype(int)
else:
indices = scores.argmax(axis=1)
return self.classes_[indices]
[docs] def predict_proba(self, X, iter=None):
"""
Predict class probabilities for samples in X.
Parameters
----------
X : array_like or sparse matrix, shape (n_samples, n_features)
Samples.
iter: int
Total number of iterations used in the prediction.
Returns
-------
p : array, shape [n_samples, n_classes]
Predicted class probability per sample.
"""
check_is_fitted(self, ['estimators_'])
X = check_array(X)
y_pred = self._decision_function(X, iter)
scores = y_pred.reshape(len(X), -1)
scores = scores.ravel() if scores.shape[1] == 1 else scores
if self.n_classes_ == 1:
prob = expit(scores)
else:
prob = softmax(scores, axis=1)
return prob
@property
def classes_(self):
"""
Classes labels
"""
return self.labelEncoder_.labelEncoder.classes_
[docs] def fit(self, X=None, y=None, eval_data=None):
"""
Build a TrBF model.
Parameters
----------
X : {ndarray, sparse matrix} of shape (n_samples, n_features) or dict
Training data.
y : ndarray of shape (n_samples,)
Target values.
eval_data : tuple (X_test, y_test)
tuple to use for watching the boosting process.
Returns
-------
self : object
Instance of the estimator.
"""
np.random.seed(self.random_state)
self.labelEncoder_ = myEncoder()
self.labelEncoder_.fit(y)
Y = self.labelEncoder_.transform(y)
if not self.labelEncoder_.labelEncoder.y_type_.startswith('multilabel'):
_ = column_or_1d(y, warn=True)
else:
raise ValueError(
"%s doesn't support multi-label classification" % (
self.__class__.__name__))
X, Y = check_X_y(X, Y, dtype=[np.float64, np.float32], multi_output=True, y_numeric=True)
self.n_classes_ = len(self.classes_)
model = Ridge(alpha=self.reg_alpha)
output_total = self._get_init_output(X)
prob = self._output2prob(output_total)
if self.verbose:
if Y is not None:
print('Init Loss: {}, ACC: {}'.format(log_loss(Y, prob), accuracy_score(Y.argmax(axis=1), prob.argmax(axis=1))))
H = self._generate_h(X)
if eval_data is not None:
eval_X, eval_y = eval_data
eval_output = self._get_init_output(eval_X)
if self.verbose:
print('Init Test ACC: {}'.format(accuracy_score(eval_y, eval_output.argmax(axis=1))))
else:
eval_X, eval_y, eval_output = None, None, None
self.estimators_ = []
for i in range(self.max_iterations):
new_estimators_ = []
for j in range(self.n_classes_):
model_copy = deepcopy(model)
w, z = self._weight_and_response(Y[:, j], prob[:, j])
batch_index = self._get_balance_index(Y[:, j], weight=w, bs=int(self.batch_size / 2.0))
X_batch, z_batch = H[batch_index], z[batch_index]
model_copy.fit(X_batch, z_batch)
new_estimators_.append(model_copy)
self.estimators_.append(new_estimators_)
new_scores = self._predict_score(new_estimators_, H)
output_total += new_scores
prob = self._output2prob(output_total)
if self.verbose:
print('Iteration: {} Loss: {}, ACC: {}'.format(i + 1, log_loss(Y, prob),
accuracy_score(Y.argmax(axis=1), prob.argmax(axis=1))))
if eval_data is not None:
eval_H = self._transform_iter(eval_X, i)
eval_output += self._predict_score(new_estimators_, eval_H)
if self.verbose:
print('Iteration: {} Test ACC: {}'.format(i + 1, accuracy_score(eval_y, eval_output.argmax(axis=1))))
if i < self.max_iterations - 1:
H = self._generate_h(X)
return self
[docs]class BFRegressor(RegressorMixin, BF):
"""
TrBF regressor. Construct a TrBF model to fine-tune the initial model.
Parameters
----------
max_iterations: int, default=10
Controls the number of boosting iterations.
active_function: {str, ('relu', 'tanh', 'sigmoid' or 'linear')}, default='relu'
Controls the active function of enhancement nodes.
n_nodes_H: int, default=100
Controls the number of enhancement nodes.
reg_alpha: float, default=0.001
Regularization strength; must be a positive float. Regularization improves the conditioning of the problem and reduces the variance of the estimates. Larger values specify stronger regularization.
verbose: bool, default=False
Controls wether to show the boosting process.
boosting_model: str, default='ridge'
Controls the base learner used in boosting.
batch_size: int, default=256
Controls the batch size.
learning_rate: float, default=0.05
Controls the learning rate.
initLearner: obj, default=None
Controls the initial model.
random_state: int, default=0
Controls the randomness of the estimator.
"""
def __init__(self, max_iterations=10, active_function='relu', n_nodes_H=100, reg_alpha=0.001, verbose=False, boosting_model='ridge',
batch_size=256, learning_rate=0.05, initLearner=None, random_state=0):
BF.__init__(self, active_function, n_nodes_H)
self.reg_alpha = reg_alpha
self.initLearner = initLearner
self.batch_size = batch_size
self.max_iterations = max_iterations
self.learning_rate = learning_rate
self.boosting_model = boosting_model
self.verbose = verbose
self.random_state = random_state
def _predict_score(self, node_model, X):
new_scores = node_model.predict(X)
new_scores = np.clip(new_scores, a_min=-MAX_RESPONSE, a_max=MAX_RESPONSE)
return new_scores * self.learning_rate
@staticmethod
def _weight_and_response(y, output):
z = y - output
sample_weight = np.ones(z.shape)
z = np.clip(z, a_min=-MAX_RESPONSE, a_max=MAX_RESPONSE)
return sample_weight, z
def _get_init_output(self, X):
if self.initLearner is not None:
initOutput = self.initLearner.predict(X)
initOutput = initOutput.reshape(len(X))
else:
initOutput = np.zeros(len(X))
return initOutput
def _decision_function(self, X, iter=None):
if iter is None:
iter = self.max_iterations
output = self._get_init_output(X)
for i in range(iter):
output += self._predict_score(self.estimators_[i], self._transform_iter(X, i))
return output
[docs] def predict(self, X, iter=None):
"""
Return the predicted value for each sample.
Parameters
----------
X : array_like or sparse matrix, shape (n_samples, n_features)
Samples.
iter: int
Total number of iterations used in the prediction.
Returns
-------
C : array, shape (n_samples,)
Returns predicted values.
"""
check_is_fitted(self, ['estimators_'])
X = check_array(X)
y_pred = self._decision_function(X, iter)
y_pred = y_pred.reshape(len(X))
return y_pred
[docs] def fit(self, X=None, y=None, eval_data=None):
"""
Build a TrBF model.
Parameters
----------
X : {ndarray, sparse matrix} of shape (n_samples, n_features) or dict
Training data.
y : ndarray of shape (n_samples,)
Target values.
eval_data : tuple (X_test, y_test)
tuple to use for watching the boosting process.
Returns
-------
self : object
Instance of the estimator.
"""
np.random.seed(self.random_state)
y = column_or_1d(y, warn=True)
X, y = check_X_y(X, y, dtype=[np.float64, np.float32], multi_output=True, y_numeric=True)
model = Ridge(alpha=self.reg_alpha)
output = self._get_init_output(X)
if self.verbose:
print('Init MSE Loss: '.format(mean_squared_error(y, output)))
H = self._generate_h(X)
if eval_data is not None:
eval_X, eval_y = eval_data
eval_output = self._get_init_output(eval_X)
if self.verbose:
print('Init Test MSE Loss: {}'.format(mean_squared_error(eval_y, eval_output)))
else:
eval_X, eval_y, eval_output = None, None, None
self.estimators_ = []
for i in range(self.max_iterations):
new_estimators_ = deepcopy(model)
w, z = self._weight_and_response(y, output)
batch_index = random.choice(len(w), self.batch_size, replace=True, p=w / w.sum())
X_batch, z_batch = H[batch_index], z[batch_index]
new_estimators_.fit(X_batch, z_batch)
self.estimators_.append(new_estimators_)
new_scores = self._predict_score(new_estimators_, H)
output += new_scores
if self.verbose:
print('Iteration: {} MSE Loss: {}'.format(i + 1, mean_squared_error(y, output)))
if eval_data is not None:
eval_output += self._predict_score(new_estimators_, self._transform_iter(eval_X, i))
if self.verbose:
print('Iteration: {} Test MSE Loss: {}'.format(i + 1, mean_squared_error(eval_y, eval_output)))
if i < self.max_iterations - 1:
H = self._generate_h(X)
return self
if __name__ == "__main__":
check_estimator(BFClassifier())
check_estimator(BFRegressor())