Source code for alp.backend.keras_backend

"""
Adaptor for the Keras backend
=============================

Compilation & cache
~~~~~~~~~~~~~~~~~~~

The models are compiled on the fly after the build. If the model is already
compiled and in the `COMPILED_MODEL` dictionnary mapping the models id to the
in memory compiled function, this function is used instead.

----------------------------------------------------------------------------
"""

import inspect
import types

import dill
import numpy as np
import six

from six.moves import zip as szip

from ..appcom import _path_h5
from ..appcom.utils import check_gen
from ..backend import common as cm
from ..celapp import RESULT_SERIALIZER
from ..celapp import app

try:  # pragma: no cover
    import cPickle as pickle
except ImportError:  # pragma: no cover
    import pickle


COMPILED_MODELS = dict()
TO_SERIALIZE = ['custom_objects', 'callbacks']


# general utilities

[docs]def get_backend(): import keras as CB return CB
[docs]def check_validation(dv): validation = True if dv is None: validation = False return(validation)
[docs]def save_params(model, filepath): model.save_weights(filepath, overwrite=True)
[docs]def serialize(cust_obj): """A function to serialize custom objects passed to a model Args: cust_obj(callable): a custom layer or function to serialize Returns: a dict of the serialized components of the object""" ser_func = dict() if isinstance(cust_obj, types.FunctionType): func_code = six.get_function_code(cust_obj) func_code_d = dill.dumps(func_code).decode('raw_unicode_escape') ser_func['func_code_d'] = func_code_d ser_func['name_d'] = pickle.dumps( cust_obj.__name__).decode('raw_unicode_escape') ser_func['args_d'] = pickle.dumps( six.get_function_defaults(cust_obj)).decode('raw_unicode_escape') clos = dill.dumps( six.get_function_closure(cust_obj)).decode('raw_unicode_escape') ser_func['clos_d'] = clos ser_func['type_obj'] = 'func' else: if hasattr(cust_obj, '__module__'): # pragma: no cover cust_obj.__module__ = '__main__' ser_func['name_d'] = None ser_func['args_d'] = None ser_func['clos_d'] = None ser_func['type_obj'] = 'class' loaded = dill.dumps(cust_obj).decode('raw_unicode_escape') ser_func['func_code_d'] = loaded return ser_func
[docs]def deserialize(name_d, func_code_d, args_d, clos_d, type_obj): """A function to deserialize an object serialized with the serialize function. Args: name_d(unicode): the dumped name of the object func_code_d(unicode): the dumped byte code of the function args_d(unicode): the dumped information about the arguments clos_d(unicode): the dumped information about the function closure Returns: a deserialized object""" if type_obj == 'func': name = pickle.loads(name_d.encode('raw_unicode_escape')) code = dill.loads(func_code_d.encode('raw_unicode_escape')) args = pickle.loads(args_d.encode('raw_unicode_escape')) clos = dill.loads(clos_d.encode('raw_unicode_escape')) loaded_obj = types.FunctionType(code, globals(), name, args, clos) else: # pragma: no cover loaded_obj = dill.loads(func_code_d.encode('raw_unicode_escape')) return loaded_obj
# Serialization utilities
[docs]def to_dict_w_opt(model, metrics=None): """Serialize a model and add the config of the optimizer and the loss. Args: model(keras.Model): the model to serialize metrics(list, optionnal): a list of metrics to monitor Returns: a dictionnary of the serialized model """ config = dict() config_m = model.get_config() config['config'] = { 'class_name': model.__class__.__name__, 'config': config_m, } if hasattr(model, 'optimizer'): config['optimizer'] = model.optimizer.get_config() config['optimizer']['name'] = model.optimizer.__class__.__name__ config['metrics'] = [] config['ser_metrics'] = [] if hasattr(model, 'loss'): name_out = [l.name for l in model.output_layers] if isinstance(model.loss, dict): config['loss'] = dict([(k, get_function_name(v)) for k, v in model.loss.items()]) elif isinstance(model.loss, list): config['loss'] = dict(zip(name_out, [get_function_name(l) for l in model.loss])) elif hasattr(model.loss, '__call__'): config['loss'] = dict(zip(name_out, [get_function_name(model.loss)])) elif isinstance(model.loss, six.string_types): config['loss'] = dict(zip(name_out, [get_function_name(model.loss)])) else: # pragma: no cover raise TypeError('Loss must be a list a string or a callable.') if metrics is not None: for m in metrics: if isinstance(m, six.string_types): config['metrics'].append(m) else: config['ser_metrics'].append(m.__name__) return config
[docs]def model_from_dict_w_opt(model_dict, custom_objects=None): """Builds a model from a serialized model using `to_dict_w_opt` Args: model_dict(dict): a serialized Keras model custom_objects(dict, optionnal): a dictionnary mapping custom objects names to custom objects (Layers, functions, etc.) Returns: A Keras.Model which is compiled if the information about the optimizer is available. """ from keras import optimizers from keras.utils.layer_utils import layer_from_config if custom_objects is None: custom_objects = dict() custom_objects = {k: deserialize(**custom_objects[k]) for k in custom_objects} for k in custom_objects: if inspect.isfunction(custom_objects[k]): custom_objects[k] = custom_objects[k]() model = layer_from_config(model_dict['config'], custom_objects=custom_objects) if 'optimizer' in model_dict: metrics = model_dict.get("metrics", []) ser_metrics = model_dict.get("ser_metrics", []) for k in custom_objects: if inspect.isfunction(custom_objects[k]): function_name = custom_objects[k].__name__ if k in ser_metrics or function_name in ser_metrics: metrics.append(custom_objects[k]) model_name = model_dict['config'].get('class_name') # if it has an optimizer, the model is assumed to be compiled loss = model_dict.get('loss') # if a custom loss function is passed replace it in loss for l in loss: for c in custom_objects: if loss[l] == c: loss[l] = custom_objects[c] optimizer_params = dict([( k, v) for k, v in model_dict.get('optimizer').items()]) optimizer_name = optimizer_params.pop('name') optimizer = optimizers.get(optimizer_name, optimizer_params) if model_name == "Sequential": sample_weight_mode = model_dict.get('sample_weight_mode') model.compile(loss=loss, optimizer=optimizer, sample_weight_mode=sample_weight_mode, metrics=metrics) elif model_name == "Model": sample_weight_mode = model_dict.get('sample_weight_mode') loss_weights = model_dict.get('loss_weights', None) model.compile(loss=loss, optimizer=optimizer, sample_weight_mode=sample_weight_mode, loss_weights=loss_weights, metrics=metrics) else: # pragma: no cover raise Exception('{} model, must be in Sequential, ' 'Model'.format(model_name)) return model
[docs]def get_function_name(o): """Utility function to return the model's name Args: o(object): an object to check Returns: The name(str) of the object """ if isinstance(o, six.string_types): return o else: return o.__name__
# core utilities
[docs]def build_predict_func(mod): """Build Keras prediction functions based on a Keras model Using inputs and outputs of the graph a prediction function (forward pass) is compiled for prediction purpose. Args: mod(keras.models): a Model or Sequential model Returns: a Keras (Theano or Tensorflow) function """ import keras.backend as K if mod.uses_learning_phase: tensors = mod.inputs + [K.learning_phase()] else: tensors = mod.inputs return K.function(tensors, mod.outputs, updates=mod.state_updates)
[docs]def train(model, data, data_val, size_gen, generator=False, *args, **kwargs): """Fit a model given hyperparameters and a serialized model Args: model(dict): a serialized keras.Model data(list): a list of dict mapping inputs and outputs to lists or dictionnaries mapping the inputs names to np.arrays data_val(list): same structure than `data` but for validation Returns: the loss (list), the validation loss (list), the number of iterations, and the model """ if generator: from six.moves import reload_module as sreload import theano sreload(theano) results = dict() results['metrics'] = dict() custom_objects = None callbacks = [] fit_gen_val = False suf = 'val_' if 'custom_objects' in kwargs: custom_objects = kwargs.pop('custom_objects') # load model model = model_from_dict_w_opt(model, custom_objects=custom_objects) if 'callbacks' in kwargs: callbacks = kwargs.pop('callbacks') callbacks = [deserialize(**callback) for callback in callbacks] for i, callback in enumerate(callbacks): if inspect.isfunction(callback): callbacks[i] = callback() else: raise TypeError('Your callback is not wrapped in a function') metrics_names = model.metrics_names for metric in metrics_names: results['metrics'][metric] = [] results['metrics'][suf + metric] = [] mod_name = model.__class__.__name__ if generator: data = [pickle.loads(d.encode('raw_unicode_escape')) for d in data] data = [cm.transform_gen(d, mod_name) for d in data] kwargs.pop('batch_size') if all(v is None for v in data_val): val_gen = 0 else: val_gen = check_gen(data_val) if val_gen > 0: if generator: data_val = [pickle.loads(dv.encode('raw_unicode_escape')) for dv in data_val] data_val = [cm.transform_gen(dv, mod_name) for dv in data_val] for i, check in enumerate(size_gen): if check is 1: data_val[i] = next(data_val[i]) fit_gen_val = True else: raise Exception("You should also pass a generator for the training" " data.") # fit the model according to the input/output type if mod_name is "Sequential" or mod_name is "Model": for d, dv in szip(data, data_val): validation = check_validation(dv) if not fit_gen_val: if dv is not None: dv = (dv['X'], dv['y']) if generator: h = model.fit_generator(generator=d, validation_data=dv, callbacks=callbacks, *args, **kwargs) else: X, y = d['X'], d['y'] h = model.fit(x=X, y=y, validation_data=dv, callbacks=callbacks, *args, **kwargs) for metric in metrics_names: results['metrics'][metric] += h.history[metric] if validation: results['metrics'][ suf + metric] += h.history[suf + metric] else: results['metrics'][suf + metric] += [np.nan] * \ len(h.history[metric]) results['metrics']['iter'] = h.epoch[-1] * len(data) else: raise NotImplementedError("This type of model" "is not supported: {}".format(mod_name)) return results, model
@app.task(bind=True, default_retry_delay=60 * 10, max_retries=3, rate_limit='20/s', queue='keras') def fit(self, backend_name, backend_version, model, data, data_hash, data_val, size_gen, generator=False, *args, **kwargs): """A function to train models given a datagenerator,a serialized model, Args: backend_name(str): the model dumped with the `to_json` method backend_version(str): the backend version model(keras.model): a keras model data(list): a list of np.arrays for training data_val(list): a list of np.arrays for validation Returns: results similar to what the fit method of keras would return""" from alp import dbbackend as db from datetime import datetime import alp.backend.common as cm import keras.backend as K if K.backend() == 'tensorflow' and cm.on_worker(): # pragma: no cover import tensorflow as tf K.clear_session() config = tf.ConfigProto(allow_soft_placement=True) config.gpu_options.allow_growth = True session = tf.Session(config=config) K.set_session(session) if kwargs.get("batch_size") is None: kwargs['batch_size'] = 32 if kwargs.get("overwrite") is None: # pragma: no cover overwrite = False else: overwrite = kwargs.pop("overwrite") batch_size = kwargs['batch_size'] model_c = cm.clean_model(model) hexdi_m, params_dump = cm.make_all_hash(model_c, batch_size, data_hash, _path_h5) # update the full json full_json_model = {'backend_name': backend_name, 'backend_version': backend_version, 'model_arch': model_c['model_arch'], 'datetime': datetime.now(), 'mod_id': hexdi_m, 'data_id': data_hash, 'params_dump': params_dump, 'batch_size': kwargs['batch_size'], 'trained': 0, 'mod_data_id': hexdi_m + data_hash, 'task_id': self.request.id} mod_id = db.insert(full_json_model, db.get_models(), upsert=overwrite) if generator is True: full_json_data = {'mod_data_id': hexdi_m + data_hash, 'data_id': data_hash, 'data': data} db.insert(full_json_data, db.get_generators(), upsert=overwrite) try: results, res_dict = cm.train_pipe(train, save_params, model, data, data_val, generator, size_gen, params_dump, data_hash, hexdi_m, *args, **kwargs) db.update({'_id': mod_id}, {'$set': res_dict}) except Exception: db.update({'_id': mod_id}, {'$set': {'error': 1}}) raise return results @app.task(queue='keras') def predict(model, data, async, *args, **kwargs): """Make predictions given a model and data Args: model(dict): a serialized keras models data(list, dict, np.array): data to be passed as a dictionary mapping inputs names to np.arrays or a list of arrays or an arrays Returns: an np.array of predictions """ import alp.backend.common as cm import keras.backend as K from keras.engine.training import make_batches if K.backend() == 'tensorflow' and cm.on_worker(): # pragma: no cover import tensorflow as tf K.clear_session() config = tf.ConfigProto(allow_soft_placement=True) config.gpu_options.allow_growth = True session = tf.Session(config=config) K.set_session(session) json_serializer = RESULT_SERIALIZER == 'json' if kwargs.get("batch_size") is None: # pragma: no cover kwargs['batch_size'] = 32 batch_size = kwargs['batch_size'] custom_objects = kwargs.get('custom_objects') model_name = model['model_arch']['config'].get('class_name') # check if the predict function is already compiled m_id = model['mod_id'] + model['data_id'] if m_id in COMPILED_MODELS: pred_function = COMPILED_MODELS[m_id]['pred'] model_k = COMPILED_MODELS[m_id]['model'] learning_phase = COMPILED_MODELS[m_id]['learning_phase'] output_shape = COMPILED_MODELS[m_id]['model'].output_shape else: # get the model arch model_dict = model['model_arch'] model_dict.pop('optimizer') # load model model_k = model_from_dict_w_opt(model_dict, custom_objects=custom_objects) # load the weights model_k.load_weights(model['params_dump']) # build the prediction function pred_function = build_predict_func(model_k) COMPILED_MODELS[m_id] = dict() COMPILED_MODELS[m_id]['pred'] = pred_function COMPILED_MODELS[m_id]['model'] = model_k learning_phase = model_k.uses_learning_phase COMPILED_MODELS[m_id]['learning_phase'] = learning_phase output_shape = model_k.output_shape # predict according to the input/output type if model_name == 'Sequential': if not isinstance(data, list): data = [data] elif model_name == 'Model': if isinstance(data, dict): data = [data[k] for k in model_k.input_names] elif not isinstance(data, list): data = [data] else: raise NotImplementedError( '{}: This type of model is not supported'.format(model_name)) # Predict by batch to control GPU memory len_data = len(data[0]) batches = make_batches(len_data, batch_size) index_array = np.arange(len_data) results_array = np.empty((len_data, ) + output_shape[1:]) for batch_start, batch_end in batches: batch_ids = index_array[batch_start:batch_end] data_b = [d[batch_ids] for d in data] if learning_phase: data_b.append(0.) batch_prediction = pred_function(data_b) if isinstance(batch_prediction, list): # pragma: no cover batch_prediction = batch_prediction[0] results_array[batch_ids] = batch_prediction if async and json_serializer: results_array = results_array.tolist() return results_array