how to log KerasClassifier model in a sklearn pipeline mlflow?

0

Issue

I have a set of pre-processing stages in sklearn Pipeline and an estimator which is a KerasClassifier (from tensorflow.keras.wrappers.scikit_learn import KerasClassifier).

My overall goal is to tune and log the whole sklearn pipeline in mlflow (in databricks evn). I get a confusing type error which I can’t figure out how to reslove:

TypeError: can’t pickle _thread.RLock objects

I have the following code (without tuning stage) which returns the above error:

conda_env = _mlflow_conda_env(
    additional_conda_deps=None,
    additional_pip_deps=[
        "cloudpickle=={}".format(cloudpickle.__version__),
        "scikit-learn=={}".format(sklearn.__version__),
        "numpy=={}".format(np.__version__),
        "tensorflow=={}".format(tf.__version__),
    ],
    additional_conda_channels=None,
)

search_space = {
    "estimator__dense_l1": 20,
    "estimator__dense_l2": 20,
    "estimator__learning_rate": 0.1,
    "estimator__optimizer": "Adam",
}


def create_model(n):

    model = Sequential()
    model.add(Dense(int(n["estimator__dense_l1"]), activation="relu"))
    model.add(Dense(int(n["estimator__dense_l2"]), activation="relu"))
    model.add(Dense(1, activation="sigmoid"))
    model.compile(
        loss="binary_crossentropy",
        optimizer=n["estimator__optimizer"],
        metrics=["accuracy"],
    )

    return model


mlflow.sklearn.autolog()
with mlflow.start_run(nested=True) as run:

    classfier = KerasClassifier(build_fn=create_model, n=search_space)
    # fit the pipeline
    clf = Pipeline(steps=[("preprocessor", preprocessor), 
                          ("estimator", classfier)])
    h = clf.fit(
        X_train,
        y_train.values,
        estimator__validation_split=0.2,
        estimator__epochs=10,
        estimator__verbose=2,
    )

    # log scores
    acc_score = clf.score(X=X_test, y=y_test)
    mlflow.log_metric("accuracy", acc_score)

    signature = infer_signature(X_test, clf.predict(X_test))
    # Log the model with a signature that defines the schema of the model's inputs and outputs.
    mlflow.sklearn.log_model(
        sk_model=clf, artifact_path="model", 
        signature=signature, 
        conda_env=conda_env
    )

I also get this warning before the error:


    WARNING mlflow.sklearn.utils: Truncated the value of the key `steps`. Truncated value: `[('preprocessor', ColumnTransformer(n_jobs=None, remainder='drop', sparse_threshold=0.3,
                      transformer_weights=None,
                      transformers=[('num',
                                   Pipeline(memory=None,

note the the whole pipeline runs outside mlflow.
can someone help?

Solution

I think I find sort of a workaround/solution for this for now, but I think this issue needs to be addressed in MLFloow anyways.

What I did is not the best way probably.
I used a python package called scikeras that does this wrapping and then could log the model

The code:

import scikeras 
import tensorflow as tf 
from tensorflow.keras.models import Sequential 
from tensorflow.keras.layers import Input, Dense, Dropout, LSTM, Flatten, Activation 
 
from scikeras.wrappers import KerasClassifier 
  
 
class ModelWrapper(mlflow.pyfunc.PythonModel): 
    def __init__(self, model): 
        self.model = model 
 
    def predict(self, context, model_input): 
        return self.model.predict(model_input) 
 
conda_env =  _mlflow_conda_env( 
      additional_conda_deps=None, 
      additional_pip_deps=[ 
        "cloudpickle=={}".format(cloudpickle.__version__),  
        "scikit-learn=={}".format(sklearn.__version__), 
        "numpy=={}".format(np.__version__), 
        "tensorflow=={}".format(tf.__version__), 
        "scikeras=={}".format(scikeras.__version__), 
      ], 
      additional_conda_channels=None, 
  ) 
 
param = { 
   "dense_l1": 20, 
   "dense_l2": 20, 
   "optimizer__learning_rate": 0.1, 
   "optimizer": "Adam", 
   "loss":"binary_crossentropy", 
} 
 
  
def create_model(dense_l1, dense_l2, meta): 
  
  n_features_in_ = meta["n_features_in_"] 
  X_shape_ = meta["X_shape_"] 
  n_classes_ = meta["n_classes_"] 
 
  model = Sequential() 
  model.add(Dense(n_features_in_, input_shape=X_shape_[1:], activation="relu")) 
  model.add(Dense(dense_l1, activation="relu")) 
  model.add(Dense(dense_l2, activation="relu")) 
  model.add(Dense(1, activation="sigmoid")) 
 
  return model   
 
mlflow.sklearn.autolog() 
with mlflow.start_run(run_name="sample_run"): 
 
  classfier = KerasClassifier( 
    create_model, 
    loss=param["loss"], 
    dense_l1=param["dense_l1"], 
    dense_l2=param["dense_l2"], 
    optimizer__learning_rate = param["optimizer__learning_rate"], 
    optimizer= param["optimizer"], 
) 
 
  # fit the pipeline 
  clf = Pipeline(steps=[('preprocessor', preprocessor), 
                      ('estimator', classfier)])   
 
  h = clf.fit(X_train, y_train.values) 
  # log scores 
  acc_score = clf.score(X=X_test, y=y_test) 
  mlflow.log_metric("accuracy", acc_score) 
  signature = infer_signature(X_test, clf.predict(X_test)) 
  model_nn = ModelWrapper(clf,)  
 
  mlflow.pyfunc.log_model( 
      python_model= model_nn, 
      artifact_path = "model",  
      signature = signature,  
      conda_env = conda_env 
  ) 

Answered By – mas

This Answer collected from stackoverflow, is licensed under cc by-sa 2.5 , cc by-sa 3.0 and cc by-sa 4.0

Leave A Reply

Your email address will not be published.

This website uses cookies to improve your experience. We'll assume you're ok with this, but you can opt-out if you wish. Accept Read More