cari
RumahPeranti teknologiAIRamalan Harga Bitcoin Menggunakan MLOPS

Tidak tahu banyak tentang Bitcoin atau turun naik harga tetapi ingin membuat keputusan pelaburan untuk membuat keuntungan? Model pembelajaran mesin ini mempunyai belakang anda. Ia boleh meramalkan harga lebih baik daripada ahli astrologi. Dalam artikel ini, kami akan membina model ML untuk meramalkan dan meramalkan harga bitcoin, menggunakan ZenML dan MLFlow. Oleh itu, mari kita mulakan perjalanan kami untuk memahami bagaimana orang boleh menggunakan alat ML dan MLOPS untuk meramalkan masa depan.

Objektif Pembelajaran

  • belajar untuk mengambil data langsung menggunakan API dengan cekap.
  • fahami apa zenml, mengapa kita menggunakan mlflow, dan bagaimana anda boleh mengintegrasikannya dengan zenml.
  • meneroka proses penempatan untuk model pembelajaran mesin, dari idea ke pengeluaran.
  • Ketahui cara membuat aplikasi Streamlit mesra pengguna untuk ramalan model pembelajaran mesin interaktif.
Artikel ini diterbitkan sebagai sebahagian daripada Blogathon Sains Data

Jadual Kandungan 5: Pembersihan Data Langkah 6: Kejuruteraan Ciri Langkah 7: Pemisahan data

Langkah 8: Latihan Model

    Pernyataan Masalah
  • Harga bitcoin sangat tidak menentu, dan membuat ramalan adalah mustahil. Dalam projek kami, kami menggunakan Praktikesto Terbaik TMLOPS membina model LSTM untuk meramalkan Bitcoinprices dan Trend.
  • Sebelum melaksanakan projek, mari kita lihat seni bina projek.
    • Pelaksanaan Projek
    • mari kita mulakan dengan mengakses API.
    • Mengapa kita melakukan ini? Anda boleh mendapatkan data harga bitcoin bersejarah dari dataset yang berbeza, tetapi dengan API, kita boleh mempunyai akses kepada data pasaran hidup.
    • Langkah 1: Mengakses API
    • Daftar untuk API Access:
    • sebaik sahaja anda mendaftar di halaman API TheCCData. Anda boleh mendapatkan kunci API percuma dari pagehttps ini: //developers.cryptocompare.com/documentation/data-api/index_cc_v1_historical_days
    • Ambil data harga bitcoin:
    • Dengan kod di bawah, anda boleh mengambil data harga bitcoin dari API CCData dan menukarnya ke dalam data data PANDAS. Juga, simpan kunci API dalam fail .env.
  • Langkah 2: Menyambung ke pangkalan data menggunakan mongoDB
MongoDB adalah pangkalan data NoSQL yang terkenal dengan kebolehsuaian, pengembangan, dan keupayaan untuk menyimpan data yang tidak berstruktur dalam format seperti JSON.

import requests
import pandas as pd
from dotenv import load_dotenv
import os

# Load the .env file
load_dotenv()

def fetch_crypto_data(api_uri):
    response = requests.get(
        api_uri,
        params={
            "market": "cadli",
            "instrument": "BTC-USD",
            "limit": 5000,
            "aggregate": 1,
            "fill": "true",
            "apply_mapping": "true",
            "response_format": "JSON"
        },
        headers={"Content-type": "application/json; charset=UTF-8"}
    )

    if response.status_code == 200:
        print('API Connection Successful! \nFetching the data...')

        data = response.json()
        data_list = data.get('Data', [])

        df = pd.DataFrame(data_list)

        df['DATE'] = pd.to_datetime(df['TIMESTAMP'], unit='s')

        return df  # Return the DataFrame
    else:
        raise Exception(f"API Error: {response.status_code} - {response.text}")

Kod ini menyambung ke MongoDB, mengambil data harga bitcoin melalui API, dan mengemas kini pangkalan data dengan semua entri baru selepas tarikh log masuk terkini.

memperkenalkan zenml

zenmlis Platform sumber terbuka yang disesuaikan untuk operasi pembelajaran mesin, menyokong penciptaan saluran paip yang fleksibel dan siap pengeluaran. Di samping itu, ZenML mengintegrasikan dengan pelbagai alat pembelajaran mesin likemlflow, bentoml, dan lain -lain, untuk membuat saluran paip ML yang lancar.

⚠️ Jika anda adalah pengguna Windows, cuba pasang WSL pada sistem anda. ZenML tidak menyokong Windows.

Dalam projek ini, kami akan melaksanakan saluran paip tradisional, yang menggunakan ZenML, dan kami akan mengintegrasikan MLFlow dengan ZenML, untuk penjejakan eksperimen.

pra-syarat dan perintah ZENML asas

  • python 3.12 atau lebih tinggi: anda boleh mendapatkannya dari sini: https: //www.python.org/downloads/
  • Aktifkan persekitaran maya anda:
import os
from pymongo import MongoClient
from dotenv import load_dotenv
from data.management.api import fetch_crypto_data  # Import the API function
import pandas as pd

load_dotenv()

MONGO_URI = os.getenv("MONGO_URI")
API_URI = os.getenv("API_URI")

client = MongoClient(MONGO_URI, ssl=True, ssl_certfile=None, ssl_ca_certs=None)
db = client['crypto_data']
collection = db['historical_data']

try:
    latest_entry = collection.find_one(sort=[("DATE", -1)])  # Find the latest date
    if latest_entry:
        last_date = pd.to_datetime(latest_entry['DATE']).strftime('%Y-%m-%d')
    else:
        last_date = '2011-03-27'  # Default start date if MongoDB is empty

    print(f"Fetching data starting from {last_date}...")
    new_data_df = fetch_crypto_data(API_URI)

    if latest_entry:
        new_data_df = new_data_df[new_data_df['DATE'] > last_date]

    if not new_data_df.empty:
        data_to_insert = new_data_df.to_dict(orient='records')
        result = collection.insert_many(data_to_insert)
        print(f"Inserted {len(result.inserted_ids)} new records into MongoDB.")
    else:
        print("No new data to insert.")
except Exception as e:
    print(f"An error occurred: {e}")
  1. perintah zenml:

Semua perintah ZenML teras bersama -sama dengan fungsi mereka disediakan di bawah:

#create a virtual environment
python3 -m venv venv

#Activate your virtual environmnent in your project folder
source venv/bin/activate

Langkah 3: Integrasi Mlflow dengan Zenml

Kami menggunakan MLFlow untuk penjejakan eksperimen, untuk menjejaki model, artifak, metrik, dan nilai hiperparameter kami. Kami mendaftarkan MLFlow untuk pengesanan eksperimen dan penempatan model di sini:

#Install zenml
pip install zenml

#To Launch zenml server and dashboard locally
pip install "zenml[server]"

#To check the zenml Version:
zenml version

#To initiate a new repository
zenml init

#To run the dashboard locally:
zenml login --local

#To know the status of our zenml Pipelines
zenml show

#To shutdown the zenml server
zenml clean

Senarai Stack Zenml

Ramalan Harga Bitcoin Menggunakan MLOPS

Struktur Projek

di sini, anda dapat melihat susun atur projek. Sekarang mari kita bincangkannya satu demi satu secara terperinci.

#Integrating mlflow with ZenML
zenml integration install mlflow -y

#Register the experiment tracker
zenml experiment-tracker register mlflow_tracker --flavor=mlflow

#Registering the model deployer
zenml model-deployer register mlflow --flavor=mlflow

#Registering the stack
zenml stack register local-mlflow-stack-new -a default -o default -d mlflow -e mlflow_tracker --set

#To view the stack list
zenml stack --list
Langkah 4: Pengingesan Data

Kami mula -mula menelan data dari API ke MongoDB dan mengubahnya menjadi PANDAS DataFrame.

bitcoin_price_prediction_mlops/        # Project directory
├── data/                             
│   └── management/                   
│       ├── api_to_mongodb.py          # Code to fetch data and save it to MongoDB
│       └── api.py                     # API-related utility functions
│
├── pipelines/                         
│   ├── deployment_pipeline.py         # Deployment pipeline
│   └── training_pipeline.py           # Training pipeline
│
├── saved_models/                      # Directory for storing trained models
├── saved_scalers/                     # Directory for storing scalers used in data preprocessing
│
├── src/                               # Source code
│   ├── data_cleaning.py               # Data cleaning and preprocessing
│   ├── data_ingestion.py              # Data ingestion 
│   ├── data_splitter.py               # Data splitting 
│   ├── feature_engineering.py         # Feature engineering 
│   ├── model_evaluation.py            # Model evaluation
│   └── model_training.py              # Model training
│
├── steps/                             # ZenML steps
│   ├── clean_data.py                  # ZenML step for cleaning data
│   ├── data_splitter.py               # ZenML step for data splitting
│   ├── dynamic_importer.py            # ZenML step for importing dynamic data
│   ├── feature_engineering.py         # ZenML step for feature engineering
│   ├── ingest_data.py                 # ZenML step for data ingestion
│   ├── model_evaluation.py            # ZenML step for model evaluation
│   ├── model_training.py              # ZenML step for training the model
│   ├── prediction_service_loader.py   # ZenML step for loading prediction services
│   ├── predictor.py                   # ZenML step for prediction
│   └── utils.py                       # Utility functions for steps
│
├── .env                               # Environment variables file
├── .gitignore                         # Git ignore file
│
├── app.py                             # Streamlit user interface app
│
├── README.md                          # Project documentation
├── requirements.txt                   # List of required packages
├── run_deployment.py                  # Code for running deployment and prediction pipeline
├── run_pipeline.py                    # Code for running training pipeline
└── .zen/                              # ZenML directory (created automatically after ZenML initialization)

Kami menambah @step sebagai penghias kepada ingest_data () berfungsi untuk mengisytiharkannya sebagai langkah saluran paip latihan kami. Dengan cara yang sama, kami akan menulis kod untuk setiap langkah dalam seni bina projek dan membuat saluran paip.

Untuk melihat bagaimana saya telah menggunakan penghias

@step , lihat pautan GitHub di bawah (folder langkah) untuk melalui kod untuk langkah -langkah lain dalam saluran paip iaitu pembersihan data, kejuruteraan ciri, pemisahan data, latihan model, dan penilaian model. Langkah 5: Pembersihan Data

Dalam langkah ini, kami akan membuat strategi yang berbeza untuk membersihkan data yang ditelan. Kami akan menggugurkan lajur yang tidak diingini dan nilai yang hilang dalam data.

import requests
import pandas as pd
from dotenv import load_dotenv
import os

# Load the .env file
load_dotenv()

def fetch_crypto_data(api_uri):
    response = requests.get(
        api_uri,
        params={
            "market": "cadli",
            "instrument": "BTC-USD",
            "limit": 5000,
            "aggregate": 1,
            "fill": "true",
            "apply_mapping": "true",
            "response_format": "JSON"
        },
        headers={"Content-type": "application/json; charset=UTF-8"}
    )

    if response.status_code == 200:
        print('API Connection Successful! \nFetching the data...')

        data = response.json()
        data_list = data.get('Data', [])

        df = pd.DataFrame(data_list)

        df['DATE'] = pd.to_datetime(df['TIMESTAMP'], unit='s')

        return df  # Return the DataFrame
    else:
        raise Exception(f"API Error: {response.status_code} - {response.text}")

Langkah 6: Kejuruteraan Ciri

Langkah ini mengambil data yang dibersihkan dari langkah data_cleaning sebelumnya. Kami mencipta ciri-ciri baru seperti purata bergerak mudah (SMA), purata bergerak eksponen (EMA), dan statistik yang tertinggal dan bergulir untuk menangkap trend, mengurangkan bunyi, dan membuat ramalan yang lebih dipercayai dari data siri masa. Di samping itu, kami skala ciri dan pembolehubah sasaran menggunakan skala Minmax.

import os
from pymongo import MongoClient
from dotenv import load_dotenv
from data.management.api import fetch_crypto_data  # Import the API function
import pandas as pd

load_dotenv()

MONGO_URI = os.getenv("MONGO_URI")
API_URI = os.getenv("API_URI")

client = MongoClient(MONGO_URI, ssl=True, ssl_certfile=None, ssl_ca_certs=None)
db = client['crypto_data']
collection = db['historical_data']

try:
    latest_entry = collection.find_one(sort=[("DATE", -1)])  # Find the latest date
    if latest_entry:
        last_date = pd.to_datetime(latest_entry['DATE']).strftime('%Y-%m-%d')
    else:
        last_date = '2011-03-27'  # Default start date if MongoDB is empty

    print(f"Fetching data starting from {last_date}...")
    new_data_df = fetch_crypto_data(API_URI)

    if latest_entry:
        new_data_df = new_data_df[new_data_df['DATE'] > last_date]

    if not new_data_df.empty:
        data_to_insert = new_data_df.to_dict(orient='records')
        result = collection.insert_many(data_to_insert)
        print(f"Inserted {len(result.inserted_ids)} new records into MongoDB.")
    else:
        print("No new data to insert.")
except Exception as e:
    print(f"An error occurred: {e}")

Langkah 7: Pemisahan data

Sekarang, kita membahagikan data yang diproses ke dalam latihan dan ujian dataset dalam nisbah 80:20.

#create a virtual environment
python3 -m venv venv

#Activate your virtual environmnent in your project folder
source venv/bin/activate

Langkah 8: Latihan Model

Dalam langkah ini, kami melatih model Thelstm dengan berhenti awal untuk mengelakkan terlalu banyak, dan dengan menggunakan pembalakan automatik MLFlow untuk mengesan model dan eksperimen kami dan menyimpan model terlatih sebagai lstm_model.keras .

#Install zenml
pip install zenml

#To Launch zenml server and dashboard locally
pip install "zenml[server]"

#To check the zenml Version:
zenml version

#To initiate a new repository
zenml init

#To run the dashboard locally:
zenml login --local

#To know the status of our zenml Pipelines
zenml show

#To shutdown the zenml server
zenml clean
Langkah 9: Penilaian Model

Oleh kerana ini adalah masalah regresi, kami menggunakan metrik penilaian seperti kesilapan kuadrat min (MSE), kesilapan akar kuadrat (MSE), kesilapan mutlak (MAE), dan R-kuadrat.

#Integrating mlflow with ZenML
zenml integration install mlflow -y

#Register the experiment tracker
zenml experiment-tracker register mlflow_tracker --flavor=mlflow

#Registering the model deployer
zenml model-deployer register mlflow --flavor=mlflow

#Registering the stack
zenml stack register local-mlflow-stack-new -a default -o default -d mlflow -e mlflow_tracker --set

#To view the stack list
zenml stack --list
Sekarang kita akan mengatur semua langkah di atas ke dalam saluran paip. Mari buat fail baru latihan_pipeline.py.

bitcoin_price_prediction_mlops/        # Project directory
├── data/                             
│   └── management/                   
│       ├── api_to_mongodb.py          # Code to fetch data and save it to MongoDB
│       └── api.py                     # API-related utility functions
│
├── pipelines/                         
│   ├── deployment_pipeline.py         # Deployment pipeline
│   └── training_pipeline.py           # Training pipeline
│
├── saved_models/                      # Directory for storing trained models
├── saved_scalers/                     # Directory for storing scalers used in data preprocessing
│
├── src/                               # Source code
│   ├── data_cleaning.py               # Data cleaning and preprocessing
│   ├── data_ingestion.py              # Data ingestion 
│   ├── data_splitter.py               # Data splitting 
│   ├── feature_engineering.py         # Feature engineering 
│   ├── model_evaluation.py            # Model evaluation
│   └── model_training.py              # Model training
│
├── steps/                             # ZenML steps
│   ├── clean_data.py                  # ZenML step for cleaning data
│   ├── data_splitter.py               # ZenML step for data splitting
│   ├── dynamic_importer.py            # ZenML step for importing dynamic data
│   ├── feature_engineering.py         # ZenML step for feature engineering
│   ├── ingest_data.py                 # ZenML step for data ingestion
│   ├── model_evaluation.py            # ZenML step for model evaluation
│   ├── model_training.py              # ZenML step for training the model
│   ├── prediction_service_loader.py   # ZenML step for loading prediction services
│   ├── predictor.py                   # ZenML step for prediction
│   └── utils.py                       # Utility functions for steps
│
├── .env                               # Environment variables file
├── .gitignore                         # Git ignore file
│
├── app.py                             # Streamlit user interface app
│
├── README.md                          # Project documentation
├── requirements.txt                   # List of required packages
├── run_deployment.py                  # Code for running deployment and prediction pipeline
├── run_pipeline.py                    # Code for running training pipeline
└── .zen/                              # ZenML directory (created automatically after ZenML initialization)
di sini,

@pipeline penghias digunakan untuk menentukan fungsiML_PIPELINE () sebagai saluran paip di zenml.

Untuk melihat papan pemuka untuk saluran paip latihan, hanya jalankan skrip run_pipeline.py. Mari buat fail run_pipeline.py.

import os
import logging
from pymongo import MongoClient
from dotenv import load_dotenv
from zenml import step
import pandas as pd

# Load the .env file
load_dotenv()

# Get MongoDB URI from environment variables
MONGO_URI = os.getenv("MONGO_URI")

def fetch_data_from_mongodb(collection_name:str, database_name:str):
    """
    Fetches data from MongoDB and converts it into a pandas DataFrame.

    collection_name: 
        Name of the MongoDB collection to fetch data.
    database_name: 
        Name of the MongoDB database.
    return: 
        A pandas DataFrame containing the data
    """
    # Connect to the MongoDB client
    client = MongoClient(MONGO_URI)
    db = client[database_name]  # Select the database
    collection = db[collection_name]  # Select the collection

    # Fetch all documents from the collection
    try:
        logging.info(f"Fetching data from MongoDB collection: {collection_name}...")
        data = list(collection.find())  # Convert cursor to a list of dictionaries

        if not data:
            logging.info("No data found in the MongoDB collection.")
            

        # Convert the list of dictionaries into a pandas DataFrame
        df = pd.DataFrame(data)

        # Drop the MongoDB ObjectId field if it exists (optional)
        if '_id' in df.columns:
            df = df.drop(columns=['_id'])

        logging.info("Data successfully fetched and converted to a DataFrame!")
        return df

    except Exception as e:
        logging.error(f"An error occurred while fetching data: {e}")
        raise e  
        
        
@step(enable_cache=False)
def ingest_data(collection_name: str = "historical_data", database_name: str = "crypto_data") -> pd.DataFrame:
    
    logging.info("Started data ingestion process from MongoDB.")

    try:
        # Use the fetch_data_from_mongodb function to fetch data
        df = fetch_data_from_mongodb(collection_name=collection_name, database_name=database_name)

        if df.empty:
            logging.warning("No data was loaded. Check the collection name or the database content.")
        else:
            logging.info(f"Data ingestion completed. Number of records loaded: {len(df)}.")

        return df
    
    except Exception as e:
        logging.error(f"Error while reading data from {collection_name} in {database_name}: {e}")
        raise e  
Sekarang kami telah menyelesaikan membuat saluran paip. Jalankan arahan di bawah untuk melihat papan pemuka saluran paip.

class DataPreprocessor:
    def __init__(self, data: pd.DataFrame):
        
        self.data = data
        logging.info("DataPreprocessor initialized with data of shape: %s", data.shape)

    def clean_data(self) -> pd.DataFrame:
        """
        Performs data cleaning by removing unnecessary columns, dropping columns with missing values,
        and returning the cleaned DataFrame.

        Returns:
            pd.DataFrame: The cleaned DataFrame with unnecessary and missing-value columns removed.
        """
        logging.info("Starting data cleaning process.")

        # Drop unnecessary columns, including '_id' if it exists
        columns_to_drop = [
            'UNIT', 'TYPE', 'MARKET', 'INSTRUMENT', 
            'FIRST_MESSAGE_TIMESTAMP', 'LAST_MESSAGE_TIMESTAMP', 
            'FIRST_MESSAGE_VALUE', 'HIGH_MESSAGE_VALUE', 'HIGH_MESSAGE_TIMESTAMP', 
            'LOW_MESSAGE_VALUE', 'LOW_MESSAGE_TIMESTAMP', 'LAST_MESSAGE_VALUE', 
            'TOTAL_INDEX_UPDATES', 'VOLUME_TOP_TIER', 'QUOTE_VOLUME_TOP_TIER', 
            'VOLUME_DIRECT', 'QUOTE_VOLUME_DIRECT', 'VOLUME_TOP_TIER_DIRECT', 
            'QUOTE_VOLUME_TOP_TIER_DIRECT', '_id'  # Adding '_id' to the list
        ]
        logging.info("Dropping columns: %s")
        self.data = self.drop_columns(self.data, columns_to_drop)

        # Drop columns where the number of missing values is greater than 0
        logging.info("Dropping columns with missing values.")
        self.data = self.drop_columns_with_missing_values(self.data)

        logging.info("Data cleaning completed. Data shape after cleaning: %s", self.data.shape)
        return self.data

    def drop_columns(self, data: pd.DataFrame, columns: list) -> pd.DataFrame:
        """
        Drops specified columns from the DataFrame.

        Returns:
            pd.DataFrame: The DataFrame with the specified columns removed.
        """
        logging.info("Dropping columns: %s", columns)
        return data.drop(columns=columns, errors='ignore')

    def drop_columns_with_missing_values(self, data: pd.DataFrame) -> pd.DataFrame:
        """
        Drops columns with any missing values from the DataFrame.

        Parameters:
            data: pd.DataFrame
                The DataFrame from which columns with missing values will be removed.
        
        Returns:
            pd.DataFrame: The DataFrame with columns containing missing values removed.
        """
        missing_columns = data.columns[data.isnull().sum() > 0]
        if not missing_columns.empty:
            logging.info("Columns with missing values: %s", missing_columns.tolist())
        else:
            logging.info("No columns with missing values found.")
        return data.loc[:, data.isnull().sum() == 0]
selepas menjalankan perintah di atas, ia akan mengembalikan url papan pemuka pengesanan, yang kelihatan seperti ini.

Ramalan Harga Bitcoin Menggunakan MLOPS Paip latihan kelihatan seperti ini di papan pemuka, yang diberikan di bawah:

Ramalan Harga Bitcoin Menggunakan MLOPS

Ramalan Harga Bitcoin Menggunakan MLOPS

Langkah 10: Penggunaan Model Ramalan Harga Bitcoin Menggunakan MLOPS

sehingga kini kami telah membina model dan saluran paip. Sekarang mari kita menolak saluran paip ke dalam pengeluaran di mana pengguna boleh membuat ramalan.

Ramalan Harga Bitcoin Menggunakan MLOPS Paip Penyebaran Berterusan

Paip ini bertanggungjawab untuk terus menggunakan model terlatih. Ia mula -mula menjalankan

ml_pipeline ()

dari

latihan_pipeline.py

fail untuk melatih model, kemudian menggunakan
import joblib
import pandas as pd
from abc import ABC, abstractmethod
from sklearn.preprocessing import MinMaxScaler


# Abstract class for Feature Engineering strategy
class FeatureEngineeringStrategy(ABC):
    @abstractmethod
    def generate_features(self, df: pd.DataFrame) -> pd.DataFrame:
        pass


# Concrete class for calculating SMA, EMA, RSI, and other features
class TechnicalIndicators(FeatureEngineeringStrategy):
    def generate_features(self, df: pd.DataFrame) -> pd.DataFrame:
        
        # Calculate SMA, EMA, and RSI
        df['SMA_20'] = df['CLOSE'].rolling(window=20).mean()
        df['SMA_50'] = df['CLOSE'].rolling(window=50).mean()
        df['EMA_20'] = df['CLOSE'].ewm(span=20, adjust=False).mean()
        
        # Price difference features
        df['OPEN_CLOSE_diff'] = df['OPEN'] - df['CLOSE']
        df['HIGH_LOW_diff'] = df['HIGH'] - df['LOW']
        df['HIGH_OPEN_diff'] = df['HIGH'] - df['OPEN']
        df['CLOSE_LOW_diff'] = df['CLOSE'] - df['LOW']

        # Lagged features
        df['OPEN_lag1'] = df['OPEN'].shift(1)
        df['CLOSE_lag1'] = df['CLOSE'].shift(1)
        df['HIGH_lag1'] = df['HIGH'].shift(1)
        df['LOW_lag1'] = df['LOW'].shift(1)

        # Rolling statistics
        df['CLOSE_roll_mean_14'] = df['CLOSE'].rolling(window=14).mean()
        df['CLOSE_roll_std_14'] = df['CLOSE'].rolling(window=14).std()

        # Drop rows with missing values (due to rolling windows, shifts)
        df.dropna(inplace=True)

        return df
        
# Abstract class for Scaling strategy
class ScalingStrategy(ABC):
    @abstractmethod
    def scale(self, df: pd.DataFrame, features: list, target: str):
        pass

# Concrete class for MinMax Scaling
class MinMaxScaling(ScalingStrategy):
    def scale(self, df: pd.DataFrame, features: list, target: str):
        """
        Scales the features and target using MinMaxScaler.

        Parameters:
            df: pd.DataFrame
                The DataFrame containing the features and target.
            features: list
                List of feature column names.
            target: str
                The target column name.

        Returns:
            pd.DataFrame, pd.DataFrame: Scaled features and target
        """
        scaler_X = MinMaxScaler(feature_range=(0, 1))
        scaler_y = MinMaxScaler(feature_range=(0, 1))

        X_scaled = scaler_X.fit_transform(df[features].values)
        y_scaled = scaler_y.fit_transform(df[[target]].values)

        joblib.dump(scaler_X, 'saved_scalers/scaler_X.pkl')
        joblib.dump(scaler_y, 'saved_scalers/scaler_y.pkl')

        return X_scaled, y_scaled, scaler_y


# FeatureEngineeringContext: This will use the Strategy Pattern
class FeatureEngineering:
    def __init__(self, feature_strategy: FeatureEngineeringStrategy, scaling_strategy: ScalingStrategy):
        self.feature_strategy = feature_strategy
        self.scaling_strategy = scaling_strategy

    def process_features(self, df: pd.DataFrame, features: list, target: str):
        # Generate features using the provided strategy
        df_with_features = self.feature_strategy.generate_features(df)

        # Scale features and target using the provided strategy
        X_scaled, y_scaled, scaler_y = self.scaling_strategy.scale(df_with_features, features, target)

        return df_with_features, X_scaled, y_scaled, scaler_y

Pipeline Inference

Kami menggunakan saluran paip kesimpulan untuk membuat ramalan pada data baru, menggunakan model yang digunakan. Mari kita lihat bagaimana kami melaksanakan saluran paip ini dalam projek kami.

import requests
import pandas as pd
from dotenv import load_dotenv
import os

# Load the .env file
load_dotenv()

def fetch_crypto_data(api_uri):
    response = requests.get(
        api_uri,
        params={
            "market": "cadli",
            "instrument": "BTC-USD",
            "limit": 5000,
            "aggregate": 1,
            "fill": "true",
            "apply_mapping": "true",
            "response_format": "JSON"
        },
        headers={"Content-type": "application/json; charset=UTF-8"}
    )

    if response.status_code == 200:
        print('API Connection Successful! \nFetching the data...')

        data = response.json()
        data_list = data.get('Data', [])

        df = pd.DataFrame(data_list)

        df['DATE'] = pd.to_datetime(df['TIMESTAMP'], unit='s')

        return df  # Return the DataFrame
    else:
        raise Exception(f"API Error: {response.status_code} - {response.text}")

mari kita lihat tentang setiap fungsi yang dipanggil dalam saluran paip kesimpulan di bawah:

dynamic_importer ()

Fungsi ini memuat data baru, melakukan pemprosesan data, dan mengembalikan data.

import os
from pymongo import MongoClient
from dotenv import load_dotenv
from data.management.api import fetch_crypto_data  # Import the API function
import pandas as pd

load_dotenv()

MONGO_URI = os.getenv("MONGO_URI")
API_URI = os.getenv("API_URI")

client = MongoClient(MONGO_URI, ssl=True, ssl_certfile=None, ssl_ca_certs=None)
db = client['crypto_data']
collection = db['historical_data']

try:
    latest_entry = collection.find_one(sort=[("DATE", -1)])  # Find the latest date
    if latest_entry:
        last_date = pd.to_datetime(latest_entry['DATE']).strftime('%Y-%m-%d')
    else:
        last_date = '2011-03-27'  # Default start date if MongoDB is empty

    print(f"Fetching data starting from {last_date}...")
    new_data_df = fetch_crypto_data(API_URI)

    if latest_entry:
        new_data_df = new_data_df[new_data_df['DATE'] > last_date]

    if not new_data_df.empty:
        data_to_insert = new_data_df.to_dict(orient='records')
        result = collection.insert_many(data_to_insert)
        print(f"Inserted {len(result.inserted_ids)} new records into MongoDB.")
    else:
        print("No new data to insert.")
except Exception as e:
    print(f"An error occurred: {e}")

prediction_service_loader ()

Fungsi ini dihiasi dengan @step . Kami memuatkan perkhidmatan penempatan W.R.T Model yang digunakan berdasarkan pipeline_name, dan step_name, di mana model yang digunakan kami bersedia untuk memproses pertanyaan ramalan untuk data baru.

garis sedia ada_services = mlflow_model_deployer_component.find_model_server () Mencari perkhidmatan penyebaran yang tersedia berdasarkan parameter yang diberikan seperti nama saluran paip dan nama langkah paip. Sekiranya tiada perkhidmatan yang tersedia, ia menunjukkan bahawa saluran paip penempatan sama ada tidak dijalankan atau mengalami masalah dengan saluran paip penempatan, jadi ia melemparkan runtimeerror.

#create a virtual environment
python3 -m venv venv

#Activate your virtual environmnent in your project folder
source venv/bin/activate

Predictor ()

Fungsi mengambil dalam model MLFlow yang dikerahkan melalui MLFlowDeploymentservice dan data baru. Data diproses lebih jauh untuk memadankan format model yang diharapkan untuk membuat kesimpulan masa nyata.

#Install zenml
pip install zenml

#To Launch zenml server and dashboard locally
pip install "zenml[server]"

#To check the zenml Version:
zenml version

#To initiate a new repository
zenml init

#To run the dashboard locally:
zenml login --local

#To know the status of our zenml Pipelines
zenml show

#To shutdown the zenml server
zenml clean

Untuk memvisualisasikan saluran penyebaran dan kesimpulan yang berterusan, kita perlu menjalankan skrip run_deployment.py, di mana konfigurasi penempatan dan ramalan akan ditakrifkan. (Sila periksa kod run_deployment.py dalam github yang diberikan di bawah).

#Integrating mlflow with ZenML
zenml integration install mlflow -y

#Register the experiment tracker
zenml experiment-tracker register mlflow_tracker --flavor=mlflow

#Registering the model deployer
zenml model-deployer register mlflow --flavor=mlflow

#Registering the stack
zenml stack register local-mlflow-stack-new -a default -o default -d mlflow -e mlflow_tracker --set

#To view the stack list
zenml stack --list

Sekarang mari kita jalankan fail run_deployment.py untuk melihat papan pemuka saluran paip penempatan berterusan dan saluran paip kesimpulan.

bitcoin_price_prediction_mlops/        # Project directory
├── data/                             
│   └── management/                   
│       ├── api_to_mongodb.py          # Code to fetch data and save it to MongoDB
│       └── api.py                     # API-related utility functions
│
├── pipelines/                         
│   ├── deployment_pipeline.py         # Deployment pipeline
│   └── training_pipeline.py           # Training pipeline
│
├── saved_models/                      # Directory for storing trained models
├── saved_scalers/                     # Directory for storing scalers used in data preprocessing
│
├── src/                               # Source code
│   ├── data_cleaning.py               # Data cleaning and preprocessing
│   ├── data_ingestion.py              # Data ingestion 
│   ├── data_splitter.py               # Data splitting 
│   ├── feature_engineering.py         # Feature engineering 
│   ├── model_evaluation.py            # Model evaluation
│   └── model_training.py              # Model training
│
├── steps/                             # ZenML steps
│   ├── clean_data.py                  # ZenML step for cleaning data
│   ├── data_splitter.py               # ZenML step for data splitting
│   ├── dynamic_importer.py            # ZenML step for importing dynamic data
│   ├── feature_engineering.py         # ZenML step for feature engineering
│   ├── ingest_data.py                 # ZenML step for data ingestion
│   ├── model_evaluation.py            # ZenML step for model evaluation
│   ├── model_training.py              # ZenML step for training the model
│   ├── prediction_service_loader.py   # ZenML step for loading prediction services
│   ├── predictor.py                   # ZenML step for prediction
│   └── utils.py                       # Utility functions for steps
│
├── .env                               # Environment variables file
├── .gitignore                         # Git ignore file
│
├── app.py                             # Streamlit user interface app
│
├── README.md                          # Project documentation
├── requirements.txt                   # List of required packages
├── run_deployment.py                  # Code for running deployment and prediction pipeline
├── run_pipeline.py                    # Code for running training pipeline
└── .zen/                              # ZenML directory (created automatically after ZenML initialization)

saluran paip penempatan berterusan - output

Ramalan Harga Bitcoin Menggunakan MLOPS

Pipeline Inference - Output

Ramalan Harga Bitcoin Menggunakan MLOPS Setelah menjalankan fail run_deployment.py, anda dapat melihat pautan papan pemuka mlflow yang kelihatan seperti ini.

Sekarang anda perlu menyalin dan menyisipkan pautan UI MLFlow di atas dalam baris arahan anda dan jalankannya.
import os
import logging
from pymongo import MongoClient
from dotenv import load_dotenv
from zenml import step
import pandas as pd

# Load the .env file
load_dotenv()

# Get MongoDB URI from environment variables
MONGO_URI = os.getenv("MONGO_URI")

def fetch_data_from_mongodb(collection_name:str, database_name:str):
    """
    Fetches data from MongoDB and converts it into a pandas DataFrame.

    collection_name: 
        Name of the MongoDB collection to fetch data.
    database_name: 
        Name of the MongoDB database.
    return: 
        A pandas DataFrame containing the data
    """
    # Connect to the MongoDB client
    client = MongoClient(MONGO_URI)
    db = client[database_name]  # Select the database
    collection = db[collection_name]  # Select the collection

    # Fetch all documents from the collection
    try:
        logging.info(f"Fetching data from MongoDB collection: {collection_name}...")
        data = list(collection.find())  # Convert cursor to a list of dictionaries

        if not data:
            logging.info("No data found in the MongoDB collection.")
            

        # Convert the list of dictionaries into a pandas DataFrame
        df = pd.DataFrame(data)

        # Drop the MongoDB ObjectId field if it exists (optional)
        if '_id' in df.columns:
            df = df.drop(columns=['_id'])

        logging.info("Data successfully fetched and converted to a DataFrame!")
        return df

    except Exception as e:
        logging.error(f"An error occurred while fetching data: {e}")
        raise e  
        
        
@step(enable_cache=False)
def ingest_data(collection_name: str = "historical_data", database_name: str = "crypto_data") -> pd.DataFrame:
    
    logging.info("Started data ingestion process from MongoDB.")

    try:
        # Use the fetch_data_from_mongodb function to fetch data
        df = fetch_data_from_mongodb(collection_name=collection_name, database_name=database_name)

        if df.empty:
            logging.warning("No data was loaded. Check the collection name or the database content.")
        else:
            logging.info(f"Data ingestion completed. Number of records loaded: {len(df)}.")

        return df
    
    except Exception as e:
        logging.error(f"Error while reading data from {collection_name} in {database_name}: {e}")
        raise e  

di sini adalah papan pemuka mlflow, di mana anda dapat melihat metrik penilaian dan parameter model:

Langkah 11: Membina aplikasi Streamlit Ramalan Harga Bitcoin Menggunakan MLOPS

StreamLit adalah sumber terbuka sumber terbuka, Rangka Kerja Berasaskan Python, yang digunakan untuk membuat UI interaktif, kita boleh menggunakan Streamlit untuk membina aplikasi web dengan cepat, tanpa mengetahui pembangunan backend atau frontend. Pertama, kita perlu memasang Streamlit pada sistem kami.

Sekali lagi, anda boleh mencari kod di GitHub untuk aplikasi Streamlit.

Ramalan Harga Bitcoin Menggunakan MLOPS

Berikut adalah kod github dan penjelasan video projek untuk pemahaman anda yang lebih baik.

Kesimpulan

Dalam artikel ini, kami telah berjaya membina projek MLOPS Ramalan Bitcoin yang siap sedia, siap sedia. Dari memperoleh data melalui API dan memprosesnya untuk latihan model, penilaian, dan penempatan, projek kami menyoroti peranan kritikal MLOPS dalam menghubungkan pembangunan dengan pengeluaran. Kami satu langkah lebih dekat untuk membentuk masa depan meramalkan harga bitcoin dalam masa nyata. API memberikan akses lancar kepada data luaran, seperti data harga bitcoin dari API CCData, menghapuskan keperluan untuk dataset yang sedia ada.

Takeaways Key

    API membolehkan akses lancar ke data luaran, seperti data harga bitcoin dari CCData API, menghapuskan keperluan untuk dataset yang sedia ada.
  • zenml dan mlflow adalah alat yang mantap yang memudahkan pembangunan, penjejakan, dan penggunaan model pembelajaran mesin dalam aplikasi dunia nyata.
  • kami telah mengikuti amalan terbaik dengan melakukan pengambilan data, pembersihan, kejuruteraan ciri, latihan model, dan penilaian dengan betul.
  • Pipelin penempatan dan kesimpulan yang berterusan adalah penting untuk memastikan model tetap cekap dan boleh didapati dalam persekitaran pengeluaran.
Soalan Lazim

Q1. Adakah zenml percuma untuk digunakan? a. Ya, ZenML adalah rangka kerja MLOPS sumber terbuka yang menjadikan peralihan dari pembangunan tempatan ke saluran paip pengeluaran semudah 1 baris kod.

Q2. Apa yang digunakan oleh MLFlow? a. MLFlow menjadikan pembangunan pembelajaran mesin lebih mudah dengan menawarkan alat untuk mengesan eksperimen, model versi, dan menggunakannya. Q3. Bagaimana cara debug daemon pelayan tidak menjalankan ralat?

a. Ini adalah kesilapan biasa yang akan anda hadapi dalam projek. Hanya jalankan `zenml logout -local` kemudian` zenml clean`, dan kemudian `zenml login -local`, sekali lagi menjalankan saluran paip. Ia akan diselesaikan.

Atas ialah kandungan terperinci Ramalan Harga Bitcoin Menggunakan MLOPS. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan
Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn
Ahli terapi AI ada di sini: 14 alat kesihatan mental yang perlu anda ketahuiAhli terapi AI ada di sini: 14 alat kesihatan mental yang perlu anda ketahuiApr 30, 2025 am 11:17 AM

Walaupun ia tidak dapat memberikan sambungan manusia dan intuisi ahli terapi terlatih, penyelidikan telah menunjukkan bahawa ramai orang selesa berkongsi kebimbangan dan kebimbangan mereka dengan bot AI yang agak tidak berwajah dan tanpa nama. Sama ada ini selalu baik saya

Memanggil AI ke lorong runcitMemanggil AI ke lorong runcitApr 30, 2025 am 11:16 AM

Kecerdasan Buatan (AI), satu dekad teknologi dalam pembuatan, merevolusikan industri runcit makanan. Dari keuntungan kecekapan berskala besar dan pengurangan kos kepada proses yang diselaraskan di pelbagai fungsi perniagaan, kesan AI adalah undeniabl

Mendapatkan ceramah pep dari ai generatif untuk mengangkat semangat andaMendapatkan ceramah pep dari ai generatif untuk mengangkat semangat andaApr 30, 2025 am 11:15 AM

Mari kita bercakap mengenainya. Analisis terobosan AI yang inovatif ini adalah sebahagian daripada liputan lajur Forbes yang berterusan pada AI terkini termasuk mengenal pasti dan menjelaskan pelbagai kerumitan AI yang memberi kesan (lihat pautan di sini). Di samping itu, untuk comp saya

Mengapa Hyper-Personalization berkuasa AI adalah satu kemestian untuk semua perniagaanMengapa Hyper-Personalization berkuasa AI adalah satu kemestian untuk semua perniagaanApr 30, 2025 am 11:14 AM

Mengekalkan imej profesional memerlukan kemas kini almari pakaian sekali -sekala. Walaupun membeli-belah dalam talian adalah mudah, ia tidak mempunyai kepastian percubaan secara peribadi. Penyelesaian saya? Peribadi yang berkuasa AI. Saya membayangkan pembantu AI yang mengendalikan pakaian selecti

Lupakan Duolingo: Ciri AI Baru Google Translate Mengajar BahasaLupakan Duolingo: Ciri AI Baru Google Translate Mengajar BahasaApr 30, 2025 am 11:13 AM

Google Translate menambah fungsi pembelajaran bahasa Menurut Android Authority, App Expers AssembleDebug telah mendapati bahawa versi terbaru aplikasi Google Translate mengandungi mod ujian "amalan" baru yang direka untuk membantu pengguna meningkatkan kemahiran bahasa mereka melalui aktiviti yang diperibadikan. Ciri ini kini tidak dapat dilihat oleh pengguna, tetapi AssembleDebug dapat mengaktifkannya dan melihat beberapa elemen antara muka pengguna yang baru. Apabila diaktifkan, ciri ini menambah ikon topi tamat pengajian baru di bahagian bawah skrin yang ditandai dengan lencana "beta" yang menunjukkan bahawa ciri "amalan" akan dikeluarkan pada mulanya dalam bentuk eksperimen. Prompt pop timbul yang berkaitan menunjukkan "Amalan aktiviti yang disesuaikan untuk anda!", Yang bermaksud Google akan menjana disesuaikan

Mereka membuat TCP/IP untuk AI, dan ia dipanggil NandaMereka membuat TCP/IP untuk AI, dan ia dipanggil NandaApr 30, 2025 am 11:12 AM

Penyelidik MIT sedang membangunkan Nanda, protokol web yang direka untuk agen AI. Pendek untuk ejen rangkaian dan AI yang terdesentralisasi, Nanda membina Protokol Konteks Model Anthropic (MCP) dengan menambahkan keupayaan Internet, membolehkan AI AGEN

The Prompt: Deepfake Detection adalah perniagaan yang berkembang pesatThe Prompt: Deepfake Detection adalah perniagaan yang berkembang pesatApr 30, 2025 am 11:11 AM

Usaha terbaru Meta: Aplikasi AI untuk menyaingi chatgpt Meta, syarikat induk Facebook, Instagram, WhatsApp, dan Threads, melancarkan aplikasi berkuasa AI yang baru. Aplikasi mandiri ini, Meta AI, bertujuan untuk bersaing secara langsung dengan chatgpt Openai. Tuil

Dua tahun akan datang dalam keselamatan siber AI untuk pemimpin perniagaanDua tahun akan datang dalam keselamatan siber AI untuk pemimpin perniagaanApr 30, 2025 am 11:10 AM

Menavigasi serangan AI Cyber ​​yang semakin meningkat Baru-baru ini, Jason Clinton, Ciso untuk Anthropic, menggariskan risiko yang muncul yang terikat kepada identiti bukan manusia-sebagai komunikasi komunikasi ke mesin, melindungi "identiti" ini menjadi

See all articles

Alat AI Hot

Undresser.AI Undress

Undresser.AI Undress

Apl berkuasa AI untuk mencipta foto bogel yang realistik

AI Clothes Remover

AI Clothes Remover

Alat AI dalam talian untuk mengeluarkan pakaian daripada foto.

Undress AI Tool

Undress AI Tool

Gambar buka pakaian secara percuma

Clothoff.io

Clothoff.io

Penyingkiran pakaian AI

Video Face Swap

Video Face Swap

Tukar muka dalam mana-mana video dengan mudah menggunakan alat tukar muka AI percuma kami!

Alat panas

ZendStudio 13.5.1 Mac

ZendStudio 13.5.1 Mac

Persekitaran pembangunan bersepadu PHP yang berkuasa

mPDF

mPDF

mPDF ialah perpustakaan PHP yang boleh menjana fail PDF daripada HTML yang dikodkan UTF-8. Pengarang asal, Ian Back, menulis mPDF untuk mengeluarkan fail PDF "dengan cepat" dari tapak webnya dan mengendalikan bahasa yang berbeza. Ia lebih perlahan dan menghasilkan fail yang lebih besar apabila menggunakan fon Unicode daripada skrip asal seperti HTML2FPDF, tetapi menyokong gaya CSS dsb. dan mempunyai banyak peningkatan. Menyokong hampir semua bahasa, termasuk RTL (Arab dan Ibrani) dan CJK (Cina, Jepun dan Korea). Menyokong elemen peringkat blok bersarang (seperti P, DIV),

SecLists

SecLists

SecLists ialah rakan penguji keselamatan muktamad. Ia ialah koleksi pelbagai jenis senarai yang kerap digunakan semasa penilaian keselamatan, semuanya di satu tempat. SecLists membantu menjadikan ujian keselamatan lebih cekap dan produktif dengan menyediakan semua senarai yang mungkin diperlukan oleh penguji keselamatan dengan mudah. Jenis senarai termasuk nama pengguna, kata laluan, URL, muatan kabur, corak data sensitif, cangkerang web dan banyak lagi. Penguji hanya boleh menarik repositori ini ke mesin ujian baharu dan dia akan mempunyai akses kepada setiap jenis senarai yang dia perlukan.

Penyesuai Pelayan SAP NetWeaver untuk Eclipse

Penyesuai Pelayan SAP NetWeaver untuk Eclipse

Integrasikan Eclipse dengan pelayan aplikasi SAP NetWeaver.

MinGW - GNU Minimalis untuk Windows

MinGW - GNU Minimalis untuk Windows

Projek ini dalam proses untuk dipindahkan ke osdn.net/projects/mingw, anda boleh terus mengikuti kami di sana. MinGW: Port Windows asli bagi GNU Compiler Collection (GCC), perpustakaan import yang boleh diedarkan secara bebas dan fail pengepala untuk membina aplikasi Windows asli termasuk sambungan kepada masa jalan MSVC untuk menyokong fungsi C99. Semua perisian MinGW boleh dijalankan pada platform Windows 64-bit.