Source code for alp.backend.sklearn_backend

"""
Adaptor for the sklearn backend
===============================
"""

import copy
import pickle
import re
import h5py
import numpy as np

from six import next as snext
from six.moves import zip as szip
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.discriminant_analysis import QuadraticDiscriminantAnalysis
from sklearn.kernel_ridge import KernelRidge
from sklearn.linear_model import ARDRegression
from sklearn.linear_model import BayesianRidge
from sklearn.linear_model import Lars
from sklearn.linear_model import Lasso
from sklearn.linear_model import LassoLars
from sklearn.linear_model import LinearRegression
from sklearn.linear_model import LogisticRegression
from sklearn.linear_model import OrthogonalMatchingPursuit
from sklearn.linear_model import Ridge


from ..appcom import _path_h5
from ..appcom.utils import check_gen
from ..celapp import RESULT_SERIALIZER
from ..celapp import app

SUPPORTED = [LogisticRegression, LinearRegression, Ridge, Lasso,
             Lars, LassoLars, OrthogonalMatchingPursuit, BayesianRidge,
             ARDRegression, LinearDiscriminantAnalysis,
             QuadraticDiscriminantAnalysis, KernelRidge]


[docs]def getname(model, call=True): if call: m = model() else: m = model return(str(type(m))[8:][:-2])
keyval = dict() for m in SUPPORTED: keyval[getname(m)] = m() COMPILED_MODELS = dict() TO_SERIALIZE = ['custom_objects'] # general utilities
[docs]def get_backend(): import sklearn as SK return SK
[docs]def save_params(model, filepath): """ Dumps the attributes of the (generally fitted) model in a h5 file. Args: model(sklearn.BaseEstimator): a sklearn model (in SUPPORTED). filepath(string): the file name where the attributes should be written. """ attr = model.__dict__ dict_params = dict() for k, v in attr.items(): if k[-1:] == '_': dict_params[k] = v f = h5py.File(filepath, 'w') for k, v in dict_params.items(): if v is not None: if type(v) is list: for i, val in enumerate(v): kadd = "tolist" + str(i) + k f[kadd] = val else: f[k] = v # so far the None case has been seen # only in Ridge when solver is not sag or lsqr. f.flush() f.close()
[docs]def load_params(model, filepath): """ Load the attributes that have been dumped in a h5 file in a model. Args: model(sklearn.BaseEstimator): a sklearn model (in SUPPORTED). filepath(string): the file name where the attributes should be read. Returns: the model with updated parameters. """ f = h5py.File(filepath, 'r') listed_params = dict() # first loop on f to get the parameters that are "unlisted" for k, v in f.items(): if k[:6] == "tolist": listkeywithoutdigit = str(re.sub("\d+", "", k[6:])) digits = int(re.search(r'\d+', k[6:]).group()) if listkeywithoutdigit not in listed_params.keys(): listed_params[listkeywithoutdigit] = {digits: v} else: listed_params[listkeywithoutdigit][digits] = v # loop on listed_params that fills the temporary lists and set them in the # model for k, v in listed_params.items(): lenlist = max(listed_params[k]) + 1 stored = [None] * lenlist for i in range(lenlist): with listed_params[k][i].astype(listed_params[k][i].dtype): if listed_params[k][i].shape is not (): stored[i] = listed_params[k][i][:] else: out = listed_params[k][i][()] setattr(model, k, stored) # second loop on f. # TODO : merge the 2 loops on f. for k, v in f.items(): if k[:6] != "tolist": with v.astype(v.dtype): if v.shape is not (): out = v[:] else: out = v[()] out = v[()] setattr(model, k, out) f.flush() f.close() return model
[docs]def typeconversion(v): """Utility function to ease serialization of custom types (namely np.types) Args: v(np.ndarray, list, other) : the object to return as a jsonable object. If the type of v is not a np.ndarray or a list, the type of the returned object is unchanged. Returns: a jsonable object, which type depends on the type of v """ if isinstance(v, np.ndarray): # pragma: no cover return v.tolist() elif isinstance(v, list): if len(v) == 0: return v else: if isinstance(v[0], np.integer): return [int(vv) for vv in v] elif isinstance(v[0], np.float): # pragma: no cover return [float(vv) for vv in v] elif isinstance(v[0], np.ndarray): return [vv.tolist() for vv in v] else: # pragma: no cover return v else: return v
[docs]def to_dict_w_opt(model, metrics=None): """Serializes a sklearn model. Saves the parameters, not the attributes. Args: model(sklearn.BaseEstimator): the model to serialize, must be in SUPPORTED metrics(list, optionnal): a list of metrics to monitor Returns: a dictionnary of the serialized model """ config = dict() typestring = str(type(model))[8:][:-2] config['config'] = typestring attr = model.__dict__ for k, v in attr.items(): # check if parameter or attribute if k[-1:] == '_': # do not store attributes pass else: config[k] = typeconversion(v) # to be discussed : # we add the metrics to the config even if it doesnt # make sense for a sklearn model # the metrics are then catch in model_from_dict_w_opt if metrics is not None: config['metrics'] = [] for m in metrics: config['metrics'].append(m) return config
[docs]def model_from_dict_w_opt(model_dict, custom_objects=None): """Builds a sklearn model from a serialized model using `to_dict_w_opt` Args: model_dict(dict): a serialized sklearn model custom_objects(dict, optionnal): a dictionnary mapping custom objects names to custom objects (callables, etc.) Returns: A new sklearn.BaseEstimator (in SUPPORTED) instance. The attributes are not loaded. """ if custom_objects is None: custom_objects = dict() # custom_objects = {k: deserialize(k, custom_objects[k]) # for k in custom_objects} # safety check if model_dict['config'] not in keyval: raise NotImplementedError("sklearn model not supported.") # load the metrics if 'metrics' in model_dict: metrics = model_dict.pop('metrics') else: metrics = None # create a new instance of the appropriate model type model = copy.deepcopy(keyval[model_dict['config']]) # load the parameters for k, v in model_dict.items(): if isinstance(v, list): # pragma: no cover setattr(model, k, np.array(v)) else: setattr(model, k, v) return model, metrics
[docs]def train(model, data, data_val, size_gen, generator=False, *args, **kwargs): """Fit a model given parameters and a serialized model Args: model(dict): a serialized sklearn model data(list): a list of dict mapping inputs and outputs to lists or dictionnaries mapping the inputs names to np.arrays XOR -a list of fuel generators data_val(list): same structure than `data` but for validation. it is possible to feed generators for data and plain data for data_val. it is not possible the other way around. Returns: the loss (list), the validation loss (list), the number of iterations, and the model """ # Local variables import sklearn.metrics results = dict() results['metrics'] = dict() custom_objects = None predondata = [] predonval = [] fit_gen_val = False # Load custom_objects if 'custom_objects' in kwargs: # pragma: no cover custom_objects = kwargs.pop('custom_objects') # Load model and get metrics model, metrics = model_from_dict_w_opt(model, custom_objects=custom_objects) # instantiates metrics # there is at least one mandatory metric for sklearn models metrics_names = ["score"] if metrics: for metric in metrics: metrics_names.append(metric) for metric in metrics_names: results['metrics'][metric] = [] results['metrics']["val_" + metric] = [] # pickle data if generator if generator: data = [pickle.loads(d.encode('raw_unicode_escape')) for d in data] # check if data_val is in generator if all(v is None for v in data_val): val_gen = 0 else: val_gen = check_gen(data_val) # if so pickle data_val if val_gen > 0: if generator: data_val = [pickle.loads(dv.encode('raw_unicode_escape')) for dv in data_val] fit_gen_val = True else: raise Exception("You should also pass a generator for the training" " data.") # Fit the model # and validates it if len(size_gen) == 0: size_gen = [0] * len(data) # loop over the data/generators for d, dv, s_gen in szip(data, data_val, size_gen): # check if we have a data_val object. # if not, no evaluation of the metrics on data_val. if dv is None: validation = False else: validation = True # not treating the case "not generator and fit_gen_val" # since it is catched above # case A : dict for data and data_val if not generator and not fit_gen_val: X, y = d['X'], d['y'] model.fit(X, y, *args, **kwargs) predondata.append(model.predict(X)) for metric in metrics_names: if metric is not 'score': computed_metric = getattr( sklearn.metrics, metric)(y, predondata[-1]) results['metrics'][metric].append( computed_metric) else: computed_metric = model.score(X, y) results['metrics']['score'].append( computed_metric) # TODO : optimization if validation: X_val, y_val = dv['X'], dv['y'] predonval.append(model.predict(X_val)) for metric in metrics_names: if metric is not 'score': computed_metric = getattr( sklearn.metrics, metric)(y_val, predonval[-1]) else: computed_metric = model.score(X_val, y_val) # TODO : optimization results['metrics']['val_' + metric].append( computed_metric) else: for metric in metrics_names: results['metrics']['val_' + metric].append(np.nan) # case B : generator for data and no generator for data_val # could be dict or None elif generator and not fit_gen_val: if validation: X_val, y_val = dv['X'], dv['y'] for batch_data in d.get_epoch_iterator(): X, y = batch_data model.fit(X, y, *args, **kwargs) predondata.append(model.predict(X)) if validation: predonval.append(model.predict(X_val)) for metric in metrics_names: if metric is not 'score': results['metrics'][metric].append( getattr(sklearn.metrics, metric)(y, predondata[-1])) if validation: results['metrics']['val_' + metric].append( getattr(sklearn.metrics, metric)(y_val, predonval[-1])) else: results['metrics'][ 'val_' + metric].append(np.nan) else: results['metrics']['score'].append( model.score(X, y)) if validation: results['metrics']['val_score'].append( model.score(X_val, y_val)) else: results['metrics']['val_score'].append(np.nan) # case C : generator for data and for data_val else: # case C1: N chunks in gen, 1 chunk in val, many to one if s_gen == 1: X_val, y_val = snext(dv.get_epoch_iterator()) for batch_data in d.get_epoch_iterator(): X, y = batch_data model.fit(X, y, *args, **kwargs) predondata.append(model.predict(X)) predonval.append(model.predict(X_val)) for metric in metrics_names: if metric is not 'score': results['metrics'][metric].append( getattr(sklearn.metrics, metric)(y, predondata[-1])) results['metrics']['val_' + metric].append( getattr(sklearn.metrics, metric)(y_val, predonval[-1])) else: results['metrics']['score'].append( model.score(X, y)) results['metrics']['val_score'].append( model.score(X_val, y_val)) # case C2 : 1 chunk in gen, N chunks in val, one to many elif s_gen == 2: X, y = snext(d.get_epoch_iterator()) model.fit(X, y, *args, **kwargs) predondata.append(model.predict(X)) for metric in metrics_names: if metric is not 'score': results['metrics'][metric].append( getattr(sklearn.metrics, metric)(y, predondata[-1])) else: results['metrics']['score'].append(model.score(X, y)) for batch_val in dv.get_epoch_iterator(): X_val, y_val = batch_val predonval.append(model.predict(X_val)) for metric in metrics_names: if metric is not 'score': results['metrics']['val_' + metric].append( getattr(sklearn.metrics, metric)(y_val, predonval[-1])) else: results['metrics']['val_score'].append( model.score(X_val, y_val)) # case C3 : same numbers of chunks, many to many elif s_gen == 3: for batch_data, batch_val in szip(d.get_epoch_iterator(), dv.get_epoch_iterator()): X, y = batch_data X_val, y_val = batch_val model.fit(X, y, *args, **kwargs) predondata.append(model.predict(X)) predonval.append(model.predict(X_val)) for metric in metrics_names: if metric is not 'score': results['metrics'][metric].append( getattr(sklearn.metrics, metric)(y, predondata[-1])) results['metrics']['val_' + metric].append( getattr(sklearn.metrics, metric)(y_val, predonval[-1])) else: results['metrics']['score'].append( model.score(X, y)) results['metrics']['val_score'].append( model.score(X_val, y_val)) else: # pragma: no cover raise Exception( 'Incoherent generator size for train and validation') # for compatibility with keras backend results['metrics']['iter'] = np.nan return results, model
@app.task(bind=True, default_retry_delay=60 * 10, max_retries=3, rate_limit='20/s', queue='sklearn') def fit(self, backend_name, backend_version, model, data, data_hash, data_val, size_gen, generator=False, *args, **kwargs): """A function that takes a model and data (with validation), then applies the 'train' method if possible. The parameters are updated in case of success. Args: backend_name : backend_version : model (sklearn.BaseEstimator) : the sklearn model to be trained. data(list): a list of dict mapping inputs and outputs to lists or dictionnaries mapping the inputs names to np.arrays data_val(list): same structure than `data` but for validation Returns: hexdi_m : the hex hash of the model hexdi_d : the hex hash of the data params_dump : the name of the file where the attributes are dumped""" from alp import dbbackend as db import alp.backend.common as cm from datetime import datetime if kwargs.get("overwrite") is None: # pragma: no cover overwrite = False else: overwrite = kwargs.pop("overwrite") hexdi_m, params_dump = cm.make_all_hash(model, 0, data_hash, _path_h5) # update the full json full_json = {'backend_name': backend_name, 'backend_version': backend_version, 'model_arch': model['model_arch'], 'datetime': datetime.now(), 'mod_id': hexdi_m, 'data_id': data_hash, 'params_dump': params_dump, 'trained': 0, 'mod_data_id': hexdi_m + data_hash, 'task_id': self.request.id} mod_id = db.insert(full_json, db.get_models(), upsert=overwrite) if generator is True: # pragma: no cover full_json_data = {'mod_data_id': hexdi_m + data_hash, 'data_id': data_hash, 'data': data} db.insert(full_json_data, db.get_generators(), upsert=overwrite) try: results, res_dict = cm.train_pipe(train, save_params, model, data, data_val, generator, size_gen, params_dump, data_hash, hexdi_m, *args, **kwargs) db.update({'_id': mod_id}, {'$set': res_dict}) except Exception: db.update({'_id': mod_id}, {'$set': {'error': 1}}) raise return results @app.task(queue='sklearn') def predict(model, data, async, *args, **kwargs): """Make predictions given a model and data Args: model (dict) : a serialied sklearn model. data(list, dict, np.array): data to be passed as a dictionary mapping inputs names to np.arrays or a list of arrays or an arrays Returns: an np.array of predictions """ json_serializer = RESULT_SERIALIZER == 'json' custom_objects = kwargs.get('custom_objects') # check if the predict function is already compiled m_id = model['mod_id'] + model['data_id'] if m_id in COMPILED_MODELS: # pragma: no cover model_instance = COMPILED_MODELS[m_id]['model'] # load the attributes model_instance = load_params(model_instance, model['params_dump']) else: # get the model type model_dict = model['model_arch'] # load model model_instance, _ = model_from_dict_w_opt( model_dict, custom_objects=custom_objects) # load the attributes model_instance = load_params(model_instance, model['params_dump']) # write in the compiled list COMPILED_MODELS[m_id] = dict() COMPILED_MODELS[m_id]['model'] = model_instance # to be discussed # data = data[0]['X'] results_array = model_instance.predict(data) if async and json_serializer: # pragma: no cover results_array = results_array.tolist() return results_array