Pipelines en scikit-learn
El objetivo de esta sección es mostrar cómo se pueden desarrollar flujos de preprocesamiento en sklearn
. Vamos a explorar las diferentes operaciones que se le pueden hacer a los diferentes tipos de variables y cómo agrupar todas las operaciones en un solo elemento de sklearn
que tenga los métodos .fit()
y .transform()
.
import numpy as np
import pandas as pd
from typing import List
from sklearn import set_config
set_config(display='diagram')
data = pd.read_csv(
"https://factored-workshops.s3.amazonaws.com/taxi-trip-duration.csv"
)
# Limitar rango de datos
tiempo_minimo = 60 # 1 minuto
tiempo_maximo = 36000 # 10 horas
data = data[
(data["trip_duration"] > tiempo_minimo) &
(data["trip_duration"] < tiempo_maximo)
]
data.head()

Es importante siempre separar la variable dependiente—en este caso trip_duration
del dataframe que vamos a usar para crear las variables independientes.
y = data["trip_duration"]
input_df = data.drop(
["id", "trip_duration", "dropoff_datetime", "store_and_fwd_flag"],
axis="columns"
)
División de los datos
Un paso que siempre se debe hacer es dividir los datos disponibles entrenamiento, validación y test. En este caso vamos a dividir los datos disponibles en sets de entrenamiento y validación con la función train_test_split
from sklearn.model_selection import train_test_split
train_df, val_df, y_train, y_val = train_test_split(input_df, y, random_state=0)
Transformers
scikit-learn
incluye una gran lista de transformers que nos permiten limpiar y transformar datos dependiendo de nuestro objetivo. Sin importar el tipo de transformacion que apliquen, todos los transformers siguen la convención de tener por lo menos dos métodos:
.fit()
calcula los parámetros necesarios para realizar la transformación a partir de unos datos de entrada. Este método se ejecuta únicamente en los datos de entrenamiento para asegurarnos que los parámetros no contienen información de los datos de validación..transform()
aplica la transformación a los datos.
Veamos un ejemplo usando StandardScaler
, un transformer que nos permite remover la media y escalar los datos para que tengan varianza de 1. Vamos a usarlo para normalizar los coordenadas de inicio del viaje.
from sklearn.preprocessing import StandardScaler
transformer = StandardScaler()
transformer.fit(
train_df[["pickup_longitude", "pickup_latitude"]]
)
normed_array = transformer.transform(
val_df[["pickup_longitude", "pickup_latitude"]]
)
print(normed_array)

Custom Transformers
A pesar de que scikit-learn
ofrece varias operaciones para transformar datos, frecuentemente necesitamos crear transformaciones que son específicas a nuestro proyecto. Para esto vamos a aprender cómo implementar custom transformers.
Todos los transformer custom deben heredar BaseEstimator
y TransformerMixin
para que tengan todas las funciones necesarias para conectarse a otros objetos de sklearn. Por convención de sklearn
, los objetos usados para transformar datos siempre deben tener los métodos .fit()
y .transform()
. Ambos métodos reciben X
y y
para que se integren sin problemas a pipelines de sklearn
.
El método .fit()
sirve para almacenar cantidades que vamos a usar durante la transformación de los datos y siempre debe retornar self
. El método .transform()
ejecuta la transformación y retorna los datos transformados. Ahora vamos a replicar el StandardScaler
pero esta vez escribiéndolo como un transformer personalizado que no retorne un array sino un DataFrame.
from sklearn.base import BaseEstimator, TransformerMixin
class PrimerTransformer(BaseEstimator, TransformerMixin):
def fit(self, X, y=None):
self.mean = X.mean()
self.std = X.std()
return self
def transform(self, X, y=None):
return (X - self.mean) / self.std
Checkpoint #1
Correr .fit()
y .transform()
para PrimerTransformer
para las coordenadas de inicio del viaje y verificar que el resultado sea igual al del StandardScaler
.
De PrimerTransformer
va a salir un DataFrame en lugar de un array pero los valores deben ser los mismos.
Solución Checkpoint #1
primer_transformer = PrimerTransformer()
train_normed_df = primer_transformer.fit_transform(
train_df[["pickup_longitude", "pickup_latitude"]]
)
val_normed_df = primer_transformer.transform(
val_df[["pickup_longitude", "pickup_latitude"]]
)
En este link pueden encontrar más detalles de qué papel juegan los métodos __init__
, .fit()
y .transform()
Transformer Fechas
Ahora que sabemos cómo construir objetos para transformar datos, vamos a crear un transformer para crear variables como el día de la semana y la hora del momento en el que empieza el servicio. Como vimos la semana pasada en nuestro EDA, esa información puede ser relevante para determinar la duración del viaje.
En nuestra transformación no debemos almacenar ningún dato para hacer las transformación entonces dejamos el método .fit()
vacío. En .transform()
extraemos los datos de fecha que nos interesan y retornamos un dataframe con las nuevas variables.
class TransformerFechas(BaseEstimator, TransformerMixin):
def fit(self, X, y=None):
return self
def transform(self, X, y=None):
columna_fecha = pd.to_datetime(X["pickup_datetime"])
fecha_df = pd.DataFrame()
# TODO: Crear columnas con dia de la semana y hora de recogida.
return fecha_df
Checkpoint #2.1
Completar el codigo para extrar el día de la semana y la hora a partir de una fecha en pandas
y los ponga en las columnas weekday
y hour
.
Solución Checkpoint #2.1
class TransformerFechas(BaseEstimator, TransformerMixin):
def fit(self, X, y=None):
return self
def transform(self, X, y=None):
columna_fecha = pd.to_datetime(X["pickup_datetime"])
fecha_df = pd.DataFrame()
fecha_df["weekday"] = columna_fecha.dt.weekday
fecha_df["hour"] = columna_fecha.dt.hour
return fecha_df
transformer_fechas = TransformerFechas()
fechas_df = transformer_fechas.fit_transform(train_df)
fechas_df.head()

Transformer Distancia
También queremos crear una feature que nos ayude a medir la distancia entre el punto de origen y el punto de destino usando la longitud y la latitud en los datos. Nuevamente no tenemos que almacenar cantidades en nuestro método .fit()
y calculamos la distancia entre los dos puntos usando la distancia de Haversine. En el código ya está implementada la función para calcular la distancia y no es necesario fijarse en los detalles de la implementación. Como pueden ver, nuestros transformers pueden incluir funciones adicionales que nos ayuden a calcular cantidades útiles para realizar la transformación.
class TransformerDistancia(BaseEstimator, TransformerMixin):
def fit(self, X, y=None):
return self
def transform(self, X, y=None):
X_init = X[["pickup_latitude", "pickup_longitude"]].to_numpy()
X_final = X[["dropoff_latitude", "dropoff_longitude"]].to_numpy()
# Distancia de Haversine
# TODO: Calcular la variable distancia usando la funcion
# distancia de Haversine.
distancia_df = pd.DataFrame()
distancia_df["distancia"] = distancia
return distancia_df
def distancia_haversine(self, X_init, X_final):
# Convertir de decimal a radianes
X_init = np.radians(X_init)
X_final = np.radians(X_final)
# Formula Haversine
dlat = X_final[:, 0] - X_init[:, 0]
dlon = X_final[:, 1] - X_init[:, 1]
a = np.sin(dlat / 2) ** 2 + np.cos(X_init[:, 0]) * np.cos(X_final[:, 0]) * np.sin(dlon / 2) ** 2
c = 2 * np.arcsin(np.sqrt(a))
r = 6371 # Radius of earth in kilometers. Use 3956 for miles. Determines return value units.
return c * r
transformer_dist = TransformerDistancia()
distancias_df = transformer_dist.fit_transform(train_df)
distancias_df.head()

Checkpoint #2.2
Calcular la distancia de Haversine usando el método que está implementado en la clase y almacenarlo en la variable distancia.
Solución Checkpoint #2.2
class TransformerDistancia(BaseEstimator, TransformerMixin):
def fit(self, X, y=None):
return self
def transform(self, X, y=None):
X_init = X[["pickup_latitude", "pickup_longitude"]].to_numpy()
X_final = X[["dropoff_latitude", "dropoff_longitude"]].to_numpy()
# Distancia de Haversine
distancia = self.distancia_haversine(X_init=X_init, X_final=X_final)
distancia_df = pd.DataFrame()
distancia_df["distancia"] = distancia
return distancia_df
def distancia_haversine(self, X_init, X_final):
# Convertir de decimal a radianes
X_init = np.radians(X_init)
X_final = np.radians(X_final)
# haversine formula
dlat = X_final[:, 0] - X_init[:, 0]
dlon = X_final[:, 1] - X_init[:, 1]
a = np.sin(dlat / 2) ** 2 + np.cos(X_init[:, 0]) * np.cos(X_final[:, 0]) * np.sin(dlon / 2) ** 2
c = 2 * np.arcsin(np.sqrt(a))
r = 6371 # Radius of earth in kilometers. Use 3956 for miles. Determines return value units.
return c * r
Unión de transformers con Pipelines
Pipeline Numérico
Ahora vamos a usar los objetos de Pipeline
y ColumnTransformer
para unir los transformers que ya hemos creado con otros disponibles en sklearn
y lograr todas las transformaciones que queremos realizar a todos los datos.
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
ColumnTransformer
nos permite elegir las columnas sobre las que queremos aplicar una transformación cuando nos llegan columnas adicionales. En este caso queremos que al TransformerDistancia
llegue únicamente pickup_longitude
, pickup_latitude
, dropoff_longitude
y dropoff_latitude
.
Leyendo la documentación sabemos que debemos pasar una tupla con el nombre del transformer, la clase que define el transformer y las columnas sobre las que queremos aplicar la transformación. ColumnTransformer
también nos permite definir qué se debe hacer con las columnas que no estamos transformando; en este caso elegimos pasarlas sin transformarlas remainder="passthrough"
.
coord_cols = [
"pickup_longitude",
"pickup_latitude",
"dropoff_longitude",
"dropoff_latitude"
]
transformer_coord = ColumnTransformer(
[
("transformer_dist", TransformerDistancia(), coord_cols),
],
remainder="passthrough"
)
display(transformer_coord)
Ahora que tenemos nuestro transformer que realiza transformación usando las coordenadas y deja pasar columnas adicionales, vamos a usarlo para unirlo con la otra variable númerica que tenemos en el dataset—passenger_count
— y normalizarlas usando StandardScaler
. Para esto vamos a usar el objeto Pipeline
.
Pipeline
(documentación Pipeline) nos permite concatenar transformaciones de sklearn
. En este caso vamos a concatenar el ColumnTransformer
que creamos con TransformerDistancia
con el StandardScaler
. Como passenger_count
no estaba entre columnas que selecciona transformer_coord
, esa columna pasa directamente a ser normalizada por el StandardScaler
.
num_cols = ["passenger_count"] + coord_cols
num_pipeline = Pipeline(
[
("transformer_coord", transformer_coord),
("scaler", StandardScaler())
]
)
X_num = num_pipeline.fit_transform(train_df[num_cols], y_train)
print(X_num)

Usando la función display
podemos ver el diagrama del flujo que acabamos de crear. Por TransformerDistancia
pasan las variables de longitud y latitud y por passthrough
pasa cualquier columna que le pasemos al ColumnTransformer
que no sean longitud o latitud—en este caso passenger_count
. El último paso es aplicar un StandardScaler
a la variable de distancia que sale de TransformerDistancia
y a las variables que vienen de passthrough
.
display(num_pipeline)

Pipeline Categórico
Usaremos una lógica similar para las variables categóricas pero esta vez para concatenar el resultado de extraer variables temporales de la fecha de inicio del viaje con un OneHotEncoder
para las variables categóricas de los datos.
from sklearn.preprocessing import OrdinalEncoder
cat_cols = ["vendor_id", "pickup_borough", "pickup_datetime"]
transformer_fechas = ColumnTransformer(
[
#TODO: Punto 1 de Checkpoint 3
],
remainder="passthrough"
)
cat_pipeline = Pipeline(
[
#TODO: Punto 2 de Checkpoint 3
]
)
X_cat = cat_pipeline.fit_transform(train_df[cat_cols])
display(cat_pipeline)

Checkpoint #3
Crear un
ColumnTransformer
llamadotransformer_fechas
que solo seleccione la variablepickup_datetime
para aplicarle elTransformerFechas
.Unir el
ColumnTransformer
del punto 1 con unOrdinalEncoder
para transformar las variables que salen delColumnTransformer
de fechas.
Solución Checkpoint #3
from sklearn.preprocessing import OrdinalEncoder
cat_cols = ["vendor_id", "pickup_borough", "pickup_datetime"]
transformer_fechas = ColumnTransformer(
[
("transfomer_fechas", TransformerFechas(), ["pickup_datetime"])
],
remainder="passthrough"
)
cat_pipeline = Pipeline(
[
("transformer_fechas", transformer_fechas),
("ordinal_encoder", OrdinalEncoder())
]
)
X_cat = cat_pipeline.fit_transform(train_df[cat_cols])
print(X_cat)
Unión de Pipelines
Por último, usaremos nuevamente el ColumnTransformer
para determinar cuáles son las variables numéricas y las variables catégoricas en nuestros datos. Definiendo varios elementos en la lista que le pasamos al ColumnTransformer
le hacemos saber a sklearn
que queremos diferentes transformaciones para las columnas y al final queremos unirlas para que todas queden en un solo numpy
array. En nuestro caso, unimos las columnas que resultan del preprocesamiento de las categóricas y del de las numéricas.
from sklearn.pipeline import FeatureUnion
full_pipeline = ColumnTransformer(
[
("num_pipeline", num_pipeline, num_cols),
("cat_pipeline", cat_pipeline, cat_cols)
]
)
X_transformed = full_pipeline.fit_transform(train_df, y_train)
print(X_transformed.shape)
En este diagrama podemos ver que ejecutamos las transformaciones por separado para las variables numéricas y las categóricas y al final las unimos para tener los datos en un array que le pasaremos al modelo al momento de entrenar.
display(full_pipeline)

Guardar Pipeline a Disco
Además de tener una clara separación entre los métodos .fit()
y .transform()
, la ventaja de escribir todo nuestro proceso como un solo pipeline (full_pipeline
) es que podemos guardar en disco el objeto que usamos para preprocesar. Esto es útil para tener todas las transformaciones definidas en un objeto al momento que queramos hacer reproducibles nuestras transformaciones para, por ejemplo, desplegar nuestro modelo.
Creamos un contexto usando open()
y usamos la función dill.dump()
para guardar nuestro flujo de preprocesamiento en disco.
import dill
dill.settings['recurse'] = True
with open(<TODO: Nombre del archivo>, "wb") as f:
dill.dump(full_pipeline, f)
Para cargar nuestro flujo, usamos la función dill.load()
. Además verificamos que el resultado de las transformaciones con el flujo que teníamos en el notebook y el que cargamos desde el disco es idéntico.
with open(<TODO: Nombre del archivo>, "rb") as f:
loaded_pipeline = dill.load(f)
X_loaded = loaded_pipeline.transform(train_df)
print((X_loaded == X_transformed).all())
El bloque anterior debería imprimir True
.
Solución Checkpoint #4
import dill
dill.settings['recurse'] = True
with open("preprocesser.pkl", "wb") as f:
dill.dump(full_pipeline, f, protocol=dill.HIGHEST_PROTOCOL)
with open("preprocesser.pkl", "rb") as f:
loaded_pipeline = dill.load(f)
X_loaded = loaded_pipeline.transform(train_df)
print((X_loaded == X_transformed).all())
Last updated
Was this helpful?