Creación de APIs con FastAPI
El objetivo de esta sección es crear una API para consumir uno de los modelos que se generaron en el workshop anterior "scikit-learn y MLFlow". De igual forma, se empleará el pipeline de preprocesamiento de datos que se creó en el workshop anterior "Pipelines en scikit-learn".
Antes de arrancar, aseguremosnos de tener las siguientes dependencias en nuestro entorno de desarrollo:
dill==0.3.4
matplotlib==3.4.2
mlflow==1.20.2
numpy==1.21.0
pandas==1.2.5
scikit-learn==0.24.2
seaborn==0.11.1
fastapi==0.68.1
uvicorn==0.15.0
Elección del modelo que se va a desplegar
Antes de iniciar el código de nuestra primer API usando FastAPI, recordemos el final del workshop anterior, donde exportamos el pipeline de preprocesamiento de los datos y el modelo en objetos ".dill".
Creamos un contexto usando open()
y usamos la función dill.dump()
para guardar nuestro flujo de preprocesamiento en disco.
import dill
dill.settings['recurse'] = True
dill.dump(full_pipeline, f, protocol=dill.HIGHEST_PROTOCOL)
with open('./preprocesser.pkl', 'wb') as f:
dill.dump(full_pipeline, f)
Creamos un contexto usando open()
y usamos la función dill.dump()
para guardar nuestro modelo en disco. El modelo que usaremos es la Regresión Lineal.
with open('./lr_model.pkl', 'wb') as f:
dill.dump(model,f)
Luego, también necesitamos un archivo llamado "requirements.txt".
El archivo "requirements.txt" tiene los requerimientos para la API, que se pueden observar a continuación:
dill == 0.3.3
scikit-learn == 0.24.1
python-multipart == 0.0.5
numpy==1.21.0
pandas==1.2.5
Ahora, vamos a elegir un lugar en nuestro directorio para poner todos los archivos relacionados al despliegue del modelo. A ésta carpeta la podemos nombrar "despliegue" y los primeros archivos que almacenaremos acá serán el pipeline de procesamiento "preprocesser.pkl", el script "transformes.py" el cual es usado por el preprocesador, el modelo Regresión Linear 'lr_model.dill' y el archivo "requirements.txt".
despliegue
├── preprocesser.pkl
├── lr_model.pkl
├── transformers.py
└── requirements.txt
Importar dependencias, Preprocesador y Modelo
Importaremos las dependencias en la ruta de despliegue.
pip install -r requirements.txt
2. Vamos a crear un script en nuestra carpeta de despliegue llamado Main.py. Y vamos a importar las dependencias necesariar para usar FastAPI, el preprocesador y el modelo. También vamos a crear nuestra API, la cual tendrá el nombre de "Taxi Trips Duration Predictor".
import numpy as np
import pandas as pd
import dill
from fastapi import FastAPI, File
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from transformers import TransformerFechas, TransformerDistancia, TransformerVelocidad
app = FastAPI(title="Taxi Trips Duration Predictor")
3. Ahora vamos a importar el modelo y el preprocesador que creamos y exportamos en la sesión anterior.
# import the preprocessor
with open("preprocesser.pkl", "rb") as f:
preprocessor = dill.load(f)
# import the model
with open("lr_model.pkl", "rb") as f:
model = dill.load(f)
Solicitudes y Respuestas
Las API funcionan mediante “solicitudes” y “respuestas”. Cuando una API solicita información de una aplicación web o un servidor web, recibirá una respuesta.
Nuestra API tendrá la funcionalidad de recibir "solicitudes" con los datos de los features que requiere el modelo, luego la lógica del negocio procesará los datos y finalmente la API devolverá una "respuesta" con la predicción del modelo. En este caso, la lógica del negocio consiste en el pipeline de preprocesamiento y el modelo.
Puntos finales o "Endpoints"
El lugar al que las API envían solicitudes y donde reside el recurso se denomina "endpoint".
Cuando una API interactúa con otro sistema, los puntos de contacto de esta comunicación se consideran puntos finales. Para las API, un punto final puede incluir una URL de un servidor o servicio. Cada punto final es la ubicación desde la que las API pueden acceder a los recursos que necesitan para llevar a cabo su función.
Primer Endpoint: Get Endpoint
Las solicitudes GET son los métodos más comunes y más utilizados en API y sitios web. El método GET se utiliza para recuperar datos de un servidor en el recurso especificado. Por ejemplo, supongamos que tiene una API con un punto final /users. Hacer una solicitud GET a ese punto final debería devolver una lista de todos los usuarios disponibles.
GET
es un método que puede recibir parámetros a través de parámetros "path" o parámetros "query":
Path Parameters
:
@app.get("/{}", response_class=JSONResponse)
Puedes revisar la documentación de Path Parameters
en FastAPI acá.
Query Parameters
:
@app.get("/", response_class=JSONResponse)
def get_funct(
vendor_id: int,
pickup_datetime: str,
passenger_count: int,
pickup_longitude: float,
pickup_latitude: float,
dropoff_longitude: float,
dropoff_latitude: float,
pickup_borough: str,
dropoff_borough: str,
)
Con los Query Parameters
, los parámetros se definen al interior de la función get_func().
Puedes revisar la documentación de Query Parameters
en FastAPI acá.
En el presente ejemplo nuestro método GET
recibirá los parámetros con la convención de query parameters y retornará la respuesta de la predicción en un JSON.
import numpy as np
import pandas as pd
import dill
from fastapi import FastAPI, File
from fastapi.responses import JSONResponse
from transformers import TransformerFechas, TransformerDistancia, TransformerVelocidad
app = FastAPI(title="Taxi Trips Duration Predictor")
# import the preprocessor
with open("preprocesser.pkl", "rb") as f:
preprocessor = dill.load(f)
# import the model
with open("lr_model.pkl", "rb") as f:
model = dill.load(f)
@app.get("/", response_class=JSONResponse)
def get_funct(
vendor_id: int,
pickup_datetime: str,
passenger_count: int,
pickup_longitude: float,
pickup_latitude: float,
dropoff_longitude: float,
dropoff_latitude: float,
pickup_borough: str,
dropoff_borough: str,
):
"""Serves predictions given query parameters specifying the taxi trip's
features from a single example.
Args:
vendor_id (int): a code indicating the provider associated with the trip record
pickup_datetime (str): date and time when the meter was engaged
passenger_count (float): the number of passengers in the vehicle
(driver entered value)
pickup_longitude (float): the longitude where the meter was engaged
pickup_latitude (float): the latitude where the meter was engaged
dropoff_longitude (float): the longitude where the meter was disengaged
dropoff_latitude (float): the latitude where the meter was disengaged
pickup_borough (str): the borough where the meter was engaged
dropoff_borough (str): the borough where the meter was disengaged
Returns:
[JSON]: model prediction for the single example given
"""
df = pd.DataFrame(
[
[
vendor_id,
pickup_datetime,
passenger_count,
pickup_longitude,
pickup_latitude,
dropoff_longitude,
dropoff_latitude,
pickup_borough,
dropoff_borough,
]
],
columns=[
"vendor_id",
"pickup_datetime",
"passenger_count",
"pickup_longitude",
"pickup_latitude",
"dropoff_longitude",
"dropoff_latitude",
"pickup_borough",
"dropoff_borough",
],
)
prediction = model.predict(preprocessor.transform(df))
return {
"features": {
"vendor_id": vendor_id,
"pickup_datetime": pickup_datetime,
"passenger_count": passenger_count,
"pickup_longitude": pickup_longitude,
"pickup_latitude": pickup_latitude,
"dropoff_longitude": dropoff_longitude,
"dropoff_latitude": dropoff_latitude,
"pickup_borough": pickup_borough,
"dropoff_borough": dropoff_borough,
},
"prediction": list(prediction)[0],
}
if __name__ == "__main__":
import uvicorn
# For local development:
uvicorn.run("main:app", port=3000, reload=True)
Revisemos el main de nuestra API
if __name__ == "__main__":
import uvicorn
# For local development:
uvicorn.run("main:app", port=3000, reload=True)
reload=True
permite que cada vez que se guarde el archivo main.py, la API se actualice automáticamente, sin necesidad de cerrar la API y volverla a reiniciar cada vez que se haga un cambio. Esta es una funcionalidad muy útil durante la etapa de desarrollo de la API, no es recomendable tenerla en la etapa de despligue.
Así es como se debe ver ahora la estructura de la carpeta despliegue:
despliegue
├── preprocesser.pkl
├── lr_model.pkl
├── transformers.py
├── main.py
└── requirements.txt
A continuación debemos ir a la terminal, ubicarnos en la carpeta "despliegue" y correr el siguiente comando:
python main.py
En la terminal obtendremos una salida así:

¡A continuación viene la mejor parte! ¡Vamos a probar nuestra API!
Así que ahora ve a tu buscador preferido, puede ser Chrome, Safari, ojalá no Internet Explorer 😁 si quieres acabar este workpshop hoy 🤭
Una vez en el buscador, coloca esta dirección http://127.0.0.1:3000
Si nos quedamos en esta dirección podríamos pasar los features del modelo editando la URL directamente, pero esto sería engorroso. Afortunadamente FastAPI tiene por default un Front que podemos usar con cualquiera de nuestras APIs. Así que sólo debemos hacer un cambio en la URL:
Ahora deberías ver algo así en el buscador:

Ahora, presionando "Try it out" puedes rellenar para cada feature su valor correspondiente, de esta forma el modelo recibe los datos que va a preprocesar y luego el modelo podrá generar su predicción y enviarla como respuesta!

Checkpoint #1
Try out! Ensayar el método GET con features reales:
vendor_id: 2
pickup_datetime: 2016-03-14 17:24:55
passenger_count: 1
pickup_longitude: -73.9821548462
pickup_latitude: 40.7679367065
dropoff_longitude: -73.964630127
dropoff_latitude: 40.7656021118
pickup_borough: Manhattan
dropoff_borough: Manhattan
Al final debes obtener el codigo de respuesta éxitosa, es decir 200, y la predicción del modelo. Se debe ver más o menos así:

Segundo Endpoint: Post Endpoint
En los servicios web, las solicitudes POST se utilizan para enviar datos al servidor API para crear o actualizar un recurso. Los datos enviados al servidor se almacenan en el cuerpo de la solicitud HTTP.
El ejemplo más simple es un formulario de contacto en un sitio web. Cuando completas las entradas en un formulario y presionas Enviar, esos datos se colocan en el cuerpo de respuesta de la solicitud y se envían al servidor. Puede ser JSON, XML o parámetros de consulta (query parameters).
En este caso, nos gustaría que nuestro método POST reciba un JSON y retorne un JSON.
BaseModel
de pydantic es una clase de la cuál vamos a heredar para crear las especificaciones del JSON que va a ingresar en nuestro Endpoint. Por eso la debemos agregar a nuestras dependencias:
import numpy as np
import pandas as pd
import dill
from fastapi import FastAPI, File
from fastapi.responses import JSONResponse, StreamingResponse
from pydantic import BaseModel
Ahora vamos a crear la clase TaxiTrip, que hereda de BaseModel. TaxiTrip va a ser un objeto con 9 parámetros que describen un viaje en taxi.
class TaxiTrip(BaseModel):
vendor_id: int
pickup_datetime: str
passenger_count: float
pickup_longitude: float
pickup_latitude: float
dropoff_longitude: float
dropoff_latitude: float
pickup_borough: str
dropoff_borough: str
Ahora, en nuestro POST
endpoint vamos a definir que el parámetro que entra es un taxitrip de tipo TaxiTrip.
@app.post("/json", response_class=JSONResponse)
def post_json(taxitrip: TaxiTrip):
"""Serves predictions given a request body specifying the taxis trip's features
from a single example.
Args:
taxitrip (TaxiTrip): request body of type `TaxiTrip` with the
attributes: vendor_id, pickup_datetime, passenger_count, pickup_longitude,
pickup_latitude, dropoff_longitude, dropoff_latitude, pickup_borough and
dropoff_borough
Returns:
[JSON]: model prediction for the single example given
"""
vendor_id = taxitrip.vendor_id
pickup_datetime = taxitrip.pickup_datetime
passenger_count = taxitrip.passenger_count
pickup_longitude = taxitrip.pickup_longitude
pickup_latitude = taxitrip.pickup_latitude
dropoff_longitude = taxitrip.dropoff_longitude
dropoff_latitude = taxitrip.dropoff_latitude
pickup_borough = taxitrip.pickup_borough
dropoff_borough = taxitrip.dropoff_borough
df = pd.DataFrame(
[
[
vendor_id,
pickup_datetime,
passenger_count,
pickup_longitude,
pickup_latitude,
dropoff_longitude,
dropoff_latitude,
pickup_borough,
dropoff_borough,
]
],
columns=[
"vendor_id",
"pickup_datetime",
"passenger_count",
"pickup_longitude",
"pickup_latitude",
"dropoff_longitude",
"dropoff_latitude",
"pickup_borough",
"dropoff_borough",
],
)
prediction = model.predict(preprocessor.transform(df))
return {
"features": {
"vendor_id": vendor_id,
"pickup_datetime": pickup_datetime,
"passenger_count": passenger_count,
"pickup_longitude": pickup_longitude,
"pickup_latitude": pickup_latitude,
"dropoff_longitude": dropoff_longitude,
"dropoff_latitude": dropoff_latitude,
"pickup_borough": pickup_borough,
"dropoff_borough": dropoff_borough,
},
"prediction": list(prediction)[0],
}
Ahora, guardamos los cambios en el archivo main.py y recargamos la página:
Como podremos observar, ya nuestro POST
Endpoint está listo para usar:

Checkpoint #2
Try out! Ensayar el método POST con features reales:
{"vendor_id":2, "pickup_datetime":"2016-03-14 17:24:55", "passenger_count":1, "pickup_longitude":-73.9821548462, "pickup_latitude":40.7679367065, "dropoff_longitude":-73.964630127, "dropoff_latitude":40.7656021118, "pickup_borough":"Manhattan", "dropoff_borough":"Manhattan"}
Al final debes obtener el codigo de respuesta éxitosa, es decir 200 y la predicción del modelo. Se debe ver más o menos así:

Tercer Endpoint: POST Endpoint
Este último Endpoint será diferente al anterior ya que recibirá como entrada un archivo y devolverá un archivo a su vez. Por esto, debemos importar otro tipo de respuesta de FastAPI en nuestras dependencias, el StreamingResponse.
Por otro lado, nuestro método POST
va a recibir un parámetro de tipo File
, así que también lo debemos importar de la librería fastapi
.
Además, debemos traer la clase BytesIO
de la librería io
, la cual toma un objeto bytes que está en memoria en python e imita el comportamiento de un archivo, de lo que Python considera un objeto tipo archivo, el cual tiene métodos de "read" y "write".
import numpy as np
import pandas as pd
import dill
from fastapi import FastAPI, File
from fastapi.responses import JSONResponse, StreamingResponse
from pydantic import BaseModel
from io import BytesIO
from transformers import TransformerFechas, TransformerDistancia, TransformerVelocidad
StreamingResponse
es muy útil ya que no guarda los archivos en disco sino que en el Browser recibe el archivo. De esta forma ahorramos espacio en el disco, evitamos las operaciones en disco y las hacemos en memoria, lo cual es mucho más rápido.
@app.post("/file", response_class=StreamingResponse)
def post_file(file: bytes = File(...)):
"""Serves predictions given a CSV file with no header and seven columns
specifying each taxi trip's features in the order vendor_id, pickup_datetime,
passenger_count, pickup_longitude,pickup_latitude, dropoff_longitude and
dropoff_latitude, pickup_borough and dropoff_borough
Args:
file (bytes, optional): bytes from a CSV file as described above.
Defaults to File(...), but to receive a file is required.
Returns:
[StreamingResponse]: Returns a streaming response with a new CSV file that contains
a column with the predictions.
"""
# Decode the bytes as text and split the lines:
input_lines = file.decode().splitlines()
# Split each line as a list of the three features:
X = [p.split(",") for p in input_lines]
predictions = []
for x in X:
vendor_id = int(x[0])
pickup_datetime = str(x[1])
passenger_count = float(x[2])
pickup_longitude = float(x[3])
pickup_latitude = float(x[4])
dropoff_longitude = float(x[5])
dropoff_latitude = float(x[6])
pickup_borough = str(x[7])
dropoff_borough = str(x[8])
df = pd.DataFrame(
[
[
vendor_id,
pickup_datetime,
passenger_count,
pickup_longitude,
pickup_latitude,
dropoff_longitude,
dropoff_latitude,
pickup_borough,
dropoff_borough,
]
],
columns=[
"vendor_id",
"pickup_datetime",
"passenger_count",
"pickup_longitude",
"pickup_latitude",
"dropoff_longitude",
"dropoff_latitude",
"pickup_borough",
"dropoff_borough",
],
)
# Get predictions for each taxi trip:
prediction = model.predict(preprocessor.transform(df))
predictions.append(prediction)
# Append the prediction to each input line:
output = [line + "," + str(pred[0]) for line, pred in zip(input_lines, predictions)]
# Join the output as a single string:
output = "\n".join(output)
# Encode output as bytes:
output = output.encode()
# The kind is text, the extension is csv
return StreamingResponse(
BytesIO(output),
media_type="text/csv",
headers={"Content-Disposition": 'attachment;filename="prediction.csv"'},
)
Ahora, guardamos los cambios en el archivo main.py y recargamos la página:
Como podremos observar, ya nuestro tercer Endpoint (POST
) está listo para usar:

Este tercer endpoint recibe un archivo .CSV con diferentes ejemplos, te reto a generar las predicciones de los ejemplos en el siguiente archivo:
Al final debes obtener el codigo de respuesta éxitosa, es decir 200 y la predicción del modelo. Se debe ver más o menos así:

Como se puede observar, desde el buscador podemos descargar un archivo que contendrá los features de cada ejemplo y sus respectivas predicciones. Ese archivo se llamará "prediction.csv" y su contenido se debe ver así:

Como se puede observar la predicción fue la misma para ambos ejemplos, esto nos indica que la Regresión Lineal no es un modelo muy robusto. Para este workshop se decidió usar la Regresión Lineal por simplicidad y por ser un modelo liviano, pero lo recomendable es siempre elegir el mejor modelo que se haya encontrada en la etapa de experimentación.
Last updated
Was this helpful?