Pipelines en scikit-learn

El objetivo de esta sección es mostrar cómo se pueden desarrollar flujos de preprocesamiento en sklearn. Vamos a explorar las diferentes operaciones que se le pueden hacer a los diferentes tipos de variables y cómo agrupar todas las operaciones en un solo elemento de sklearn que tenga los métodos .fit() y .transform().

import numpy as np
import pandas as pd

from typing import List
from sklearn import set_config
set_config(display='diagram')
data = pd.read_csv(
    "https://factored-workshops.s3.amazonaws.com/taxi-trip-duration.csv"
)
# Limitar rango de datos
tiempo_minimo = 60 # 1 minuto
tiempo_maximo = 36000 # 10 horas
data = data[
    (data["trip_duration"] > tiempo_minimo) &
    (data["trip_duration"] < tiempo_maximo)
]
data.head()
data.head()

Es importante siempre separar la variable dependiente—en este caso trip_duration del dataframe que vamos a usar para crear las variables independientes.

y = data["trip_duration"]
input_df = data.drop(
    ["id", "trip_duration", "dropoff_datetime", "store_and_fwd_flag"],
    axis="columns"
)

División de los datos

Un paso que siempre se debe hacer es dividir los datos disponibles entrenamiento, validación y test. En este caso vamos a dividir los datos disponibles en sets de entrenamiento y validación con la función train_test_split

from sklearn.model_selection import train_test_split

train_df, val_df, y_train, y_val = train_test_split(input_df, y, random_state=0)

Transformers

scikit-learn incluye una gran lista de transformers que nos permiten limpiar y transformar datos dependiendo de nuestro objetivo. Sin importar el tipo de transformacion que apliquen, todos los transformers siguen la convención de tener por lo menos dos métodos:

  • .fit() calcula los parámetros necesarios para realizar la transformación a partir de unos datos de entrada. Este método se ejecuta únicamente en los datos de entrenamiento para asegurarnos que los parámetros no contienen información de los datos de validación.

  • .transform() aplica la transformación a los datos.

Veamos un ejemplo usando StandardScaler, un transformer que nos permite remover la media y escalar los datos para que tengan varianza de 1. Vamos a usarlo para normalizar los coordenadas de inicio del viaje.

from sklearn.preprocessing import StandardScaler

transformer = StandardScaler()
transformer.fit(
    train_df[["pickup_longitude", "pickup_latitude"]]
)
normed_array = transformer.transform(
    val_df[["pickup_longitude", "pickup_latitude"]]
)
print(normed_array)
print(normed_array)

Custom Transformers

A pesar de que scikit-learn ofrece varias operaciones para transformar datos, frecuentemente necesitamos crear transformaciones que son específicas a nuestro proyecto. Para esto vamos a aprender cómo implementar custom transformers.

Todos los transformer custom deben heredar BaseEstimator y TransformerMixin para que tengan todas las funciones necesarias para conectarse a otros objetos de sklearn. Por convención de sklearn, los objetos usados para transformar datos siempre deben tener los métodos .fit() y .transform(). Ambos métodos reciben X y y para que se integren sin problemas a pipelines de sklearn.

El método .fit() sirve para almacenar cantidades que vamos a usar durante la transformación de los datos y siempre debe retornar self. El método .transform() ejecuta la transformación y retorna los datos transformados. Ahora vamos a replicar el StandardScaler pero esta vez escribiéndolo como un transformer personalizado que no retorne un array sino un DataFrame.

from sklearn.base import BaseEstimator, TransformerMixin

class PrimerTransformer(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None):
        self.mean = X.mean()
        self.std = X.std()
        return self

    def transform(self, X, y=None):
        return (X - self.mean) / self.std

Checkpoint #1

Solución Checkpoint #1

primer_transformer = PrimerTransformer()
train_normed_df = primer_transformer.fit_transform(
    train_df[["pickup_longitude", "pickup_latitude"]]
)
val_normed_df = primer_transformer.transform(
    val_df[["pickup_longitude", "pickup_latitude"]]
)

En este link pueden encontrar más detalles de qué papel juegan los métodos __init__, .fit() y .transform()

Transformer Fechas

Ahora que sabemos cómo construir objetos para transformar datos, vamos a crear un transformer para crear variables como el día de la semana y la hora del momento en el que empieza el servicio. Como vimos la semana pasada en nuestro EDA, esa información puede ser relevante para determinar la duración del viaje.

En nuestra transformación no debemos almacenar ningún dato para hacer las transformación entonces dejamos el método .fit() vacío. En .transform() extraemos los datos de fecha que nos interesan y retornamos un dataframe con las nuevas variables.


class TransformerFechas(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None):
        return self

    def transform(self, X, y=None):
        columna_fecha = pd.to_datetime(X["pickup_datetime"])
        fecha_df = pd.DataFrame()
        # TODO: Crear columnas con dia de la semana y hora de recogida.
        return fecha_df

Checkpoint #2.1

Solución Checkpoint #2.1

class TransformerFechas(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None):
        return self

    def transform(self, X, y=None):
        columna_fecha = pd.to_datetime(X["pickup_datetime"])
        fecha_df = pd.DataFrame()
        fecha_df["weekday"] = columna_fecha.dt.weekday
        fecha_df["hour"] = columna_fecha.dt.hour
        return fecha_df
transformer_fechas = TransformerFechas()
fechas_df = transformer_fechas.fit_transform(train_df)
fechas_df.head()
fechas_df.head()

Transformer Distancia

También queremos crear una feature que nos ayude a medir la distancia entre el punto de origen y el punto de destino usando la longitud y la latitud en los datos. Nuevamente no tenemos que almacenar cantidades en nuestro método .fit() y calculamos la distancia entre los dos puntos usando la distancia de Haversine. En el código ya está implementada la función para calcular la distancia y no es necesario fijarse en los detalles de la implementación. Como pueden ver, nuestros transformers pueden incluir funciones adicionales que nos ayuden a calcular cantidades útiles para realizar la transformación.

class TransformerDistancia(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None):
        return self

    def transform(self, X, y=None):
        X_init = X[["pickup_latitude", "pickup_longitude"]].to_numpy()
        X_final = X[["dropoff_latitude", "dropoff_longitude"]].to_numpy()

        # Distancia de Haversine
        # TODO: Calcular la variable distancia usando la funcion
        # distancia de Haversine.
        distancia_df = pd.DataFrame()
        distancia_df["distancia"] = distancia
        return distancia_df
    
    def distancia_haversine(self, X_init, X_final):
        # Convertir de decimal a radianes
        X_init = np.radians(X_init)
        X_final = np.radians(X_final)

        # Formula Haversine
        dlat = X_final[:, 0] - X_init[:, 0] 
        dlon = X_final[:, 1] - X_init[:, 1]
        a = np.sin(dlat / 2) ** 2 + np.cos(X_init[:, 0]) * np.cos(X_final[:, 0]) * np.sin(dlon / 2) ** 2
        c = 2 * np.arcsin(np.sqrt(a))
        r = 6371 # Radius of earth in kilometers. Use 3956 for miles. Determines return value units.
        return c * r
transformer_dist = TransformerDistancia()
distancias_df = transformer_dist.fit_transform(train_df)
distancias_df.head()
distancias_df.head()

Checkpoint #2.2

Solución Checkpoint #2.2

class TransformerDistancia(BaseEstimator, TransformerMixin): 
    def fit(self, X, y=None):
        return self

    def transform(self, X, y=None):
        X_init = X[["pickup_latitude", "pickup_longitude"]].to_numpy()
        X_final = X[["dropoff_latitude", "dropoff_longitude"]].to_numpy()
    
        # Distancia de Haversine
        distancia = self.distancia_haversine(X_init=X_init, X_final=X_final)
        distancia_df = pd.DataFrame()
        distancia_df["distancia"] = distancia
        return distancia_df
    
    def distancia_haversine(self, X_init, X_final):
        # Convertir de decimal a radianes
        X_init = np.radians(X_init)
        X_final = np.radians(X_final)
    
        # haversine formula 
        dlat = X_final[:, 0] - X_init[:, 0] 
        dlon = X_final[:, 1] - X_init[:, 1]
        a = np.sin(dlat / 2) ** 2 + np.cos(X_init[:, 0]) * np.cos(X_final[:, 0]) * np.sin(dlon / 2) ** 2
        c = 2 * np.arcsin(np.sqrt(a))
        r = 6371 # Radius of earth in kilometers. Use 3956 for miles. Determines return value units.
        return c * r

Unión de transformers con Pipelines

Pipeline Numérico

Ahora vamos a usar los objetos de Pipeline y ColumnTransformer para unir los transformers que ya hemos creado con otros disponibles en sklearn y lograr todas las transformaciones que queremos realizar a todos los datos.

from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline

ColumnTransformer nos permite elegir las columnas sobre las que queremos aplicar una transformación cuando nos llegan columnas adicionales. En este caso queremos que al TransformerDistancia llegue únicamente pickup_longitude, pickup_latitude, dropoff_longitude y dropoff_latitude .

Leyendo la documentación sabemos que debemos pasar una tupla con el nombre del transformer, la clase que define el transformer y las columnas sobre las que queremos aplicar la transformación. ColumnTransformer también nos permite definir qué se debe hacer con las columnas que no estamos transformando; en este caso elegimos pasarlas sin transformarlas remainder="passthrough".

coord_cols = [
    "pickup_longitude",
    "pickup_latitude",
    "dropoff_longitude",
    "dropoff_latitude"
]

transformer_coord = ColumnTransformer(
    [
        ("transformer_dist", TransformerDistancia(), coord_cols),
    ],
    remainder="passthrough"
)
display(transformer_coord)

Ahora que tenemos nuestro transformer que realiza transformación usando las coordenadas y deja pasar columnas adicionales, vamos a usarlo para unirlo con la otra variable númerica que tenemos en el dataset—passenger_count— y normalizarlas usando StandardScaler. Para esto vamos a usar el objeto Pipeline.

Pipeline (documentación Pipeline) nos permite concatenar transformaciones de sklearn. En este caso vamos a concatenar el ColumnTransformer que creamos con TransformerDistancia con el StandardScaler. Como passenger_count no estaba entre columnas que selecciona transformer_coord, esa columna pasa directamente a ser normalizada por el StandardScaler.

num_cols = ["passenger_count"] + coord_cols

num_pipeline = Pipeline(
    [
        ("transformer_coord", transformer_coord),
        ("scaler", StandardScaler())
    ]
)

X_num = num_pipeline.fit_transform(train_df[num_cols], y_train)
print(X_num)
print(X_num)

Usando la función display podemos ver el diagrama del flujo que acabamos de crear. Por TransformerDistancia pasan las variables de longitud y latitud y por passthrough pasa cualquier columna que le pasemos al ColumnTransformer que no sean longitud o latitud—en este caso passenger_count. El último paso es aplicar un StandardScaler a la variable de distancia que sale de TransformerDistancia y a las variables que vienen de passthrough.

display(num_pipeline)

Pipeline Categórico

Usaremos una lógica similar para las variables categóricas pero esta vez para concatenar el resultado de extraer variables temporales de la fecha de inicio del viaje con un OneHotEncoder para las variables categóricas de los datos.

from sklearn.preprocessing import OrdinalEncoder

cat_cols = ["vendor_id", "pickup_borough", "pickup_datetime"]

transformer_fechas = ColumnTransformer(
    [
        #TODO: Punto 1 de Checkpoint 3
    ],
    remainder="passthrough"
)

cat_pipeline = Pipeline(
    [
        #TODO: Punto 2 de Checkpoint 3
    ]
)

X_cat = cat_pipeline.fit_transform(train_df[cat_cols])
display(cat_pipeline)

Checkpoint #3

Solución Checkpoint #3

from sklearn.preprocessing import OrdinalEncoder

cat_cols = ["vendor_id", "pickup_borough", "pickup_datetime"]

transformer_fechas = ColumnTransformer(
    [
        ("transfomer_fechas", TransformerFechas(), ["pickup_datetime"])
    ],
    remainder="passthrough"
)

cat_pipeline = Pipeline(
    [
        ("transformer_fechas", transformer_fechas),
        ("ordinal_encoder", OrdinalEncoder())
    ]
)

X_cat = cat_pipeline.fit_transform(train_df[cat_cols])
print(X_cat)

Unión de Pipelines

Por último, usaremos nuevamente el ColumnTransformer para determinar cuáles son las variables numéricas y las variables catégoricas en nuestros datos. Definiendo varios elementos en la lista que le pasamos al ColumnTransformer le hacemos saber a sklearn que queremos diferentes transformaciones para las columnas y al final queremos unirlas para que todas queden en un solo numpy array. En nuestro caso, unimos las columnas que resultan del preprocesamiento de las categóricas y del de las numéricas.

from sklearn.pipeline import FeatureUnion

full_pipeline = ColumnTransformer(
    [
        ("num_pipeline", num_pipeline, num_cols),
        ("cat_pipeline", cat_pipeline, cat_cols)
    ]
)

X_transformed = full_pipeline.fit_transform(train_df, y_train)
print(X_transformed.shape)

En este diagrama podemos ver que ejecutamos las transformaciones por separado para las variables numéricas y las categóricas y al final las unimos para tener los datos en un array que le pasaremos al modelo al momento de entrenar.

display(full_pipeline)

Guardar Pipeline a Disco

Además de tener una clara separación entre los métodos .fit() y .transform(), la ventaja de escribir todo nuestro proceso como un solo pipeline (full_pipeline) es que podemos guardar en disco el objeto que usamos para preprocesar. Esto es útil para tener todas las transformaciones definidas en un objeto al momento que queramos hacer reproducibles nuestras transformaciones para, por ejemplo, desplegar nuestro modelo.

Creamos un contexto usando open() y usamos la función dill.dump() para guardar nuestro flujo de preprocesamiento en disco.

import dill
dill.settings['recurse'] = True

with open(<TODO: Nombre del archivo>, "wb") as f:
    dill.dump(full_pipeline, f)

Para cargar nuestro flujo, usamos la función dill.load(). Además verificamos que el resultado de las transformaciones con el flujo que teníamos en el notebook y el que cargamos desde el disco es idéntico.

with open(<TODO: Nombre del archivo>, "rb") as f:
    loaded_pipeline = dill.load(f)
    
X_loaded = loaded_pipeline.transform(train_df)
print((X_loaded == X_transformed).all())

El bloque anterior debería imprimir True .

Checkpoint #4

Solución Checkpoint #4

import dill
dill.settings['recurse'] = True

with open("preprocesser.pkl", "wb") as f:
    dill.dump(full_pipeline, f, protocol=dill.HIGHEST_PROTOCOL)
    
with open("preprocesser.pkl", "rb") as f:
    loaded_pipeline = dill.load(f)
    
X_loaded = loaded_pipeline.transform(train_df)
print((X_loaded == X_transformed).all())

Last updated

Was this helpful?