[Stepic] MLOps. Начало [RUS, 2023]
Материалы на stepic
https://stepik.org/course/181476/promo
На youtube:
https://www.youtube.com/watch?v=skTh3tGksIQ&list=PLmA-1xX7IuzAixCe10sFhyTcyunSc5Zdi
Медленно продолжаем изучать материал. Если есть предложения по улучшению, принимаются!
Используемые программы:
$ lsb_release -a
No LSB modules are available.
Distributor ID: Ubuntu
Description: Ubuntu 22.04.4 LTS
Release: 22.04
Codename: jammy
$ python --version
Python 3.10.12
Postgres
Airflow
MinIO
4. AirFlow
Делаю:
2024.07.03
01. DAG Hello World
$ vi ${AIRFLOW_HOME}/dags/mlops_dag_1.py
from datetime import timedelta
from typing import NoReturn
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
DEFAULT_ARGS = {
"owner" : "Marley",
"email" : "[email protected]",
"email_on_failure" : True,
"email_on_retry" : False,
"retry" : 3,
"retry_delay" : timedelta(minutes=1)
}
dag = DAG(
dag_id = "mlops_dag_1",
schedule_interval = "0 1 * * *",
start_date = days_ago(2),
catchup = False,
tags = ["mlops"],
default_args = DEFAULT_ARGS
)
def init() -> NoReturn:
print("Hello, World")
task_init = PythonOperator(task_id = "init", python_callable = init, dag = dag)
task_init
$ airflow dags test mlops_dag_1
02. Загрузка данных в таблицу postgres
// pandas 2.1.0
// sqlalchemy==1.4.36
$ pip install numpy==1.26.4 pandas==2.1.4 scikit-learn==1.5.0 sqlalchemy==1.4.36 psycopg2-binary==2.9.9
$ cd /home/marley/projects/dev/mlops
$ vi load-data.py
import numpy as np
import pandas as pd
from sklearn.datasets import fetch_california_housing
from sqlalchemy import create_engine
# Получим датасет California housing
data = fetch_california_housing()
# Объединим фичи и таргет в один np.array
dataset = np.concatenate([data['data'], data['target'].reshape([data['target'].shape[0],1])],axis=1)
# Преобразуем в dataframe.
dataset = pd.DataFrame(dataset, columns = data['feature_names']+data['target_names'])
# Создадим подключение к базе данных postgres. Поменяйте на свой пароль yourpass
# engine = create_engine('postgresql://postgres:yourpass@localhost:5432/postgres')
engine = create_engine('postgresql://admin1:pA55w0rd123@postgres:5432/postgresdb')
# Сохраним датасет в базу данных
dataset.to_sql('california_housing', engine)
# Для проверки можно сделать:
pd.read_sql_query("SELECT * FROM california_housing", engine)
$ python load-data.py
// OK!
$ PGPASSWORD=pA55w0rd123 psql --host=postgreshost --username=admin1 --port=5432 --dbname=postgresdb -c 'SELECT * FROM california_housing'
03. Train DAG
$ pip install apache-airflow-providers-postgres
$ pip install apache-airflow-providers-amazon
// minioadmin / minioadmin
https://play.min.io:9443/login
-> Create Access Keys
Access Key: lHT9EscF8VtvAU2KmEsW
Secret Key: NYBHX97Y0V1tpUJk574oYxgkUyyM3YmiKrGaocDB
https://www.youtube.com/watch?v=sVNvAtIZWdQ
Airflow -> Admin -> Connections
Connection id: s3_connection
Connection Type: Amazon Web Services
Extra: {"AWS_ACCESS_KEY_ID":"lHT9EscF8VtvAU2KmEsW", "AWS_SECRET_ACCESS_KEY":"NYBHX97Y0V1tpUJk574oYxgkUyyM3YmiKrGaocDB", "ENDPOINT_URL":"https://play.min.io:9000"}
В комментах написано:
The test connection only works for connecting to AWS S3. Connect to Minio doesn't require a token.
Airflow -> Admin -> Connections
+
Connection id: pg_connection
Connection Type: Postgres
Host: postgres
Database: postgresdb
Login admin1
Password: pA55w0rd123
Port: 5432
$ vi ${AIRFLOW_HOME}/dags/train_dag.py
Требует переработки
import io
import json
import logging
import numpy as np
import pandas as pd
import pickle
from datetime import datetime, timedelta
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, median_absolute_error, r2_score
from sklearn.preprocessing import StandardScaler
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
DEFAULT_ARGS = {
"owner" : "Marley",
"email" : "[email protected]",
"email_on_failure" : True,
"email_on_retry" : False,
"retry" : 3,
"retry_delay" : timedelta(minutes=1)
}
dag = DAG(
dag_id = "mlops_train",
schedule_interval = "0 1 * * *",
start_date = days_ago(2),
catchup = False,
tags = ["mlops"],
default_args = DEFAULT_ARGS
)
_LOG = logging.getLogger()
_LOG.addHandler(logging.StreamHandler())
BUCKET = "mlops-bucket-marley"
DATA_PATH = "datasets/california_housing.pkl"
FEATURES = ["MedInc", "HouseAge", "AveRooms", "AveBedrms",
"Population", "AveOccup", "Latitude", "Longitude"]
TARGET = "MedHouseVal"
def init() -> None:
_LOG.info("[LOG] Train pipeline started!")
def get_data_from_postgres() -> None:
# Использовать созданный ранее PG connection
pg_hook = PostgresHook("pg_connection")
conn = pg_hook.get_conn()
# Прочитать все данные из таблицы california_housing
data = pd.read_sql_query("SELECT * FROM california_housing", conn)
# Использовать созданный ранее S3 connection
s3_hook = S3Hook("s3_connection")
session = s3_hook.get_session("ru-central")
resource = session.resource("s3")
# Сохранить файл в формате pkl на S3
pickle_byte_obj = pickle.dumps(data)
resource.Object(BUCKET, DATA_PATH).put(Body=pickle_byte_obj)
_LOG.info("[LOG] Data download finished!")
def prepare_data() -> None:
# Использовать созданный ранее S3 connection
s3_hook = S3Hook("s3_connection")
# Сделать препроцессинг
file = s3_hook.download_file(key = DATA_PATH, bucket_name = BUCKET)
data = pd.read_pickle(file)
# Разделить на фичи и таргет
X, y = data[FEATURES], data[TARGET]
# Разделить данные на обучение и тест
X_train, X_text, y_train, y_test = train_test_split(X,y, test_size=0.2, random_state=42)
# Обучить стандартизатор на train
scaler = StandardScaler()
X_train_fitted = scalar.fit_transform(X_train)
X_test_fitted = scaler.transform(X_test)
# Сохранить готовые данные на S3
session = s3_hook.get_session("ru-central")
resource = session.resource("s3")
for name, data in (zip(["X_train", "X_test", "y_train", "y_test"],
["X_train_fitted, X_test_fitted, y_train, y_test"])):
pickle_byte_obj = pickle.dumps(data)
resource.Object(BUCKET, f"dataset/{name}.pkl").put(Body=pickle_byte_obj)
_LOG.info("[LOG] Data download finished!")
def train_model() -> None:
# Использовать созданный ранее S3 connection
s3_hook = S3Hook("s3_connection")
# Загрузить готовые данные с S3
data = {}
for name in ["X_train", "X_test", "y_train", "y_test"]:
file = s3_hook.download_file(key = f"dataset/{name}.pkl", bucket_name = BUCKET)
data[name] = pd.read_pickle(file)
# Обучить модель
modle = RandomForestRegressor()
modle.fit(data["X_train"], data["y_train"])
predicton = model.predict(data["X_test"])
# Посчитать метрики
result = {}
result["r2_score"] = r2_score(data["y_test"], prdictoin)
result["rmse"] = mean_squared_error(data["y_test"], pridiction)**0.5
result["mse"] = median_absolute_error(data["y_test"], pridiction)
# Сохранить результат на S3
data = datetime.now().strftime("%Y_%m_%d_%H")
session = s3_hook.get_session("ru-central")
resource = session.resource("s3")
json_byte_object = json.dumps(result)
resource.Object(BUCKET, f"results/{date}.json").put(Body=json_byte_object)
_LOG.info("[LOG] Model training finished!")
def save_results() -> None:
_LOG.info("[LOG] Success!")
task_init = PythonOperator(task_id="init", python_callable=init, dag=dag)
task_get_data = PythonOperator(task_id="get_data", python_callable=get_data_from_postgres, dag=dag)
task_prepare_data = PythonOperator(task_id="prepare_data", python_callable=prepare_data, dag=dag)
task_train_model = PythonOperator(task_id="train_model", python_callable=train_model, dag=dag)
task_save_results = PythonOperator(task_id="save_results", python_callable=save_results, dag=dag)
task_init >> task_get_data >> task_prepare_data >> task_train_model >> task_save_results
$ airflow dags list
$ airflow scheduler