首頁  >  文章  >  後端開發  >  為什麼現實世界的機器學習需要分散式運算

為什麼現實世界的機器學習需要分散式運算

WBOY
WBOY原創
2024-09-10 06:49:32750瀏覽

Why You Need Distributed Computing for Real-World Machine Learning

PySpark 如何幫助您像專業人士一樣處理龐大的資料集

PyTorch 和 TensorFlow 等機器學習框架非常適合建立模型。但現實是,當涉及現實世界的專案時(處理巨大的資料集),您需要的不僅僅是一個好的模型。您需要一種有效處理和管理所有資料的方法。這就是像 PySpark 這樣的分散式計算可以拯救世界的地方。

讓我們來分析為什麼在現實世界的機器學習中處理大數據意味著超越 PyTorch 和 TensorFlow,以及 PySpark 如何幫助您實現這一目標。
真正的問題:大數據
您在網路上看到的大多數機器學習範例都使用小型、易於管理的資料集。您可以將整個事情放入記憶體中,進行嘗試,並在幾分鐘內訓練模型。但在現實場景中(例如信用卡詐欺偵測、推薦系統或財務預測),您要處理數百萬甚至數十億行。突然間,您的筆記型電腦或伺服器無法處理它。

如果您嘗試將所有資料一次載入到 PyTorch 或 TensorFlow 中,事情就會崩潰。這些框架是為模型訓練而設計的,而不是為了有效處理巨大的資料集。這就是分散式運算變得至關重要的地方。
為什麼 PyTorch 和 TensorFlow 還不夠
PyTorch 和 TensorFlow 非常適合建立和最佳化模型,但在處理大規模資料任務時卻表現不佳。兩個主要問題:

  • 記憶體過載:他們在訓練之前將整個資料集載入到記憶體中。這適用於小型資料集,但當您擁有 TB 級的資料時,遊戲就結束了。
  • 無分散式資料處理:PyTorch 和 TensorFlow 並不是為處理分散式資料處理而建構的。如果您有大量資料分佈在多台機器上,那麼它們並沒有真正的幫助。

這就是 PySpark 的閃光點。它旨在處理分散式數據,在多台機器上高效處理數據,同時處理大量數據集,而不會導致系統崩潰。

真實範例:使用 PySpark 偵測信用卡詐欺
讓我們深入研究一個例子。假設您正在開發使用信用卡交易資料的詐欺偵測系統。在本例中,我們將使用 Kaggle 的流行資料集。它包含超過 284,000 筆交易,其中不到 1% 是詐騙交易。

第 1 步:在 Google Colab 中設定 PySpark
為此,我們將使用 Google Colab,因為它允許我們以最少的設定運行 PySpark。

!pip install pyspark

接下來,導入必要的函式庫並啟動 Spark 會話。

import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, udf
from pyspark.ml.feature import VectorAssembler, StringIndexer, MinMaxScaler
from pyspark.ml.classification import RandomForestClassifier, GBTClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.linalg import Vectors
import numpy as np
from pyspark.sql.types import FloatType

啟動 pyspark 會話

spark = SparkSession.builder \
    .appName("FraudDetectionImproved") \
    .master("local[*]") \
    .config("spark.executorEnv.PYTHONHASHSEED", "0") \
    .getOrCreate()

第 2 步:載入與準備資料

data = spark.read.csv('creditcard.csv', header=True, inferSchema=True)
data = data.orderBy("Time")  # Ensure data is sorted by time
data.show(5)
data.describe().show()
# Check for missing values in each column
data.select([sum(col(c).isNull().cast("int")).alias(c) for c in data.columns]).show()

# Prepare the feature columns
feature_columns = data.columns
feature_columns.remove("Class")  # Removing "Class" column as it is our label

# Assemble features into a single vector
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
data = assembler.transform(data)
data.select("features", "Class").show(5)

# Split data into train (60%), test (20%), and unseen (20%)
train_data, temp_data = data.randomSplit([0.6, 0.4], seed=42)
test_data, unseen_data = temp_data.randomSplit([0.5, 0.5], seed=42)

# Print class distribution in each dataset
print("Train Data:")
train_data.groupBy("Class").count().show()

print("Test and parameter optimisation Data:")
test_data.groupBy("Class").count().show()

print("Unseen Data:")
unseen_data.groupBy("Class").count().show()

第 3 步:初始化模型

# Initialize RandomForestClassifier
rf = RandomForestClassifier(labelCol="Class", featuresCol="features", probabilityCol="probability")

# Create ParamGrid for Cross Validation
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [10, 20 ]) \
    .addGrid(rf.maxDepth, [5, 10]) \
    .build()

# Create 5-fold CrossValidator
crossval = CrossValidator(estimator=rf,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(labelCol="Class", metricName="areaUnderROC"),
                          numFolds=5)

第 4 步:擬合、運行交叉驗證,並選擇最佳參數集

# Run cross-validation, and choose the best set of parameters
rf_model = crossval.fit(train_data)

# Make predictions on test data
predictions_rf = rf_model.transform(test_data)

# Evaluate Random Forest Model
binary_evaluator = BinaryClassificationEvaluator(labelCol="Class", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
pr_evaluator = BinaryClassificationEvaluator(labelCol="Class", rawPredictionCol="rawPrediction", metricName="areaUnderPR")

auc_rf = binary_evaluator.evaluate(predictions_rf)
auprc_rf = pr_evaluator.evaluate(predictions_rf)
print(f"Random Forest - AUC: {auc_rf:.4f}, AUPRC: {auprc_rf:.4f}")

# UDF to extract positive probability from probability vector
extract_prob = udf(lambda prob: float(prob[1]), FloatType())
predictions_rf = predictions_rf.withColumn("positive_probability", extract_prob(col("probability")))

第 5 步計算精確率、回想率和 F1 分數的函數

# Function to calculate precision, recall, and F1-score
def calculate_metrics(predictions):
    tp = predictions.filter((col("Class") == 1) & (col("prediction") == 1)).count()
    fp = predictions.filter((col("Class") == 0) & (col("prediction") == 1)).count()
    fn = predictions.filter((col("Class") == 1) & (col("prediction") == 0)).count()

    precision = tp / (tp + fp) if (tp + fp) != 0 else 0
    recall = tp / (tp + fn) if (tp + fn) != 0 else 0
    f1_score = (2 * precision * recall) / (precision + recall) if (precision + recall) != 0 else 0

    return precision, recall, f1_score

第 6 步:找出模型的最佳閾值

# Find the best threshold for the model
best_threshold = 0.5
best_f1 = 0
for threshold in np.arange(0.1, 0.9, 0.1):
    thresholded_predictions = predictions_rf.withColumn("prediction", (col("positive_probability") > threshold).cast("double"))
    precision, recall, f1 = calculate_metrics(thresholded_predictions)

    if f1 > best_f1:
        best_f1 = f1
        best_threshold = threshold

print(f"Best threshold: {best_threshold}, Best F1-score: {best_f1:.4f}")

第七步:評估未見過的數據

# Evaluate on unseen data
predictions_unseen = rf_model.transform(unseen_data)
auc_unseen = binary_evaluator.evaluate(predictions_unseen)
print(f"Unseen Data - AUC: {auc_unseen:.4f}")

precision, recall, f1 = calculate_metrics(predictions_unseen)
print(f"Unseen Data - Precision: {precision:.4f}, Recall: {recall:.4f}, F1-score: {f1:.4f}")

area_under_roc = binary_evaluator.evaluate(predictions_unseen)
area_under_pr = pr_evaluator.evaluate(predictions_unseen)
print(f"Unseen Data - AUC: {area_under_roc:.4f}, AUPRC: {area_under_pr:.4f}")

結果

Best threshold: 0.30000000000000004, Best F1-score: 0.9062
Unseen Data - AUC: 0.9384
Unseen Data - Precision: 0.9655, Recall: 0.7568, F1-score: 0.8485
Unseen Data - AUC: 0.9423, AUPRC: 0.8618

然後您可以保存此模型(幾KB)並在 pyspark 管道中的任何地方使用它

rf_model.save()

這就是 PySpark 在處理現實機器學習任務中的大型資料集時產生巨大差異的原因:

輕鬆擴展:PySpark 可以跨叢集分配任務,讓您能夠處理 TB 級的數據,而不會耗盡記憶體。
動態資料處理:PySpark 不需要將整個資料集載入記憶體。它根據需要處理數據,這使得它更有效率。
更快的模型訓練:借助分散式計算,您可以透過在多台機器上分配計算工作負載來更快地訓練模型。
最後的想法
PyTorch 和 TensorFlow 是建立機器學習模型的絕佳工具,但對於現實世界的大規模任務,您需要更多工具。使用 PySpark 進行分散式運算可讓您有效率地處理龐大資料集、即時處理資料並擴展機器學習管道。

因此,下次您處理大量資料時(無論是詐欺偵測、推薦系統還是財務分析),請考慮使用 PySpark 將您的專案提升到一個新的水平。

有關完整的程式碼和結果,請查看此筆記本。 :
https://colab.research.google.com/drive/1W9naxNZirirLRodSEnHAUWevYd5LH8D4?authuser=5#scrollTo=odmodmqKcY23

__

私は Swapnil です。お気軽にコメントを残してください。結果やアイデアについて、またはデータ、ソフトウェア開発の仕事や仕事について swapnil@nooffice.no にメッセージを送ってください

以上是為什麼現實世界的機器學習需要分散式運算的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn