検索
ホームページテクノロジー周辺機器AIMLOPを使用したビットコイン価格予測

ビットコインやその価格の変動についてあまり知らないが、利益を上げるために投資決定を下したいですか?この機械学習モデルには背中があります。占星術師よりも価格がはるかに優れていることを予測できます。この記事では、ZENMLとMLFLOWを使用して、ビットコイン価格を予測および予測するためのMLモデルを構築します。それでは、誰もがMLおよびMLOPSツールを使用して将来を予測する方法を理解するための旅を始めましょう。

学習目標

    APIを効率的に使用してライブデータを取得することを学びます
  • ZenMLとは何か、MLFLOWを使用する理由、ZenMLと統合する方法を理解してください。
  • アイデアから生産まで、機械学習モデルの展開プロセスを探索してください。
  • インタラクティブな機械学習モデルの予測用にユーザーフレンドリーな流線アプリを作成する方法を発見してください。
  • この記事は、

データサイエンスブログの一部として公開されました。 目次問題声明

プロジェクト実装

ステップ1:APIへのアクセスステップ2:MongoDB
  • ステップ3を使用してデータベースへの接続5:データのクリーニング
  • ステップ6:フィーチャーエンジニアリング
    • ステップ7:データの分割
    • ステップ8:モデルトレーニング
    • ステップ9:モデル評価
    • ステップ10:モデルの展開
    • ステップ
    • 問題声明
    • ビットコインの価格は非常に不安定であり、予測を行うことは不可能です。私たちのプロジェクトでは、Mlopsのベストプラクティックを使用してLSTMモデルを構築して、ビットコインプリスとトレンドを予測しています。
    • プロジェクトを実装する前に、プロジェクトアーキテクチャを見てみましょう。
    • プロジェクトの実装
    • APIにアクセスすることから始めましょう。
    • なぜ私たちはこれをしているのですか?さまざまなデータセットから履歴ビットコインの価格データを取得できますが、APIを使用すると、ライブ市場データにアクセスできます。
    • ステップ1:APIへのアクセス
  • apiアクセスにサインアップ:

theccdata apiページにサインアップしたら。このpagehttps://developers.cryptocompare.com/documentation/data-api/index_cc_v1_historical_days

ビットコインの価格データを取得:

以下のコードを使用すると、CCDATA APIからビットコイン価格データを取得し、パンダのデータフレームに変換できます。また、APIキーを.ENVファイルに保持します。MLOPを使用したビットコイン価格予測

ステップ2:mongodb

を使用してデータベースに接続します

mongodbは、その適応性、拡張性、および非構造化データをJSONのような形式で保存する能力で知られているNOSQLデータベースです。
import requests
import pandas as pd
from dotenv import load_dotenv
import os

# Load the .env file
load_dotenv()

def fetch_crypto_data(api_uri):
    response = requests.get(
        api_uri,
        params={
            "market": "cadli",
            "instrument": "BTC-USD",
            "limit": 5000,
            "aggregate": 1,
            "fill": "true",
            "apply_mapping": "true",
            "response_format": "JSON"
        },
        headers={"Content-type": "application/json; charset=UTF-8"}
    )

    if response.status_code == 200:
        print('API Connection Successful! \nFetching the data...')

        data = response.json()
        data_list = data.get('Data', [])

        df = pd.DataFrame(data_list)

        df['DATE'] = pd.to_datetime(df['TIMESTAMP'], unit='s')

        return df  # Return the DataFrame
    else:
        raise Exception(f"API Error: {response.status_code} - {response.text}")

このコードはMongoDBに接続し、APIを介してビットコイン価格データを取得し、最新のログ日付の後にすべての新しいエントリでデータベースを更新します。

Zenml

の紹介

Zenmlis機械学習操作に合わせたオープンソースプラットフォームで、柔軟で生産対応のパイプラインの作成をサポートしています。さらに、ZenMLは複数の機械学習ツールとyikemlflow、Bentomlなどと統合して、シームレスなMLパイプラインを作成します。

⚠️あなたがWindowsユーザーの場合は、システムにWSLをインストールしてみてください。 ZenmlはWindowsをサポートしていません このプロジェクトでは、ZenMLを使用する従来のパイプラインを実装し、実験追跡のためにMLFLOWをZenMLと統合します。

前提条件と基本的なZENMLコマンド

python 3.12以降:

ここから入手できます:https://www.python.org/downloads/

仮想環境をアクティブにします:
  • zenmlコマンド:
import os
from pymongo import MongoClient
from dotenv import load_dotenv
from data.management.api import fetch_crypto_data  # Import the API function
import pandas as pd

load_dotenv()

MONGO_URI = os.getenv("MONGO_URI")
API_URI = os.getenv("API_URI")

client = MongoClient(MONGO_URI, ssl=True, ssl_certfile=None, ssl_ca_certs=None)
db = client['crypto_data']
collection = db['historical_data']

try:
    latest_entry = collection.find_one(sort=[("DATE", -1)])  # Find the latest date
    if latest_entry:
        last_date = pd.to_datetime(latest_entry['DATE']).strftime('%Y-%m-%d')
    else:
        last_date = '2011-03-27'  # Default start date if MongoDB is empty

    print(f"Fetching data starting from {last_date}...")
    new_data_df = fetch_crypto_data(API_URI)

    if latest_entry:
        new_data_df = new_data_df[new_data_df['DATE'] > last_date]

    if not new_data_df.empty:
        data_to_insert = new_data_df.to_dict(orient='records')
        result = collection.insert_many(data_to_insert)
        print(f"Inserted {len(result.inserted_ids)} new records into MongoDB.")
    else:
        print("No new data to insert.")
except Exception as e:
    print(f"An error occurred: {e}")
すべてのコアZENMLコマンドとその機能を以下に示します。
  1. ステップ3:mlflowとZenMl の統合 実験追跡にMLFLOWを使用して、モデル、アーティファクト、メトリック、およびハイパーパラメーター値を追跡しています。ここでは、実験追跡とモデルの展開のためにMLFLOWを登録しています。
ZenMlスタックリスト

#create a virtual environment
python3 -m venv venv

#Activate your virtual environmnent in your project folder
source venv/bin/activate

プロジェクト構造

ここでは、プロジェクトのレイアウトを見ることができます。それでは、詳細に1つずつ議論しましょう。
#Install zenml
pip install zenml

#To Launch zenml server and dashboard locally
pip install "zenml[server]"

#To check the zenml Version:
zenml version

#To initiate a new repository
zenml init

#To run the dashboard locally:
zenml login --local

#To know the status of our zenml Pipelines
zenml show

#To shutdown the zenml server
zenml clean

ステップ4:データ摂取

MLOPを使用したビットコイン価格予測最初にAPIからMongoDBにデータを摂取し、Pandas DataFrameに変換します。

@step

#Integrating mlflow with ZenML
zenml integration install mlflow -y

#Register the experiment tracker
zenml experiment-tracker register mlflow_tracker --flavor=mlflow

#Registering the model deployer
zenml model-deployer register mlflow --flavor=mlflow

#Registering the stack
zenml stack register local-mlflow-stack-new -a default -o default -d mlflow -e mlflow_tracker --set

#To view the stack list
zenml stack --list
を追加して、

gest_data()

関数の装飾器として追加して、トレーニングパイプラインのステップとして宣言します。同様に、プロジェクトアーキテクチャの各ステップのコードを作成し、パイプラインを作成します。

デコレーターを使用した方法を表示するには、以下のgithubリンク(ステップフォルダー)をチェックして、パイプラインの他のステップ、つまりデータクリーニング、機能エンジニアリング、データ分割、モデルトレーニング、モデル評価のコードを確認してください。

bitcoin_price_prediction_mlops/        # Project directory
├── data/                             
│   └── management/                   
│       ├── api_to_mongodb.py          # Code to fetch data and save it to MongoDB
│       └── api.py                     # API-related utility functions
│
├── pipelines/                         
│   ├── deployment_pipeline.py         # Deployment pipeline
│   └── training_pipeline.py           # Training pipeline
│
├── saved_models/                      # Directory for storing trained models
├── saved_scalers/                     # Directory for storing scalers used in data preprocessing
│
├── src/                               # Source code
│   ├── data_cleaning.py               # Data cleaning and preprocessing
│   ├── data_ingestion.py              # Data ingestion 
│   ├── data_splitter.py               # Data splitting 
│   ├── feature_engineering.py         # Feature engineering 
│   ├── model_evaluation.py            # Model evaluation
│   └── model_training.py              # Model training
│
├── steps/                             # ZenML steps
│   ├── clean_data.py                  # ZenML step for cleaning data
│   ├── data_splitter.py               # ZenML step for data splitting
│   ├── dynamic_importer.py            # ZenML step for importing dynamic data
│   ├── feature_engineering.py         # ZenML step for feature engineering
│   ├── ingest_data.py                 # ZenML step for data ingestion
│   ├── model_evaluation.py            # ZenML step for model evaluation
│   ├── model_training.py              # ZenML step for training the model
│   ├── prediction_service_loader.py   # ZenML step for loading prediction services
│   ├── predictor.py                   # ZenML step for prediction
│   └── utils.py                       # Utility functions for steps
│
├── .env                               # Environment variables file
├── .gitignore                         # Git ignore file
│
├── app.py                             # Streamlit user interface app
│
├── README.md                          # Project documentation
├── requirements.txt                   # List of required packages
├── run_deployment.py                  # Code for running deployment and prediction pipeline
├── run_pipeline.py                    # Code for running training pipeline
└── .zen/                              # ZenML directory (created automatically after ZenML initialization)
ステップ5:データのクリーニング

このステップでは、摂取されたデータをクリーニングするためのさまざまな戦略を作成します。データ内の不要な列と欠損値をドロップします。

import requests
import pandas as pd
from dotenv import load_dotenv
import os

# Load the .env file
load_dotenv()

def fetch_crypto_data(api_uri):
    response = requests.get(
        api_uri,
        params={
            "market": "cadli",
            "instrument": "BTC-USD",
            "limit": 5000,
            "aggregate": 1,
            "fill": "true",
            "apply_mapping": "true",
            "response_format": "JSON"
        },
        headers={"Content-type": "application/json; charset=UTF-8"}
    )

    if response.status_code == 200:
        print('API Connection Successful! \nFetching the data...')

        data = response.json()
        data_list = data.get('Data', [])

        df = pd.DataFrame(data_list)

        df['DATE'] = pd.to_datetime(df['TIMESTAMP'], unit='s')

        return df  # Return the DataFrame
    else:
        raise Exception(f"API Error: {response.status_code} - {response.text}")

ステップ6:機能エンジニアリング

このステップは、以前のdata_cleaningステップからクリーンデータを取得します。単純な移動平均(SMA)、指数移動平均(EMA)、遅れてローリング統計などの新しい機能を作成して、傾向をキャプチャし、騒音を減らし、時系列データからより信頼性の高い予測を行います。さらに、Minmaxスケーリングを使用して、機能とターゲット変数をスケーリングします。

import os
from pymongo import MongoClient
from dotenv import load_dotenv
from data.management.api import fetch_crypto_data  # Import the API function
import pandas as pd

load_dotenv()

MONGO_URI = os.getenv("MONGO_URI")
API_URI = os.getenv("API_URI")

client = MongoClient(MONGO_URI, ssl=True, ssl_certfile=None, ssl_ca_certs=None)
db = client['crypto_data']
collection = db['historical_data']

try:
    latest_entry = collection.find_one(sort=[("DATE", -1)])  # Find the latest date
    if latest_entry:
        last_date = pd.to_datetime(latest_entry['DATE']).strftime('%Y-%m-%d')
    else:
        last_date = '2011-03-27'  # Default start date if MongoDB is empty

    print(f"Fetching data starting from {last_date}...")
    new_data_df = fetch_crypto_data(API_URI)

    if latest_entry:
        new_data_df = new_data_df[new_data_df['DATE'] > last_date]

    if not new_data_df.empty:
        data_to_insert = new_data_df.to_dict(orient='records')
        result = collection.insert_many(data_to_insert)
        print(f"Inserted {len(result.inserted_ids)} new records into MongoDB.")
    else:
        print("No new data to insert.")
except Exception as e:
    print(f"An error occurred: {e}")
ステップ7:データの分割

次に、処理されたデータを80:20の比率でトレーニングとテストデータセットに分割します。

ステップ8:モデルトレーニング
#create a virtual environment
python3 -m venv venv

#Activate your virtual environmnent in your project folder
source venv/bin/activate
このステップでは、過剰適合を防ぐために早期停止でThelSTMモデルをトレーニングし、MLFLOWの自動ロギングを使用してモデルと実験を追跡し、

LSTM_MODEL.KERAS

ステップ9:モデルの評価 これは回帰の問題であるため、平均四角誤差(MSE)、ルート平均二乗誤差(MSE)、平均絶対誤差(MAE)、R-cquaredなどの評価メトリックを使用しています。

#Install zenml
pip install zenml

#To Launch zenml server and dashboard locally
pip install "zenml[server]"

#To check the zenml Version:
zenml version

#To initiate a new repository
zenml init

#To run the dashboard locally:
zenml login --local

#To know the status of our zenml Pipelines
zenml show

#To shutdown the zenml server
zenml clean
ここで、上記のすべての手順をパイプラインに整理します。新しいファイルTraining_pipeline.pyを作成しましょう

ここで、

@pipeline

デコレーターは、functionml_pipeline()をZenmlのパイプラインとして定義するために使用されます。 トレーニングパイプラインのダッシュボードを表示するには、run_pipeline.pyスクリプトを実行するだけです。 run_pipeline.pyファイルを作成しましょう。
#Integrating mlflow with ZenML
zenml integration install mlflow -y

#Register the experiment tracker
zenml experiment-tracker register mlflow_tracker --flavor=mlflow

#Registering the model deployer
zenml model-deployer register mlflow --flavor=mlflow

#Registering the stack
zenml stack register local-mlflow-stack-new -a default -o default -d mlflow -e mlflow_tracker --set

#To view the stack list
zenml stack --list

パイプラインの作成を完了しました。以下のコマンドを実行して、パイプラインダッシュボードを表示します。
bitcoin_price_prediction_mlops/        # Project directory
├── data/                             
│   └── management/                   
│       ├── api_to_mongodb.py          # Code to fetch data and save it to MongoDB
│       └── api.py                     # API-related utility functions
│
├── pipelines/                         
│   ├── deployment_pipeline.py         # Deployment pipeline
│   └── training_pipeline.py           # Training pipeline
│
├── saved_models/                      # Directory for storing trained models
├── saved_scalers/                     # Directory for storing scalers used in data preprocessing
│
├── src/                               # Source code
│   ├── data_cleaning.py               # Data cleaning and preprocessing
│   ├── data_ingestion.py              # Data ingestion 
│   ├── data_splitter.py               # Data splitting 
│   ├── feature_engineering.py         # Feature engineering 
│   ├── model_evaluation.py            # Model evaluation
│   └── model_training.py              # Model training
│
├── steps/                             # ZenML steps
│   ├── clean_data.py                  # ZenML step for cleaning data
│   ├── data_splitter.py               # ZenML step for data splitting
│   ├── dynamic_importer.py            # ZenML step for importing dynamic data
│   ├── feature_engineering.py         # ZenML step for feature engineering
│   ├── ingest_data.py                 # ZenML step for data ingestion
│   ├── model_evaluation.py            # ZenML step for model evaluation
│   ├── model_training.py              # ZenML step for training the model
│   ├── prediction_service_loader.py   # ZenML step for loading prediction services
│   ├── predictor.py                   # ZenML step for prediction
│   └── utils.py                       # Utility functions for steps
│
├── .env                               # Environment variables file
├── .gitignore                         # Git ignore file
│
├── app.py                             # Streamlit user interface app
│
├── README.md                          # Project documentation
├── requirements.txt                   # List of required packages
├── run_deployment.py                  # Code for running deployment and prediction pipeline
├── run_pipeline.py                    # Code for running training pipeline
└── .zen/                              # ZenML directory (created automatically after ZenML initialization)

上記のコマンドを実行した後、トラッキングダッシュボードURLを返します。これは次のようになります。

トレーニングパイプラインはダッシュボードでこのようになります。
import os
import logging
from pymongo import MongoClient
from dotenv import load_dotenv
from zenml import step
import pandas as pd

# Load the .env file
load_dotenv()

# Get MongoDB URI from environment variables
MONGO_URI = os.getenv("MONGO_URI")

def fetch_data_from_mongodb(collection_name:str, database_name:str):
    """
    Fetches data from MongoDB and converts it into a pandas DataFrame.

    collection_name: 
        Name of the MongoDB collection to fetch data.
    database_name: 
        Name of the MongoDB database.
    return: 
        A pandas DataFrame containing the data
    """
    # Connect to the MongoDB client
    client = MongoClient(MONGO_URI)
    db = client[database_name]  # Select the database
    collection = db[collection_name]  # Select the collection

    # Fetch all documents from the collection
    try:
        logging.info(f"Fetching data from MongoDB collection: {collection_name}...")
        data = list(collection.find())  # Convert cursor to a list of dictionaries

        if not data:
            logging.info("No data found in the MongoDB collection.")
            

        # Convert the list of dictionaries into a pandas DataFrame
        df = pd.DataFrame(data)

        # Drop the MongoDB ObjectId field if it exists (optional)
        if '_id' in df.columns:
            df = df.drop(columns=['_id'])

        logging.info("Data successfully fetched and converted to a DataFrame!")
        return df

    except Exception as e:
        logging.error(f"An error occurred while fetching data: {e}")
        raise e  
        
        
@step(enable_cache=False)
def ingest_data(collection_name: str = "historical_data", database_name: str = "crypto_data") -> pd.DataFrame:
    
    logging.info("Started data ingestion process from MongoDB.")

    try:
        # Use the fetch_data_from_mongodb function to fetch data
        df = fetch_data_from_mongodb(collection_name=collection_name, database_name=database_name)

        if df.empty:
            logging.warning("No data was loaded. Check the collection name or the database content.")
        else:
            logging.info(f"Data ingestion completed. Number of records loaded: {len(df)}.")

        return df
    
    except Exception as e:
        logging.error(f"Error while reading data from {collection_name} in {database_name}: {e}")
        raise e  

class DataPreprocessor:
    def __init__(self, data: pd.DataFrame):
        
        self.data = data
        logging.info("DataPreprocessor initialized with data of shape: %s", data.shape)

    def clean_data(self) -> pd.DataFrame:
        """
        Performs data cleaning by removing unnecessary columns, dropping columns with missing values,
        and returning the cleaned DataFrame.

        Returns:
            pd.DataFrame: The cleaned DataFrame with unnecessary and missing-value columns removed.
        """
        logging.info("Starting data cleaning process.")

        # Drop unnecessary columns, including '_id' if it exists
        columns_to_drop = [
            'UNIT', 'TYPE', 'MARKET', 'INSTRUMENT', 
            'FIRST_MESSAGE_TIMESTAMP', 'LAST_MESSAGE_TIMESTAMP', 
            'FIRST_MESSAGE_VALUE', 'HIGH_MESSAGE_VALUE', 'HIGH_MESSAGE_TIMESTAMP', 
            'LOW_MESSAGE_VALUE', 'LOW_MESSAGE_TIMESTAMP', 'LAST_MESSAGE_VALUE', 
            'TOTAL_INDEX_UPDATES', 'VOLUME_TOP_TIER', 'QUOTE_VOLUME_TOP_TIER', 
            'VOLUME_DIRECT', 'QUOTE_VOLUME_DIRECT', 'VOLUME_TOP_TIER_DIRECT', 
            'QUOTE_VOLUME_TOP_TIER_DIRECT', '_id'  # Adding '_id' to the list
        ]
        logging.info("Dropping columns: %s")
        self.data = self.drop_columns(self.data, columns_to_drop)

        # Drop columns where the number of missing values is greater than 0
        logging.info("Dropping columns with missing values.")
        self.data = self.drop_columns_with_missing_values(self.data)

        logging.info("Data cleaning completed. Data shape after cleaning: %s", self.data.shape)
        return self.data

    def drop_columns(self, data: pd.DataFrame, columns: list) -> pd.DataFrame:
        """
        Drops specified columns from the DataFrame.

        Returns:
            pd.DataFrame: The DataFrame with the specified columns removed.
        """
        logging.info("Dropping columns: %s", columns)
        return data.drop(columns=columns, errors='ignore')

    def drop_columns_with_missing_values(self, data: pd.DataFrame) -> pd.DataFrame:
        """
        Drops columns with any missing values from the DataFrame.

        Parameters:
            data: pd.DataFrame
                The DataFrame from which columns with missing values will be removed.
        
        Returns:
            pd.DataFrame: The DataFrame with columns containing missing values removed.
        """
        missing_columns = data.columns[data.isnull().sum() > 0]
        if not missing_columns.empty:
            logging.info("Columns with missing values: %s", missing_columns.tolist())
        else:
            logging.info("No columns with missing values found.")
        return data.loc[:, data.isnull().sum() == 0]

MLOPを使用したビットコイン価格予測

MLOPを使用したビットコイン価格予測ステップ10:モデルの展開

今まで、モデルとパイプラインを構築しています。それでは、ユーザーが予測できるパイプラインを制作に押し込みましょう。 MLOPを使用したビットコイン価格予測継続的な展開パイプライン

MLOPを使用したビットコイン価格予測このパイプラインは、訓練されたモデルを継続的に展開する責任があります。最初に

ml_pipeline()

ファイルから実行してモデルをトレーニングし、次に

mlflowモデルデプロイヤー

を使用して、

continuous_deployment_pipeline()

推論パイプライン

展開モデルを使用して、推論パイプラインを使用して新しいデータを予測します。このプロジェクトでこのパイプラインをどのように実装したかを見てみましょう。

import requests
import pandas as pd
from dotenv import load_dotenv
import os

# Load the .env file
load_dotenv()

def fetch_crypto_data(api_uri):
    response = requests.get(
        api_uri,
        params={
            "market": "cadli",
            "instrument": "BTC-USD",
            "limit": 5000,
            "aggregate": 1,
            "fill": "true",
            "apply_mapping": "true",
            "response_format": "JSON"
        },
        headers={"Content-type": "application/json; charset=UTF-8"}
    )

    if response.status_code == 200:
        print('API Connection Successful! \nFetching the data...')

        data = response.json()
        data_list = data.get('Data', [])

        df = pd.DataFrame(data_list)

        df['DATE'] = pd.to_datetime(df['TIMESTAMP'], unit='s')

        return df  # Return the DataFrame
    else:
        raise Exception(f"API Error: {response.status_code} - {response.text}")
以下の推論パイプラインで呼び出される各関数について見てみましょう:

dynamic_importer()

この関数は新しいデータをロードし、データ処理を実行し、データを返します。

import os
from pymongo import MongoClient
from dotenv import load_dotenv
from data.management.api import fetch_crypto_data  # Import the API function
import pandas as pd

load_dotenv()

MONGO_URI = os.getenv("MONGO_URI")
API_URI = os.getenv("API_URI")

client = MongoClient(MONGO_URI, ssl=True, ssl_certfile=None, ssl_ca_certs=None)
db = client['crypto_data']
collection = db['historical_data']

try:
    latest_entry = collection.find_one(sort=[("DATE", -1)])  # Find the latest date
    if latest_entry:
        last_date = pd.to_datetime(latest_entry['DATE']).strftime('%Y-%m-%d')
    else:
        last_date = '2011-03-27'  # Default start date if MongoDB is empty

    print(f"Fetching data starting from {last_date}...")
    new_data_df = fetch_crypto_data(API_URI)

    if latest_entry:
        new_data_df = new_data_df[new_data_df['DATE'] > last_date]

    if not new_data_df.empty:
        data_to_insert = new_data_df.to_dict(orient='records')
        result = collection.insert_many(data_to_insert)
        print(f"Inserted {len(result.inserted_ids)} new records into MongoDB.")
    else:
        print("No new data to insert.")
except Exception as e:
    print(f"An error occurred: {e}")
prectiction_service_loader()

この関数は、

@step

で装飾されています。展開サービスw.r.tは、pipeline_nameとstep_nameに基づいて展開モデルをロードします。展開モデルは、新しいデータの予測クエリを処理する準備ができています。 lineexpstion_services = mlflow_model_deployer_component.find_model_server()

パイプライン名やパイプラインステップ名などの特定のパラメーターに基づいて利用可能な展開サービスを検索します。サービスが利用できない場合、展開パイプラインが実行されていないか、展開パイプラインの問題が発生していないことを示しているため、RuntimeErrorがスローされます。

predictor()

#create a virtual environment
python3 -m venv venv

#Activate your virtual environmnent in your project folder
source venv/bin/activate
この関数は、MLFLOWDEPLOYMENTSERVICEと新しいデータを介してMLFLOWで廃止されたモデルを取り入れます。データは、モデルの予想形式と一致するようにさらに処理され、リアルタイムの推論を行います。

継続的な展開と推論パイプラインを視覚化するには、deployment and Prowictionの構成が定義されるrun_deployment.pyスクリプトを実行する必要があります。 (以下のgithubでrun_deployment.pyコードを確認してください)

run_deployment.pyファイルを実行して、継続的な展開パイプラインと推論パイプラインのダッシュボードを表示しましょう。

継続的な展開パイプライン - 出力
#Install zenml
pip install zenml

#To Launch zenml server and dashboard locally
pip install "zenml[server]"

#To check the zenml Version:
zenml version

#To initiate a new repository
zenml init

#To run the dashboard locally:
zenml login --local

#To know the status of our zenml Pipelines
zenml show

#To shutdown the zenml server
zenml clean

#Integrating mlflow with ZenML
zenml integration install mlflow -y

#Register the experiment tracker
zenml experiment-tracker register mlflow_tracker --flavor=mlflow

#Registering the model deployer
zenml model-deployer register mlflow --flavor=mlflow

#Registering the stack
zenml stack register local-mlflow-stack-new -a default -o default -d mlflow -e mlflow_tracker --set

#To view the stack list
zenml stack --list

推論パイプライン - 出力

bitcoin_price_prediction_mlops/        # Project directory
├── data/                             
│   └── management/                   
│       ├── api_to_mongodb.py          # Code to fetch data and save it to MongoDB
│       └── api.py                     # API-related utility functions
│
├── pipelines/                         
│   ├── deployment_pipeline.py         # Deployment pipeline
│   └── training_pipeline.py           # Training pipeline
│
├── saved_models/                      # Directory for storing trained models
├── saved_scalers/                     # Directory for storing scalers used in data preprocessing
│
├── src/                               # Source code
│   ├── data_cleaning.py               # Data cleaning and preprocessing
│   ├── data_ingestion.py              # Data ingestion 
│   ├── data_splitter.py               # Data splitting 
│   ├── feature_engineering.py         # Feature engineering 
│   ├── model_evaluation.py            # Model evaluation
│   └── model_training.py              # Model training
│
├── steps/                             # ZenML steps
│   ├── clean_data.py                  # ZenML step for cleaning data
│   ├── data_splitter.py               # ZenML step for data splitting
│   ├── dynamic_importer.py            # ZenML step for importing dynamic data
│   ├── feature_engineering.py         # ZenML step for feature engineering
│   ├── ingest_data.py                 # ZenML step for data ingestion
│   ├── model_evaluation.py            # ZenML step for model evaluation
│   ├── model_training.py              # ZenML step for training the model
│   ├── prediction_service_loader.py   # ZenML step for loading prediction services
│   ├── predictor.py                   # ZenML step for prediction
│   └── utils.py                       # Utility functions for steps
│
├── .env                               # Environment variables file
├── .gitignore                         # Git ignore file
│
├── app.py                             # Streamlit user interface app
│
├── README.md                          # Project documentation
├── requirements.txt                   # List of required packages
├── run_deployment.py                  # Code for running deployment and prediction pipeline
├── run_pipeline.py                    # Code for running training pipeline
└── .zen/                              # ZenML directory (created automatically after ZenML initialization)

run_deployment.pyファイルを実行した後、次のようなmlflowダッシュボードリンクを見ることができます。

MLOPを使用したビットコイン価格予測ここで、コマンドラインの上記のMLFlow UIリンクをコピーして貼り付けて実行する必要があります。

ここにmlflowダッシュボードがあります。ここでは、評価メトリックとモデルパラメーターを確認できます。

MLOPを使用したビットコイン価格予測ステップ11:retrienlitアプリを構築します

Streamlitは、インタラクティブなUIの作成に使用される驚くべきオープンソースのPythonベースのフレームワークです。バックエンドやフロントエンド開発を知らずに、Riremlitを使用してWebアプリをすばやく構築できます。まず、システムにRiremlitをインストールする必要があります

import os
import logging
from pymongo import MongoClient
from dotenv import load_dotenv
from zenml import step
import pandas as pd

# Load the .env file
load_dotenv()

# Get MongoDB URI from environment variables
MONGO_URI = os.getenv("MONGO_URI")

def fetch_data_from_mongodb(collection_name:str, database_name:str):
    """
    Fetches data from MongoDB and converts it into a pandas DataFrame.

    collection_name: 
        Name of the MongoDB collection to fetch data.
    database_name: 
        Name of the MongoDB database.
    return: 
        A pandas DataFrame containing the data
    """
    # Connect to the MongoDB client
    client = MongoClient(MONGO_URI)
    db = client[database_name]  # Select the database
    collection = db[collection_name]  # Select the collection

    # Fetch all documents from the collection
    try:
        logging.info(f"Fetching data from MongoDB collection: {collection_name}...")
        data = list(collection.find())  # Convert cursor to a list of dictionaries

        if not data:
            logging.info("No data found in the MongoDB collection.")
            

        # Convert the list of dictionaries into a pandas DataFrame
        df = pd.DataFrame(data)

        # Drop the MongoDB ObjectId field if it exists (optional)
        if '_id' in df.columns:
            df = df.drop(columns=['_id'])

        logging.info("Data successfully fetched and converted to a DataFrame!")
        return df

    except Exception as e:
        logging.error(f"An error occurred while fetching data: {e}")
        raise e  
        
        
@step(enable_cache=False)
def ingest_data(collection_name: str = "historical_data", database_name: str = "crypto_data") -> pd.DataFrame:
    
    logging.info("Started data ingestion process from MongoDB.")

    try:
        # Use the fetch_data_from_mongodb function to fetch data
        df = fetch_data_from_mongodb(collection_name=collection_name, database_name=database_name)

        if df.empty:
            logging.warning("No data was loaded. Check the collection name or the database content.")
        else:
            logging.info(f"Data ingestion completed. Number of records loaded: {len(df)}.")

        return df
    
    except Exception as e:
        logging.error(f"Error while reading data from {collection_name} in {database_name}: {e}")
        raise e  
繰り返しますが、retrylitアプリのgithubでコードを見つけることができます。

MLOPを使用したビットコイン価格予測

これがあなたのより良い理解のためのプロジェクトのgithubコードとビデオの説明です。

結論

この記事では、エンドツーエンドの生産対応のビットコイン価格予測MLOPSプロジェクトの構築に成功しました。データを取得してAPIを介してプリプロシスしてトレーニング、評価、展開をモデル化するために、プロジェクトをモデル化することから、開発と生産との接続におけるMLOPの重要な役割を強調しています。リアルタイムでビットコインの価格を予測する未来を形作ることに一歩近づいています。 APIは、CCDATA APIのビットコイン価格データなど、外部データへのスムーズなアクセスを提供し、既存のデータセットの必要性を排除します。

キーテイクアウト

APIは、CCDATA APIのビットコイン価格データなど、外部データへのシームレスなアクセスを有効にして、既存のデータセットの必要性を排除します。
    ZENMLおよびMLFLOWは、現実世界のアプリケーションでの機械学習モデルの開発、追跡、展開を促進する堅牢なツールです。
  • データの摂取、クリーニング、機能エンジニアリング、モデルトレーニング、評価を適切に実行することで、ベストプラクティスに従いました。
  • 継続的な展開と推論パイプラインは、モデルが効率的で生産環境で利用可能であることを保証するために不可欠です。
  • よくある質問
  • q1。 Zenmlは自由に使用できますか?はい、ZenMLは完全にオープンソースMLOPSフレームワークであり、ローカル開発から生産パイプラインへの移行を1行のコードと同じくらい簡単にします。 MLFLOWは何に使用されていますか? MLFLOWは、実験、バージョンモデルを追跡し、それらを展開するためのツールを提供することにより、機械学習開発を容易にします。サーバーデーモンをデバッグする方法は、エラーを実行していませんか?これは、プロジェクトで直面する一般的なエラーです。 `zenml logout –local` then` zenml clean`を実行してから、 `zenml login –local`を実行して、再びパイプラインを実行します。それは解決されます。

以上がMLOPを使用したビットコイン価格予測の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
AIセラピストがここにいます:あなたが知る必要がある14の画期的なメンタルヘルスツールAIセラピストがここにいます:あなたが知る必要がある14の画期的なメンタルヘルスツールApr 30, 2025 am 11:17 AM

訓練を受けたセラピストの人間のつながりと直観を提供することはできませんが、多くの人々は、比較的顔のない匿名のAIボットと心配や懸念を共有することを快適に共有していることが研究で示されています。 これが常に良いかどうか

食料品の通路にAIを呼びます食料品の通路にAIを呼びますApr 30, 2025 am 11:16 AM

数十年の技術である人工知能(AI)は、食品小売業界に革命をもたらしています。 大規模な効率性の向上とコスト削減から、さまざまなビジネス機能にわたる合理化されたプロセスまで、AIの影響はUndeniablです

あなたの精神を持ち上げるために生成的なAIからPEPの話をするあなたの精神を持ち上げるために生成的なAIからPEPの話をするApr 30, 2025 am 11:15 AM

それについて話しましょう。 革新的なAIブレークスルーのこの分析は、さまざまなインパクトのあるAIの複雑さを特定して説明するなど、最新のAIで進行中のForbes列のカバレッジの一部です(こちらのリンクを参照)。さらに、私のコンプのために

AI駆動のハイパーパーソナリゼーションがすべてのビジネスにとって必須である理由AI駆動のハイパーパーソナリゼーションがすべてのビジネスにとって必須である理由Apr 30, 2025 am 11:14 AM

プロの画像を維持するには、時折ワードローブの更新が必要です。 オンラインショッピングは便利ですが、対面の試練の確実性がありません。 私の解決策? AI駆動のパーソナライズ。 衣類の選択をキュレーションするAIアシスタントが想像しています

Duolingoを忘れてください:Google Translateの新しいAI機能は言語を教えていますDuolingoを忘れてください:Google Translateの新しいAI機能は言語を教えていますApr 30, 2025 am 11:13 AM

Google Translateは言語学習機能を追加します Android Authorityによると、App Expert AssemberBugは、Google Translateアプリの最新バージョンには、パーソナライズされたアクティビティを通じてユーザーが言語スキルを向上させるように設計された新しい「実践」モードのテストコードが含まれていることを発見しました。この機能は現在、ユーザーには見えませんが、AssembleDebugはそれを部分的にアクティブにして、新しいユーザーインターフェイス要素の一部を表示できます。 アクティブ化すると、この機能は、「ベータ」バッジでマークされた画面の下部に新しい卒業キャップアイコンを追加し、「実践」機能が最初に実験形式でリリースされることを示します。 関連するポップアッププロンプトは、「あなたのために調整されたアクティビティを練習してください!」を示しています。つまり、Googleがカスタマイズされたことを意味します

彼らはAIのためにTCP/IPを作成しており、Nandaと呼ばれています彼らはAIのためにTCP/IPを作成しており、Nandaと呼ばれていますApr 30, 2025 am 11:12 AM

MITの研究者は、AIエージェント向けに設計された画期的なWebプロトコルであるNandaを開発しています。 ネットワークエージェントと分散型AIの略であるNandaは、インターネット機能を追加することにより、人類のモデルコンテキストプロトコル(MCP)に基づいて構築され、AI Agenを可能にします

プロンプト:Deepfake Detectionは活況を呈しているビジネスですプロンプト:Deepfake Detectionは活況を呈しているビジネスですApr 30, 2025 am 11:11 AM

メタの最新のベンチャー:chatgptに匹敵するAIアプリ Facebook、Instagram、WhatsApp、およびThreadsの親会社であるMetaは、新しいAIを搭載したアプリケーションを立ち上げています。 このスタンドアロンアプリであるMeta AIは、OpenaiのChatGptと直接競争することを目指しています。 レバー

ビジネスリーダーのためのAIサイバーセキュリティでの次の2年間ビジネスリーダーのためのAIサイバーセキュリティでの次の2年間Apr 30, 2025 am 11:10 AM

AIサイバー攻撃の上昇する潮をナビゲートします 最近、人類のためのCISOであるジェイソン・クリントンは、機械間通信が増殖すると、これらの「アイデンティティ」を保護するために、非人間のアイデンティティに結びついた新たなリスクを強調しました。

See all articles

ホットAIツール

Undresser.AI Undress

Undresser.AI Undress

リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover

AI Clothes Remover

写真から衣服を削除するオンライン AI ツール。

Undress AI Tool

Undress AI Tool

脱衣画像を無料で

Clothoff.io

Clothoff.io

AI衣類リムーバー

Video Face Swap

Video Face Swap

完全無料の AI 顔交換ツールを使用して、あらゆるビデオの顔を簡単に交換できます。

ホットツール

SublimeText3 Linux 新バージョン

SublimeText3 Linux 新バージョン

SublimeText3 Linux 最新バージョン

VSCode Windows 64 ビットのダウンロード

VSCode Windows 64 ビットのダウンロード

Microsoft によって発売された無料で強力な IDE エディター

ドリームウィーバー CS6

ドリームウィーバー CS6

ビジュアル Web 開発ツール

Dreamweaver Mac版

Dreamweaver Mac版

ビジュアル Web 開発ツール

WebStorm Mac版

WebStorm Mac版

便利なJavaScript開発ツール