Tidak tahu banyak tentang Bitcoin atau turun naik harga tetapi ingin membuat keputusan pelaburan untuk membuat keuntungan? Model pembelajaran mesin ini mempunyai belakang anda. Ia boleh meramalkan harga lebih baik daripada ahli astrologi. Dalam artikel ini, kami akan membina model ML untuk meramalkan dan meramalkan harga bitcoin, menggunakan ZenML dan MLFlow. Oleh itu, mari kita mulakan perjalanan kami untuk memahami bagaimana orang boleh menggunakan alat ML dan MLOPS untuk meramalkan masa depan.
Objektif Pembelajaran
- belajar untuk mengambil data langsung menggunakan API dengan cekap.
- fahami apa zenml, mengapa kita menggunakan mlflow, dan bagaimana anda boleh mengintegrasikannya dengan zenml.
- meneroka proses penempatan untuk model pembelajaran mesin, dari idea ke pengeluaran.
- Ketahui cara membuat aplikasi Streamlit mesra pengguna untuk ramalan model pembelajaran mesin interaktif.
Jadual Kandungan
Langkah 8: Latihan Model
- Pernyataan Masalah
- Harga bitcoin sangat tidak menentu, dan membuat ramalan adalah mustahil. Dalam projek kami, kami menggunakan Praktikesto Terbaik TMLOPS membina model LSTM untuk meramalkan Bitcoinprices dan Trend.
- Sebelum melaksanakan projek, mari kita lihat seni bina projek.
- Pelaksanaan Projek
- mari kita mulakan dengan mengakses API.
- Mengapa kita melakukan ini? Anda boleh mendapatkan data harga bitcoin bersejarah dari dataset yang berbeza, tetapi dengan API, kita boleh mempunyai akses kepada data pasaran hidup.
- Langkah 1: Mengakses API
- Daftar untuk API Access:
- sebaik sahaja anda mendaftar di halaman API TheCCData. Anda boleh mendapatkan kunci API percuma dari pagehttps ini: //developers.cryptocompare.com/documentation/data-api/index_cc_v1_historical_days
- Ambil data harga bitcoin:
- Dengan kod di bawah, anda boleh mengambil data harga bitcoin dari API CCData dan menukarnya ke dalam data data PANDAS. Juga, simpan kunci API dalam fail .env.
- Langkah 2: Menyambung ke pangkalan data menggunakan mongoDB
import requests
import pandas as pd
from dotenv import load_dotenv
import os
# Load the .env file
load_dotenv()
def fetch_crypto_data(api_uri):
response = requests.get(
api_uri,
params={
"market": "cadli",
"instrument": "BTC-USD",
"limit": 5000,
"aggregate": 1,
"fill": "true",
"apply_mapping": "true",
"response_format": "JSON"
},
headers={"Content-type": "application/json; charset=UTF-8"}
)
if response.status_code == 200:
print('API Connection Successful! \nFetching the data...')
data = response.json()
data_list = data.get('Data', [])
df = pd.DataFrame(data_list)
df['DATE'] = pd.to_datetime(df['TIMESTAMP'], unit='s')
return df # Return the DataFrame
else:
raise Exception(f"API Error: {response.status_code} - {response.text}")
Kod ini menyambung ke MongoDB, mengambil data harga bitcoin melalui API, dan mengemas kini pangkalan data dengan semua entri baru selepas tarikh log masuk terkini.
memperkenalkan zenml
zenmlis Platform sumber terbuka yang disesuaikan untuk operasi pembelajaran mesin, menyokong penciptaan saluran paip yang fleksibel dan siap pengeluaran. Di samping itu, ZenML mengintegrasikan dengan pelbagai alat pembelajaran mesin likemlflow, bentoml, dan lain -lain, untuk membuat saluran paip ML yang lancar.
⚠️ Jika anda adalah pengguna Windows, cuba pasang WSL pada sistem anda. ZenML tidak menyokong Windows.
Dalam projek ini, kami akan melaksanakan saluran paip tradisional, yang menggunakan ZenML, dan kami akan mengintegrasikan MLFlow dengan ZenML, untuk penjejakan eksperimen.
pra-syarat dan perintah ZENML asas
- python 3.12 atau lebih tinggi: anda boleh mendapatkannya dari sini: https: //www.python.org/downloads/
- Aktifkan persekitaran maya anda:
import os from pymongo import MongoClient from dotenv import load_dotenv from data.management.api import fetch_crypto_data # Import the API function import pandas as pd load_dotenv() MONGO_URI = os.getenv("MONGO_URI") API_URI = os.getenv("API_URI") client = MongoClient(MONGO_URI, ssl=True, ssl_certfile=None, ssl_ca_certs=None) db = client['crypto_data'] collection = db['historical_data'] try: latest_entry = collection.find_one(sort=[("DATE", -1)]) # Find the latest date if latest_entry: last_date = pd.to_datetime(latest_entry['DATE']).strftime('%Y-%m-%d') else: last_date = '2011-03-27' # Default start date if MongoDB is empty print(f"Fetching data starting from {last_date}...") new_data_df = fetch_crypto_data(API_URI) if latest_entry: new_data_df = new_data_df[new_data_df['DATE'] > last_date] if not new_data_df.empty: data_to_insert = new_data_df.to_dict(orient='records') result = collection.insert_many(data_to_insert) print(f"Inserted {len(result.inserted_ids)} new records into MongoDB.") else: print("No new data to insert.") except Exception as e: print(f"An error occurred: {e}")
- perintah zenml:
Semua perintah ZenML teras bersama -sama dengan fungsi mereka disediakan di bawah:
#create a virtual environment python3 -m venv venv #Activate your virtual environmnent in your project folder source venv/bin/activate
Langkah 3: Integrasi Mlflow dengan Zenml
Kami menggunakan MLFlow untuk penjejakan eksperimen, untuk menjejaki model, artifak, metrik, dan nilai hiperparameter kami. Kami mendaftarkan MLFlow untuk pengesanan eksperimen dan penempatan model di sini:
#Install zenml pip install zenml #To Launch zenml server and dashboard locally pip install "zenml[server]" #To check the zenml Version: zenml version #To initiate a new repository zenml init #To run the dashboard locally: zenml login --local #To know the status of our zenml Pipelines zenml show #To shutdown the zenml server zenml clean
Senarai Stack Zenml
di sini, anda dapat melihat susun atur projek. Sekarang mari kita bincangkannya satu demi satu secara terperinci.
#Integrating mlflow with ZenML zenml integration install mlflow -y #Register the experiment tracker zenml experiment-tracker register mlflow_tracker --flavor=mlflow #Registering the model deployer zenml model-deployer register mlflow --flavor=mlflow #Registering the stack zenml stack register local-mlflow-stack-new -a default -o default -d mlflow -e mlflow_tracker --set #To view the stack list zenml stack --listLangkah 4: Pengingesan Data
Kami mula -mula menelan data dari API ke MongoDB dan mengubahnya menjadi PANDAS DataFrame.
bitcoin_price_prediction_mlops/ # Project directory ├── data/ │ └── management/ │ ├── api_to_mongodb.py # Code to fetch data and save it to MongoDB │ └── api.py # API-related utility functions │ ├── pipelines/ │ ├── deployment_pipeline.py # Deployment pipeline │ └── training_pipeline.py # Training pipeline │ ├── saved_models/ # Directory for storing trained models ├── saved_scalers/ # Directory for storing scalers used in data preprocessing │ ├── src/ # Source code │ ├── data_cleaning.py # Data cleaning and preprocessing │ ├── data_ingestion.py # Data ingestion │ ├── data_splitter.py # Data splitting │ ├── feature_engineering.py # Feature engineering │ ├── model_evaluation.py # Model evaluation │ └── model_training.py # Model training │ ├── steps/ # ZenML steps │ ├── clean_data.py # ZenML step for cleaning data │ ├── data_splitter.py # ZenML step for data splitting │ ├── dynamic_importer.py # ZenML step for importing dynamic data │ ├── feature_engineering.py # ZenML step for feature engineering │ ├── ingest_data.py # ZenML step for data ingestion │ ├── model_evaluation.py # ZenML step for model evaluation │ ├── model_training.py # ZenML step for training the model │ ├── prediction_service_loader.py # ZenML step for loading prediction services │ ├── predictor.py # ZenML step for prediction │ └── utils.py # Utility functions for steps │ ├── .env # Environment variables file ├── .gitignore # Git ignore file │ ├── app.py # Streamlit user interface app │ ├── README.md # Project documentation ├── requirements.txt # List of required packages ├── run_deployment.py # Code for running deployment and prediction pipeline ├── run_pipeline.py # Code for running training pipeline └── .zen/ # ZenML directory (created automatically after ZenML initialization)
Kami menambah @step sebagai penghias kepada ingest_data () berfungsi untuk mengisytiharkannya sebagai langkah saluran paip latihan kami. Dengan cara yang sama, kami akan menulis kod untuk setiap langkah dalam seni bina projek dan membuat saluran paip.
Untuk melihat bagaimana saya telah menggunakan penghias@step , lihat pautan GitHub di bawah (folder langkah) untuk melalui kod untuk langkah -langkah lain dalam saluran paip iaitu pembersihan data, kejuruteraan ciri, pemisahan data, latihan model, dan penilaian model. Langkah 5: Pembersihan Data
Dalam langkah ini, kami akan membuat strategi yang berbeza untuk membersihkan data yang ditelan. Kami akan menggugurkan lajur yang tidak diingini dan nilai yang hilang dalam data.
import requests import pandas as pd from dotenv import load_dotenv import os # Load the .env file load_dotenv() def fetch_crypto_data(api_uri): response = requests.get( api_uri, params={ "market": "cadli", "instrument": "BTC-USD", "limit": 5000, "aggregate": 1, "fill": "true", "apply_mapping": "true", "response_format": "JSON" }, headers={"Content-type": "application/json; charset=UTF-8"} ) if response.status_code == 200: print('API Connection Successful! \nFetching the data...') data = response.json() data_list = data.get('Data', []) df = pd.DataFrame(data_list) df['DATE'] = pd.to_datetime(df['TIMESTAMP'], unit='s') return df # Return the DataFrame else: raise Exception(f"API Error: {response.status_code} - {response.text}")
Langkah 6: Kejuruteraan Ciri
Langkah ini mengambil data yang dibersihkan dari langkah data_cleaning sebelumnya. Kami mencipta ciri-ciri baru seperti purata bergerak mudah (SMA), purata bergerak eksponen (EMA), dan statistik yang tertinggal dan bergulir untuk menangkap trend, mengurangkan bunyi, dan membuat ramalan yang lebih dipercayai dari data siri masa. Di samping itu, kami skala ciri dan pembolehubah sasaran menggunakan skala Minmax.
import os from pymongo import MongoClient from dotenv import load_dotenv from data.management.api import fetch_crypto_data # Import the API function import pandas as pd load_dotenv() MONGO_URI = os.getenv("MONGO_URI") API_URI = os.getenv("API_URI") client = MongoClient(MONGO_URI, ssl=True, ssl_certfile=None, ssl_ca_certs=None) db = client['crypto_data'] collection = db['historical_data'] try: latest_entry = collection.find_one(sort=[("DATE", -1)]) # Find the latest date if latest_entry: last_date = pd.to_datetime(latest_entry['DATE']).strftime('%Y-%m-%d') else: last_date = '2011-03-27' # Default start date if MongoDB is empty print(f"Fetching data starting from {last_date}...") new_data_df = fetch_crypto_data(API_URI) if latest_entry: new_data_df = new_data_df[new_data_df['DATE'] > last_date] if not new_data_df.empty: data_to_insert = new_data_df.to_dict(orient='records') result = collection.insert_many(data_to_insert) print(f"Inserted {len(result.inserted_ids)} new records into MongoDB.") else: print("No new data to insert.") except Exception as e: print(f"An error occurred: {e}")
Langkah 7: Pemisahan data
Sekarang, kita membahagikan data yang diproses ke dalam latihan dan ujian dataset dalam nisbah 80:20.
#create a virtual environment python3 -m venv venv #Activate your virtual environmnent in your project folder source venv/bin/activate
Langkah 8: Latihan Model
Dalam langkah ini, kami melatih model Thelstm dengan berhenti awal untuk mengelakkan terlalu banyak, dan dengan menggunakan pembalakan automatik MLFlow untuk mengesan model dan eksperimen kami dan menyimpan model terlatih sebagai lstm_model.keras .
#Install zenml pip install zenml #To Launch zenml server and dashboard locally pip install "zenml[server]" #To check the zenml Version: zenml version #To initiate a new repository zenml init #To run the dashboard locally: zenml login --local #To know the status of our zenml Pipelines zenml show #To shutdown the zenml server zenml cleanLangkah 9: Penilaian Model
Oleh kerana ini adalah masalah regresi, kami menggunakan metrik penilaian seperti kesilapan kuadrat min (MSE), kesilapan akar kuadrat (MSE), kesilapan mutlak (MAE), dan R-kuadrat.
#Integrating mlflow with ZenML zenml integration install mlflow -y #Register the experiment tracker zenml experiment-tracker register mlflow_tracker --flavor=mlflow #Registering the model deployer zenml model-deployer register mlflow --flavor=mlflow #Registering the stack zenml stack register local-mlflow-stack-new -a default -o default -d mlflow -e mlflow_tracker --set #To view the stack list zenml stack --listSekarang kita akan mengatur semua langkah di atas ke dalam saluran paip. Mari buat fail baru latihan_pipeline.py.
bitcoin_price_prediction_mlops/ # Project directory ├── data/ │ └── management/ │ ├── api_to_mongodb.py # Code to fetch data and save it to MongoDB │ └── api.py # API-related utility functions │ ├── pipelines/ │ ├── deployment_pipeline.py # Deployment pipeline │ └── training_pipeline.py # Training pipeline │ ├── saved_models/ # Directory for storing trained models ├── saved_scalers/ # Directory for storing scalers used in data preprocessing │ ├── src/ # Source code │ ├── data_cleaning.py # Data cleaning and preprocessing │ ├── data_ingestion.py # Data ingestion │ ├── data_splitter.py # Data splitting │ ├── feature_engineering.py # Feature engineering │ ├── model_evaluation.py # Model evaluation │ └── model_training.py # Model training │ ├── steps/ # ZenML steps │ ├── clean_data.py # ZenML step for cleaning data │ ├── data_splitter.py # ZenML step for data splitting │ ├── dynamic_importer.py # ZenML step for importing dynamic data │ ├── feature_engineering.py # ZenML step for feature engineering │ ├── ingest_data.py # ZenML step for data ingestion │ ├── model_evaluation.py # ZenML step for model evaluation │ ├── model_training.py # ZenML step for training the model │ ├── prediction_service_loader.py # ZenML step for loading prediction services │ ├── predictor.py # ZenML step for prediction │ └── utils.py # Utility functions for steps │ ├── .env # Environment variables file ├── .gitignore # Git ignore file │ ├── app.py # Streamlit user interface app │ ├── README.md # Project documentation ├── requirements.txt # List of required packages ├── run_deployment.py # Code for running deployment and prediction pipeline ├── run_pipeline.py # Code for running training pipeline └── .zen/ # ZenML directory (created automatically after ZenML initialization)di sini,
@pipeline penghias digunakan untuk menentukan fungsiML_PIPELINE () sebagai saluran paip di zenml.
Untuk melihat papan pemuka untuk saluran paip latihan, hanya jalankan skrip run_pipeline.py. Mari buat fail run_pipeline.py.
import os import logging from pymongo import MongoClient from dotenv import load_dotenv from zenml import step import pandas as pd # Load the .env file load_dotenv() # Get MongoDB URI from environment variables MONGO_URI = os.getenv("MONGO_URI") def fetch_data_from_mongodb(collection_name:str, database_name:str): """ Fetches data from MongoDB and converts it into a pandas DataFrame. collection_name: Name of the MongoDB collection to fetch data. database_name: Name of the MongoDB database. return: A pandas DataFrame containing the data """ # Connect to the MongoDB client client = MongoClient(MONGO_URI) db = client[database_name] # Select the database collection = db[collection_name] # Select the collection # Fetch all documents from the collection try: logging.info(f"Fetching data from MongoDB collection: {collection_name}...") data = list(collection.find()) # Convert cursor to a list of dictionaries if not data: logging.info("No data found in the MongoDB collection.") # Convert the list of dictionaries into a pandas DataFrame df = pd.DataFrame(data) # Drop the MongoDB ObjectId field if it exists (optional) if '_id' in df.columns: df = df.drop(columns=['_id']) logging.info("Data successfully fetched and converted to a DataFrame!") return df except Exception as e: logging.error(f"An error occurred while fetching data: {e}") raise e @step(enable_cache=False) def ingest_data(collection_name: str = "historical_data", database_name: str = "crypto_data") -> pd.DataFrame: logging.info("Started data ingestion process from MongoDB.") try: # Use the fetch_data_from_mongodb function to fetch data df = fetch_data_from_mongodb(collection_name=collection_name, database_name=database_name) if df.empty: logging.warning("No data was loaded. Check the collection name or the database content.") else: logging.info(f"Data ingestion completed. Number of records loaded: {len(df)}.") return df except Exception as e: logging.error(f"Error while reading data from {collection_name} in {database_name}: {e}") raise eSekarang kami telah menyelesaikan membuat saluran paip. Jalankan arahan di bawah untuk melihat papan pemuka saluran paip.
class DataPreprocessor: def __init__(self, data: pd.DataFrame): self.data = data logging.info("DataPreprocessor initialized with data of shape: %s", data.shape) def clean_data(self) -> pd.DataFrame: """ Performs data cleaning by removing unnecessary columns, dropping columns with missing values, and returning the cleaned DataFrame. Returns: pd.DataFrame: The cleaned DataFrame with unnecessary and missing-value columns removed. """ logging.info("Starting data cleaning process.") # Drop unnecessary columns, including '_id' if it exists columns_to_drop = [ 'UNIT', 'TYPE', 'MARKET', 'INSTRUMENT', 'FIRST_MESSAGE_TIMESTAMP', 'LAST_MESSAGE_TIMESTAMP', 'FIRST_MESSAGE_VALUE', 'HIGH_MESSAGE_VALUE', 'HIGH_MESSAGE_TIMESTAMP', 'LOW_MESSAGE_VALUE', 'LOW_MESSAGE_TIMESTAMP', 'LAST_MESSAGE_VALUE', 'TOTAL_INDEX_UPDATES', 'VOLUME_TOP_TIER', 'QUOTE_VOLUME_TOP_TIER', 'VOLUME_DIRECT', 'QUOTE_VOLUME_DIRECT', 'VOLUME_TOP_TIER_DIRECT', 'QUOTE_VOLUME_TOP_TIER_DIRECT', '_id' # Adding '_id' to the list ] logging.info("Dropping columns: %s") self.data = self.drop_columns(self.data, columns_to_drop) # Drop columns where the number of missing values is greater than 0 logging.info("Dropping columns with missing values.") self.data = self.drop_columns_with_missing_values(self.data) logging.info("Data cleaning completed. Data shape after cleaning: %s", self.data.shape) return self.data def drop_columns(self, data: pd.DataFrame, columns: list) -> pd.DataFrame: """ Drops specified columns from the DataFrame. Returns: pd.DataFrame: The DataFrame with the specified columns removed. """ logging.info("Dropping columns: %s", columns) return data.drop(columns=columns, errors='ignore') def drop_columns_with_missing_values(self, data: pd.DataFrame) -> pd.DataFrame: """ Drops columns with any missing values from the DataFrame. Parameters: data: pd.DataFrame The DataFrame from which columns with missing values will be removed. Returns: pd.DataFrame: The DataFrame with columns containing missing values removed. """ missing_columns = data.columns[data.isnull().sum() > 0] if not missing_columns.empty: logging.info("Columns with missing values: %s", missing_columns.tolist()) else: logging.info("No columns with missing values found.") return data.loc[:, data.isnull().sum() == 0]selepas menjalankan perintah di atas, ia akan mengembalikan url papan pemuka pengesanan, yang kelihatan seperti ini.
Paip latihan kelihatan seperti ini di papan pemuka, yang diberikan di bawah:
Langkah 10: Penggunaan Model
Paip Penyebaran Berterusan
Paip ini bertanggungjawab untuk terus menggunakan model terlatih. Ia mula -mula menjalankan
ml_pipeline ()
darilatihan_pipeline.py
fail untuk melatih model, kemudian menggunakanimport joblib import pandas as pd from abc import ABC, abstractmethod from sklearn.preprocessing import MinMaxScaler # Abstract class for Feature Engineering strategy class FeatureEngineeringStrategy(ABC): @abstractmethod def generate_features(self, df: pd.DataFrame) -> pd.DataFrame: pass # Concrete class for calculating SMA, EMA, RSI, and other features class TechnicalIndicators(FeatureEngineeringStrategy): def generate_features(self, df: pd.DataFrame) -> pd.DataFrame: # Calculate SMA, EMA, and RSI df['SMA_20'] = df['CLOSE'].rolling(window=20).mean() df['SMA_50'] = df['CLOSE'].rolling(window=50).mean() df['EMA_20'] = df['CLOSE'].ewm(span=20, adjust=False).mean() # Price difference features df['OPEN_CLOSE_diff'] = df['OPEN'] - df['CLOSE'] df['HIGH_LOW_diff'] = df['HIGH'] - df['LOW'] df['HIGH_OPEN_diff'] = df['HIGH'] - df['OPEN'] df['CLOSE_LOW_diff'] = df['CLOSE'] - df['LOW'] # Lagged features df['OPEN_lag1'] = df['OPEN'].shift(1) df['CLOSE_lag1'] = df['CLOSE'].shift(1) df['HIGH_lag1'] = df['HIGH'].shift(1) df['LOW_lag1'] = df['LOW'].shift(1) # Rolling statistics df['CLOSE_roll_mean_14'] = df['CLOSE'].rolling(window=14).mean() df['CLOSE_roll_std_14'] = df['CLOSE'].rolling(window=14).std() # Drop rows with missing values (due to rolling windows, shifts) df.dropna(inplace=True) return df # Abstract class for Scaling strategy class ScalingStrategy(ABC): @abstractmethod def scale(self, df: pd.DataFrame, features: list, target: str): pass # Concrete class for MinMax Scaling class MinMaxScaling(ScalingStrategy): def scale(self, df: pd.DataFrame, features: list, target: str): """ Scales the features and target using MinMaxScaler. Parameters: df: pd.DataFrame The DataFrame containing the features and target. features: list List of feature column names. target: str The target column name. Returns: pd.DataFrame, pd.DataFrame: Scaled features and target """ scaler_X = MinMaxScaler(feature_range=(0, 1)) scaler_y = MinMaxScaler(feature_range=(0, 1)) X_scaled = scaler_X.fit_transform(df[features].values) y_scaled = scaler_y.fit_transform(df[[target]].values) joblib.dump(scaler_X, 'saved_scalers/scaler_X.pkl') joblib.dump(scaler_y, 'saved_scalers/scaler_y.pkl') return X_scaled, y_scaled, scaler_y # FeatureEngineeringContext: This will use the Strategy Pattern class FeatureEngineering: def __init__(self, feature_strategy: FeatureEngineeringStrategy, scaling_strategy: ScalingStrategy): self.feature_strategy = feature_strategy self.scaling_strategy = scaling_strategy def process_features(self, df: pd.DataFrame, features: list, target: str): # Generate features using the provided strategy df_with_features = self.feature_strategy.generate_features(df) # Scale features and target using the provided strategy X_scaled, y_scaled, scaler_y = self.scaling_strategy.scale(df_with_features, features, target) return df_with_features, X_scaled, y_scaled, scaler_y
Pipeline Inference
Kami menggunakan saluran paip kesimpulan untuk membuat ramalan pada data baru, menggunakan model yang digunakan. Mari kita lihat bagaimana kami melaksanakan saluran paip ini dalam projek kami.
import requests import pandas as pd from dotenv import load_dotenv import os # Load the .env file load_dotenv() def fetch_crypto_data(api_uri): response = requests.get( api_uri, params={ "market": "cadli", "instrument": "BTC-USD", "limit": 5000, "aggregate": 1, "fill": "true", "apply_mapping": "true", "response_format": "JSON" }, headers={"Content-type": "application/json; charset=UTF-8"} ) if response.status_code == 200: print('API Connection Successful! \nFetching the data...') data = response.json() data_list = data.get('Data', []) df = pd.DataFrame(data_list) df['DATE'] = pd.to_datetime(df['TIMESTAMP'], unit='s') return df # Return the DataFrame else: raise Exception(f"API Error: {response.status_code} - {response.text}")
mari kita lihat tentang setiap fungsi yang dipanggil dalam saluran paip kesimpulan di bawah:
dynamic_importer ()
Fungsi ini memuat data baru, melakukan pemprosesan data, dan mengembalikan data.
import os from pymongo import MongoClient from dotenv import load_dotenv from data.management.api import fetch_crypto_data # Import the API function import pandas as pd load_dotenv() MONGO_URI = os.getenv("MONGO_URI") API_URI = os.getenv("API_URI") client = MongoClient(MONGO_URI, ssl=True, ssl_certfile=None, ssl_ca_certs=None) db = client['crypto_data'] collection = db['historical_data'] try: latest_entry = collection.find_one(sort=[("DATE", -1)]) # Find the latest date if latest_entry: last_date = pd.to_datetime(latest_entry['DATE']).strftime('%Y-%m-%d') else: last_date = '2011-03-27' # Default start date if MongoDB is empty print(f"Fetching data starting from {last_date}...") new_data_df = fetch_crypto_data(API_URI) if latest_entry: new_data_df = new_data_df[new_data_df['DATE'] > last_date] if not new_data_df.empty: data_to_insert = new_data_df.to_dict(orient='records') result = collection.insert_many(data_to_insert) print(f"Inserted {len(result.inserted_ids)} new records into MongoDB.") else: print("No new data to insert.") except Exception as e: print(f"An error occurred: {e}")
prediction_service_loader ()
Fungsi ini dihiasi dengan @step . Kami memuatkan perkhidmatan penempatan W.R.T Model yang digunakan berdasarkan pipeline_name, dan step_name, di mana model yang digunakan kami bersedia untuk memproses pertanyaan ramalan untuk data baru.
garis sedia ada_services = mlflow_model_deployer_component.find_model_server () Mencari perkhidmatan penyebaran yang tersedia berdasarkan parameter yang diberikan seperti nama saluran paip dan nama langkah paip. Sekiranya tiada perkhidmatan yang tersedia, ia menunjukkan bahawa saluran paip penempatan sama ada tidak dijalankan atau mengalami masalah dengan saluran paip penempatan, jadi ia melemparkan runtimeerror.
#create a virtual environment python3 -m venv venv #Activate your virtual environmnent in your project folder source venv/bin/activate
Predictor ()
Fungsi mengambil dalam model MLFlow yang dikerahkan melalui MLFlowDeploymentservice dan data baru. Data diproses lebih jauh untuk memadankan format model yang diharapkan untuk membuat kesimpulan masa nyata.
#Install zenml pip install zenml #To Launch zenml server and dashboard locally pip install "zenml[server]" #To check the zenml Version: zenml version #To initiate a new repository zenml init #To run the dashboard locally: zenml login --local #To know the status of our zenml Pipelines zenml show #To shutdown the zenml server zenml clean
Untuk memvisualisasikan saluran penyebaran dan kesimpulan yang berterusan, kita perlu menjalankan skrip run_deployment.py, di mana konfigurasi penempatan dan ramalan akan ditakrifkan. (Sila periksa kod run_deployment.py dalam github yang diberikan di bawah).
#Integrating mlflow with ZenML zenml integration install mlflow -y #Register the experiment tracker zenml experiment-tracker register mlflow_tracker --flavor=mlflow #Registering the model deployer zenml model-deployer register mlflow --flavor=mlflow #Registering the stack zenml stack register local-mlflow-stack-new -a default -o default -d mlflow -e mlflow_tracker --set #To view the stack list zenml stack --list
Sekarang mari kita jalankan fail run_deployment.py untuk melihat papan pemuka saluran paip penempatan berterusan dan saluran paip kesimpulan.
bitcoin_price_prediction_mlops/ # Project directory ├── data/ │ └── management/ │ ├── api_to_mongodb.py # Code to fetch data and save it to MongoDB │ └── api.py # API-related utility functions │ ├── pipelines/ │ ├── deployment_pipeline.py # Deployment pipeline │ └── training_pipeline.py # Training pipeline │ ├── saved_models/ # Directory for storing trained models ├── saved_scalers/ # Directory for storing scalers used in data preprocessing │ ├── src/ # Source code │ ├── data_cleaning.py # Data cleaning and preprocessing │ ├── data_ingestion.py # Data ingestion │ ├── data_splitter.py # Data splitting │ ├── feature_engineering.py # Feature engineering │ ├── model_evaluation.py # Model evaluation │ └── model_training.py # Model training │ ├── steps/ # ZenML steps │ ├── clean_data.py # ZenML step for cleaning data │ ├── data_splitter.py # ZenML step for data splitting │ ├── dynamic_importer.py # ZenML step for importing dynamic data │ ├── feature_engineering.py # ZenML step for feature engineering │ ├── ingest_data.py # ZenML step for data ingestion │ ├── model_evaluation.py # ZenML step for model evaluation │ ├── model_training.py # ZenML step for training the model │ ├── prediction_service_loader.py # ZenML step for loading prediction services │ ├── predictor.py # ZenML step for prediction │ └── utils.py # Utility functions for steps │ ├── .env # Environment variables file ├── .gitignore # Git ignore file │ ├── app.py # Streamlit user interface app │ ├── README.md # Project documentation ├── requirements.txt # List of required packages ├── run_deployment.py # Code for running deployment and prediction pipeline ├── run_pipeline.py # Code for running training pipeline └── .zen/ # ZenML directory (created automatically after ZenML initialization)
saluran paip penempatan berterusan - output
Setelah menjalankan fail run_deployment.py, anda dapat melihat pautan papan pemuka mlflow yang kelihatan seperti ini.
Sekarang anda perlu menyalin dan menyisipkan pautan UI MLFlow di atas dalam baris arahan anda dan jalankannya.
import os import logging from pymongo import MongoClient from dotenv import load_dotenv from zenml import step import pandas as pd # Load the .env file load_dotenv() # Get MongoDB URI from environment variables MONGO_URI = os.getenv("MONGO_URI") def fetch_data_from_mongodb(collection_name:str, database_name:str): """ Fetches data from MongoDB and converts it into a pandas DataFrame. collection_name: Name of the MongoDB collection to fetch data. database_name: Name of the MongoDB database. return: A pandas DataFrame containing the data """ # Connect to the MongoDB client client = MongoClient(MONGO_URI) db = client[database_name] # Select the database collection = db[collection_name] # Select the collection # Fetch all documents from the collection try: logging.info(f"Fetching data from MongoDB collection: {collection_name}...") data = list(collection.find()) # Convert cursor to a list of dictionaries if not data: logging.info("No data found in the MongoDB collection.") # Convert the list of dictionaries into a pandas DataFrame df = pd.DataFrame(data) # Drop the MongoDB ObjectId field if it exists (optional) if '_id' in df.columns: df = df.drop(columns=['_id']) logging.info("Data successfully fetched and converted to a DataFrame!") return df except Exception as e: logging.error(f"An error occurred while fetching data: {e}") raise e @step(enable_cache=False) def ingest_data(collection_name: str = "historical_data", database_name: str = "crypto_data") -> pd.DataFrame: logging.info("Started data ingestion process from MongoDB.") try: # Use the fetch_data_from_mongodb function to fetch data df = fetch_data_from_mongodb(collection_name=collection_name, database_name=database_name) if df.empty: logging.warning("No data was loaded. Check the collection name or the database content.") else: logging.info(f"Data ingestion completed. Number of records loaded: {len(df)}.") return df except Exception as e: logging.error(f"Error while reading data from {collection_name} in {database_name}: {e}") raise e
di sini adalah papan pemuka mlflow, di mana anda dapat melihat metrik penilaian dan parameter model:
Langkah 11: Membina aplikasi Streamlit
Sekali lagi, anda boleh mencari kod di GitHub untuk aplikasi Streamlit.
Kesimpulan
Dalam artikel ini, kami telah berjaya membina projek MLOPS Ramalan Bitcoin yang siap sedia, siap sedia. Dari memperoleh data melalui API dan memprosesnya untuk latihan model, penilaian, dan penempatan, projek kami menyoroti peranan kritikal MLOPS dalam menghubungkan pembangunan dengan pengeluaran. Kami satu langkah lebih dekat untuk membentuk masa depan meramalkan harga bitcoin dalam masa nyata. API memberikan akses lancar kepada data luaran, seperti data harga bitcoin dari API CCData, menghapuskan keperluan untuk dataset yang sedia ada.
Takeaways Key
- API membolehkan akses lancar ke data luaran, seperti data harga bitcoin dari CCData API, menghapuskan keperluan untuk dataset yang sedia ada.
- zenml dan mlflow adalah alat yang mantap yang memudahkan pembangunan, penjejakan, dan penggunaan model pembelajaran mesin dalam aplikasi dunia nyata.
- kami telah mengikuti amalan terbaik dengan melakukan pengambilan data, pembersihan, kejuruteraan ciri, latihan model, dan penilaian dengan betul.
- Pipelin penempatan dan kesimpulan yang berterusan adalah penting untuk memastikan model tetap cekap dan boleh didapati dalam persekitaran pengeluaran.
Q1. Adakah zenml percuma untuk digunakan? a. Ya, ZenML adalah rangka kerja MLOPS sumber terbuka yang menjadikan peralihan dari pembangunan tempatan ke saluran paip pengeluaran semudah 1 baris kod.
Q2. Apa yang digunakan oleh MLFlow?
a. Ini adalah kesilapan biasa yang akan anda hadapi dalam projek. Hanya jalankan `zenml logout -local` kemudian` zenml clean`, dan kemudian `zenml login -local`, sekali lagi menjalankan saluran paip. Ia akan diselesaikan.
Atas ialah kandungan terperinci Ramalan Harga Bitcoin Menggunakan MLOPS. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Walaupun ia tidak dapat memberikan sambungan manusia dan intuisi ahli terapi terlatih, penyelidikan telah menunjukkan bahawa ramai orang selesa berkongsi kebimbangan dan kebimbangan mereka dengan bot AI yang agak tidak berwajah dan tanpa nama. Sama ada ini selalu baik saya

Kecerdasan Buatan (AI), satu dekad teknologi dalam pembuatan, merevolusikan industri runcit makanan. Dari keuntungan kecekapan berskala besar dan pengurangan kos kepada proses yang diselaraskan di pelbagai fungsi perniagaan, kesan AI adalah undeniabl

Mari kita bercakap mengenainya. Analisis terobosan AI yang inovatif ini adalah sebahagian daripada liputan lajur Forbes yang berterusan pada AI terkini termasuk mengenal pasti dan menjelaskan pelbagai kerumitan AI yang memberi kesan (lihat pautan di sini). Di samping itu, untuk comp saya

Mengekalkan imej profesional memerlukan kemas kini almari pakaian sekali -sekala. Walaupun membeli-belah dalam talian adalah mudah, ia tidak mempunyai kepastian percubaan secara peribadi. Penyelesaian saya? Peribadi yang berkuasa AI. Saya membayangkan pembantu AI yang mengendalikan pakaian selecti

Google Translate menambah fungsi pembelajaran bahasa Menurut Android Authority, App Expers AssembleDebug telah mendapati bahawa versi terbaru aplikasi Google Translate mengandungi mod ujian "amalan" baru yang direka untuk membantu pengguna meningkatkan kemahiran bahasa mereka melalui aktiviti yang diperibadikan. Ciri ini kini tidak dapat dilihat oleh pengguna, tetapi AssembleDebug dapat mengaktifkannya dan melihat beberapa elemen antara muka pengguna yang baru. Apabila diaktifkan, ciri ini menambah ikon topi tamat pengajian baru di bahagian bawah skrin yang ditandai dengan lencana "beta" yang menunjukkan bahawa ciri "amalan" akan dikeluarkan pada mulanya dalam bentuk eksperimen. Prompt pop timbul yang berkaitan menunjukkan "Amalan aktiviti yang disesuaikan untuk anda!", Yang bermaksud Google akan menjana disesuaikan

Penyelidik MIT sedang membangunkan Nanda, protokol web yang direka untuk agen AI. Pendek untuk ejen rangkaian dan AI yang terdesentralisasi, Nanda membina Protokol Konteks Model Anthropic (MCP) dengan menambahkan keupayaan Internet, membolehkan AI AGEN

Usaha terbaru Meta: Aplikasi AI untuk menyaingi chatgpt Meta, syarikat induk Facebook, Instagram, WhatsApp, dan Threads, melancarkan aplikasi berkuasa AI yang baru. Aplikasi mandiri ini, Meta AI, bertujuan untuk bersaing secara langsung dengan chatgpt Openai. Tuil

Menavigasi serangan AI Cyber yang semakin meningkat Baru-baru ini, Jason Clinton, Ciso untuk Anthropic, menggariskan risiko yang muncul yang terikat kepada identiti bukan manusia-sebagai komunikasi komunikasi ke mesin, melindungi "identiti" ini menjadi


Alat AI Hot

Undresser.AI Undress
Apl berkuasa AI untuk mencipta foto bogel yang realistik

AI Clothes Remover
Alat AI dalam talian untuk mengeluarkan pakaian daripada foto.

Undress AI Tool
Gambar buka pakaian secara percuma

Clothoff.io
Penyingkiran pakaian AI

Video Face Swap
Tukar muka dalam mana-mana video dengan mudah menggunakan alat tukar muka AI percuma kami!

Artikel Panas

Alat panas

ZendStudio 13.5.1 Mac
Persekitaran pembangunan bersepadu PHP yang berkuasa

mPDF
mPDF ialah perpustakaan PHP yang boleh menjana fail PDF daripada HTML yang dikodkan UTF-8. Pengarang asal, Ian Back, menulis mPDF untuk mengeluarkan fail PDF "dengan cepat" dari tapak webnya dan mengendalikan bahasa yang berbeza. Ia lebih perlahan dan menghasilkan fail yang lebih besar apabila menggunakan fon Unicode daripada skrip asal seperti HTML2FPDF, tetapi menyokong gaya CSS dsb. dan mempunyai banyak peningkatan. Menyokong hampir semua bahasa, termasuk RTL (Arab dan Ibrani) dan CJK (Cina, Jepun dan Korea). Menyokong elemen peringkat blok bersarang (seperti P, DIV),

SecLists
SecLists ialah rakan penguji keselamatan muktamad. Ia ialah koleksi pelbagai jenis senarai yang kerap digunakan semasa penilaian keselamatan, semuanya di satu tempat. SecLists membantu menjadikan ujian keselamatan lebih cekap dan produktif dengan menyediakan semua senarai yang mungkin diperlukan oleh penguji keselamatan dengan mudah. Jenis senarai termasuk nama pengguna, kata laluan, URL, muatan kabur, corak data sensitif, cangkerang web dan banyak lagi. Penguji hanya boleh menarik repositori ini ke mesin ujian baharu dan dia akan mempunyai akses kepada setiap jenis senarai yang dia perlukan.

Penyesuai Pelayan SAP NetWeaver untuk Eclipse
Integrasikan Eclipse dengan pelayan aplikasi SAP NetWeaver.

MinGW - GNU Minimalis untuk Windows
Projek ini dalam proses untuk dipindahkan ke osdn.net/projects/mingw, anda boleh terus mengikuti kami di sana. MinGW: Port Windows asli bagi GNU Compiler Collection (GCC), perpustakaan import yang boleh diedarkan secara bebas dan fail pengepala untuk membina aplikasi Windows asli termasuk sambungan kepada masa jalan MSVC untuk menyokong fungsi C99. Semua perisian MinGW boleh dijalankan pada platform Windows 64-bit.
