wissen nicht viel über Bitcoin oder seine Preisschwankungen, möchten aber Investitionsentscheidungen treffen, um Gewinne zu erzielen? Dieses maschinelle Lernmodell hat Ihren Rücken. Es kann die Preise viel besser vorhersagen als ein Astrologe. In diesem Artikel werden wir ein ML -Modell für die Prognose und Vorhersage von Bitcoin -Preis unter Verwendung von ZENML und MLFLOW erstellen. Beginnen wir also mit unserer Reise, um zu verstehen, wie jemand ML- und MLOPS -Tools verwenden kann, um die Zukunft vorherzusagen.
Lernziele
- Lernen Sie, Live -Daten mithilfe von API effizient abzurufen.
- Verstehen Sie, was Zenml ist, warum wir MLFlow verwenden und wie Sie es in Zenml integrieren können.
- Erforschen Sie den Bereitstellungsprozess für maschinelle Lernmodelle von der Idee bis zur Produktion.
- Erstellen Sie, wie Sie eine benutzerfreundliche Stromlit-App für interaktive Modellvorhersagen für maschinelles Lernen erstellen.
Dieser Artikel wurde als Teil des Data Science -Blogathon veröffentlicht.
Table of Contents
- Problem Statement
- Project Implementation
- Step 1: Accessing the API
- Step 2: Connecting to Database Using MongoDB
- Step 3: Integration of MLflow with ZenML
- Step 4: Data Ingestion
- Step 5: Data Cleaning
- Step 6: Feature Engineering
- Step 7: Data Splitting
- Step 8: Model Training
- Step 9: Model Evaluation
- Step 10: Model Deployment
- Step 11: Building the Streamlit App
- Frequently Asked Fragen
Problemanweisung
Bitcoin -Preise sind sehr volatil, und Vorhersagen ist so gut wie unmöglich. In unserem Projekt verwenden wir den Best Practicesto vonMlops ein LSTM -Modell, um Bitcoinprices und Trends zu prognostizieren.
Bevor Sie das Projekt implementieren, schauen wir uns die Projektarchitektur an.
Projektimplementierung
Beginnen wir mit dem Zugriff auf die API.
Warum machen wir das? Sie können historische Bitcoin -Preisdaten aus verschiedenen Datensätzen erhalten, aber mit einer API können wir Zugriff auf Live -Marktdaten haben.
Schritt 1: Zugriff auf die API
- Melden Sie sich für den API -Zugriff an:
Sobald Sie sich auf der API -Seite der Kkdata angemeldet haben. Sie können den kostenlosen API-Schlüssel von dieser Seite erhalten.
Bitcoin -Preisdaten abrufen: - Mit dem folgenden Code können Sie Bitcoin -Preisdaten von der CCDATA -API abrufen und in einen Pandas -Datenfream umwandeln. Halten Sie auch den API -Schlüssel in der Datei .env.
import requests import pandas as pd from dotenv import load_dotenv import os # Load the .env file load_dotenv() def fetch_crypto_data(api_uri): response = requests.get( api_uri, params={ "market": "cadli", "instrument": "BTC-USD", "limit": 5000, "aggregate": 1, "fill": "true", "apply_mapping": "true", "response_format": "JSON" }, headers={"Content-type": "application/json; charset=UTF-8"} ) if response.status_code == 200: print('API Connection Successful! \nFetching the data...') data = response.json() data_list = data.get('Data', []) df = pd.DataFrame(data_list) df['DATE'] = pd.to_datetime(df['TIMESTAMP'], unit='s') return df # Return the DataFrame else: raise Exception(f"API Error: {response.status_code} - {response.text}")herstellen
mongoDB ist eine NoSQL-Datenbank, die für ihre Anpassungsfähigkeit, Erweiterbarkeit und Fähigkeit, unstrukturierte Daten in einem JSON-ähnlichen Format zu speichern, bekannt ist.
import requests import pandas as pd from dotenv import load_dotenv import os # Load the .env file load_dotenv() def fetch_crypto_data(api_uri): response = requests.get( api_uri, params={ "market": "cadli", "instrument": "BTC-USD", "limit": 5000, "aggregate": 1, "fill": "true", "apply_mapping": "true", "response_format": "JSON" }, headers={"Content-type": "application/json; charset=UTF-8"} ) if response.status_code == 200: print('API Connection Successful! \nFetching the data...') data = response.json() data_list = data.get('Data', []) df = pd.DataFrame(data_list) df['DATE'] = pd.to_datetime(df['TIMESTAMP'], unit='s') return df # Return the DataFrame else: raise Exception(f"API Error: {response.status_code} - {response.text}")
Dieser Code stellt eine Verbindung zu MongoDB her, holt Bitcoin -Preisdaten über eine API und aktualisiert die Datenbank mit allen neuen Einträgen nach dem neuesten protokollierten Datum.
Einführung von Zenml
zenmlis Eine Open-Source-Plattform, die auf maschinelles Lernen zugeschnitten ist und die Erstellung flexibler und produktionsbereiteter Pipelines unterstützt. Darüber hinaus integriert sich ZENML in Multiple maschinelle Lernwerkzeuge Gleiche, Bentoml usw., um nahtlose ML -Pipelines zu erstellen.
⚠️ Wenn Sie ein Windows -Benutzer sind, versuchen Sie, WSL in Ihrem System zu installieren. Zenml unterstützt keine Windows.
In diesem Projekt werden wir eine traditionelle Pipeline implementieren, die ZenML verwendet, und wir werden MLFlow in ZENML für die Experimentverfolgung integrieren.
Voraussetzungen und grundlegende ZENML-Befehle
- Python 3.12 oder höher: Sie können es von hier aus erhalten: https: //www.python.org/downloads/
- Aktivieren Sie Ihre virtuelle Umgebung:
import os from pymongo import MongoClient from dotenv import load_dotenv from data.management.api import fetch_crypto_data # Import the API function import pandas as pd load_dotenv() MONGO_URI = os.getenv("MONGO_URI") API_URI = os.getenv("API_URI") client = MongoClient(MONGO_URI, ssl=True, ssl_certfile=None, ssl_ca_certs=None) db = client['crypto_data'] collection = db['historical_data'] try: latest_entry = collection.find_one(sort=[("DATE", -1)]) # Find the latest date if latest_entry: last_date = pd.to_datetime(latest_entry['DATE']).strftime('%Y-%m-%d') else: last_date = '2011-03-27' # Default start date if MongoDB is empty print(f"Fetching data starting from {last_date}...") new_data_df = fetch_crypto_data(API_URI) if latest_entry: new_data_df = new_data_df[new_data_df['DATE'] > last_date] if not new_data_df.empty: data_to_insert = new_data_df.to_dict(orient='records') result = collection.insert_many(data_to_insert) print(f"Inserted {len(result.inserted_ids)} new records into MongoDB.") else: print("No new data to insert.") except Exception as e: print(f"An error occurred: {e}")
- ZENML -Befehle:
Alle Kern -ZenML -Befehle zusammen mit ihren Funktionen finden Sie unten:
#create a virtual environment python3 -m venv venv #Activate your virtual environmnent in your project folder source venv/bin/activate
Schritt 3: Integration von MLFlow mit Zenml
Wir verwenden MLFlow für die Experimentverfolgung, um unser Modell, Artefakte, Metriken und Hyperparameterwerte zu verfolgen. Wir registrieren MLFlow für Experimentverfolgung und Model -Deploymer hier:
#Install zenml pip install zenml #To Launch zenml server and dashboard locally pip install "zenml[server]" #To check the zenml Version: zenml version #To initiate a new repository zenml init #To run the dashboard locally: zenml login --local #To know the status of our zenml Pipelines zenml show #To shutdown the zenml server zenml clean
Zenml Stacklist
Projektstruktur
Hier sehen Sie das Layout des Projekts. Lassen Sie uns nun nacheinander ausführlich besprechen.
#Integrating mlflow with ZenML zenml integration install mlflow -y #Register the experiment tracker zenml experiment-tracker register mlflow_tracker --flavor=mlflow #Registering the model deployer zenml model-deployer register mlflow --flavor=mlflow #Registering the stack zenml stack register local-mlflow-stack-new -a default -o default -d mlflow -e mlflow_tracker --set #To view the stack list zenml stack --list
Schritt 4: Datenaufnahme
wir nehmen zuerst Daten von API zu MongoDB auf und konvertieren sie in Pandas DataFrame.
bitcoin_price_prediction_mlops/ # Project directory ├── data/ │ └── management/ │ ├── api_to_mongodb.py # Code to fetch data and save it to MongoDB │ └── api.py # API-related utility functions │ ├── pipelines/ │ ├── deployment_pipeline.py # Deployment pipeline │ └── training_pipeline.py # Training pipeline │ ├── saved_models/ # Directory for storing trained models ├── saved_scalers/ # Directory for storing scalers used in data preprocessing │ ├── src/ # Source code │ ├── data_cleaning.py # Data cleaning and preprocessing │ ├── data_ingestion.py # Data ingestion │ ├── data_splitter.py # Data splitting │ ├── feature_engineering.py # Feature engineering │ ├── model_evaluation.py # Model evaluation │ └── model_training.py # Model training │ ├── steps/ # ZenML steps │ ├── clean_data.py # ZenML step for cleaning data │ ├── data_splitter.py # ZenML step for data splitting │ ├── dynamic_importer.py # ZenML step for importing dynamic data │ ├── feature_engineering.py # ZenML step for feature engineering │ ├── ingest_data.py # ZenML step for data ingestion │ ├── model_evaluation.py # ZenML step for model evaluation │ ├── model_training.py # ZenML step for training the model │ ├── prediction_service_loader.py # ZenML step for loading prediction services │ ├── predictor.py # ZenML step for prediction │ └── utils.py # Utility functions for steps │ ├── .env # Environment variables file ├── .gitignore # Git ignore file │ ├── app.py # Streamlit user interface app │ ├── README.md # Project documentation ├── requirements.txt # List of required packages ├── run_deployment.py # Code for running deployment and prediction pipeline ├── run_pipeline.py # Code for running training pipeline └── .zen/ # ZenML directory (created automatically after ZenML initialization)
Wir fügen @step als Dekorateur zur incest_data () hinzu, um es als Schritt unserer Trainingspipeline zu deklarieren. Auf die gleiche Weise schreiben wir Code für jeden Schritt in der Projektarchitektur und erstellen die Pipeline.
Um anzuzeigen, wie ich den @step Dekorator verwendet habe, lesen Sie den GitHub -Link unten (Schritteordner), um den Code für andere Schritte der Pipeline durchzuführen, d. H. Datenreinigung, Feature Engineering, Datenaufteilung, Modelltraining und Modellbewertung.
Schritt 5: Datenreinigung
In diesem Schritt werden wir verschiedene Strategien zur Reinigung der aufgenommenen Daten erstellen. Wir werden die unerwünschten Spalten und fehlenden Werte in den Daten fallen lassen.
import requests import pandas as pd from dotenv import load_dotenv import os # Load the .env file load_dotenv() def fetch_crypto_data(api_uri): response = requests.get( api_uri, params={ "market": "cadli", "instrument": "BTC-USD", "limit": 5000, "aggregate": 1, "fill": "true", "apply_mapping": "true", "response_format": "JSON" }, headers={"Content-type": "application/json; charset=UTF-8"} ) if response.status_code == 200: print('API Connection Successful! \nFetching the data...') data = response.json() data_list = data.get('Data', []) df = pd.DataFrame(data_list) df['DATE'] = pd.to_datetime(df['TIMESTAMP'], unit='s') return df # Return the DataFrame else: raise Exception(f"API Error: {response.status_code} - {response.text}")
Schritt 6: Feature Engineering
Dieser Schritt enthält die gereinigten Daten aus dem früheren Datenschritt von Data_Cleaning. Wir erstellen neue Funktionen wie Simple Moving Average (SMA), Exponential Moving Average (EMA) sowie verzögerte und rollende Statistiken, um Trends zu erfassen, Rauschen zu reduzieren und zuverlässigere Vorhersagen aus Zeitreihendaten zu machen. Zusätzlich skalieren wir die Funktionen und Zielvariablen mithilfe von Minmax -Skalierung.
import os from pymongo import MongoClient from dotenv import load_dotenv from data.management.api import fetch_crypto_data # Import the API function import pandas as pd load_dotenv() MONGO_URI = os.getenv("MONGO_URI") API_URI = os.getenv("API_URI") client = MongoClient(MONGO_URI, ssl=True, ssl_certfile=None, ssl_ca_certs=None) db = client['crypto_data'] collection = db['historical_data'] try: latest_entry = collection.find_one(sort=[("DATE", -1)]) # Find the latest date if latest_entry: last_date = pd.to_datetime(latest_entry['DATE']).strftime('%Y-%m-%d') else: last_date = '2011-03-27' # Default start date if MongoDB is empty print(f"Fetching data starting from {last_date}...") new_data_df = fetch_crypto_data(API_URI) if latest_entry: new_data_df = new_data_df[new_data_df['DATE'] > last_date] if not new_data_df.empty: data_to_insert = new_data_df.to_dict(orient='records') result = collection.insert_many(data_to_insert) print(f"Inserted {len(result.inserted_ids)} new records into MongoDB.") else: print("No new data to insert.") except Exception as e: print(f"An error occurred: {e}")
Schritt 7: Datenaufteilung
Jetzt teilen wir die verarbeiteten Daten in Schulungs- und Testen von Datensätzen im Verhältnis von 80:20 auf.
#create a virtual environment python3 -m venv venv #Activate your virtual environmnent in your project folder source venv/bin/activate
Schritt 8: Modelltraining
In diesem Schritt trainieren wir das Modell mit frühem Stoppen, um eine Überanpassung zu verhindern, und verwenden die automatisierte Protokollierung von MLFLOW, um unser Modell und unsere Experimente zu verfolgen und das trainierte Modell als lstm_model.keras zu speichern.
#Install zenml pip install zenml #To Launch zenml server and dashboard locally pip install "zenml[server]" #To check the zenml Version: zenml version #To initiate a new repository zenml init #To run the dashboard locally: zenml login --local #To know the status of our zenml Pipelines zenml show #To shutdown the zenml server zenml clean
Schritt 9: Modellbewertung
Da dies ein Regressionsproblem ist, verwenden wir Bewertungsmetriken wie mittlerer quadratischer Fehler (MSE), Stammmittelwert für quadratische Fehler (MSE), mittlerer absoluter Fehler (MAE) und R-Squared.
#Integrating mlflow with ZenML zenml integration install mlflow -y #Register the experiment tracker zenml experiment-tracker register mlflow_tracker --flavor=mlflow #Registering the model deployer zenml model-deployer register mlflow --flavor=mlflow #Registering the stack zenml stack register local-mlflow-stack-new -a default -o default -d mlflow -e mlflow_tracker --set #To view the stack list zenml stack --list
Jetzt werden wir alle oben genannten Schritte in eine Pipeline organisieren. Erstellen wir eine neue Dateitraining_Pipeline.py.
bitcoin_price_prediction_mlops/ # Project directory ├── data/ │ └── management/ │ ├── api_to_mongodb.py # Code to fetch data and save it to MongoDB │ └── api.py # API-related utility functions │ ├── pipelines/ │ ├── deployment_pipeline.py # Deployment pipeline │ └── training_pipeline.py # Training pipeline │ ├── saved_models/ # Directory for storing trained models ├── saved_scalers/ # Directory for storing scalers used in data preprocessing │ ├── src/ # Source code │ ├── data_cleaning.py # Data cleaning and preprocessing │ ├── data_ingestion.py # Data ingestion │ ├── data_splitter.py # Data splitting │ ├── feature_engineering.py # Feature engineering │ ├── model_evaluation.py # Model evaluation │ └── model_training.py # Model training │ ├── steps/ # ZenML steps │ ├── clean_data.py # ZenML step for cleaning data │ ├── data_splitter.py # ZenML step for data splitting │ ├── dynamic_importer.py # ZenML step for importing dynamic data │ ├── feature_engineering.py # ZenML step for feature engineering │ ├── ingest_data.py # ZenML step for data ingestion │ ├── model_evaluation.py # ZenML step for model evaluation │ ├── model_training.py # ZenML step for training the model │ ├── prediction_service_loader.py # ZenML step for loading prediction services │ ├── predictor.py # ZenML step for prediction │ └── utils.py # Utility functions for steps │ ├── .env # Environment variables file ├── .gitignore # Git ignore file │ ├── app.py # Streamlit user interface app │ ├── README.md # Project documentation ├── requirements.txt # List of required packages ├── run_deployment.py # Code for running deployment and prediction pipeline ├── run_pipeline.py # Code for running training pipeline └── .zen/ # ZenML directory (created automatically after ZenML initialization)
Hier wird @pipeline Dekorateur verwendet, um die Funktionsml_Pipeline () als Pipeline in Zenml zu definieren.
Um das Dashboard für die Trainingspipeline anzuzeigen, führen Sie einfach das Skript run_pipeline.py aus. Erstellen wir eine run_pipeline.py -Datei.
import os import logging from pymongo import MongoClient from dotenv import load_dotenv from zenml import step import pandas as pd # Load the .env file load_dotenv() # Get MongoDB URI from environment variables MONGO_URI = os.getenv("MONGO_URI") def fetch_data_from_mongodb(collection_name:str, database_name:str): """ Fetches data from MongoDB and converts it into a pandas DataFrame. collection_name: Name of the MongoDB collection to fetch data. database_name: Name of the MongoDB database. return: A pandas DataFrame containing the data """ # Connect to the MongoDB client client = MongoClient(MONGO_URI) db = client[database_name] # Select the database collection = db[collection_name] # Select the collection # Fetch all documents from the collection try: logging.info(f"Fetching data from MongoDB collection: {collection_name}...") data = list(collection.find()) # Convert cursor to a list of dictionaries if not data: logging.info("No data found in the MongoDB collection.") # Convert the list of dictionaries into a pandas DataFrame df = pd.DataFrame(data) # Drop the MongoDB ObjectId field if it exists (optional) if '_id' in df.columns: df = df.drop(columns=['_id']) logging.info("Data successfully fetched and converted to a DataFrame!") return df except Exception as e: logging.error(f"An error occurred while fetching data: {e}") raise e @step(enable_cache=False) def ingest_data(collection_name: str = "historical_data", database_name: str = "crypto_data") -> pd.DataFrame: logging.info("Started data ingestion process from MongoDB.") try: # Use the fetch_data_from_mongodb function to fetch data df = fetch_data_from_mongodb(collection_name=collection_name, database_name=database_name) if df.empty: logging.warning("No data was loaded. Check the collection name or the database content.") else: logging.info(f"Data ingestion completed. Number of records loaded: {len(df)}.") return df except Exception as e: logging.error(f"Error while reading data from {collection_name} in {database_name}: {e}") raise e
Jetzt haben wir das Erstellen der Pipeline abgeschlossen. Führen Sie den Befehl unten aus, um das Pipeline -Dashboard anzuzeigen.
class DataPreprocessor: def __init__(self, data: pd.DataFrame): self.data = data logging.info("DataPreprocessor initialized with data of shape: %s", data.shape) def clean_data(self) -> pd.DataFrame: """ Performs data cleaning by removing unnecessary columns, dropping columns with missing values, and returning the cleaned DataFrame. Returns: pd.DataFrame: The cleaned DataFrame with unnecessary and missing-value columns removed. """ logging.info("Starting data cleaning process.") # Drop unnecessary columns, including '_id' if it exists columns_to_drop = [ 'UNIT', 'TYPE', 'MARKET', 'INSTRUMENT', 'FIRST_MESSAGE_TIMESTAMP', 'LAST_MESSAGE_TIMESTAMP', 'FIRST_MESSAGE_VALUE', 'HIGH_MESSAGE_VALUE', 'HIGH_MESSAGE_TIMESTAMP', 'LOW_MESSAGE_VALUE', 'LOW_MESSAGE_TIMESTAMP', 'LAST_MESSAGE_VALUE', 'TOTAL_INDEX_UPDATES', 'VOLUME_TOP_TIER', 'QUOTE_VOLUME_TOP_TIER', 'VOLUME_DIRECT', 'QUOTE_VOLUME_DIRECT', 'VOLUME_TOP_TIER_DIRECT', 'QUOTE_VOLUME_TOP_TIER_DIRECT', '_id' # Adding '_id' to the list ] logging.info("Dropping columns: %s") self.data = self.drop_columns(self.data, columns_to_drop) # Drop columns where the number of missing values is greater than 0 logging.info("Dropping columns with missing values.") self.data = self.drop_columns_with_missing_values(self.data) logging.info("Data cleaning completed. Data shape after cleaning: %s", self.data.shape) return self.data def drop_columns(self, data: pd.DataFrame, columns: list) -> pd.DataFrame: """ Drops specified columns from the DataFrame. Returns: pd.DataFrame: The DataFrame with the specified columns removed. """ logging.info("Dropping columns: %s", columns) return data.drop(columns=columns, errors='ignore') def drop_columns_with_missing_values(self, data: pd.DataFrame) -> pd.DataFrame: """ Drops columns with any missing values from the DataFrame. Parameters: data: pd.DataFrame The DataFrame from which columns with missing values will be removed. Returns: pd.DataFrame: The DataFrame with columns containing missing values removed. """ missing_columns = data.columns[data.isnull().sum() > 0] if not missing_columns.empty: logging.info("Columns with missing values: %s", missing_columns.tolist()) else: logging.info("No columns with missing values found.") return data.loc[:, data.isnull().sum() == 0]
Nach dem Ausführen des obigen Befehls wird die Tracking -Dashboard -URL zurückgegeben, die so aussieht.
Die Trainingspipeline sieht im Dashboard wie unten angegeben aus:
Schritt 10: Modellbereitstellung
Bis jetzt haben wir das Modell und die Pipelines gebaut. Lassen Sie uns nun die Pipeline in die Produktion bringen, bei der Benutzer Vorhersagen treffen können.
Pipeline für kontinuierliche Bereitstellung
import joblib import pandas as pd from abc import ABC, abstractmethod from sklearn.preprocessing import MinMaxScaler # Abstract class for Feature Engineering strategy class FeatureEngineeringStrategy(ABC): @abstractmethod def generate_features(self, df: pd.DataFrame) -> pd.DataFrame: pass # Concrete class for calculating SMA, EMA, RSI, and other features class TechnicalIndicators(FeatureEngineeringStrategy): def generate_features(self, df: pd.DataFrame) -> pd.DataFrame: # Calculate SMA, EMA, and RSI df['SMA_20'] = df['CLOSE'].rolling(window=20).mean() df['SMA_50'] = df['CLOSE'].rolling(window=50).mean() df['EMA_20'] = df['CLOSE'].ewm(span=20, adjust=False).mean() # Price difference features df['OPEN_CLOSE_diff'] = df['OPEN'] - df['CLOSE'] df['HIGH_LOW_diff'] = df['HIGH'] - df['LOW'] df['HIGH_OPEN_diff'] = df['HIGH'] - df['OPEN'] df['CLOSE_LOW_diff'] = df['CLOSE'] - df['LOW'] # Lagged features df['OPEN_lag1'] = df['OPEN'].shift(1) df['CLOSE_lag1'] = df['CLOSE'].shift(1) df['HIGH_lag1'] = df['HIGH'].shift(1) df['LOW_lag1'] = df['LOW'].shift(1) # Rolling statistics df['CLOSE_roll_mean_14'] = df['CLOSE'].rolling(window=14).mean() df['CLOSE_roll_std_14'] = df['CLOSE'].rolling(window=14).std() # Drop rows with missing values (due to rolling windows, shifts) df.dropna(inplace=True) return df # Abstract class for Scaling strategy class ScalingStrategy(ABC): @abstractmethod def scale(self, df: pd.DataFrame, features: list, target: str): pass # Concrete class for MinMax Scaling class MinMaxScaling(ScalingStrategy): def scale(self, df: pd.DataFrame, features: list, target: str): """ Scales the features and target using MinMaxScaler. Parameters: df: pd.DataFrame The DataFrame containing the features and target. features: list List of feature column names. target: str The target column name. Returns: pd.DataFrame, pd.DataFrame: Scaled features and target """ scaler_X = MinMaxScaler(feature_range=(0, 1)) scaler_y = MinMaxScaler(feature_range=(0, 1)) X_scaled = scaler_X.fit_transform(df[features].values) y_scaled = scaler_y.fit_transform(df[[target]].values) joblib.dump(scaler_X, 'saved_scalers/scaler_X.pkl') joblib.dump(scaler_y, 'saved_scalers/scaler_y.pkl') return X_scaled, y_scaled, scaler_y # FeatureEngineeringContext: This will use the Strategy Pattern class FeatureEngineering: def __init__(self, feature_strategy: FeatureEngineeringStrategy, scaling_strategy: ScalingStrategy): self.feature_strategy = feature_strategy self.scaling_strategy = scaling_strategy def process_features(self, df: pd.DataFrame, features: list, target: str): # Generate features using the provided strategy df_with_features = self.feature_strategy.generate_features(df) # Scale features and target using the provided strategy X_scaled, y_scaled, scaler_y = self.scaling_strategy.scale(df_with_features, features, target) return df_with_features, X_scaled, y_scaled, scaler_y
Diese Pipeline ist für kontinuierlich bereitgestellte geschulte Modelle verantwortlich. Es wird zunächst die ml_pipeline () aus der Datei tracy_pipeline.py zum Training des Modells ausgeführt und verwendet dann das MLFlow -Modell -Deployments das geschulte Modell mit dem continuous_deployment_pipeline () .
. . . .Inferenzpipeline
Wir verwenden eine Inferenzpipeline, um Vorhersagen für die neuen Daten mit dem bereitgestellten Modell zu treffen. Schauen wir uns an, wie wir diese Pipeline in unserem Projekt implementiert haben.
import requests import pandas as pd from dotenv import load_dotenv import os # Load the .env file load_dotenv() def fetch_crypto_data(api_uri): response = requests.get( api_uri, params={ "market": "cadli", "instrument": "BTC-USD", "limit": 5000, "aggregate": 1, "fill": "true", "apply_mapping": "true", "response_format": "JSON" }, headers={"Content-type": "application/json; charset=UTF-8"} ) if response.status_code == 200: print('API Connection Successful! \nFetching the data...') data = response.json() data_list = data.get('Data', []) df = pd.DataFrame(data_list) df['DATE'] = pd.to_datetime(df['TIMESTAMP'], unit='s') return df # Return the DataFrame else: raise Exception(f"API Error: {response.status_code} - {response.text}")
Sehen Sie uns über jede der in der folgenden Inferenzpipeline genannten Funktionen:
dynamic_importer ()
Diese Funktion lädt die neuen Daten, führt die Datenverarbeitung durch und gibt die Daten zurück.
import os from pymongo import MongoClient from dotenv import load_dotenv from data.management.api import fetch_crypto_data # Import the API function import pandas as pd load_dotenv() MONGO_URI = os.getenv("MONGO_URI") API_URI = os.getenv("API_URI") client = MongoClient(MONGO_URI, ssl=True, ssl_certfile=None, ssl_ca_certs=None) db = client['crypto_data'] collection = db['historical_data'] try: latest_entry = collection.find_one(sort=[("DATE", -1)]) # Find the latest date if latest_entry: last_date = pd.to_datetime(latest_entry['DATE']).strftime('%Y-%m-%d') else: last_date = '2011-03-27' # Default start date if MongoDB is empty print(f"Fetching data starting from {last_date}...") new_data_df = fetch_crypto_data(API_URI) if latest_entry: new_data_df = new_data_df[new_data_df['DATE'] > last_date] if not new_data_df.empty: data_to_insert = new_data_df.to_dict(orient='records') result = collection.insert_many(data_to_insert) print(f"Inserted {len(result.inserted_ids)} new records into MongoDB.") else: print("No new data to insert.") except Exception as e: print(f"An error occurred: {e}")
prediction_service_loader ()
Diese Funktion ist mit @step dekoriert. Wir laden den Bereitstellungsdienst W.R.T Das bereitgestellte Modell basierend auf dem Pipeline_Name und Step_Name, in dem unser bereitgestelltes Modell bereit ist, Vorhersageabfragen für die neuen Daten zu verarbeiten.
Die Zeile vorhanden_services = mlflow_model_deployer_component.find_model_server () sucht nach einem verfügbaren Bereitstellungsdienst basierend auf den angegebenen Parametern wie Pipeline -Namen und Pipeline -Schrittname. Wenn keine Dienste verfügbar sind, zeigt dies an, dass die Einsatzpipeline entweder nicht durchgeführt oder auf ein Problem mit der Bereitstellungspipeline gestoßen wurde, sodass sie einen RunTimeError auswirkt.
#create a virtual environment python3 -m venv venv #Activate your virtual environmnent in your project folder source venv/bin/activate
predictor ()
Die Funktion übernimmt das MLFlow-abgelagerte Modell durch den MLFlowDeploymentService und die neuen Daten. Die Daten werden weiter verarbeitet, um das erwartete Format des Modells zu entsprechen, um Echtzeit-Schlussfolgerungen zu ziehen.
#Install zenml pip install zenml #To Launch zenml server and dashboard locally pip install "zenml[server]" #To check the zenml Version: zenml version #To initiate a new repository zenml init #To run the dashboard locally: zenml login --local #To know the status of our zenml Pipelines zenml show #To shutdown the zenml server zenml clean
Um die kontinuierliche Bereitstellung und Inferenzpipeline zu visualisieren, müssen wir das Skript run_deployment.py ausführen, in dem die Konfigurationen für Bereitstellungs- und Vorhersagekonfigurationen definiert werden. (Bitte überprüfen Sie den Code run_deployment.py im unten angegebenen GitHub).
#Integrating mlflow with ZenML zenml integration install mlflow -y #Register the experiment tracker zenml experiment-tracker register mlflow_tracker --flavor=mlflow #Registering the model deployer zenml model-deployer register mlflow --flavor=mlflow #Registering the stack zenml stack register local-mlflow-stack-new -a default -o default -d mlflow -e mlflow_tracker --set #To view the stack list zenml stack --list
Lassen Sie uns nun die Datei run_deployment.py ausführen, um das Dashboard der Pipeline für kontinuierliche Bereitstellung und Inferenzpipeline zu sehen.
bitcoin_price_prediction_mlops/ # Project directory ├── data/ │ └── management/ │ ├── api_to_mongodb.py # Code to fetch data and save it to MongoDB │ └── api.py # API-related utility functions │ ├── pipelines/ │ ├── deployment_pipeline.py # Deployment pipeline │ └── training_pipeline.py # Training pipeline │ ├── saved_models/ # Directory for storing trained models ├── saved_scalers/ # Directory for storing scalers used in data preprocessing │ ├── src/ # Source code │ ├── data_cleaning.py # Data cleaning and preprocessing │ ├── data_ingestion.py # Data ingestion │ ├── data_splitter.py # Data splitting │ ├── feature_engineering.py # Feature engineering │ ├── model_evaluation.py # Model evaluation │ └── model_training.py # Model training │ ├── steps/ # ZenML steps │ ├── clean_data.py # ZenML step for cleaning data │ ├── data_splitter.py # ZenML step for data splitting │ ├── dynamic_importer.py # ZenML step for importing dynamic data │ ├── feature_engineering.py # ZenML step for feature engineering │ ├── ingest_data.py # ZenML step for data ingestion │ ├── model_evaluation.py # ZenML step for model evaluation │ ├── model_training.py # ZenML step for training the model │ ├── prediction_service_loader.py # ZenML step for loading prediction services │ ├── predictor.py # ZenML step for prediction │ └── utils.py # Utility functions for steps │ ├── .env # Environment variables file ├── .gitignore # Git ignore file │ ├── app.py # Streamlit user interface app │ ├── README.md # Project documentation ├── requirements.txt # List of required packages ├── run_deployment.py # Code for running deployment and prediction pipeline ├── run_pipeline.py # Code for running training pipeline └── .zen/ # ZenML directory (created automatically after ZenML initialization)
kontinuierliche Bereitstellungspipeline - Ausgabe
Inferenzpipeline - Ausgabe
Nach dem Ausführen der Datei run_deployment.py können Sie den Link Mlflow Dashboard sehen, der so aussieht.
import os import logging from pymongo import MongoClient from dotenv import load_dotenv from zenml import step import pandas as pd # Load the .env file load_dotenv() # Get MongoDB URI from environment variables MONGO_URI = os.getenv("MONGO_URI") def fetch_data_from_mongodb(collection_name:str, database_name:str): """ Fetches data from MongoDB and converts it into a pandas DataFrame. collection_name: Name of the MongoDB collection to fetch data. database_name: Name of the MongoDB database. return: A pandas DataFrame containing the data """ # Connect to the MongoDB client client = MongoClient(MONGO_URI) db = client[database_name] # Select the database collection = db[collection_name] # Select the collection # Fetch all documents from the collection try: logging.info(f"Fetching data from MongoDB collection: {collection_name}...") data = list(collection.find()) # Convert cursor to a list of dictionaries if not data: logging.info("No data found in the MongoDB collection.") # Convert the list of dictionaries into a pandas DataFrame df = pd.DataFrame(data) # Drop the MongoDB ObjectId field if it exists (optional) if '_id' in df.columns: df = df.drop(columns=['_id']) logging.info("Data successfully fetched and converted to a DataFrame!") return df except Exception as e: logging.error(f"An error occurred while fetching data: {e}") raise e @step(enable_cache=False) def ingest_data(collection_name: str = "historical_data", database_name: str = "crypto_data") -> pd.DataFrame: logging.info("Started data ingestion process from MongoDB.") try: # Use the fetch_data_from_mongodb function to fetch data df = fetch_data_from_mongodb(collection_name=collection_name, database_name=database_name) if df.empty: logging.warning("No data was loaded. Check the collection name or the database content.") else: logging.info(f"Data ingestion completed. Number of records loaded: {len(df)}.") return df except Exception as e: logging.error(f"Error while reading data from {collection_name} in {database_name}: {e}") raise e
Jetzt müssen Sie den obigen MLFlow UI -Link in Ihre Befehlszeile kopieren und einfügen und ausführen.
Hier ist das MLFlow -Dashboard, in dem Sie die Bewertungsmetriken und Modellparameter sehen können:
Schritt 11: Erstellen Sie die Stromlit -App
streamlit ist ein erstaunliches Open-Source-Framework, das zur Erstellung interaktiver UI verwendet wird. Wir können Streamlit verwenden, um Web-Apps schnell zu erstellen, ohne die Backend- oder Frontend-Entwicklung zu kennen. Zunächst müssen wir Stromlit in unserem System installieren.
class DataPreprocessor: def __init__(self, data: pd.DataFrame): self.data = data logging.info("DataPreprocessor initialized with data of shape: %s", data.shape) def clean_data(self) -> pd.DataFrame: """ Performs data cleaning by removing unnecessary columns, dropping columns with missing values, and returning the cleaned DataFrame. Returns: pd.DataFrame: The cleaned DataFrame with unnecessary and missing-value columns removed. """ logging.info("Starting data cleaning process.") # Drop unnecessary columns, including '_id' if it exists columns_to_drop = [ 'UNIT', 'TYPE', 'MARKET', 'INSTRUMENT', 'FIRST_MESSAGE_TIMESTAMP', 'LAST_MESSAGE_TIMESTAMP', 'FIRST_MESSAGE_VALUE', 'HIGH_MESSAGE_VALUE', 'HIGH_MESSAGE_TIMESTAMP', 'LOW_MESSAGE_VALUE', 'LOW_MESSAGE_TIMESTAMP', 'LAST_MESSAGE_VALUE', 'TOTAL_INDEX_UPDATES', 'VOLUME_TOP_TIER', 'QUOTE_VOLUME_TOP_TIER', 'VOLUME_DIRECT', 'QUOTE_VOLUME_DIRECT', 'VOLUME_TOP_TIER_DIRECT', 'QUOTE_VOLUME_TOP_TIER_DIRECT', '_id' # Adding '_id' to the list ] logging.info("Dropping columns: %s") self.data = self.drop_columns(self.data, columns_to_drop) # Drop columns where the number of missing values is greater than 0 logging.info("Dropping columns with missing values.") self.data = self.drop_columns_with_missing_values(self.data) logging.info("Data cleaning completed. Data shape after cleaning: %s", self.data.shape) return self.data def drop_columns(self, data: pd.DataFrame, columns: list) -> pd.DataFrame: """ Drops specified columns from the DataFrame. Returns: pd.DataFrame: The DataFrame with the specified columns removed. """ logging.info("Dropping columns: %s", columns) return data.drop(columns=columns, errors='ignore') def drop_columns_with_missing_values(self, data: pd.DataFrame) -> pd.DataFrame: """ Drops columns with any missing values from the DataFrame. Parameters: data: pd.DataFrame The DataFrame from which columns with missing values will be removed. Returns: pd.DataFrame: The DataFrame with columns containing missing values removed. """ missing_columns = data.columns[data.isnull().sum() > 0] if not missing_columns.empty: logging.info("Columns with missing values: %s", missing_columns.tolist()) else: logging.info("No columns with missing values found.") return data.loc[:, data.isnull().sum() == 0]
Sie können den Code auf GitHub für die Streamlit -App finden.
Hier ist der GitHub -Code und die Videoerklärung des Projekts für Ihr besseres Verständnis.
Schlussfolgerung
In diesem Artikel haben wir erfolgreich ein End-to-End-Mlops-Projekt mit Bitcoin-Preisvorhersage vorgestellt. Aus dem Erwerb von Daten über eine API und die Vorverarbeitung bis hin zur Modellierung von Schulungen, Bewertung und Bereitstellung unterstreicht unser Projekt die entscheidende Rolle von MLOPS bei der Verbindung von Entwicklung mit Produktion. Wir sind uns einem Schritt näher, um die Zukunft der Vorhersage von Bitcoin -Preisen in Echtzeit zu prägen. APIs bieten einen reibungslosen Zugriff auf externe Daten wie Bitcoin-Preisdaten aus der CCDATA-API, wodurch die Notwendigkeit eines bereits bestehenden Datensatzes erforderlich ist.
Key Takeaways
- APIs ermöglichen einen nahtlosen Zugriff auf externe Daten wie Bitcoin-Preisdaten aus der CCDATA-API, wodurch die Notwendigkeit eines bereits bestehenden Datensatzes erforderlich ist.
- Zenml und MLFlow sind robuste Tools, die die Entwicklung, Verfolgung und Bereitstellung von maschinellen Lernmodellen in realen Anwendungen erleichtern.
- Wir haben Best Practices befolgt, indem wir die Aufnahme, Reinigung, Merkmalen von Daten, Modelltraining und Bewertung ordnungsgemäß durchführen.
- kontinuierliche Einsatz- und Inferenzpipelines sind wichtig, um sicherzustellen, dass Modelle in Produktionsumgebungen effizient bleiben und verfügbar sind.
häufig gestellte Fragen
Q1. Ist Zenml frei zu verwenden?a. Ja, Zenml ist ein vollständig offenes MLOPS-Framework, das den Übergang von der lokalen Entwicklung zu Produktionspipelines so einfach wie 1 Codezeile macht.
Q2. Wofür wird MLFlow verwendet?a. MLFLOW erleichtert die Entwicklung des maschinellen Lernens, indem sie Tools zum Verfolgen von Experimenten, Versionungsmodellen und Bereitstellungen anbieten.
Q3. Wie kann der Server -Daemon nicht ausgeführt werden.a. Dies ist ein häufiger Fehler, mit dem Sie im Projekt konfrontiert sind. Führen Sie einfach "Zenml Logout" -Local`, dann "Zenml Clean" und dann "Zenml Login –Local" aus. Führen Sie die Pipeline erneut aus. Es wird gelöst.
Das obige ist der detaillierte Inhalt vonBitcoin -Preisvorhersage unter Verwendung von Mlops. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Obwohl es nicht die menschliche Verbindung und Intuition eines ausgebildeten Therapeuten herstellen kann, hat die Forschung gezeigt, dass viele Menschen sich wohl fühlen, wenn sie ihre Sorgen und Bedenken mit relativ gesichtslosen und anonymen AI -Bots teilen. Ob dies immer ein gutes Ich ist

Künstliche Intelligenz (KI), eine Technologie -Jahrzehnte in der Herstellung, revolutioniert die Lebensmitteleinzelhandel. Von groß angelegten Effizienzgewinnen und Kostensenkungen bis hin zu optimierten Prozessen über verschiedene Geschäftsfunktionen hinweg sind die Auswirkungen von AI unzählig

Reden wir darüber. Diese Analyse eines innovativen KI -Durchbruchs ist Teil meiner laufenden Forbes -Säulenberichterstattung über die neueste in der KI, einschließlich der Identifizierung und Erklärung verschiedener wirksamer KI -Komplexitäten (siehe Link hier). Außerdem für meinen Comp comp

Die Aufrechterhaltung eines professionellen Images erfordert gelegentliche Kleiderschrank -Updates. Während Online-Shopping bequem ist, fehlt es die Gewissheit von persönlichen Try-Ons. Meine Lösung? KI-betriebene Personalisierung. Ich stelle mir einen KI -Assistenten vor

Google Translate fügt die Funktion des Sprachlernens hinzu Laut Android Authority hat App Expert AssembleDeBug festgestellt, dass die neueste Version der Google Translate App eine neue "Praxis" -Modus des Testcode enthält, mit denen Benutzer ihre Sprachkenntnisse durch personalisierte Aktivitäten verbessern können. Diese Funktion ist derzeit für Benutzer unsichtbar, aber AssembleDeBug kann sie teilweise aktivieren und einige seiner neuen Elemente der Benutzeroberfläche anzeigen. Bei der Aktivierung fügt die Funktion am unteren Rand des Bildschirms ein neues Abschlusskapellymbol hinzu, das mit einem "Beta" -Anzeichen markiert wird, das anfällt, dass die Funktion "Praxis" anfänglich in experimenteller Form veröffentlicht wird. Die zugehörige Popup-Eingabeaufforderung zeigt "Üben Sie die für Sie zugeschnittenen Aktivitäten!", Dies bedeutet, dass Google individuell generiert wird

MIT -Forscher entwickeln Nanda, ein bahnbrechendes Webprotokoll für KI -Agenten. Nanda, kurz für vernetzte Agenten und dezentrale KI

METAs neuestes Unternehmen: Eine KI -App zum Konkurrenz von Chatgpt Meta, die Muttergesellschaft von Facebook, Instagram, WhatsApp und Threads, startet eine neue AI-betriebene Anwendung. Diese eigenständige App, Meta AI, zielt darauf ab, direkt mit Openai's Chatgpt zu konkurrieren. Hebel

Navigation der steigenden Flut von AI -Cyber -Angriffen In jüngster Zeit unterstrich Jason Clinton, Ciso für anthropische, die aufkommenden Risiken, die mit nichtmenschlichen Identitäten gebunden sind-als Kommunikation mit Maschine zu Maschinen, die diese "Identitäten" schützen, werden werden


Heiße KI -Werkzeuge

Undresser.AI Undress
KI-gestützte App zum Erstellen realistischer Aktfotos

AI Clothes Remover
Online-KI-Tool zum Entfernen von Kleidung aus Fotos.

Undress AI Tool
Ausziehbilder kostenlos

Clothoff.io
KI-Kleiderentferner

Video Face Swap
Tauschen Sie Gesichter in jedem Video mühelos mit unserem völlig kostenlosen KI-Gesichtstausch-Tool aus!

Heißer Artikel

Heiße Werkzeuge

ZendStudio 13.5.1 Mac
Leistungsstarke integrierte PHP-Entwicklungsumgebung

mPDF
mPDF ist eine PHP-Bibliothek, die PDF-Dateien aus UTF-8-codiertem HTML generieren kann. Der ursprüngliche Autor, Ian Back, hat mPDF geschrieben, um PDF-Dateien „on the fly“ von seiner Website auszugeben und verschiedene Sprachen zu verarbeiten. Es ist langsamer und erzeugt bei der Verwendung von Unicode-Schriftarten größere Dateien als Originalskripte wie HTML2FPDF, unterstützt aber CSS-Stile usw. und verfügt über viele Verbesserungen. Unterstützt fast alle Sprachen, einschließlich RTL (Arabisch und Hebräisch) und CJK (Chinesisch, Japanisch und Koreanisch). Unterstützt verschachtelte Elemente auf Blockebene (wie P, DIV),

SecLists
SecLists ist der ultimative Begleiter für Sicherheitstester. Dabei handelt es sich um eine Sammlung verschiedener Arten von Listen, die häufig bei Sicherheitsbewertungen verwendet werden, an einem Ort. SecLists trägt dazu bei, Sicherheitstests effizienter und produktiver zu gestalten, indem es bequem alle Listen bereitstellt, die ein Sicherheitstester benötigen könnte. Zu den Listentypen gehören Benutzernamen, Passwörter, URLs, Fuzzing-Payloads, Muster für vertrauliche Daten, Web-Shells und mehr. Der Tester kann dieses Repository einfach auf einen neuen Testcomputer übertragen und hat dann Zugriff auf alle Arten von Listen, die er benötigt.

SAP NetWeaver Server-Adapter für Eclipse
Integrieren Sie Eclipse mit dem SAP NetWeaver-Anwendungsserver.

MinGW – Minimalistisches GNU für Windows
Dieses Projekt wird derzeit auf osdn.net/projects/mingw migriert. Sie können uns dort weiterhin folgen. MinGW: Eine native Windows-Portierung der GNU Compiler Collection (GCC), frei verteilbare Importbibliotheken und Header-Dateien zum Erstellen nativer Windows-Anwendungen, einschließlich Erweiterungen der MSVC-Laufzeit zur Unterstützung der C99-Funktionalität. Die gesamte MinGW-Software kann auf 64-Bit-Windows-Plattformen ausgeführt werden.
