Heim >Technologie-Peripheriegeräte >KI >Bitcoin -Preisvorhersage unter Verwendung von Mlops

Bitcoin -Preisvorhersage unter Verwendung von Mlops

William Shakespeare
William ShakespeareOriginal
2025-03-09 11:53:09431Durchsuche

wissen nicht viel über Bitcoin oder seine Preisschwankungen, möchten aber Investitionsentscheidungen treffen, um Gewinne zu erzielen? Dieses maschinelle Lernmodell hat Ihren Rücken. Es kann die Preise viel besser vorhersagen als ein Astrologe. In diesem Artikel werden wir ein ML -Modell für die Prognose und Vorhersage von Bitcoin -Preis unter Verwendung von ZENML und MLFLOW erstellen. Beginnen wir also mit unserer Reise, um zu verstehen, wie jemand ML- und MLOPS -Tools verwenden kann, um die Zukunft vorherzusagen.

Lernziele

  • Lernen Sie, Live -Daten mithilfe von API effizient abzurufen.
  • Verstehen Sie, was Zenml ist, warum wir MLFlow verwenden und wie Sie es in Zenml integrieren können.
  • Erforschen Sie den Bereitstellungsprozess für maschinelle Lernmodelle von der Idee bis zur Produktion.
  • Erstellen Sie, wie Sie eine benutzerfreundliche Stromlit-App für interaktive Modellvorhersagen für maschinelles Lernen erstellen.

Dieser Artikel wurde als Teil des Data Science -Blogathon veröffentlicht.

Table of Contents

  • Problem Statement
  • Project Implementation
    • Step 1: Accessing the API
    • Step 2: Connecting to Database Using MongoDB
    • Step 3: Integration of MLflow with ZenML
    • Step 4: Data Ingestion
    • Step 5: Data Cleaning
    • Step 6: Feature Engineering
    • Step 7: Data Splitting
    • Step 8: Model Training
    • Step 9: Model Evaluation
    • Step 10: Model Deployment
    • Step 11: Building the Streamlit App
  • Frequently Asked Fragen

Problemanweisung

Bitcoin -Preise sind sehr volatil, und Vorhersagen ist so gut wie unmöglich. In unserem Projekt verwenden wir den Best Practicesto vonMlops ein LSTM -Modell, um Bitcoinprices und Trends zu prognostizieren.

Bevor Sie das Projekt implementieren, schauen wir uns die Projektarchitektur an.

Bitcoin -Preisvorhersage unter Verwendung von Mlops

Projektimplementierung

Beginnen wir mit dem Zugriff auf die API.

Warum machen wir das? Sie können historische Bitcoin -Preisdaten aus verschiedenen Datensätzen erhalten, aber mit einer API können wir Zugriff auf Live -Marktdaten haben.

Schritt 1: Zugriff auf die API

  • Melden Sie sich für den API -Zugriff an:
    Sobald Sie sich auf der API -Seite der Kkdata angemeldet haben. Sie können den kostenlosen API-Schlüssel von dieser Seite erhalten.
  • Bitcoin -Preisdaten abrufen:
  • Mit dem folgenden Code können Sie Bitcoin -Preisdaten von der CCDATA -API abrufen und in einen Pandas -Datenfream umwandeln. Halten Sie auch den API -Schlüssel in der Datei .env.
Schritt 2: Verbindung zur Datenbank mit MongoDB
import requests
import pandas as pd
from dotenv import load_dotenv
import os

# Load the .env file
load_dotenv()

def fetch_crypto_data(api_uri):
    response = requests.get(
        api_uri,
        params={
            "market": "cadli",
            "instrument": "BTC-USD",
            "limit": 5000,
            "aggregate": 1,
            "fill": "true",
            "apply_mapping": "true",
            "response_format": "JSON"
        },
        headers={"Content-type": "application/json; charset=UTF-8"}
    )

    if response.status_code == 200:
        print('API Connection Successful! \nFetching the data...')

        data = response.json()
        data_list = data.get('Data', [])

        df = pd.DataFrame(data_list)

        df['DATE'] = pd.to_datetime(df['TIMESTAMP'], unit='s')

        return df  # Return the DataFrame
    else:
        raise Exception(f"API Error: {response.status_code} - {response.text}")
herstellen

mongoDB ist eine NoSQL-Datenbank, die für ihre Anpassungsfähigkeit, Erweiterbarkeit und Fähigkeit, unstrukturierte Daten in einem JSON-ähnlichen Format zu speichern, bekannt ist.

import requests
import pandas as pd
from dotenv import load_dotenv
import os

# Load the .env file
load_dotenv()

def fetch_crypto_data(api_uri):
    response = requests.get(
        api_uri,
        params={
            "market": "cadli",
            "instrument": "BTC-USD",
            "limit": 5000,
            "aggregate": 1,
            "fill": "true",
            "apply_mapping": "true",
            "response_format": "JSON"
        },
        headers={"Content-type": "application/json; charset=UTF-8"}
    )

    if response.status_code == 200:
        print('API Connection Successful! \nFetching the data...')

        data = response.json()
        data_list = data.get('Data', [])

        df = pd.DataFrame(data_list)

        df['DATE'] = pd.to_datetime(df['TIMESTAMP'], unit='s')

        return df  # Return the DataFrame
    else:
        raise Exception(f"API Error: {response.status_code} - {response.text}")

Dieser Code stellt eine Verbindung zu MongoDB her, holt Bitcoin -Preisdaten über eine API und aktualisiert die Datenbank mit allen neuen Einträgen nach dem neuesten protokollierten Datum.

Einführung von Zenml

zenmlis Eine Open-Source-Plattform, die auf maschinelles Lernen zugeschnitten ist und die Erstellung flexibler und produktionsbereiteter Pipelines unterstützt. Darüber hinaus integriert sich ZENML in Multiple maschinelle Lernwerkzeuge Gleiche, Bentoml usw., um nahtlose ML -Pipelines zu erstellen.

⚠️ Wenn Sie ein Windows -Benutzer sind, versuchen Sie, WSL in Ihrem System zu installieren. Zenml unterstützt keine Windows.

In diesem Projekt werden wir eine traditionelle Pipeline implementieren, die ZenML verwendet, und wir werden MLFlow in ZENML für die Experimentverfolgung integrieren.

Voraussetzungen und grundlegende ZENML-Befehle

  • Python 3.12 oder höher: Sie können es von hier aus erhalten: https: //www.python.org/downloads/
  • Aktivieren Sie Ihre virtuelle Umgebung:
import os
from pymongo import MongoClient
from dotenv import load_dotenv
from data.management.api import fetch_crypto_data  # Import the API function
import pandas as pd

load_dotenv()

MONGO_URI = os.getenv("MONGO_URI")
API_URI = os.getenv("API_URI")

client = MongoClient(MONGO_URI, ssl=True, ssl_certfile=None, ssl_ca_certs=None)
db = client['crypto_data']
collection = db['historical_data']

try:
    latest_entry = collection.find_one(sort=[("DATE", -1)])  # Find the latest date
    if latest_entry:
        last_date = pd.to_datetime(latest_entry['DATE']).strftime('%Y-%m-%d')
    else:
        last_date = '2011-03-27'  # Default start date if MongoDB is empty

    print(f"Fetching data starting from {last_date}...")
    new_data_df = fetch_crypto_data(API_URI)

    if latest_entry:
        new_data_df = new_data_df[new_data_df['DATE'] > last_date]

    if not new_data_df.empty:
        data_to_insert = new_data_df.to_dict(orient='records')
        result = collection.insert_many(data_to_insert)
        print(f"Inserted {len(result.inserted_ids)} new records into MongoDB.")
    else:
        print("No new data to insert.")
except Exception as e:
    print(f"An error occurred: {e}")
  1. ZENML -Befehle:

Alle Kern -ZenML -Befehle zusammen mit ihren Funktionen finden Sie unten:

#create a virtual environment
python3 -m venv venv

#Activate your virtual environmnent in your project folder
source venv/bin/activate

Schritt 3: Integration von MLFlow mit Zenml

Wir verwenden MLFlow für die Experimentverfolgung, um unser Modell, Artefakte, Metriken und Hyperparameterwerte zu verfolgen. Wir registrieren MLFlow für Experimentverfolgung und Model -Deploymer hier:

#Install zenml
pip install zenml

#To Launch zenml server and dashboard locally
pip install "zenml[server]"

#To check the zenml Version:
zenml version

#To initiate a new repository
zenml init

#To run the dashboard locally:
zenml login --local

#To know the status of our zenml Pipelines
zenml show

#To shutdown the zenml server
zenml clean

Zenml Stacklist

Bitcoin -Preisvorhersage unter Verwendung von Mlops

Projektstruktur

Hier sehen Sie das Layout des Projekts. Lassen Sie uns nun nacheinander ausführlich besprechen.

#Integrating mlflow with ZenML
zenml integration install mlflow -y

#Register the experiment tracker
zenml experiment-tracker register mlflow_tracker --flavor=mlflow

#Registering the model deployer
zenml model-deployer register mlflow --flavor=mlflow

#Registering the stack
zenml stack register local-mlflow-stack-new -a default -o default -d mlflow -e mlflow_tracker --set

#To view the stack list
zenml stack --list

Schritt 4: Datenaufnahme

wir nehmen zuerst Daten von API zu MongoDB auf und konvertieren sie in Pandas DataFrame.

bitcoin_price_prediction_mlops/        # Project directory
├── data/                             
│   └── management/                   
│       ├── api_to_mongodb.py          # Code to fetch data and save it to MongoDB
│       └── api.py                     # API-related utility functions
│
├── pipelines/                         
│   ├── deployment_pipeline.py         # Deployment pipeline
│   └── training_pipeline.py           # Training pipeline
│
├── saved_models/                      # Directory for storing trained models
├── saved_scalers/                     # Directory for storing scalers used in data preprocessing
│
├── src/                               # Source code
│   ├── data_cleaning.py               # Data cleaning and preprocessing
│   ├── data_ingestion.py              # Data ingestion 
│   ├── data_splitter.py               # Data splitting 
│   ├── feature_engineering.py         # Feature engineering 
│   ├── model_evaluation.py            # Model evaluation
│   └── model_training.py              # Model training
│
├── steps/                             # ZenML steps
│   ├── clean_data.py                  # ZenML step for cleaning data
│   ├── data_splitter.py               # ZenML step for data splitting
│   ├── dynamic_importer.py            # ZenML step for importing dynamic data
│   ├── feature_engineering.py         # ZenML step for feature engineering
│   ├── ingest_data.py                 # ZenML step for data ingestion
│   ├── model_evaluation.py            # ZenML step for model evaluation
│   ├── model_training.py              # ZenML step for training the model
│   ├── prediction_service_loader.py   # ZenML step for loading prediction services
│   ├── predictor.py                   # ZenML step for prediction
│   └── utils.py                       # Utility functions for steps
│
├── .env                               # Environment variables file
├── .gitignore                         # Git ignore file
│
├── app.py                             # Streamlit user interface app
│
├── README.md                          # Project documentation
├── requirements.txt                   # List of required packages
├── run_deployment.py                  # Code for running deployment and prediction pipeline
├── run_pipeline.py                    # Code for running training pipeline
└── .zen/                              # ZenML directory (created automatically after ZenML initialization)

Wir fügen @step als Dekorateur zur incest_data () hinzu, um es als Schritt unserer Trainingspipeline zu deklarieren. Auf die gleiche Weise schreiben wir Code für jeden Schritt in der Projektarchitektur und erstellen die Pipeline.

Um anzuzeigen, wie ich den @step Dekorator verwendet habe, lesen Sie den GitHub -Link unten (Schritteordner), um den Code für andere Schritte der Pipeline durchzuführen, d. H. Datenreinigung, Feature Engineering, Datenaufteilung, Modelltraining und Modellbewertung.

Schritt 5: Datenreinigung

In diesem Schritt werden wir verschiedene Strategien zur Reinigung der aufgenommenen Daten erstellen. Wir werden die unerwünschten Spalten und fehlenden Werte in den Daten fallen lassen.

import requests
import pandas as pd
from dotenv import load_dotenv
import os

# Load the .env file
load_dotenv()

def fetch_crypto_data(api_uri):
    response = requests.get(
        api_uri,
        params={
            "market": "cadli",
            "instrument": "BTC-USD",
            "limit": 5000,
            "aggregate": 1,
            "fill": "true",
            "apply_mapping": "true",
            "response_format": "JSON"
        },
        headers={"Content-type": "application/json; charset=UTF-8"}
    )

    if response.status_code == 200:
        print('API Connection Successful! \nFetching the data...')

        data = response.json()
        data_list = data.get('Data', [])

        df = pd.DataFrame(data_list)

        df['DATE'] = pd.to_datetime(df['TIMESTAMP'], unit='s')

        return df  # Return the DataFrame
    else:
        raise Exception(f"API Error: {response.status_code} - {response.text}")

Schritt 6: Feature Engineering

Dieser Schritt enthält die gereinigten Daten aus dem früheren Datenschritt von Data_Cleaning. Wir erstellen neue Funktionen wie Simple Moving Average (SMA), Exponential Moving Average (EMA) sowie verzögerte und rollende Statistiken, um Trends zu erfassen, Rauschen zu reduzieren und zuverlässigere Vorhersagen aus Zeitreihendaten zu machen. Zusätzlich skalieren wir die Funktionen und Zielvariablen mithilfe von Minmax -Skalierung.

import os
from pymongo import MongoClient
from dotenv import load_dotenv
from data.management.api import fetch_crypto_data  # Import the API function
import pandas as pd

load_dotenv()

MONGO_URI = os.getenv("MONGO_URI")
API_URI = os.getenv("API_URI")

client = MongoClient(MONGO_URI, ssl=True, ssl_certfile=None, ssl_ca_certs=None)
db = client['crypto_data']
collection = db['historical_data']

try:
    latest_entry = collection.find_one(sort=[("DATE", -1)])  # Find the latest date
    if latest_entry:
        last_date = pd.to_datetime(latest_entry['DATE']).strftime('%Y-%m-%d')
    else:
        last_date = '2011-03-27'  # Default start date if MongoDB is empty

    print(f"Fetching data starting from {last_date}...")
    new_data_df = fetch_crypto_data(API_URI)

    if latest_entry:
        new_data_df = new_data_df[new_data_df['DATE'] > last_date]

    if not new_data_df.empty:
        data_to_insert = new_data_df.to_dict(orient='records')
        result = collection.insert_many(data_to_insert)
        print(f"Inserted {len(result.inserted_ids)} new records into MongoDB.")
    else:
        print("No new data to insert.")
except Exception as e:
    print(f"An error occurred: {e}")

Schritt 7: Datenaufteilung

Jetzt teilen wir die verarbeiteten Daten in Schulungs- und Testen von Datensätzen im Verhältnis von 80:20 auf.

#create a virtual environment
python3 -m venv venv

#Activate your virtual environmnent in your project folder
source venv/bin/activate

Schritt 8: Modelltraining

In diesem Schritt trainieren wir das Modell mit frühem Stoppen, um eine Überanpassung zu verhindern, und verwenden die automatisierte Protokollierung von MLFLOW, um unser Modell und unsere Experimente zu verfolgen und das trainierte Modell als lstm_model.keras zu speichern.

#Install zenml
pip install zenml

#To Launch zenml server and dashboard locally
pip install "zenml[server]"

#To check the zenml Version:
zenml version

#To initiate a new repository
zenml init

#To run the dashboard locally:
zenml login --local

#To know the status of our zenml Pipelines
zenml show

#To shutdown the zenml server
zenml clean

Schritt 9: Modellbewertung

Da dies ein Regressionsproblem ist, verwenden wir Bewertungsmetriken wie mittlerer quadratischer Fehler (MSE), Stammmittelwert für quadratische Fehler (MSE), mittlerer absoluter Fehler (MAE) und R-Squared.

#Integrating mlflow with ZenML
zenml integration install mlflow -y

#Register the experiment tracker
zenml experiment-tracker register mlflow_tracker --flavor=mlflow

#Registering the model deployer
zenml model-deployer register mlflow --flavor=mlflow

#Registering the stack
zenml stack register local-mlflow-stack-new -a default -o default -d mlflow -e mlflow_tracker --set

#To view the stack list
zenml stack --list

Jetzt werden wir alle oben genannten Schritte in eine Pipeline organisieren. Erstellen wir eine neue Dateitraining_Pipeline.py.

bitcoin_price_prediction_mlops/        # Project directory
├── data/                             
│   └── management/                   
│       ├── api_to_mongodb.py          # Code to fetch data and save it to MongoDB
│       └── api.py                     # API-related utility functions
│
├── pipelines/                         
│   ├── deployment_pipeline.py         # Deployment pipeline
│   └── training_pipeline.py           # Training pipeline
│
├── saved_models/                      # Directory for storing trained models
├── saved_scalers/                     # Directory for storing scalers used in data preprocessing
│
├── src/                               # Source code
│   ├── data_cleaning.py               # Data cleaning and preprocessing
│   ├── data_ingestion.py              # Data ingestion 
│   ├── data_splitter.py               # Data splitting 
│   ├── feature_engineering.py         # Feature engineering 
│   ├── model_evaluation.py            # Model evaluation
│   └── model_training.py              # Model training
│
├── steps/                             # ZenML steps
│   ├── clean_data.py                  # ZenML step for cleaning data
│   ├── data_splitter.py               # ZenML step for data splitting
│   ├── dynamic_importer.py            # ZenML step for importing dynamic data
│   ├── feature_engineering.py         # ZenML step for feature engineering
│   ├── ingest_data.py                 # ZenML step for data ingestion
│   ├── model_evaluation.py            # ZenML step for model evaluation
│   ├── model_training.py              # ZenML step for training the model
│   ├── prediction_service_loader.py   # ZenML step for loading prediction services
│   ├── predictor.py                   # ZenML step for prediction
│   └── utils.py                       # Utility functions for steps
│
├── .env                               # Environment variables file
├── .gitignore                         # Git ignore file
│
├── app.py                             # Streamlit user interface app
│
├── README.md                          # Project documentation
├── requirements.txt                   # List of required packages
├── run_deployment.py                  # Code for running deployment and prediction pipeline
├── run_pipeline.py                    # Code for running training pipeline
└── .zen/                              # ZenML directory (created automatically after ZenML initialization)

Hier wird @pipeline Dekorateur verwendet, um die Funktionsml_Pipeline () als Pipeline in Zenml zu definieren.

Um das Dashboard für die Trainingspipeline anzuzeigen, führen Sie einfach das Skript run_pipeline.py aus. Erstellen wir eine run_pipeline.py -Datei.

import os
import logging
from pymongo import MongoClient
from dotenv import load_dotenv
from zenml import step
import pandas as pd

# Load the .env file
load_dotenv()

# Get MongoDB URI from environment variables
MONGO_URI = os.getenv("MONGO_URI")

def fetch_data_from_mongodb(collection_name:str, database_name:str):
    """
    Fetches data from MongoDB and converts it into a pandas DataFrame.

    collection_name: 
        Name of the MongoDB collection to fetch data.
    database_name: 
        Name of the MongoDB database.
    return: 
        A pandas DataFrame containing the data
    """
    # Connect to the MongoDB client
    client = MongoClient(MONGO_URI)
    db = client[database_name]  # Select the database
    collection = db[collection_name]  # Select the collection

    # Fetch all documents from the collection
    try:
        logging.info(f"Fetching data from MongoDB collection: {collection_name}...")
        data = list(collection.find())  # Convert cursor to a list of dictionaries

        if not data:
            logging.info("No data found in the MongoDB collection.")
            

        # Convert the list of dictionaries into a pandas DataFrame
        df = pd.DataFrame(data)

        # Drop the MongoDB ObjectId field if it exists (optional)
        if '_id' in df.columns:
            df = df.drop(columns=['_id'])

        logging.info("Data successfully fetched and converted to a DataFrame!")
        return df

    except Exception as e:
        logging.error(f"An error occurred while fetching data: {e}")
        raise e  
        
        
@step(enable_cache=False)
def ingest_data(collection_name: str = "historical_data", database_name: str = "crypto_data") -> pd.DataFrame:
    
    logging.info("Started data ingestion process from MongoDB.")

    try:
        # Use the fetch_data_from_mongodb function to fetch data
        df = fetch_data_from_mongodb(collection_name=collection_name, database_name=database_name)

        if df.empty:
            logging.warning("No data was loaded. Check the collection name or the database content.")
        else:
            logging.info(f"Data ingestion completed. Number of records loaded: {len(df)}.")

        return df
    
    except Exception as e:
        logging.error(f"Error while reading data from {collection_name} in {database_name}: {e}")
        raise e  

Jetzt haben wir das Erstellen der Pipeline abgeschlossen. Führen Sie den Befehl unten aus, um das Pipeline -Dashboard anzuzeigen.

class DataPreprocessor:
    def __init__(self, data: pd.DataFrame):
        
        self.data = data
        logging.info("DataPreprocessor initialized with data of shape: %s", data.shape)

    def clean_data(self) -> pd.DataFrame:
        """
        Performs data cleaning by removing unnecessary columns, dropping columns with missing values,
        and returning the cleaned DataFrame.

        Returns:
            pd.DataFrame: The cleaned DataFrame with unnecessary and missing-value columns removed.
        """
        logging.info("Starting data cleaning process.")

        # Drop unnecessary columns, including '_id' if it exists
        columns_to_drop = [
            'UNIT', 'TYPE', 'MARKET', 'INSTRUMENT', 
            'FIRST_MESSAGE_TIMESTAMP', 'LAST_MESSAGE_TIMESTAMP', 
            'FIRST_MESSAGE_VALUE', 'HIGH_MESSAGE_VALUE', 'HIGH_MESSAGE_TIMESTAMP', 
            'LOW_MESSAGE_VALUE', 'LOW_MESSAGE_TIMESTAMP', 'LAST_MESSAGE_VALUE', 
            'TOTAL_INDEX_UPDATES', 'VOLUME_TOP_TIER', 'QUOTE_VOLUME_TOP_TIER', 
            'VOLUME_DIRECT', 'QUOTE_VOLUME_DIRECT', 'VOLUME_TOP_TIER_DIRECT', 
            'QUOTE_VOLUME_TOP_TIER_DIRECT', '_id'  # Adding '_id' to the list
        ]
        logging.info("Dropping columns: %s")
        self.data = self.drop_columns(self.data, columns_to_drop)

        # Drop columns where the number of missing values is greater than 0
        logging.info("Dropping columns with missing values.")
        self.data = self.drop_columns_with_missing_values(self.data)

        logging.info("Data cleaning completed. Data shape after cleaning: %s", self.data.shape)
        return self.data

    def drop_columns(self, data: pd.DataFrame, columns: list) -> pd.DataFrame:
        """
        Drops specified columns from the DataFrame.

        Returns:
            pd.DataFrame: The DataFrame with the specified columns removed.
        """
        logging.info("Dropping columns: %s", columns)
        return data.drop(columns=columns, errors='ignore')

    def drop_columns_with_missing_values(self, data: pd.DataFrame) -> pd.DataFrame:
        """
        Drops columns with any missing values from the DataFrame.

        Parameters:
            data: pd.DataFrame
                The DataFrame from which columns with missing values will be removed.
        
        Returns:
            pd.DataFrame: The DataFrame with columns containing missing values removed.
        """
        missing_columns = data.columns[data.isnull().sum() > 0]
        if not missing_columns.empty:
            logging.info("Columns with missing values: %s", missing_columns.tolist())
        else:
            logging.info("No columns with missing values found.")
        return data.loc[:, data.isnull().sum() == 0]

Nach dem Ausführen des obigen Befehls wird die Tracking -Dashboard -URL zurückgegeben, die so aussieht.

Bitcoin -Preisvorhersage unter Verwendung von Mlops

Die Trainingspipeline sieht im Dashboard wie unten angegeben aus:

Bitcoin -Preisvorhersage unter Verwendung von Mlops

Bitcoin -Preisvorhersage unter Verwendung von Mlops

Bitcoin -Preisvorhersage unter Verwendung von Mlops

Bitcoin -Preisvorhersage unter Verwendung von Mlops

Schritt 10: Modellbereitstellung

Bis jetzt haben wir das Modell und die Pipelines gebaut. Lassen Sie uns nun die Pipeline in die Produktion bringen, bei der Benutzer Vorhersagen treffen können.

Pipeline für kontinuierliche Bereitstellung

import joblib
import pandas as pd
from abc import ABC, abstractmethod
from sklearn.preprocessing import MinMaxScaler


# Abstract class for Feature Engineering strategy
class FeatureEngineeringStrategy(ABC):
    @abstractmethod
    def generate_features(self, df: pd.DataFrame) -> pd.DataFrame:
        pass


# Concrete class for calculating SMA, EMA, RSI, and other features
class TechnicalIndicators(FeatureEngineeringStrategy):
    def generate_features(self, df: pd.DataFrame) -> pd.DataFrame:
        
        # Calculate SMA, EMA, and RSI
        df['SMA_20'] = df['CLOSE'].rolling(window=20).mean()
        df['SMA_50'] = df['CLOSE'].rolling(window=50).mean()
        df['EMA_20'] = df['CLOSE'].ewm(span=20, adjust=False).mean()
        
        # Price difference features
        df['OPEN_CLOSE_diff'] = df['OPEN'] - df['CLOSE']
        df['HIGH_LOW_diff'] = df['HIGH'] - df['LOW']
        df['HIGH_OPEN_diff'] = df['HIGH'] - df['OPEN']
        df['CLOSE_LOW_diff'] = df['CLOSE'] - df['LOW']

        # Lagged features
        df['OPEN_lag1'] = df['OPEN'].shift(1)
        df['CLOSE_lag1'] = df['CLOSE'].shift(1)
        df['HIGH_lag1'] = df['HIGH'].shift(1)
        df['LOW_lag1'] = df['LOW'].shift(1)

        # Rolling statistics
        df['CLOSE_roll_mean_14'] = df['CLOSE'].rolling(window=14).mean()
        df['CLOSE_roll_std_14'] = df['CLOSE'].rolling(window=14).std()

        # Drop rows with missing values (due to rolling windows, shifts)
        df.dropna(inplace=True)

        return df
        
# Abstract class for Scaling strategy
class ScalingStrategy(ABC):
    @abstractmethod
    def scale(self, df: pd.DataFrame, features: list, target: str):
        pass

# Concrete class for MinMax Scaling
class MinMaxScaling(ScalingStrategy):
    def scale(self, df: pd.DataFrame, features: list, target: str):
        """
        Scales the features and target using MinMaxScaler.

        Parameters:
            df: pd.DataFrame
                The DataFrame containing the features and target.
            features: list
                List of feature column names.
            target: str
                The target column name.

        Returns:
            pd.DataFrame, pd.DataFrame: Scaled features and target
        """
        scaler_X = MinMaxScaler(feature_range=(0, 1))
        scaler_y = MinMaxScaler(feature_range=(0, 1))

        X_scaled = scaler_X.fit_transform(df[features].values)
        y_scaled = scaler_y.fit_transform(df[[target]].values)

        joblib.dump(scaler_X, 'saved_scalers/scaler_X.pkl')
        joblib.dump(scaler_y, 'saved_scalers/scaler_y.pkl')

        return X_scaled, y_scaled, scaler_y


# FeatureEngineeringContext: This will use the Strategy Pattern
class FeatureEngineering:
    def __init__(self, feature_strategy: FeatureEngineeringStrategy, scaling_strategy: ScalingStrategy):
        self.feature_strategy = feature_strategy
        self.scaling_strategy = scaling_strategy

    def process_features(self, df: pd.DataFrame, features: list, target: str):
        # Generate features using the provided strategy
        df_with_features = self.feature_strategy.generate_features(df)

        # Scale features and target using the provided strategy
        X_scaled, y_scaled, scaler_y = self.scaling_strategy.scale(df_with_features, features, target)

        return df_with_features, X_scaled, y_scaled, scaler_y

Diese Pipeline ist für kontinuierlich bereitgestellte geschulte Modelle verantwortlich. Es wird zunächst die ml_pipeline () aus der Datei tracy_pipeline.py zum Training des Modells ausgeführt und verwendet dann das MLFlow -Modell -Deployments das geschulte Modell mit dem continuous_deployment_pipeline () .

. . . .

Inferenzpipeline

Wir verwenden eine Inferenzpipeline, um Vorhersagen für die neuen Daten mit dem bereitgestellten Modell zu treffen. Schauen wir uns an, wie wir diese Pipeline in unserem Projekt implementiert haben.

import requests
import pandas as pd
from dotenv import load_dotenv
import os

# Load the .env file
load_dotenv()

def fetch_crypto_data(api_uri):
    response = requests.get(
        api_uri,
        params={
            "market": "cadli",
            "instrument": "BTC-USD",
            "limit": 5000,
            "aggregate": 1,
            "fill": "true",
            "apply_mapping": "true",
            "response_format": "JSON"
        },
        headers={"Content-type": "application/json; charset=UTF-8"}
    )

    if response.status_code == 200:
        print('API Connection Successful! \nFetching the data...')

        data = response.json()
        data_list = data.get('Data', [])

        df = pd.DataFrame(data_list)

        df['DATE'] = pd.to_datetime(df['TIMESTAMP'], unit='s')

        return df  # Return the DataFrame
    else:
        raise Exception(f"API Error: {response.status_code} - {response.text}")

Sehen Sie uns über jede der in der folgenden Inferenzpipeline genannten Funktionen:

dynamic_importer ()

Diese Funktion lädt die neuen Daten, führt die Datenverarbeitung durch und gibt die Daten zurück.

import os
from pymongo import MongoClient
from dotenv import load_dotenv
from data.management.api import fetch_crypto_data  # Import the API function
import pandas as pd

load_dotenv()

MONGO_URI = os.getenv("MONGO_URI")
API_URI = os.getenv("API_URI")

client = MongoClient(MONGO_URI, ssl=True, ssl_certfile=None, ssl_ca_certs=None)
db = client['crypto_data']
collection = db['historical_data']

try:
    latest_entry = collection.find_one(sort=[("DATE", -1)])  # Find the latest date
    if latest_entry:
        last_date = pd.to_datetime(latest_entry['DATE']).strftime('%Y-%m-%d')
    else:
        last_date = '2011-03-27'  # Default start date if MongoDB is empty

    print(f"Fetching data starting from {last_date}...")
    new_data_df = fetch_crypto_data(API_URI)

    if latest_entry:
        new_data_df = new_data_df[new_data_df['DATE'] > last_date]

    if not new_data_df.empty:
        data_to_insert = new_data_df.to_dict(orient='records')
        result = collection.insert_many(data_to_insert)
        print(f"Inserted {len(result.inserted_ids)} new records into MongoDB.")
    else:
        print("No new data to insert.")
except Exception as e:
    print(f"An error occurred: {e}")

prediction_service_loader ()

Diese Funktion ist mit @step dekoriert. Wir laden den Bereitstellungsdienst W.R.T Das bereitgestellte Modell basierend auf dem Pipeline_Name und Step_Name, in dem unser bereitgestelltes Modell bereit ist, Vorhersageabfragen für die neuen Daten zu verarbeiten.

Die Zeile vorhanden_services = mlflow_model_deployer_component.find_model_server () sucht nach einem verfügbaren Bereitstellungsdienst basierend auf den angegebenen Parametern wie Pipeline -Namen und Pipeline -Schrittname. Wenn keine Dienste verfügbar sind, zeigt dies an, dass die Einsatzpipeline entweder nicht durchgeführt oder auf ein Problem mit der Bereitstellungspipeline gestoßen wurde, sodass sie einen RunTimeError auswirkt.

#create a virtual environment
python3 -m venv venv

#Activate your virtual environmnent in your project folder
source venv/bin/activate

predictor ()

Die Funktion übernimmt das MLFlow-abgelagerte Modell durch den MLFlowDeploymentService und die neuen Daten. Die Daten werden weiter verarbeitet, um das erwartete Format des Modells zu entsprechen, um Echtzeit-Schlussfolgerungen zu ziehen.

#Install zenml
pip install zenml

#To Launch zenml server and dashboard locally
pip install "zenml[server]"

#To check the zenml Version:
zenml version

#To initiate a new repository
zenml init

#To run the dashboard locally:
zenml login --local

#To know the status of our zenml Pipelines
zenml show

#To shutdown the zenml server
zenml clean

Um die kontinuierliche Bereitstellung und Inferenzpipeline zu visualisieren, müssen wir das Skript run_deployment.py ausführen, in dem die Konfigurationen für Bereitstellungs- und Vorhersagekonfigurationen definiert werden. (Bitte überprüfen Sie den Code run_deployment.py im unten angegebenen GitHub).

#Integrating mlflow with ZenML
zenml integration install mlflow -y

#Register the experiment tracker
zenml experiment-tracker register mlflow_tracker --flavor=mlflow

#Registering the model deployer
zenml model-deployer register mlflow --flavor=mlflow

#Registering the stack
zenml stack register local-mlflow-stack-new -a default -o default -d mlflow -e mlflow_tracker --set

#To view the stack list
zenml stack --list

Lassen Sie uns nun die Datei run_deployment.py ausführen, um das Dashboard der Pipeline für kontinuierliche Bereitstellung und Inferenzpipeline zu sehen.

bitcoin_price_prediction_mlops/        # Project directory
├── data/                             
│   └── management/                   
│       ├── api_to_mongodb.py          # Code to fetch data and save it to MongoDB
│       └── api.py                     # API-related utility functions
│
├── pipelines/                         
│   ├── deployment_pipeline.py         # Deployment pipeline
│   └── training_pipeline.py           # Training pipeline
│
├── saved_models/                      # Directory for storing trained models
├── saved_scalers/                     # Directory for storing scalers used in data preprocessing
│
├── src/                               # Source code
│   ├── data_cleaning.py               # Data cleaning and preprocessing
│   ├── data_ingestion.py              # Data ingestion 
│   ├── data_splitter.py               # Data splitting 
│   ├── feature_engineering.py         # Feature engineering 
│   ├── model_evaluation.py            # Model evaluation
│   └── model_training.py              # Model training
│
├── steps/                             # ZenML steps
│   ├── clean_data.py                  # ZenML step for cleaning data
│   ├── data_splitter.py               # ZenML step for data splitting
│   ├── dynamic_importer.py            # ZenML step for importing dynamic data
│   ├── feature_engineering.py         # ZenML step for feature engineering
│   ├── ingest_data.py                 # ZenML step for data ingestion
│   ├── model_evaluation.py            # ZenML step for model evaluation
│   ├── model_training.py              # ZenML step for training the model
│   ├── prediction_service_loader.py   # ZenML step for loading prediction services
│   ├── predictor.py                   # ZenML step for prediction
│   └── utils.py                       # Utility functions for steps
│
├── .env                               # Environment variables file
├── .gitignore                         # Git ignore file
│
├── app.py                             # Streamlit user interface app
│
├── README.md                          # Project documentation
├── requirements.txt                   # List of required packages
├── run_deployment.py                  # Code for running deployment and prediction pipeline
├── run_pipeline.py                    # Code for running training pipeline
└── .zen/                              # ZenML directory (created automatically after ZenML initialization)

kontinuierliche Bereitstellungspipeline - Ausgabe

Bitcoin -Preisvorhersage unter Verwendung von Mlops

Inferenzpipeline - Ausgabe

Bitcoin -Preisvorhersage unter Verwendung von Mlops

Nach dem Ausführen der Datei run_deployment.py können Sie den Link Mlflow Dashboard sehen, der so aussieht.

import os
import logging
from pymongo import MongoClient
from dotenv import load_dotenv
from zenml import step
import pandas as pd

# Load the .env file
load_dotenv()

# Get MongoDB URI from environment variables
MONGO_URI = os.getenv("MONGO_URI")

def fetch_data_from_mongodb(collection_name:str, database_name:str):
    """
    Fetches data from MongoDB and converts it into a pandas DataFrame.

    collection_name: 
        Name of the MongoDB collection to fetch data.
    database_name: 
        Name of the MongoDB database.
    return: 
        A pandas DataFrame containing the data
    """
    # Connect to the MongoDB client
    client = MongoClient(MONGO_URI)
    db = client[database_name]  # Select the database
    collection = db[collection_name]  # Select the collection

    # Fetch all documents from the collection
    try:
        logging.info(f"Fetching data from MongoDB collection: {collection_name}...")
        data = list(collection.find())  # Convert cursor to a list of dictionaries

        if not data:
            logging.info("No data found in the MongoDB collection.")
            

        # Convert the list of dictionaries into a pandas DataFrame
        df = pd.DataFrame(data)

        # Drop the MongoDB ObjectId field if it exists (optional)
        if '_id' in df.columns:
            df = df.drop(columns=['_id'])

        logging.info("Data successfully fetched and converted to a DataFrame!")
        return df

    except Exception as e:
        logging.error(f"An error occurred while fetching data: {e}")
        raise e  
        
        
@step(enable_cache=False)
def ingest_data(collection_name: str = "historical_data", database_name: str = "crypto_data") -> pd.DataFrame:
    
    logging.info("Started data ingestion process from MongoDB.")

    try:
        # Use the fetch_data_from_mongodb function to fetch data
        df = fetch_data_from_mongodb(collection_name=collection_name, database_name=database_name)

        if df.empty:
            logging.warning("No data was loaded. Check the collection name or the database content.")
        else:
            logging.info(f"Data ingestion completed. Number of records loaded: {len(df)}.")

        return df
    
    except Exception as e:
        logging.error(f"Error while reading data from {collection_name} in {database_name}: {e}")
        raise e  

Jetzt müssen Sie den obigen MLFlow UI -Link in Ihre Befehlszeile kopieren und einfügen und ausführen.

Hier ist das MLFlow -Dashboard, in dem Sie die Bewertungsmetriken und Modellparameter sehen können:

Bitcoin -Preisvorhersage unter Verwendung von Mlops

Schritt 11: Erstellen Sie die Stromlit -App

streamlit ist ein erstaunliches Open-Source-Framework, das zur Erstellung interaktiver UI verwendet wird. Wir können Streamlit verwenden, um Web-Apps schnell zu erstellen, ohne die Backend- oder Frontend-Entwicklung zu kennen. Zunächst müssen wir Stromlit in unserem System installieren.

class DataPreprocessor:
    def __init__(self, data: pd.DataFrame):
        
        self.data = data
        logging.info("DataPreprocessor initialized with data of shape: %s", data.shape)

    def clean_data(self) -> pd.DataFrame:
        """
        Performs data cleaning by removing unnecessary columns, dropping columns with missing values,
        and returning the cleaned DataFrame.

        Returns:
            pd.DataFrame: The cleaned DataFrame with unnecessary and missing-value columns removed.
        """
        logging.info("Starting data cleaning process.")

        # Drop unnecessary columns, including '_id' if it exists
        columns_to_drop = [
            'UNIT', 'TYPE', 'MARKET', 'INSTRUMENT', 
            'FIRST_MESSAGE_TIMESTAMP', 'LAST_MESSAGE_TIMESTAMP', 
            'FIRST_MESSAGE_VALUE', 'HIGH_MESSAGE_VALUE', 'HIGH_MESSAGE_TIMESTAMP', 
            'LOW_MESSAGE_VALUE', 'LOW_MESSAGE_TIMESTAMP', 'LAST_MESSAGE_VALUE', 
            'TOTAL_INDEX_UPDATES', 'VOLUME_TOP_TIER', 'QUOTE_VOLUME_TOP_TIER', 
            'VOLUME_DIRECT', 'QUOTE_VOLUME_DIRECT', 'VOLUME_TOP_TIER_DIRECT', 
            'QUOTE_VOLUME_TOP_TIER_DIRECT', '_id'  # Adding '_id' to the list
        ]
        logging.info("Dropping columns: %s")
        self.data = self.drop_columns(self.data, columns_to_drop)

        # Drop columns where the number of missing values is greater than 0
        logging.info("Dropping columns with missing values.")
        self.data = self.drop_columns_with_missing_values(self.data)

        logging.info("Data cleaning completed. Data shape after cleaning: %s", self.data.shape)
        return self.data

    def drop_columns(self, data: pd.DataFrame, columns: list) -> pd.DataFrame:
        """
        Drops specified columns from the DataFrame.

        Returns:
            pd.DataFrame: The DataFrame with the specified columns removed.
        """
        logging.info("Dropping columns: %s", columns)
        return data.drop(columns=columns, errors='ignore')

    def drop_columns_with_missing_values(self, data: pd.DataFrame) -> pd.DataFrame:
        """
        Drops columns with any missing values from the DataFrame.

        Parameters:
            data: pd.DataFrame
                The DataFrame from which columns with missing values will be removed.
        
        Returns:
            pd.DataFrame: The DataFrame with columns containing missing values removed.
        """
        missing_columns = data.columns[data.isnull().sum() > 0]
        if not missing_columns.empty:
            logging.info("Columns with missing values: %s", missing_columns.tolist())
        else:
            logging.info("No columns with missing values found.")
        return data.loc[:, data.isnull().sum() == 0]

Sie können den Code auf GitHub für die Streamlit -App finden.

Bitcoin -Preisvorhersage unter Verwendung von Mlops

Hier ist der GitHub -Code und die Videoerklärung des Projekts für Ihr besseres Verständnis.

Schlussfolgerung

In diesem Artikel haben wir erfolgreich ein End-to-End-Mlops-Projekt mit Bitcoin-Preisvorhersage vorgestellt. Aus dem Erwerb von Daten über eine API und die Vorverarbeitung bis hin zur Modellierung von Schulungen, Bewertung und Bereitstellung unterstreicht unser Projekt die entscheidende Rolle von MLOPS bei der Verbindung von Entwicklung mit Produktion. Wir sind uns einem Schritt näher, um die Zukunft der Vorhersage von Bitcoin -Preisen in Echtzeit zu prägen. APIs bieten einen reibungslosen Zugriff auf externe Daten wie Bitcoin-Preisdaten aus der CCDATA-API, wodurch die Notwendigkeit eines bereits bestehenden Datensatzes erforderlich ist.

Key Takeaways

  • APIs ermöglichen einen nahtlosen Zugriff auf externe Daten wie Bitcoin-Preisdaten aus der CCDATA-API, wodurch die Notwendigkeit eines bereits bestehenden Datensatzes erforderlich ist.
  • Zenml und MLFlow sind robuste Tools, die die Entwicklung, Verfolgung und Bereitstellung von maschinellen Lernmodellen in realen Anwendungen erleichtern.
  • Wir haben Best Practices befolgt, indem wir die Aufnahme, Reinigung, Merkmalen von Daten, Modelltraining und Bewertung ordnungsgemäß durchführen.
  • kontinuierliche Einsatz- und Inferenzpipelines sind wichtig, um sicherzustellen, dass Modelle in Produktionsumgebungen effizient bleiben und verfügbar sind.

häufig gestellte Fragen

Q1. Ist Zenml frei zu verwenden?

a. Ja, Zenml ist ein vollständig offenes MLOPS-Framework, das den Übergang von der lokalen Entwicklung zu Produktionspipelines so einfach wie 1 Codezeile macht.

Q2. Wofür wird MLFlow verwendet?

a. MLFLOW erleichtert die Entwicklung des maschinellen Lernens, indem sie Tools zum Verfolgen von Experimenten, Versionungsmodellen und Bereitstellungen anbieten.

Q3. Wie kann der Server -Daemon nicht ausgeführt werden.

a. Dies ist ein häufiger Fehler, mit dem Sie im Projekt konfrontiert sind. Führen Sie einfach "Zenml Logout" -Local`, dann "Zenml Clean" und dann "Zenml Login –Local" aus. Führen Sie die Pipeline erneut aus. Es wird gelöst.

Das obige ist der detaillierte Inhalt vonBitcoin -Preisvorhersage unter Verwendung von Mlops. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn