搜尋
首頁科技週邊人工智慧使用MLOPS的比特幣價格預測

>對比特幣或其價格波動了解不多,而是想做出投資決定來獲利嗎?該機器學習模型有您的支持。它可以比占星家更好地預測價格。在本文中,我們將使用ZenML和MLFlow構建一個用於預測和預測比特幣價格的ML模型。因此,讓我們開始我們的旅程,了解任何人如何使用ML和MLOPS工具來預測未來。

學習目標

    學會有效地使用API​​獲取實時數據。
  • >
  • 了解Zenml是什麼,為什麼我們使用MLFLOW以及如何將其與Zenml集成。
  • >探索從想法到生產的機器學習模型的部署過程。
  • >
  • >發現如何為交互式機器學習模型預測創建用戶友好的簡化應用程序。
  • >

>本文是> > data Science Blogathon的一部分。 內容表>

問題語句
  • >項目實現
  • 步驟1:訪問API
    • 步驟2:使用mongodb
  • 5:數據清潔
  • 步驟6:特徵工程
  • 步驟7:數據拆分
  • 步驟8:型號培訓
  • 步驟9:型號評估
  • 步驟10:模型部署 問題
  • 問題語句
比特幣價格高度波動,而做出預測幾乎是不可能的。在我們的項目中,我們正在使用Mlops的最佳實踐構建LSTM模型來預測比特幣和趨勢。
  • > 在實施項目之前,讓我們看一下項目體系結構。
  • >

    >項目實施

    讓我們從訪問API開始。

    我們為什麼要這樣做?您可以從不同的數據集中獲取歷史比特幣價格數據,但是使用API​​,我們可以訪問Live Market Data。 使用MLOPS的比特幣價格預測>步驟1:訪問API

    >

    >註冊API訪問:

    >一旦您在theccdata api頁面上註冊後。您可以從此pagehttps://developers.cryptocompare.com/documentation/data-api/index_cc_v1_historical_days

    獲得免費的API密鑰

    提取比特幣價格數據:
    • 在以下代碼的情況下,您可以從CCDATA API獲取比特幣價格數據,並將其轉換為PANDAS DataFrame。另外,將API密鑰保留在.env文件中。

    • 步驟2:使用mongodb
    • 連接到數據庫 MongoDB是一個NOSQL數據庫,以其適應性,可擴展性和以JSON式格式存儲非結構化數據的能力。
      import requests
      import pandas as pd
      from dotenv import load_dotenv
      import os
      
      # Load the .env file
      load_dotenv()
      
      def fetch_crypto_data(api_uri):
          response = requests.get(
              api_uri,
              params={
                  "market": "cadli",
                  "instrument": "BTC-USD",
                  "limit": 5000,
                  "aggregate": 1,
                  "fill": "true",
                  "apply_mapping": "true",
                  "response_format": "JSON"
              },
              headers={"Content-type": "application/json; charset=UTF-8"}
          )
      
          if response.status_code == 200:
              print('API Connection Successful! \nFetching the data...')
      
              data = response.json()
              data_list = data.get('Data', [])
      
              df = pd.DataFrame(data_list)
      
              df['DATE'] = pd.to_datetime(df['TIMESTAMP'], unit='s')
      
              return df  # Return the DataFrame
          else:
              raise Exception(f"API Error: {response.status_code} - {response.text}")

      此代碼連接到MongoDB,通過API檢索比特幣價格數據,並在最新記錄日期後使用所有新條目更新數據庫。

      介紹Zenml

      > Zenmlis是一個針對機器學習操作量身定制的開源平台,支持了靈活和生產就緒的管道的創建。此外,Zenml與多個機器學習工具集成了LokeMlFlow,Bentoml等,以創建無縫的ML Pipelines。

      ⚠️如果您是Windows用戶,請嘗試在系統上安裝WSL。 Zenml不支持Windows。

      在此項目中,我們將實施使用Zenml的傳統管道,並將MLFlow與Zenml集成進行實驗跟踪。 >

      >前提條件和基本zenml命令

      >

      python 3.12或更高:

      您可以從這裡獲得:https://www.python.org/downloads/
      • 激活您的虛擬環境:
      • >
      > zenml命令:
      import os
      from pymongo import MongoClient
      from dotenv import load_dotenv
      from data.management.api import fetch_crypto_data  # Import the API function
      import pandas as pd
      
      load_dotenv()
      
      MONGO_URI = os.getenv("MONGO_URI")
      API_URI = os.getenv("API_URI")
      
      client = MongoClient(MONGO_URI, ssl=True, ssl_certfile=None, ssl_ca_certs=None)
      db = client['crypto_data']
      collection = db['historical_data']
      
      try:
          latest_entry = collection.find_one(sort=[("DATE", -1)])  # Find the latest date
          if latest_entry:
              last_date = pd.to_datetime(latest_entry['DATE']).strftime('%Y-%m-%d')
          else:
              last_date = '2011-03-27'  # Default start date if MongoDB is empty
      
          print(f"Fetching data starting from {last_date}...")
          new_data_df = fetch_crypto_data(API_URI)
      
          if latest_entry:
              new_data_df = new_data_df[new_data_df['DATE'] > last_date]
      
          if not new_data_df.empty:
              data_to_insert = new_data_df.to_dict(orient='records')
              result = collection.insert_many(data_to_insert)
              print(f"Inserted {len(result.inserted_ids)} new records into MongoDB.")
          else:
              print("No new data to insert.")
      except Exception as e:
          print(f"An error occurred: {e}")
      1. >所有核心Zenml命令及其功能如下:
      2. 步驟3:MLFLOW與Zenml
      的集成

      我們正在使用MLFlow進行實驗跟踪,以跟踪我們的模型,工件,指標和超參數值。我們正在註冊MLFLOW以進行實驗跟踪和模型部署者:

      #create a virtual environment
      python3 -m venv venv
      
      #Activate your virtual environmnent in your project folder
      source venv/bin/activate
      > zenml堆棧列表

      #Install zenml
      pip install zenml
      
      #To Launch zenml server and dashboard locally
      pip install "zenml[server]"
      
      #To check the zenml Version:
      zenml version
      
      #To initiate a new repository
      zenml init
      
      #To run the dashboard locally:
      zenml login --local
      
      #To know the status of our zenml Pipelines
      zenml show
      
      #To shutdown the zenml server
      zenml clean
      項目結構

      >在這裡,您可以看到項目的佈局。現在讓我們詳細討論它。

      > 使用MLOPS的比特幣價格預測

      步驟4:數據攝入

      我們首先將數據從API攝取到MongoDB,然後將其轉換為PANDAS DATAFRAME。

      #Integrating mlflow with ZenML
      zenml integration install mlflow -y
      
      #Register the experiment tracker
      zenml experiment-tracker register mlflow_tracker --flavor=mlflow
      
      #Registering the model deployer
      zenml model-deployer register mlflow --flavor=mlflow
      
      #Registering the stack
      zenml stack register local-mlflow-stack-new -a default -o default -d mlflow -e mlflow_tracker --set
      
      #To view the stack list
      zenml stack --list

      我們將作為裝飾器添加到

      > ingest_data()

      函數中,以將其聲明為我們訓練管道的一步。以同樣的方式,我們將在項目體系結構中為每個步驟編寫代碼並創建管道。
      bitcoin_price_prediction_mlops/        # Project directory
      ├── data/                             
      │   └── management/                   
      │       ├── api_to_mongodb.py          # Code to fetch data and save it to MongoDB
      │       └── api.py                     # API-related utility functions
      │
      ├── pipelines/                         
      │   ├── deployment_pipeline.py         # Deployment pipeline
      │   └── training_pipeline.py           # Training pipeline
      │
      ├── saved_models/                      # Directory for storing trained models
      ├── saved_scalers/                     # Directory for storing scalers used in data preprocessing
      │
      ├── src/                               # Source code
      │   ├── data_cleaning.py               # Data cleaning and preprocessing
      │   ├── data_ingestion.py              # Data ingestion 
      │   ├── data_splitter.py               # Data splitting 
      │   ├── feature_engineering.py         # Feature engineering 
      │   ├── model_evaluation.py            # Model evaluation
      │   └── model_training.py              # Model training
      │
      ├── steps/                             # ZenML steps
      │   ├── clean_data.py                  # ZenML step for cleaning data
      │   ├── data_splitter.py               # ZenML step for data splitting
      │   ├── dynamic_importer.py            # ZenML step for importing dynamic data
      │   ├── feature_engineering.py         # ZenML step for feature engineering
      │   ├── ingest_data.py                 # ZenML step for data ingestion
      │   ├── model_evaluation.py            # ZenML step for model evaluation
      │   ├── model_training.py              # ZenML step for training the model
      │   ├── prediction_service_loader.py   # ZenML step for loading prediction services
      │   ├── predictor.py                   # ZenML step for prediction
      │   └── utils.py                       # Utility functions for steps
      │
      ├── .env                               # Environment variables file
      ├── .gitignore                         # Git ignore file
      │
      ├── app.py                             # Streamlit user interface app
      │
      ├── README.md                          # Project documentation
      ├── requirements.txt                   # List of required packages
      ├── run_deployment.py                  # Code for running deployment and prediction pipeline
      ├── run_pipeline.py                    # Code for running training pipeline
      └── .zen/                              # ZenML directory (created automatically after ZenML initialization)
      >要查看我如何使用

      @Step 裝飾器,請查看下面的github鏈接(步驟文件夾)以瀏覽管道其他步驟的代碼,即數據清潔,功能工程,數據拆分,模型培訓和模型評估。 >>>>>>>>。 步驟5:數據清潔 在此步驟中,我們將創建清潔攝入數據的不同策略。我們將在數據中刪除不需要的列和缺失值。 >

      import requests
      import pandas as pd
      from dotenv import load_dotenv
      import os
      
      # Load the .env file
      load_dotenv()
      
      def fetch_crypto_data(api_uri):
          response = requests.get(
              api_uri,
              params={
                  "market": "cadli",
                  "instrument": "BTC-USD",
                  "limit": 5000,
                  "aggregate": 1,
                  "fill": "true",
                  "apply_mapping": "true",
                  "response_format": "JSON"
              },
              headers={"Content-type": "application/json; charset=UTF-8"}
          )
      
          if response.status_code == 200:
              print('API Connection Successful! \nFetching the data...')
      
              data = response.json()
              data_list = data.get('Data', [])
      
              df = pd.DataFrame(data_list)
      
              df['DATE'] = pd.to_datetime(df['TIMESTAMP'], unit='s')
      
              return df  # Return the DataFrame
          else:
              raise Exception(f"API Error: {response.status_code} - {response.text}")

      >步驟6:功能工程

      >此步驟從較早的data_cleaning步驟中獲取已清潔的數據。我們正在創建新功能,例如簡單的移動平均值(SMA),指數移動平均值(EMA)以及滯後和滾動統計數據,以捕獲趨勢,減少噪聲並從時間序列數據中做出更可靠的預測。此外,我們使用MinMax縮放縮放特徵和目標變量。 >

      import os
      from pymongo import MongoClient
      from dotenv import load_dotenv
      from data.management.api import fetch_crypto_data  # Import the API function
      import pandas as pd
      
      load_dotenv()
      
      MONGO_URI = os.getenv("MONGO_URI")
      API_URI = os.getenv("API_URI")
      
      client = MongoClient(MONGO_URI, ssl=True, ssl_certfile=None, ssl_ca_certs=None)
      db = client['crypto_data']
      collection = db['historical_data']
      
      try:
          latest_entry = collection.find_one(sort=[("DATE", -1)])  # Find the latest date
          if latest_entry:
              last_date = pd.to_datetime(latest_entry['DATE']).strftime('%Y-%m-%d')
          else:
              last_date = '2011-03-27'  # Default start date if MongoDB is empty
      
          print(f"Fetching data starting from {last_date}...")
          new_data_df = fetch_crypto_data(API_URI)
      
          if latest_entry:
              new_data_df = new_data_df[new_data_df['DATE'] > last_date]
      
          if not new_data_df.empty:
              data_to_insert = new_data_df.to_dict(orient='records')
              result = collection.insert_many(data_to_insert)
              print(f"Inserted {len(result.inserted_ids)} new records into MongoDB.")
          else:
              print("No new data to insert.")
      except Exception as e:
          print(f"An error occurred: {e}")
      步驟7:數據拆分

      >現在,我們將處理的數據分為培訓和測試數據集的比例為80:20。

      #create a virtual environment
      python3 -m venv venv
      
      #Activate your virtual environmnent in your project folder
      source venv/bin/activate
      步驟8:模型培訓

      在此步驟中,我們以早期停止訓練Thelstm模型以防止過度擬合,並使用MLFlow的自動日誌記錄來跟踪我們的模型和實驗,並將受過訓練的模型保存為 >步驟9:模型評估
      #Install zenml
      pip install zenml
      
      #To Launch zenml server and dashboard locally
      pip install "zenml[server]"
      
      #To check the zenml Version:
      zenml version
      
      #To initiate a new repository
      zenml init
      
      #To run the dashboard locally:
      zenml login --local
      
      #To know the status of our zenml Pipelines
      zenml show
      
      #To shutdown the zenml server
      zenml clean
      這是一個回歸問題,我們正在使用評估指標,例如均方誤差(MSE),均方根誤差(MSE),均值絕對誤差(MAE)和R-squared。

      現在,我們將在管道中組織上述所有步驟。讓我們創建一個新的文件triench_pipeline.py。

      >

      >在這裡,
      #Integrating mlflow with ZenML
      zenml integration install mlflow -y
      
      #Register the experiment tracker
      zenml experiment-tracker register mlflow_tracker --flavor=mlflow
      
      #Registering the model deployer
      zenml model-deployer register mlflow --flavor=mlflow
      
      #Registering the stack
      zenml stack register local-mlflow-stack-new -a default -o default -d mlflow -e mlflow_tracker --set
      
      #To view the stack list
      zenml stack --list
      @pipeline

      decorator用於將functionml_pipeline()定義為zenml中的管道。

      要查看訓練管道的儀表板,只需運行run_pipeline.py腳本即可。讓我們創建一個run_pipeline.py文件。
      bitcoin_price_prediction_mlops/        # Project directory
      ├── data/                             
      │   └── management/                   
      │       ├── api_to_mongodb.py          # Code to fetch data and save it to MongoDB
      │       └── api.py                     # API-related utility functions
      │
      ├── pipelines/                         
      │   ├── deployment_pipeline.py         # Deployment pipeline
      │   └── training_pipeline.py           # Training pipeline
      │
      ├── saved_models/                      # Directory for storing trained models
      ├── saved_scalers/                     # Directory for storing scalers used in data preprocessing
      │
      ├── src/                               # Source code
      │   ├── data_cleaning.py               # Data cleaning and preprocessing
      │   ├── data_ingestion.py              # Data ingestion 
      │   ├── data_splitter.py               # Data splitting 
      │   ├── feature_engineering.py         # Feature engineering 
      │   ├── model_evaluation.py            # Model evaluation
      │   └── model_training.py              # Model training
      │
      ├── steps/                             # ZenML steps
      │   ├── clean_data.py                  # ZenML step for cleaning data
      │   ├── data_splitter.py               # ZenML step for data splitting
      │   ├── dynamic_importer.py            # ZenML step for importing dynamic data
      │   ├── feature_engineering.py         # ZenML step for feature engineering
      │   ├── ingest_data.py                 # ZenML step for data ingestion
      │   ├── model_evaluation.py            # ZenML step for model evaluation
      │   ├── model_training.py              # ZenML step for training the model
      │   ├── prediction_service_loader.py   # ZenML step for loading prediction services
      │   ├── predictor.py                   # ZenML step for prediction
      │   └── utils.py                       # Utility functions for steps
      │
      ├── .env                               # Environment variables file
      ├── .gitignore                         # Git ignore file
      │
      ├── app.py                             # Streamlit user interface app
      │
      ├── README.md                          # Project documentation
      ├── requirements.txt                   # List of required packages
      ├── run_deployment.py                  # Code for running deployment and prediction pipeline
      ├── run_pipeline.py                    # Code for running training pipeline
      └── .zen/                              # ZenML directory (created automatically after ZenML initialization)
      >

      >現在我們已經完成了創建管道。在下面運行命令以查看管道儀表板。

      在運行上述命令後,它將返回跟踪儀表板URL,看起來像這樣。

      import os
      import logging
      from pymongo import MongoClient
      from dotenv import load_dotenv
      from zenml import step
      import pandas as pd
      
      # Load the .env file
      load_dotenv()
      
      # Get MongoDB URI from environment variables
      MONGO_URI = os.getenv("MONGO_URI")
      
      def fetch_data_from_mongodb(collection_name:str, database_name:str):
          """
          Fetches data from MongoDB and converts it into a pandas DataFrame.
      
          collection_name: 
              Name of the MongoDB collection to fetch data.
          database_name: 
              Name of the MongoDB database.
          return: 
              A pandas DataFrame containing the data
          """
          # Connect to the MongoDB client
          client = MongoClient(MONGO_URI)
          db = client[database_name]  # Select the database
          collection = db[collection_name]  # Select the collection
      
          # Fetch all documents from the collection
          try:
              logging.info(f"Fetching data from MongoDB collection: {collection_name}...")
              data = list(collection.find())  # Convert cursor to a list of dictionaries
      
              if not data:
                  logging.info("No data found in the MongoDB collection.")
                  
      
              # Convert the list of dictionaries into a pandas DataFrame
              df = pd.DataFrame(data)
      
              # Drop the MongoDB ObjectId field if it exists (optional)
              if '_id' in df.columns:
                  df = df.drop(columns=['_id'])
      
              logging.info("Data successfully fetched and converted to a DataFrame!")
              return df
      
          except Exception as e:
              logging.error(f"An error occurred while fetching data: {e}")
              raise e  
              
              
      @step(enable_cache=False)
      def ingest_data(collection_name: str = "historical_data", database_name: str = "crypto_data") -> pd.DataFrame:
          
          logging.info("Started data ingestion process from MongoDB.")
      
          try:
              # Use the fetch_data_from_mongodb function to fetch data
              df = fetch_data_from_mongodb(collection_name=collection_name, database_name=database_name)
      
              if df.empty:
                  logging.warning("No data was loaded. Check the collection name or the database content.")
              else:
                  logging.info(f"Data ingestion completed. Number of records loaded: {len(df)}.")
      
              return df
          
          except Exception as e:
              logging.error(f"Error while reading data from {collection_name} in {database_name}: {e}")
              raise e  

      class DataPreprocessor:
          def __init__(self, data: pd.DataFrame):
              
              self.data = data
              logging.info("DataPreprocessor initialized with data of shape: %s", data.shape)
      
          def clean_data(self) -> pd.DataFrame:
              """
              Performs data cleaning by removing unnecessary columns, dropping columns with missing values,
              and returning the cleaned DataFrame.
      
              Returns:
                  pd.DataFrame: The cleaned DataFrame with unnecessary and missing-value columns removed.
              """
              logging.info("Starting data cleaning process.")
      
              # Drop unnecessary columns, including '_id' if it exists
              columns_to_drop = [
                  'UNIT', 'TYPE', 'MARKET', 'INSTRUMENT', 
                  'FIRST_MESSAGE_TIMESTAMP', 'LAST_MESSAGE_TIMESTAMP', 
                  'FIRST_MESSAGE_VALUE', 'HIGH_MESSAGE_VALUE', 'HIGH_MESSAGE_TIMESTAMP', 
                  'LOW_MESSAGE_VALUE', 'LOW_MESSAGE_TIMESTAMP', 'LAST_MESSAGE_VALUE', 
                  'TOTAL_INDEX_UPDATES', 'VOLUME_TOP_TIER', 'QUOTE_VOLUME_TOP_TIER', 
                  'VOLUME_DIRECT', 'QUOTE_VOLUME_DIRECT', 'VOLUME_TOP_TIER_DIRECT', 
                  'QUOTE_VOLUME_TOP_TIER_DIRECT', '_id'  # Adding '_id' to the list
              ]
              logging.info("Dropping columns: %s")
              self.data = self.drop_columns(self.data, columns_to_drop)
      
              # Drop columns where the number of missing values is greater than 0
              logging.info("Dropping columns with missing values.")
              self.data = self.drop_columns_with_missing_values(self.data)
      
              logging.info("Data cleaning completed. Data shape after cleaning: %s", self.data.shape)
              return self.data
      
          def drop_columns(self, data: pd.DataFrame, columns: list) -> pd.DataFrame:
              """
              Drops specified columns from the DataFrame.
      
              Returns:
                  pd.DataFrame: The DataFrame with the specified columns removed.
              """
              logging.info("Dropping columns: %s", columns)
              return data.drop(columns=columns, errors='ignore')
      
          def drop_columns_with_missing_values(self, data: pd.DataFrame) -> pd.DataFrame:
              """
              Drops columns with any missing values from the DataFrame.
      
              Parameters:
                  data: pd.DataFrame
                      The DataFrame from which columns with missing values will be removed.
              
              Returns:
                  pd.DataFrame: The DataFrame with columns containing missing values removed.
              """
              missing_columns = data.columns[data.isnull().sum() > 0]
              if not missing_columns.empty:
                  logging.info("Columns with missing values: %s", missing_columns.tolist())
              else:
                  logging.info("No columns with missing values found.")
              return data.loc[:, data.isnull().sum() == 0]
      培訓管道在儀表板上看起來像這樣,給出了以下內容:

      >

      使用MLOPS的比特幣價格預測

      使用MLOPS的比特幣價格預測

      使用MLOPS的比特幣價格預測

      步驟10:模型部署使用MLOPS的比特幣價格預測 到目前為止,我們已經構建了模型和管道。現在,讓我們將管道推入用戶可以做出預測的生產中。

      >

      連續部署管道使用MLOPS的比特幣價格預測

      該管道負責連續部署訓練有素的模型。它首先從

      > triagn_pipeline.py.py

      文件訓練模型,然後使用

      mlflow模型exployer

      來部署訓練有素的模型,使用推理管道

      >我們使用推理管道使用已部署的模型對新數據進行預測。讓我們看一下我們如何在項目中實施該管道的方式。

      >
      import requests
      import pandas as pd
      from dotenv import load_dotenv
      import os
      
      # Load the .env file
      load_dotenv()
      
      def fetch_crypto_data(api_uri):
          response = requests.get(
              api_uri,
              params={
                  "market": "cadli",
                  "instrument": "BTC-USD",
                  "limit": 5000,
                  "aggregate": 1,
                  "fill": "true",
                  "apply_mapping": "true",
                  "response_format": "JSON"
              },
              headers={"Content-type": "application/json; charset=UTF-8"}
          )
      
          if response.status_code == 200:
              print('API Connection Successful! \nFetching the data...')
      
              data = response.json()
              data_list = data.get('Data', [])
      
              df = pd.DataFrame(data_list)
      
              df['DATE'] = pd.to_datetime(df['TIMESTAMP'], unit='s')
      
              return df  # Return the DataFrame
          else:
              raise Exception(f"API Error: {response.status_code} - {response.text}")

      >讓我們查看下面的推理管道中調用的每個功能:>

      dynamic_importer()

      此功能加載新數據,執行數據處理並返回數據。

      import os
      from pymongo import MongoClient
      from dotenv import load_dotenv
      from data.management.api import fetch_crypto_data  # Import the API function
      import pandas as pd
      
      load_dotenv()
      
      MONGO_URI = os.getenv("MONGO_URI")
      API_URI = os.getenv("API_URI")
      
      client = MongoClient(MONGO_URI, ssl=True, ssl_certfile=None, ssl_ca_certs=None)
      db = client['crypto_data']
      collection = db['historical_data']
      
      try:
          latest_entry = collection.find_one(sort=[("DATE", -1)])  # Find the latest date
          if latest_entry:
              last_date = pd.to_datetime(latest_entry['DATE']).strftime('%Y-%m-%d')
          else:
              last_date = '2011-03-27'  # Default start date if MongoDB is empty
      
          print(f"Fetching data starting from {last_date}...")
          new_data_df = fetch_crypto_data(API_URI)
      
          if latest_entry:
              new_data_df = new_data_df[new_data_df['DATE'] > last_date]
      
          if not new_data_df.empty:
              data_to_insert = new_data_df.to_dict(orient='records')
              result = collection.insert_many(data_to_insert)
              print(f"Inserted {len(result.inserted_ids)} new records into MongoDB.")
          else:
              print("No new data to insert.")
      except Exception as e:
          print(f"An error occurred: {e}")
      prediction_service_loader()

      >此功能以

      @Step

      裝飾。我們加載了基於pipeline_name的部署服務W.R.T和step_name,我們的部署模型可以處理新數據的預測查詢。 > line 現有_services = mlflow_model_deployer_component.find_model_server()

      >

      >基於諸如pipeline名稱和管道名稱的給定參數,搜索可用的部署服務。如果沒有可用的服務,則表明部署管道要么沒有進行或遇到部署管道問題,因此它會拋出RuntimeError。

      preditionor()
      #create a virtual environment
      python3 -m venv venv
      
      #Activate your virtual environmnent in your project folder
      source venv/bin/activate

      該函數通過MLFlowDeploymentservice和新數據採用MLFlow部署模型。進一步處理數據以匹配模型的預期格式以進行實時推斷。 為了可視化連續部署和推理管道,我們需要運行run_deployment.py腳本,在此將定義部署和預測配置。 (請在下面給出的github中檢查run_deployment.py代碼)。 >

      現在,讓我們運行run_deployment.py文件以查看連續部署管道和推理管道的儀表板。

      #Install zenml
      pip install zenml
      
      #To Launch zenml server and dashboard locally
      pip install "zenml[server]"
      
      #To check the zenml Version:
      zenml version
      
      #To initiate a new repository
      zenml init
      
      #To run the dashboard locally:
      zenml login --local
      
      #To know the status of our zenml Pipelines
      zenml show
      
      #To shutdown the zenml server
      zenml clean

      連續部署管道 - 輸出

      #Integrating mlflow with ZenML
      zenml integration install mlflow -y
      
      #Register the experiment tracker
      zenml experiment-tracker register mlflow_tracker --flavor=mlflow
      
      #Registering the model deployer
      zenml model-deployer register mlflow --flavor=mlflow
      
      #Registering the stack
      zenml stack register local-mlflow-stack-new -a default -o default -d mlflow -e mlflow_tracker --set
      
      #To view the stack list
      zenml stack --list

      bitcoin_price_prediction_mlops/        # Project directory
      ├── data/                             
      │   └── management/                   
      │       ├── api_to_mongodb.py          # Code to fetch data and save it to MongoDB
      │       └── api.py                     # API-related utility functions
      │
      ├── pipelines/                         
      │   ├── deployment_pipeline.py         # Deployment pipeline
      │   └── training_pipeline.py           # Training pipeline
      │
      ├── saved_models/                      # Directory for storing trained models
      ├── saved_scalers/                     # Directory for storing scalers used in data preprocessing
      │
      ├── src/                               # Source code
      │   ├── data_cleaning.py               # Data cleaning and preprocessing
      │   ├── data_ingestion.py              # Data ingestion 
      │   ├── data_splitter.py               # Data splitting 
      │   ├── feature_engineering.py         # Feature engineering 
      │   ├── model_evaluation.py            # Model evaluation
      │   └── model_training.py              # Model training
      │
      ├── steps/                             # ZenML steps
      │   ├── clean_data.py                  # ZenML step for cleaning data
      │   ├── data_splitter.py               # ZenML step for data splitting
      │   ├── dynamic_importer.py            # ZenML step for importing dynamic data
      │   ├── feature_engineering.py         # ZenML step for feature engineering
      │   ├── ingest_data.py                 # ZenML step for data ingestion
      │   ├── model_evaluation.py            # ZenML step for model evaluation
      │   ├── model_training.py              # ZenML step for training the model
      │   ├── prediction_service_loader.py   # ZenML step for loading prediction services
      │   ├── predictor.py                   # ZenML step for prediction
      │   └── utils.py                       # Utility functions for steps
      │
      ├── .env                               # Environment variables file
      ├── .gitignore                         # Git ignore file
      │
      ├── app.py                             # Streamlit user interface app
      │
      ├── README.md                          # Project documentation
      ├── requirements.txt                   # List of required packages
      ├── run_deployment.py                  # Code for running deployment and prediction pipeline
      ├── run_pipeline.py                    # Code for running training pipeline
      └── .zen/                              # ZenML directory (created automatically after ZenML initialization)
      >推理管道 - 輸出

      使用MLOPS的比特幣價格預測

      運行run_deployment.py文件後,您可以看到看起來像這樣的mlflow儀表板鏈接。

      現在,您需要在命令行中復制並粘貼上述MLFLOW UI鏈接並運行它。 使用MLOPS的比特幣價格預測

      這是MLFlow儀表板,您可以在其中看到評估指標和模型參數:

      import os
      import logging
      from pymongo import MongoClient
      from dotenv import load_dotenv
      from zenml import step
      import pandas as pd
      
      # Load the .env file
      load_dotenv()
      
      # Get MongoDB URI from environment variables
      MONGO_URI = os.getenv("MONGO_URI")
      
      def fetch_data_from_mongodb(collection_name:str, database_name:str):
          """
          Fetches data from MongoDB and converts it into a pandas DataFrame.
      
          collection_name: 
              Name of the MongoDB collection to fetch data.
          database_name: 
              Name of the MongoDB database.
          return: 
              A pandas DataFrame containing the data
          """
          # Connect to the MongoDB client
          client = MongoClient(MONGO_URI)
          db = client[database_name]  # Select the database
          collection = db[collection_name]  # Select the collection
      
          # Fetch all documents from the collection
          try:
              logging.info(f"Fetching data from MongoDB collection: {collection_name}...")
              data = list(collection.find())  # Convert cursor to a list of dictionaries
      
              if not data:
                  logging.info("No data found in the MongoDB collection.")
                  
      
              # Convert the list of dictionaries into a pandas DataFrame
              df = pd.DataFrame(data)
      
              # Drop the MongoDB ObjectId field if it exists (optional)
              if '_id' in df.columns:
                  df = df.drop(columns=['_id'])
      
              logging.info("Data successfully fetched and converted to a DataFrame!")
              return df
      
          except Exception as e:
              logging.error(f"An error occurred while fetching data: {e}")
              raise e  
              
              
      @step(enable_cache=False)
      def ingest_data(collection_name: str = "historical_data", database_name: str = "crypto_data") -> pd.DataFrame:
          
          logging.info("Started data ingestion process from MongoDB.")
      
          try:
              # Use the fetch_data_from_mongodb function to fetch data
              df = fetch_data_from_mongodb(collection_name=collection_name, database_name=database_name)
      
              if df.empty:
                  logging.warning("No data was loaded. Check the collection name or the database content.")
              else:
                  logging.info(f"Data ingestion completed. Number of records loaded: {len(df)}.")
      
              return df
          
          except Exception as e:
              logging.error(f"Error while reading data from {collection_name} in {database_name}: {e}")
              raise e  

      >步驟11:構建簡易應用

      > Sparlit是一個令人驚嘆的開源,基於Python的框架,用於創建Interactive UI,我們可以使用Sparlit快速構建Web應用程序,而無需知道後端或前端開發。首先,我們需要在系統上安裝精簡。

      使用MLOPS的比特幣價格預測

      再次,您可以在github上找到“簡化應用”的代碼。

      使用MLOPS的比特幣價格預測

      >這是對項目的GitHub代碼和視頻說明,以便您更好地理解。

      結論

      在本文中,我們成功地建立了一個端到端的,可提供生產的比特幣價格預測MLOPS項目。從通過API獲取數據並預處理數據到模型培訓,評估和部署,我們的項目突出了MLOP在將開發與生產聯繫起來的關鍵作用。我們距離實時預測比特幣價格的未來更近了一步。 API提供了對外部數據的平穩訪問,例如CCDATA API的比特幣價格數據,消除了對預先存在的數據集的需求。

      鑰匙要點

      API啟用對外部數據的無縫訪問,例如來自CCDATA API的比特幣價格數據,消除了對預先存在的數據集的需求。
        >
      • > zenml和mlflow是可靠的工具,可促進現實世界應用程序中機器學習模型的開發,跟踪和部署。
      • >我們遵循最佳實踐,通過正確執行數據攝入,清潔,功能工程,模型培訓和評估。
      • >連續部署和推理管道對於確保模型保持有效且在生產環境中可用。
      • 常見問題
      • > Q1。 Zenml可以免費使用嗎?是的,Zenml是一個完全開源的MLOPS框架,它使從本地開發到生產管道的過渡與1行代碼一樣容易。

      Q2。 mlflow用於什麼? MLFlow通過提供用於跟踪實驗,版本控制模型和部署的工具來使機器學習開髮變得更加容易。

      Q3。如何調試服務器守護程序沒有運行錯誤?這是您將在項目中面臨的常見錯誤。只需運行`Zenml註銷–Local`然後`Zenml clean'',然後再運行管道。它將得到解決。

    以上是使用MLOPS的比特幣價格預測的詳細內容。更多資訊請關注PHP中文網其他相關文章!

    陳述
    本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
    META的新AI助手:生產力助推器還是時間下沉?META的新AI助手:生產力助推器還是時間下沉?May 01, 2025 am 11:18 AM

    Meta攜手Nvidia、IBM和Dell等合作夥伴,拓展了Llama Stack的企業級部署整合。在安全方面,Meta推出了Llama Guard 4、LlamaFirewall和CyberSecEval 4等新工具,並啟動了Llama Defenders計劃,以增強AI安全性。此外,Meta還向10個全球機構(包括致力於改善公共服務、醫療保健和教育的初創企業)發放了總額150萬美元的Llama Impact Grants。 由Llama 4驅動的全新Meta AI應用,被設想為Meta AI

    80%的Zers將嫁給AI:研究80%的Zers將嫁給AI:研究May 01, 2025 am 11:17 AM

    公司開創性的人類互動公司Joi AI介紹了“ AI-Iatsionship”一詞來描述這些不斷發展的關係。 Joi AI的關係治療師Jaime Bronstein澄清說,這並不是要取代人類C

    AI使互聯網的機器人問題變得更糟。這家耗資20億美元的創業公司在前線AI使互聯網的機器人問題變得更糟。這家耗資20億美元的創業公司在前線May 01, 2025 am 11:16 AM

    在線欺詐和機器人攻擊對企業構成了重大挑戰。 零售商與機器人ho積產品,銀行戰斗帳戶接管以及社交媒體平台與模仿者鬥爭。 AI的興起加劇了這個問題,Rende

    賣給機器人:將創造或破壞業務的營銷革命賣給機器人:將創造或破壞業務的營銷革命May 01, 2025 am 11:15 AM

    AI代理人有望徹底改變營銷,並可能超過以前技術轉變的影響。 這些代理代表了生成AI的重大進步,不僅是處理諸如chatgpt之類的處理信息,而且還採取了Actio

    計算機視覺技術如何改變NBA季后賽主持人計算機視覺技術如何改變NBA季后賽主持人May 01, 2025 am 11:14 AM

    人工智能對關鍵NBA遊戲4決策的影響 兩場關鍵遊戲4 NBA對決展示了AI在主持儀式中改變遊戲規則的角色。 首先,丹佛的尼古拉·喬基奇(Nikola Jokic)錯過了三分球,導致亞倫·戈登(Aaron Gordon)的最後一秒鐘。 索尼的鷹

    AI如何加速再生醫學的未來AI如何加速再生醫學的未來May 01, 2025 am 11:13 AM

    傳統上,擴大重生醫學專業知識在全球範圍內要求廣泛的旅行,動手培訓和多年指導。 現在,AI正在改變這一景觀,克服地理局限性並通過EN加速進步

    Intel Foundry Direct Connect 2025的關鍵要點Intel Foundry Direct Connect 2025的關鍵要點May 01, 2025 am 11:12 AM

    英特爾正努力使其製造工藝重回領先地位,同時努力吸引無晶圓廠半導體客戶在其晶圓廠製造芯片。為此,英特爾必須在業界建立更多信任,不僅要證明其工藝的競爭力,還要證明合作夥伴能夠以熟悉且成熟的工作流程、一致且高可靠性地製造芯片。今天我聽到的一切都讓我相信英特爾正在朝著這個目標前進。 新任首席執行官譚立柏的主題演講拉開了當天的序幕。譚立柏直率而簡潔。他概述了英特爾代工服務的若干挑戰,以及公司為應對這些挑戰、為英特爾代工服務的未來規劃成功路線而採取的措施。譚立柏談到了英特爾代工服務正在實施的流程,以更以客

    AI出了問題嗎?現在在那里為此保險AI出了問題嗎?現在在那里為此保險May 01, 2025 am 11:11 AM

    全球專業再保險公司Chaucer Group和Armilla AI解決了圍繞AI風險的日益嚴重的問題,已聯手引入了新型的第三方責任(TPL)保險產品。 該政策保護業務不利

    See all articles

    熱AI工具

    Undresser.AI Undress

    Undresser.AI Undress

    人工智慧驅動的應用程序,用於創建逼真的裸體照片

    AI Clothes Remover

    AI Clothes Remover

    用於從照片中去除衣服的線上人工智慧工具。

    Undress AI Tool

    Undress AI Tool

    免費脫衣圖片

    Clothoff.io

    Clothoff.io

    AI脫衣器

    Video Face Swap

    Video Face Swap

    使用我們完全免費的人工智慧換臉工具,輕鬆在任何影片中換臉!

    熱工具

    SecLists

    SecLists

    SecLists是最終安全測試人員的伙伴。它是一個包含各種類型清單的集合,這些清單在安全評估過程中經常使用,而且都在一個地方。 SecLists透過方便地提供安全測試人員可能需要的所有列表,幫助提高安全測試的效率和生產力。清單類型包括使用者名稱、密碼、URL、模糊測試有效載荷、敏感資料模式、Web shell等等。測試人員只需將此儲存庫拉到新的測試機上,他就可以存取所需的每種類型的清單。

    SublimeText3 Mac版

    SublimeText3 Mac版

    神級程式碼編輯軟體(SublimeText3)

    EditPlus 中文破解版

    EditPlus 中文破解版

    體積小,語法高亮,不支援程式碼提示功能

    SublimeText3 Linux新版

    SublimeText3 Linux新版

    SublimeText3 Linux最新版

    禪工作室 13.0.1

    禪工作室 13.0.1

    強大的PHP整合開發環境