Source code for stateinterpreter.ml

import numpy as np
import pandas as pd
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.multiclass import OneVsRestClassifier
from scipy.sparse import csr_matrix
from scipy.spatial.distance import squareform
from group_lasso import LogisticGroupLasso
from tqdm import tqdm
import warnings
from .utils.plot import plot_regularization_path, plot_classifier_complexity_vs_accuracy
from ._configs import *

__all__ = ["Classifier","prepare_training_dataset"]

[docs]def prepare_training_dataset(descriptors, states_labels, n_configs, regex_filter = None, states_subset=None): """Sample points from trajectory Args: n_configs (int): number of points to sample for each metastable state regex_filter (str, optional): regex to filter the features. Defaults to '.*'. states_subset (list, optional): list of integers corresponding to the metastable states to sample. Defaults to None take all states. states_names (list, optional): list of strings corresponding to the name of the states. Defaults to None. Returns: (configurations, labels), features_names, states_names """ assert len(descriptors) == len(states_labels), "Length mismatch between descriptors and states_labels." if regex_filter is not None: features = descriptors.filter(regex=regex_filter).columns.values else: features = descriptors.columns.values config_list = [] labels = [] if isinstance(states_labels, pd.DataFrame): pass elif isinstance(states_labels, np.ndarray): states_labels = np.squeeze(states_labels) columns = ['labels'] if states_labels.ndim == 2: columns.append('selection') states_labels = pd.DataFrame(data=states_labels, columns=columns) else: raise TypeError( f"{states_labels}: Accepted types are 'pandas.Dataframe' or 'numpy.ndarray' " ) if not ('selection' in states_labels): states_labels['selection'] = np.ones(len(states_labels), dtype=bool) # convert labels in string states_labels['labels'] = states_labels['labels'].astype(str) if states_subset is None: states_subset = states_labels['labels'].unique() states_subset = states_subset[states_subset != 'undefined' ] for label in states_subset: #select label df = descriptors.loc[ (states_labels['labels'] == label) & (states_labels['selection'] == True)] #select descriptors and sample replace = False if n_configs > len(df): warnings.warn("The asked number of samples is higher than the possible unique values. Sampling with replacement") replace = True if regex_filter is not None: config_i = df.filter(regex=regex_filter).sample(n=n_configs, replace=replace).values else: config_i = df.sample(n=n_configs, replace=replace).values config_list.append(config_i) labels.extend([label]*n_configs) labels = np.array(labels) configurations = np.vstack(config_list) return (configurations, labels), features
[docs]class Classifier():
[docs] def __init__(self, dataset, features_names, rescale=True, test_size=0.25): self._X, self._labels = dataset _tmp_classes, self._labels = np.unique(self._labels, return_inverse=True) self.classes = dict() for _i, _class in enumerate(_tmp_classes): self.classes[_i] = f"{_class}" self._rescale = rescale self._test_size = test_size if self._rescale: scaler = StandardScaler(with_mean=True) scaler.fit(self._X) self._X = scaler.transform(self._X) self._train_in, self._val_in, self._train_out, self._val_out = train_test_split(self._X, self._labels, test_size=self._test_size) self._n_samples = self._train_in.shape[0] self.features = features_names self._computed = False
def compute(self, reg, max_iter = 100, quadratic_kernel=False, groups=None, warm_start = True): if self._computed: warnings.warn("Warning: deleting old computed data") self._purge() if hasattr(reg, '__iter__') == False: reg = np.array([reg]) _num_reg = len(reg) _n_basins = len(np.unique(self._train_out)) self._reg = reg if quadratic_kernel: train_in, val_in = quadratic_kernel_featuremap(self._train_in), quadratic_kernel_featuremap(self._val_in) else: train_in = self._train_in val_in = self._val_in _n_features = train_in.shape[1] if groups is not None: groups_names, groups = np.unique(groups, return_inverse=True) if quadratic_kernel: assert len(groups) == train_in.shape[1], "Length of group array does not match quadratic features number." else: assert len(groups) == len(self.features), "Length of group array does not match features number." _is_group = True _reg_name = 'estimator__group_reg' tmp_model = LogisticGroupLasso(groups, group_reg = reg[0], l1_reg=0, n_iter=max_iter, supress_warning=True, scale_reg='none', warm_start=False) model = OneVsRestClassifier(tmp_model, n_jobs=2) else: _is_group = False _reg_name = 'C' reg = (reg*self._n_samples)**-1 model = LogisticRegression(penalty='l1', C=reg[0], solver='liblinear', multi_class='ovr', fit_intercept=False, max_iter=max_iter, warm_start=warm_start) coeffs = np.empty((_num_reg, _n_basins, _n_features)) crossval = np.empty((_num_reg,)) _classes_labels = np.empty((_num_reg, _n_basins), dtype=np.int_) for reg_idx in tqdm(range(len(reg)), desc='Optimizing Lasso Estimator'): model.set_params(**{_reg_name: reg[reg_idx]}) model.fit(train_in,self._train_out) crossval[reg_idx] = model.score(val_in,self._val_out) _classes_labels[reg_idx] = model.classes_.astype(int) if _is_group: assert _n_basins == model.classes_.shape[0] tmp_coeffs = np.empty((_n_basins, _n_features)) for est_idx, _e in enumerate(model.estimators_): tmp_coeffs[est_idx] = _e.coef_[:,0] coeffs[reg_idx] = tmp_coeffs else: coeffs[reg_idx] = model.coef_ self._quadratic_kernel=quadratic_kernel self._coeffs = coeffs self._crossval = crossval self._classes_labels = _classes_labels self._groups = groups if _is_group: self._groups_names = groups_names self._groups_mask = [ self._groups == u for u in np.unique(self._groups) ] self._computed = True # count number of features for each reg num_feat = np.empty((_num_reg,),dtype=int) for i,reg in enumerate(self._reg): selected = self._get_selected(reg, feature_mode=False) unique_idxs = set() for state in selected.values(): for data in state: unique_idxs.add(data[0]) num_feat[i] = len(unique_idxs) self._num_feat = num_feat def _purge(self): if __DEV__: print("DEV >>> Purging old data") if self._computed: del self._quadratic_kernel del self._reg del self._coeffs del self._crossval del self._classes_labels del self._num_feat if self._groups is not None: del self._groups_names del self._groups_mask del self._groups self._computed = False def _closest_reg_idx(self, reg): assert self._computed, "You have to run Classifier.compute first." return np.argmin(np.abs(self._reg - reg)) def _get_selected(self, reg, feature_mode=False): reg_idx = self._closest_reg_idx(reg) coefficients = self._coeffs[reg_idx] _classes = self._classes_labels[reg_idx] selected = dict() group_mode = (not feature_mode) and (self._groups is not None) for idx, coef in enumerate(coefficients): state_name = self.classes[_classes[idx]] if group_mode: coef = np.array([np.linalg.norm(coef[b])**2 for b in self._groups_mask]) else: coef = coef**2 nrm = np.sum(coef) if nrm < __EPS__: selected[state_name] = [] else: coef = csr_matrix(coef/nrm) sort_perm = np.argsort(coef.data)[::-1] names = [] for idx in coef.indices: if group_mode: names.append(self._groups_names[idx]) else: if self._quadratic_kernel: names.append(decode_quadratic_features(idx, self.features)) else: names.append(self.features[idx]) #idx, weight, name names = np.array(names) selected[state_name]= list(zip(coef.indices[sort_perm], coef.data[sort_perm], names[sort_perm])) #If only two states the learned models are the same. if len(selected.keys()) == 2: del_key = list(selected.keys())[0] selected.pop(del_key) return selected def get_accuracy(self): return self._crossval def get_num_features(self): return self._num_feat def feature_summary(self, reg): return self._get_selected(reg, feature_mode=True) def print_selected(self, reg): selected = self._get_selected(reg) print(f"Accuracy: {int(self._crossval[self._closest_reg_idx(reg)]*100)}%") for state in selected.keys(): state_name = 'State ' + f'{state}' + ':' print(state_name) for row in selected[state]: print(" " + row[2]) def prune(self, reg, overwrite=False): selected = self._get_selected(reg) if self._quadratic_kernel: AttributeError("Pruning is not possible on classifiers trained with quadratic kernels.") unique_idxs = set() for state in selected.values(): for data in state: unique_idxs.add(data[0]) if self._groups is not None: mask = np.logical_or.reduce([self._groups_mask[idx] for idx in unique_idxs]) else: mask = np.array([False]*len(self.features)) for idx in unique_idxs: mask[idx] = True if overwrite: self._train_in = self._train_in[:, mask] self._val_in = self._val_in[:, mask] self._X = self._X[:, mask] self.features = self.features[mask] self._purge() else: X = self._X[:, mask] dset = (X, self._labels) pruned_features = self.features[mask] return Classifier(dset, pruned_features, self._rescale, self._test_size) def plot_regularization_path(self, reg): return plot_regularization_path(self, reg) def plot(self): return plot_classifier_complexity_vs_accuracy(self) def save(self, filename): pass
def quadratic_kernel_featuremap(X): n_pts, n_feats = X.shape n_feats +=1 transformed_X = np.empty((n_pts, n_feats + n_feats*(n_feats - 1)//2), dtype=np.float_) X = np.c_[X, np.ones(n_pts)] def _compute_repr(x): mat = np.outer(x,x) diag = np.diag(mat) mat = (mat - np.diag(diag))*np.sqrt(2) off_diag = squareform(mat) return np.r_[diag, off_diag] for idx, x in enumerate(X): transformed_X[idx] = _compute_repr(x) return transformed_X def decode_quadratic_features(idx, features_names): num_feats = features_names.shape[0] s = '' if idx < num_feats: s = f"{features_names[idx]} || {features_names[idx]}" elif idx > num_feats: rows, cols = np.triu_indices(num_feats + 1, k = 1) offset_idx = idx - num_feats - 1 i, j = rows[offset_idx], cols[offset_idx] if i == num_feats: s = f"{features_names[j]}" elif j == num_feats: s = f"{features_names[i]}" else: s = f"{features_names[i]} || {features_names[j]}" return s