Heim >Technologie-Peripheriegeräte >KI >Bitcoin -Preisvorhersage unter Verwendung von Mlops
wissen nicht viel über Bitcoin oder seine Preisschwankungen, möchten aber Investitionsentscheidungen treffen, um Gewinne zu erzielen? Dieses maschinelle Lernmodell hat Ihren Rücken. Es kann die Preise viel besser vorhersagen als ein Astrologe. In diesem Artikel werden wir ein ML -Modell für die Prognose und Vorhersage von Bitcoin -Preis unter Verwendung von ZENML und MLFLOW erstellen. Beginnen wir also mit unserer Reise, um zu verstehen, wie jemand ML- und MLOPS -Tools verwenden kann, um die Zukunft vorherzusagen.
Dieser Artikel wurde als Teil des Data Science -Blogathon veröffentlicht.
Bitcoin -Preise sind sehr volatil, und Vorhersagen ist so gut wie unmöglich. In unserem Projekt verwenden wir den Best Practicesto vonMlops ein LSTM -Modell, um Bitcoinprices und Trends zu prognostizieren.
Bevor Sie das Projekt implementieren, schauen wir uns die Projektarchitektur an.
Beginnen wir mit dem Zugriff auf die API.
Warum machen wir das? Sie können historische Bitcoin -Preisdaten aus verschiedenen Datensätzen erhalten, aber mit einer API können wir Zugriff auf Live -Marktdaten haben.
import requests import pandas as pd from dotenv import load_dotenv import os # Load the .env file load_dotenv() def fetch_crypto_data(api_uri): response = requests.get( api_uri, params={ "market": "cadli", "instrument": "BTC-USD", "limit": 5000, "aggregate": 1, "fill": "true", "apply_mapping": "true", "response_format": "JSON" }, headers={"Content-type": "application/json; charset=UTF-8"} ) if response.status_code == 200: print('API Connection Successful! \nFetching the data...') data = response.json() data_list = data.get('Data', []) df = pd.DataFrame(data_list) df['DATE'] = pd.to_datetime(df['TIMESTAMP'], unit='s') return df # Return the DataFrame else: raise Exception(f"API Error: {response.status_code} - {response.text}")herstellen
import requests import pandas as pd from dotenv import load_dotenv import os # Load the .env file load_dotenv() def fetch_crypto_data(api_uri): response = requests.get( api_uri, params={ "market": "cadli", "instrument": "BTC-USD", "limit": 5000, "aggregate": 1, "fill": "true", "apply_mapping": "true", "response_format": "JSON" }, headers={"Content-type": "application/json; charset=UTF-8"} ) if response.status_code == 200: print('API Connection Successful! \nFetching the data...') data = response.json() data_list = data.get('Data', []) df = pd.DataFrame(data_list) df['DATE'] = pd.to_datetime(df['TIMESTAMP'], unit='s') return df # Return the DataFrame else: raise Exception(f"API Error: {response.status_code} - {response.text}")
Dieser Code stellt eine Verbindung zu MongoDB her, holt Bitcoin -Preisdaten über eine API und aktualisiert die Datenbank mit allen neuen Einträgen nach dem neuesten protokollierten Datum.
zenmlis Eine Open-Source-Plattform, die auf maschinelles Lernen zugeschnitten ist und die Erstellung flexibler und produktionsbereiteter Pipelines unterstützt. Darüber hinaus integriert sich ZENML in Multiple maschinelle Lernwerkzeuge Gleiche, Bentoml usw., um nahtlose ML -Pipelines zu erstellen.
⚠️ Wenn Sie ein Windows -Benutzer sind, versuchen Sie, WSL in Ihrem System zu installieren. Zenml unterstützt keine Windows.
In diesem Projekt werden wir eine traditionelle Pipeline implementieren, die ZenML verwendet, und wir werden MLFlow in ZENML für die Experimentverfolgung integrieren.
Voraussetzungen und grundlegende ZENML-Befehle
import os from pymongo import MongoClient from dotenv import load_dotenv from data.management.api import fetch_crypto_data # Import the API function import pandas as pd load_dotenv() MONGO_URI = os.getenv("MONGO_URI") API_URI = os.getenv("API_URI") client = MongoClient(MONGO_URI, ssl=True, ssl_certfile=None, ssl_ca_certs=None) db = client['crypto_data'] collection = db['historical_data'] try: latest_entry = collection.find_one(sort=[("DATE", -1)]) # Find the latest date if latest_entry: last_date = pd.to_datetime(latest_entry['DATE']).strftime('%Y-%m-%d') else: last_date = '2011-03-27' # Default start date if MongoDB is empty print(f"Fetching data starting from {last_date}...") new_data_df = fetch_crypto_data(API_URI) if latest_entry: new_data_df = new_data_df[new_data_df['DATE'] > last_date] if not new_data_df.empty: data_to_insert = new_data_df.to_dict(orient='records') result = collection.insert_many(data_to_insert) print(f"Inserted {len(result.inserted_ids)} new records into MongoDB.") else: print("No new data to insert.") except Exception as e: print(f"An error occurred: {e}")
Alle Kern -ZenML -Befehle zusammen mit ihren Funktionen finden Sie unten:
#create a virtual environment python3 -m venv venv #Activate your virtual environmnent in your project folder source venv/bin/activate
Wir verwenden MLFlow für die Experimentverfolgung, um unser Modell, Artefakte, Metriken und Hyperparameterwerte zu verfolgen. Wir registrieren MLFlow für Experimentverfolgung und Model -Deploymer hier:
#Install zenml pip install zenml #To Launch zenml server and dashboard locally pip install "zenml[server]" #To check the zenml Version: zenml version #To initiate a new repository zenml init #To run the dashboard locally: zenml login --local #To know the status of our zenml Pipelines zenml show #To shutdown the zenml server zenml clean
Hier sehen Sie das Layout des Projekts. Lassen Sie uns nun nacheinander ausführlich besprechen.
#Integrating mlflow with ZenML zenml integration install mlflow -y #Register the experiment tracker zenml experiment-tracker register mlflow_tracker --flavor=mlflow #Registering the model deployer zenml model-deployer register mlflow --flavor=mlflow #Registering the stack zenml stack register local-mlflow-stack-new -a default -o default -d mlflow -e mlflow_tracker --set #To view the stack list zenml stack --list
wir nehmen zuerst Daten von API zu MongoDB auf und konvertieren sie in Pandas DataFrame.
bitcoin_price_prediction_mlops/ # Project directory ├── data/ │ └── management/ │ ├── api_to_mongodb.py # Code to fetch data and save it to MongoDB │ └── api.py # API-related utility functions │ ├── pipelines/ │ ├── deployment_pipeline.py # Deployment pipeline │ └── training_pipeline.py # Training pipeline │ ├── saved_models/ # Directory for storing trained models ├── saved_scalers/ # Directory for storing scalers used in data preprocessing │ ├── src/ # Source code │ ├── data_cleaning.py # Data cleaning and preprocessing │ ├── data_ingestion.py # Data ingestion │ ├── data_splitter.py # Data splitting │ ├── feature_engineering.py # Feature engineering │ ├── model_evaluation.py # Model evaluation │ └── model_training.py # Model training │ ├── steps/ # ZenML steps │ ├── clean_data.py # ZenML step for cleaning data │ ├── data_splitter.py # ZenML step for data splitting │ ├── dynamic_importer.py # ZenML step for importing dynamic data │ ├── feature_engineering.py # ZenML step for feature engineering │ ├── ingest_data.py # ZenML step for data ingestion │ ├── model_evaluation.py # ZenML step for model evaluation │ ├── model_training.py # ZenML step for training the model │ ├── prediction_service_loader.py # ZenML step for loading prediction services │ ├── predictor.py # ZenML step for prediction │ └── utils.py # Utility functions for steps │ ├── .env # Environment variables file ├── .gitignore # Git ignore file │ ├── app.py # Streamlit user interface app │ ├── README.md # Project documentation ├── requirements.txt # List of required packages ├── run_deployment.py # Code for running deployment and prediction pipeline ├── run_pipeline.py # Code for running training pipeline └── .zen/ # ZenML directory (created automatically after ZenML initialization)
Wir fügen @step als Dekorateur zur incest_data () hinzu, um es als Schritt unserer Trainingspipeline zu deklarieren. Auf die gleiche Weise schreiben wir Code für jeden Schritt in der Projektarchitektur und erstellen die Pipeline.
Um anzuzeigen, wie ich den @step Dekorator verwendet habe, lesen Sie den GitHub -Link unten (Schritteordner), um den Code für andere Schritte der Pipeline durchzuführen, d. H. Datenreinigung, Feature Engineering, Datenaufteilung, Modelltraining und Modellbewertung.
In diesem Schritt werden wir verschiedene Strategien zur Reinigung der aufgenommenen Daten erstellen. Wir werden die unerwünschten Spalten und fehlenden Werte in den Daten fallen lassen.
import requests import pandas as pd from dotenv import load_dotenv import os # Load the .env file load_dotenv() def fetch_crypto_data(api_uri): response = requests.get( api_uri, params={ "market": "cadli", "instrument": "BTC-USD", "limit": 5000, "aggregate": 1, "fill": "true", "apply_mapping": "true", "response_format": "JSON" }, headers={"Content-type": "application/json; charset=UTF-8"} ) if response.status_code == 200: print('API Connection Successful! \nFetching the data...') data = response.json() data_list = data.get('Data', []) df = pd.DataFrame(data_list) df['DATE'] = pd.to_datetime(df['TIMESTAMP'], unit='s') return df # Return the DataFrame else: raise Exception(f"API Error: {response.status_code} - {response.text}")
Dieser Schritt enthält die gereinigten Daten aus dem früheren Datenschritt von Data_Cleaning. Wir erstellen neue Funktionen wie Simple Moving Average (SMA), Exponential Moving Average (EMA) sowie verzögerte und rollende Statistiken, um Trends zu erfassen, Rauschen zu reduzieren und zuverlässigere Vorhersagen aus Zeitreihendaten zu machen. Zusätzlich skalieren wir die Funktionen und Zielvariablen mithilfe von Minmax -Skalierung.
import os from pymongo import MongoClient from dotenv import load_dotenv from data.management.api import fetch_crypto_data # Import the API function import pandas as pd load_dotenv() MONGO_URI = os.getenv("MONGO_URI") API_URI = os.getenv("API_URI") client = MongoClient(MONGO_URI, ssl=True, ssl_certfile=None, ssl_ca_certs=None) db = client['crypto_data'] collection = db['historical_data'] try: latest_entry = collection.find_one(sort=[("DATE", -1)]) # Find the latest date if latest_entry: last_date = pd.to_datetime(latest_entry['DATE']).strftime('%Y-%m-%d') else: last_date = '2011-03-27' # Default start date if MongoDB is empty print(f"Fetching data starting from {last_date}...") new_data_df = fetch_crypto_data(API_URI) if latest_entry: new_data_df = new_data_df[new_data_df['DATE'] > last_date] if not new_data_df.empty: data_to_insert = new_data_df.to_dict(orient='records') result = collection.insert_many(data_to_insert) print(f"Inserted {len(result.inserted_ids)} new records into MongoDB.") else: print("No new data to insert.") except Exception as e: print(f"An error occurred: {e}")
Jetzt teilen wir die verarbeiteten Daten in Schulungs- und Testen von Datensätzen im Verhältnis von 80:20 auf.
#create a virtual environment python3 -m venv venv #Activate your virtual environmnent in your project folder source venv/bin/activate
In diesem Schritt trainieren wir das Modell mit frühem Stoppen, um eine Überanpassung zu verhindern, und verwenden die automatisierte Protokollierung von MLFLOW, um unser Modell und unsere Experimente zu verfolgen und das trainierte Modell als lstm_model.keras zu speichern.
#Install zenml pip install zenml #To Launch zenml server and dashboard locally pip install "zenml[server]" #To check the zenml Version: zenml version #To initiate a new repository zenml init #To run the dashboard locally: zenml login --local #To know the status of our zenml Pipelines zenml show #To shutdown the zenml server zenml clean
Da dies ein Regressionsproblem ist, verwenden wir Bewertungsmetriken wie mittlerer quadratischer Fehler (MSE), Stammmittelwert für quadratische Fehler (MSE), mittlerer absoluter Fehler (MAE) und R-Squared.
#Integrating mlflow with ZenML zenml integration install mlflow -y #Register the experiment tracker zenml experiment-tracker register mlflow_tracker --flavor=mlflow #Registering the model deployer zenml model-deployer register mlflow --flavor=mlflow #Registering the stack zenml stack register local-mlflow-stack-new -a default -o default -d mlflow -e mlflow_tracker --set #To view the stack list zenml stack --list
Jetzt werden wir alle oben genannten Schritte in eine Pipeline organisieren. Erstellen wir eine neue Dateitraining_Pipeline.py.
bitcoin_price_prediction_mlops/ # Project directory ├── data/ │ └── management/ │ ├── api_to_mongodb.py # Code to fetch data and save it to MongoDB │ └── api.py # API-related utility functions │ ├── pipelines/ │ ├── deployment_pipeline.py # Deployment pipeline │ └── training_pipeline.py # Training pipeline │ ├── saved_models/ # Directory for storing trained models ├── saved_scalers/ # Directory for storing scalers used in data preprocessing │ ├── src/ # Source code │ ├── data_cleaning.py # Data cleaning and preprocessing │ ├── data_ingestion.py # Data ingestion │ ├── data_splitter.py # Data splitting │ ├── feature_engineering.py # Feature engineering │ ├── model_evaluation.py # Model evaluation │ └── model_training.py # Model training │ ├── steps/ # ZenML steps │ ├── clean_data.py # ZenML step for cleaning data │ ├── data_splitter.py # ZenML step for data splitting │ ├── dynamic_importer.py # ZenML step for importing dynamic data │ ├── feature_engineering.py # ZenML step for feature engineering │ ├── ingest_data.py # ZenML step for data ingestion │ ├── model_evaluation.py # ZenML step for model evaluation │ ├── model_training.py # ZenML step for training the model │ ├── prediction_service_loader.py # ZenML step for loading prediction services │ ├── predictor.py # ZenML step for prediction │ └── utils.py # Utility functions for steps │ ├── .env # Environment variables file ├── .gitignore # Git ignore file │ ├── app.py # Streamlit user interface app │ ├── README.md # Project documentation ├── requirements.txt # List of required packages ├── run_deployment.py # Code for running deployment and prediction pipeline ├── run_pipeline.py # Code for running training pipeline └── .zen/ # ZenML directory (created automatically after ZenML initialization)
Hier wird @pipeline Dekorateur verwendet, um die Funktionsml_Pipeline () als Pipeline in Zenml zu definieren.
Um das Dashboard für die Trainingspipeline anzuzeigen, führen Sie einfach das Skript run_pipeline.py aus. Erstellen wir eine run_pipeline.py -Datei.
import os import logging from pymongo import MongoClient from dotenv import load_dotenv from zenml import step import pandas as pd # Load the .env file load_dotenv() # Get MongoDB URI from environment variables MONGO_URI = os.getenv("MONGO_URI") def fetch_data_from_mongodb(collection_name:str, database_name:str): """ Fetches data from MongoDB and converts it into a pandas DataFrame. collection_name: Name of the MongoDB collection to fetch data. database_name: Name of the MongoDB database. return: A pandas DataFrame containing the data """ # Connect to the MongoDB client client = MongoClient(MONGO_URI) db = client[database_name] # Select the database collection = db[collection_name] # Select the collection # Fetch all documents from the collection try: logging.info(f"Fetching data from MongoDB collection: {collection_name}...") data = list(collection.find()) # Convert cursor to a list of dictionaries if not data: logging.info("No data found in the MongoDB collection.") # Convert the list of dictionaries into a pandas DataFrame df = pd.DataFrame(data) # Drop the MongoDB ObjectId field if it exists (optional) if '_id' in df.columns: df = df.drop(columns=['_id']) logging.info("Data successfully fetched and converted to a DataFrame!") return df except Exception as e: logging.error(f"An error occurred while fetching data: {e}") raise e @step(enable_cache=False) def ingest_data(collection_name: str = "historical_data", database_name: str = "crypto_data") -> pd.DataFrame: logging.info("Started data ingestion process from MongoDB.") try: # Use the fetch_data_from_mongodb function to fetch data df = fetch_data_from_mongodb(collection_name=collection_name, database_name=database_name) if df.empty: logging.warning("No data was loaded. Check the collection name or the database content.") else: logging.info(f"Data ingestion completed. Number of records loaded: {len(df)}.") return df except Exception as e: logging.error(f"Error while reading data from {collection_name} in {database_name}: {e}") raise e
Jetzt haben wir das Erstellen der Pipeline abgeschlossen. Führen Sie den Befehl unten aus, um das Pipeline -Dashboard anzuzeigen.
class DataPreprocessor: def __init__(self, data: pd.DataFrame): self.data = data logging.info("DataPreprocessor initialized with data of shape: %s", data.shape) def clean_data(self) -> pd.DataFrame: """ Performs data cleaning by removing unnecessary columns, dropping columns with missing values, and returning the cleaned DataFrame. Returns: pd.DataFrame: The cleaned DataFrame with unnecessary and missing-value columns removed. """ logging.info("Starting data cleaning process.") # Drop unnecessary columns, including '_id' if it exists columns_to_drop = [ 'UNIT', 'TYPE', 'MARKET', 'INSTRUMENT', 'FIRST_MESSAGE_TIMESTAMP', 'LAST_MESSAGE_TIMESTAMP', 'FIRST_MESSAGE_VALUE', 'HIGH_MESSAGE_VALUE', 'HIGH_MESSAGE_TIMESTAMP', 'LOW_MESSAGE_VALUE', 'LOW_MESSAGE_TIMESTAMP', 'LAST_MESSAGE_VALUE', 'TOTAL_INDEX_UPDATES', 'VOLUME_TOP_TIER', 'QUOTE_VOLUME_TOP_TIER', 'VOLUME_DIRECT', 'QUOTE_VOLUME_DIRECT', 'VOLUME_TOP_TIER_DIRECT', 'QUOTE_VOLUME_TOP_TIER_DIRECT', '_id' # Adding '_id' to the list ] logging.info("Dropping columns: %s") self.data = self.drop_columns(self.data, columns_to_drop) # Drop columns where the number of missing values is greater than 0 logging.info("Dropping columns with missing values.") self.data = self.drop_columns_with_missing_values(self.data) logging.info("Data cleaning completed. Data shape after cleaning: %s", self.data.shape) return self.data def drop_columns(self, data: pd.DataFrame, columns: list) -> pd.DataFrame: """ Drops specified columns from the DataFrame. Returns: pd.DataFrame: The DataFrame with the specified columns removed. """ logging.info("Dropping columns: %s", columns) return data.drop(columns=columns, errors='ignore') def drop_columns_with_missing_values(self, data: pd.DataFrame) -> pd.DataFrame: """ Drops columns with any missing values from the DataFrame. Parameters: data: pd.DataFrame The DataFrame from which columns with missing values will be removed. Returns: pd.DataFrame: The DataFrame with columns containing missing values removed. """ missing_columns = data.columns[data.isnull().sum() > 0] if not missing_columns.empty: logging.info("Columns with missing values: %s", missing_columns.tolist()) else: logging.info("No columns with missing values found.") return data.loc[:, data.isnull().sum() == 0]
Nach dem Ausführen des obigen Befehls wird die Tracking -Dashboard -URL zurückgegeben, die so aussieht.
Die Trainingspipeline sieht im Dashboard wie unten angegeben aus:
Bis jetzt haben wir das Modell und die Pipelines gebaut. Lassen Sie uns nun die Pipeline in die Produktion bringen, bei der Benutzer Vorhersagen treffen können.
import joblib import pandas as pd from abc import ABC, abstractmethod from sklearn.preprocessing import MinMaxScaler # Abstract class for Feature Engineering strategy class FeatureEngineeringStrategy(ABC): @abstractmethod def generate_features(self, df: pd.DataFrame) -> pd.DataFrame: pass # Concrete class for calculating SMA, EMA, RSI, and other features class TechnicalIndicators(FeatureEngineeringStrategy): def generate_features(self, df: pd.DataFrame) -> pd.DataFrame: # Calculate SMA, EMA, and RSI df['SMA_20'] = df['CLOSE'].rolling(window=20).mean() df['SMA_50'] = df['CLOSE'].rolling(window=50).mean() df['EMA_20'] = df['CLOSE'].ewm(span=20, adjust=False).mean() # Price difference features df['OPEN_CLOSE_diff'] = df['OPEN'] - df['CLOSE'] df['HIGH_LOW_diff'] = df['HIGH'] - df['LOW'] df['HIGH_OPEN_diff'] = df['HIGH'] - df['OPEN'] df['CLOSE_LOW_diff'] = df['CLOSE'] - df['LOW'] # Lagged features df['OPEN_lag1'] = df['OPEN'].shift(1) df['CLOSE_lag1'] = df['CLOSE'].shift(1) df['HIGH_lag1'] = df['HIGH'].shift(1) df['LOW_lag1'] = df['LOW'].shift(1) # Rolling statistics df['CLOSE_roll_mean_14'] = df['CLOSE'].rolling(window=14).mean() df['CLOSE_roll_std_14'] = df['CLOSE'].rolling(window=14).std() # Drop rows with missing values (due to rolling windows, shifts) df.dropna(inplace=True) return df # Abstract class for Scaling strategy class ScalingStrategy(ABC): @abstractmethod def scale(self, df: pd.DataFrame, features: list, target: str): pass # Concrete class for MinMax Scaling class MinMaxScaling(ScalingStrategy): def scale(self, df: pd.DataFrame, features: list, target: str): """ Scales the features and target using MinMaxScaler. Parameters: df: pd.DataFrame The DataFrame containing the features and target. features: list List of feature column names. target: str The target column name. Returns: pd.DataFrame, pd.DataFrame: Scaled features and target """ scaler_X = MinMaxScaler(feature_range=(0, 1)) scaler_y = MinMaxScaler(feature_range=(0, 1)) X_scaled = scaler_X.fit_transform(df[features].values) y_scaled = scaler_y.fit_transform(df[[target]].values) joblib.dump(scaler_X, 'saved_scalers/scaler_X.pkl') joblib.dump(scaler_y, 'saved_scalers/scaler_y.pkl') return X_scaled, y_scaled, scaler_y # FeatureEngineeringContext: This will use the Strategy Pattern class FeatureEngineering: def __init__(self, feature_strategy: FeatureEngineeringStrategy, scaling_strategy: ScalingStrategy): self.feature_strategy = feature_strategy self.scaling_strategy = scaling_strategy def process_features(self, df: pd.DataFrame, features: list, target: str): # Generate features using the provided strategy df_with_features = self.feature_strategy.generate_features(df) # Scale features and target using the provided strategy X_scaled, y_scaled, scaler_y = self.scaling_strategy.scale(df_with_features, features, target) return df_with_features, X_scaled, y_scaled, scaler_y
Diese Pipeline ist für kontinuierlich bereitgestellte geschulte Modelle verantwortlich. Es wird zunächst die ml_pipeline () aus der Datei tracy_pipeline.py zum Training des Modells ausgeführt und verwendet dann das MLFlow -Modell -Deployments das geschulte Modell mit dem continuous_deployment_pipeline () .
. . . .Wir verwenden eine Inferenzpipeline, um Vorhersagen für die neuen Daten mit dem bereitgestellten Modell zu treffen. Schauen wir uns an, wie wir diese Pipeline in unserem Projekt implementiert haben.
import requests import pandas as pd from dotenv import load_dotenv import os # Load the .env file load_dotenv() def fetch_crypto_data(api_uri): response = requests.get( api_uri, params={ "market": "cadli", "instrument": "BTC-USD", "limit": 5000, "aggregate": 1, "fill": "true", "apply_mapping": "true", "response_format": "JSON" }, headers={"Content-type": "application/json; charset=UTF-8"} ) if response.status_code == 200: print('API Connection Successful! \nFetching the data...') data = response.json() data_list = data.get('Data', []) df = pd.DataFrame(data_list) df['DATE'] = pd.to_datetime(df['TIMESTAMP'], unit='s') return df # Return the DataFrame else: raise Exception(f"API Error: {response.status_code} - {response.text}")
Sehen Sie uns über jede der in der folgenden Inferenzpipeline genannten Funktionen:
dynamic_importer ()
Diese Funktion lädt die neuen Daten, führt die Datenverarbeitung durch und gibt die Daten zurück.
import os from pymongo import MongoClient from dotenv import load_dotenv from data.management.api import fetch_crypto_data # Import the API function import pandas as pd load_dotenv() MONGO_URI = os.getenv("MONGO_URI") API_URI = os.getenv("API_URI") client = MongoClient(MONGO_URI, ssl=True, ssl_certfile=None, ssl_ca_certs=None) db = client['crypto_data'] collection = db['historical_data'] try: latest_entry = collection.find_one(sort=[("DATE", -1)]) # Find the latest date if latest_entry: last_date = pd.to_datetime(latest_entry['DATE']).strftime('%Y-%m-%d') else: last_date = '2011-03-27' # Default start date if MongoDB is empty print(f"Fetching data starting from {last_date}...") new_data_df = fetch_crypto_data(API_URI) if latest_entry: new_data_df = new_data_df[new_data_df['DATE'] > last_date] if not new_data_df.empty: data_to_insert = new_data_df.to_dict(orient='records') result = collection.insert_many(data_to_insert) print(f"Inserted {len(result.inserted_ids)} new records into MongoDB.") else: print("No new data to insert.") except Exception as e: print(f"An error occurred: {e}")
prediction_service_loader ()
Diese Funktion ist mit @step dekoriert. Wir laden den Bereitstellungsdienst W.R.T Das bereitgestellte Modell basierend auf dem Pipeline_Name und Step_Name, in dem unser bereitgestelltes Modell bereit ist, Vorhersageabfragen für die neuen Daten zu verarbeiten.
Die Zeile vorhanden_services = mlflow_model_deployer_component.find_model_server () sucht nach einem verfügbaren Bereitstellungsdienst basierend auf den angegebenen Parametern wie Pipeline -Namen und Pipeline -Schrittname. Wenn keine Dienste verfügbar sind, zeigt dies an, dass die Einsatzpipeline entweder nicht durchgeführt oder auf ein Problem mit der Bereitstellungspipeline gestoßen wurde, sodass sie einen RunTimeError auswirkt.
#create a virtual environment python3 -m venv venv #Activate your virtual environmnent in your project folder source venv/bin/activate
predictor ()
Die Funktion übernimmt das MLFlow-abgelagerte Modell durch den MLFlowDeploymentService und die neuen Daten. Die Daten werden weiter verarbeitet, um das erwartete Format des Modells zu entsprechen, um Echtzeit-Schlussfolgerungen zu ziehen.
#Install zenml pip install zenml #To Launch zenml server and dashboard locally pip install "zenml[server]" #To check the zenml Version: zenml version #To initiate a new repository zenml init #To run the dashboard locally: zenml login --local #To know the status of our zenml Pipelines zenml show #To shutdown the zenml server zenml clean
Um die kontinuierliche Bereitstellung und Inferenzpipeline zu visualisieren, müssen wir das Skript run_deployment.py ausführen, in dem die Konfigurationen für Bereitstellungs- und Vorhersagekonfigurationen definiert werden. (Bitte überprüfen Sie den Code run_deployment.py im unten angegebenen GitHub).
#Integrating mlflow with ZenML zenml integration install mlflow -y #Register the experiment tracker zenml experiment-tracker register mlflow_tracker --flavor=mlflow #Registering the model deployer zenml model-deployer register mlflow --flavor=mlflow #Registering the stack zenml stack register local-mlflow-stack-new -a default -o default -d mlflow -e mlflow_tracker --set #To view the stack list zenml stack --list
Lassen Sie uns nun die Datei run_deployment.py ausführen, um das Dashboard der Pipeline für kontinuierliche Bereitstellung und Inferenzpipeline zu sehen.
bitcoin_price_prediction_mlops/ # Project directory ├── data/ │ └── management/ │ ├── api_to_mongodb.py # Code to fetch data and save it to MongoDB │ └── api.py # API-related utility functions │ ├── pipelines/ │ ├── deployment_pipeline.py # Deployment pipeline │ └── training_pipeline.py # Training pipeline │ ├── saved_models/ # Directory for storing trained models ├── saved_scalers/ # Directory for storing scalers used in data preprocessing │ ├── src/ # Source code │ ├── data_cleaning.py # Data cleaning and preprocessing │ ├── data_ingestion.py # Data ingestion │ ├── data_splitter.py # Data splitting │ ├── feature_engineering.py # Feature engineering │ ├── model_evaluation.py # Model evaluation │ └── model_training.py # Model training │ ├── steps/ # ZenML steps │ ├── clean_data.py # ZenML step for cleaning data │ ├── data_splitter.py # ZenML step for data splitting │ ├── dynamic_importer.py # ZenML step for importing dynamic data │ ├── feature_engineering.py # ZenML step for feature engineering │ ├── ingest_data.py # ZenML step for data ingestion │ ├── model_evaluation.py # ZenML step for model evaluation │ ├── model_training.py # ZenML step for training the model │ ├── prediction_service_loader.py # ZenML step for loading prediction services │ ├── predictor.py # ZenML step for prediction │ └── utils.py # Utility functions for steps │ ├── .env # Environment variables file ├── .gitignore # Git ignore file │ ├── app.py # Streamlit user interface app │ ├── README.md # Project documentation ├── requirements.txt # List of required packages ├── run_deployment.py # Code for running deployment and prediction pipeline ├── run_pipeline.py # Code for running training pipeline └── .zen/ # ZenML directory (created automatically after ZenML initialization)
Nach dem Ausführen der Datei run_deployment.py können Sie den Link Mlflow Dashboard sehen, der so aussieht.
import os import logging from pymongo import MongoClient from dotenv import load_dotenv from zenml import step import pandas as pd # Load the .env file load_dotenv() # Get MongoDB URI from environment variables MONGO_URI = os.getenv("MONGO_URI") def fetch_data_from_mongodb(collection_name:str, database_name:str): """ Fetches data from MongoDB and converts it into a pandas DataFrame. collection_name: Name of the MongoDB collection to fetch data. database_name: Name of the MongoDB database. return: A pandas DataFrame containing the data """ # Connect to the MongoDB client client = MongoClient(MONGO_URI) db = client[database_name] # Select the database collection = db[collection_name] # Select the collection # Fetch all documents from the collection try: logging.info(f"Fetching data from MongoDB collection: {collection_name}...") data = list(collection.find()) # Convert cursor to a list of dictionaries if not data: logging.info("No data found in the MongoDB collection.") # Convert the list of dictionaries into a pandas DataFrame df = pd.DataFrame(data) # Drop the MongoDB ObjectId field if it exists (optional) if '_id' in df.columns: df = df.drop(columns=['_id']) logging.info("Data successfully fetched and converted to a DataFrame!") return df except Exception as e: logging.error(f"An error occurred while fetching data: {e}") raise e @step(enable_cache=False) def ingest_data(collection_name: str = "historical_data", database_name: str = "crypto_data") -> pd.DataFrame: logging.info("Started data ingestion process from MongoDB.") try: # Use the fetch_data_from_mongodb function to fetch data df = fetch_data_from_mongodb(collection_name=collection_name, database_name=database_name) if df.empty: logging.warning("No data was loaded. Check the collection name or the database content.") else: logging.info(f"Data ingestion completed. Number of records loaded: {len(df)}.") return df except Exception as e: logging.error(f"Error while reading data from {collection_name} in {database_name}: {e}") raise e
Jetzt müssen Sie den obigen MLFlow UI -Link in Ihre Befehlszeile kopieren und einfügen und ausführen.
Hier ist das MLFlow -Dashboard, in dem Sie die Bewertungsmetriken und Modellparameter sehen können:
streamlit ist ein erstaunliches Open-Source-Framework, das zur Erstellung interaktiver UI verwendet wird. Wir können Streamlit verwenden, um Web-Apps schnell zu erstellen, ohne die Backend- oder Frontend-Entwicklung zu kennen. Zunächst müssen wir Stromlit in unserem System installieren.
class DataPreprocessor: def __init__(self, data: pd.DataFrame): self.data = data logging.info("DataPreprocessor initialized with data of shape: %s", data.shape) def clean_data(self) -> pd.DataFrame: """ Performs data cleaning by removing unnecessary columns, dropping columns with missing values, and returning the cleaned DataFrame. Returns: pd.DataFrame: The cleaned DataFrame with unnecessary and missing-value columns removed. """ logging.info("Starting data cleaning process.") # Drop unnecessary columns, including '_id' if it exists columns_to_drop = [ 'UNIT', 'TYPE', 'MARKET', 'INSTRUMENT', 'FIRST_MESSAGE_TIMESTAMP', 'LAST_MESSAGE_TIMESTAMP', 'FIRST_MESSAGE_VALUE', 'HIGH_MESSAGE_VALUE', 'HIGH_MESSAGE_TIMESTAMP', 'LOW_MESSAGE_VALUE', 'LOW_MESSAGE_TIMESTAMP', 'LAST_MESSAGE_VALUE', 'TOTAL_INDEX_UPDATES', 'VOLUME_TOP_TIER', 'QUOTE_VOLUME_TOP_TIER', 'VOLUME_DIRECT', 'QUOTE_VOLUME_DIRECT', 'VOLUME_TOP_TIER_DIRECT', 'QUOTE_VOLUME_TOP_TIER_DIRECT', '_id' # Adding '_id' to the list ] logging.info("Dropping columns: %s") self.data = self.drop_columns(self.data, columns_to_drop) # Drop columns where the number of missing values is greater than 0 logging.info("Dropping columns with missing values.") self.data = self.drop_columns_with_missing_values(self.data) logging.info("Data cleaning completed. Data shape after cleaning: %s", self.data.shape) return self.data def drop_columns(self, data: pd.DataFrame, columns: list) -> pd.DataFrame: """ Drops specified columns from the DataFrame. Returns: pd.DataFrame: The DataFrame with the specified columns removed. """ logging.info("Dropping columns: %s", columns) return data.drop(columns=columns, errors='ignore') def drop_columns_with_missing_values(self, data: pd.DataFrame) -> pd.DataFrame: """ Drops columns with any missing values from the DataFrame. Parameters: data: pd.DataFrame The DataFrame from which columns with missing values will be removed. Returns: pd.DataFrame: The DataFrame with columns containing missing values removed. """ missing_columns = data.columns[data.isnull().sum() > 0] if not missing_columns.empty: logging.info("Columns with missing values: %s", missing_columns.tolist()) else: logging.info("No columns with missing values found.") return data.loc[:, data.isnull().sum() == 0]
Sie können den Code auf GitHub für die Streamlit -App finden.
Hier ist der GitHub -Code und die Videoerklärung des Projekts für Ihr besseres Verständnis.
In diesem Artikel haben wir erfolgreich ein End-to-End-Mlops-Projekt mit Bitcoin-Preisvorhersage vorgestellt. Aus dem Erwerb von Daten über eine API und die Vorverarbeitung bis hin zur Modellierung von Schulungen, Bewertung und Bereitstellung unterstreicht unser Projekt die entscheidende Rolle von MLOPS bei der Verbindung von Entwicklung mit Produktion. Wir sind uns einem Schritt näher, um die Zukunft der Vorhersage von Bitcoin -Preisen in Echtzeit zu prägen. APIs bieten einen reibungslosen Zugriff auf externe Daten wie Bitcoin-Preisdaten aus der CCDATA-API, wodurch die Notwendigkeit eines bereits bestehenden Datensatzes erforderlich ist.
a. Ja, Zenml ist ein vollständig offenes MLOPS-Framework, das den Übergang von der lokalen Entwicklung zu Produktionspipelines so einfach wie 1 Codezeile macht.
Q2. Wofür wird MLFlow verwendet?a. MLFLOW erleichtert die Entwicklung des maschinellen Lernens, indem sie Tools zum Verfolgen von Experimenten, Versionungsmodellen und Bereitstellungen anbieten.
Q3. Wie kann der Server -Daemon nicht ausgeführt werden.a. Dies ist ein häufiger Fehler, mit dem Sie im Projekt konfrontiert sind. Führen Sie einfach "Zenml Logout" -Local`, dann "Zenml Clean" und dann "Zenml Login –Local" aus. Führen Sie die Pipeline erneut aus. Es wird gelöst.
Das obige ist der detaillierte Inhalt vonBitcoin -Preisvorhersage unter Verwendung von Mlops. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!