ホームページ >バックエンド開発 >Python チュートリアル >現実世界の機械学習に分散コンピューティングが必要な理由

現実世界の機械学習に分散コンピューティングが必要な理由

WBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWB
WBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBオリジナル
2024-09-10 06:49:32836ブラウズ

Why You Need Distributed Computing for Real-World Machine Learning

そして、PySpark がプロのように巨大なデータセットを処理するのにどのように役立つのか

PyTorch や TensorFlow などの機械学習フレームワークは、モデルの構築に最適です。しかし現実には、巨大なデータセットを扱う現実世界のプロジェクトでは、単なる優れたモデル以上のものが必要です。すべてのデータを効率的に処理および管理する方法が必要です。そこで窮地を救うために、PySpark のような分散コンピューティングが登場します。

現実世界の機械学習でビッグデータを扱うことが、PyTorch や TensorFlow を超えることを意味する理由と、そこに到達するのに PySpark がどのように役立つのかを詳しく見てみましょう。
本当の問題: ビッグデータ
オンラインで見られるほとんどの ML サンプルでは、​​小規模で管理しやすいデータセットが使用されています。すべてをメモリに格納し、試して、モデルを数分でトレーニングできます。しかし、クレジット カード不正行為の検出、推奨システム、財務予測などの現実のシナリオでは、数百万、場合によっては数十億の行を処理することになります。突然、ラップトップまたはサーバーが処理できなくなります。

すべてのデータを一度に PyTorch または TensorFlow にロードしようとすると、問題が発生します。これらのフレームワークは、巨大なデータセットを効率的に処理するためではなく、モデルのトレーニングのために設計されています。ここで分散コンピューティングが重要になります。
PyTorch と TensorFlow だけでは不十分な理由
PyTorch と TensorFlow はモデルの構築と最適化には最適ですが、大規模なデータ タスクを扱う場合には不十分です。 2 つの大きな問題:

  • メモリ過負荷: トレーニング前にデータセット全体をメモリにロードします。これは小さなデータセットでは機能しますが、テラバイト規模のデータになるとゲームオーバーです。
  • 分散データ処理なし: PyTorch と TensorFlow は分散データ処理を処理するように構築されていません。大量のデータが複数のマシンに分散している場合、あまり役に立ちません。

ここで PySpark が威力を発揮します。分散データを操作するように設計されており、システムをクラッシュさせることなく大量のデータセットを処理しながら、複数のマシン間でデータを効率的に処理します。

実際の例: PySpark を使用したクレジット カード詐欺の検出
例を見てみましょう。クレジット カード取引データを使用した不正検出システムに取り組んでいるとします。この場合、Kaggle の人気のあるデータセットを使用します。 284,000 件を超えるトランザクションが含まれており、そのうち不正なものは 1% 未満です。

ステップ 1: Google Colab で PySpark をセットアップする
最小限のセットアップで PySpark を実行できるため、これには Google Colab を使用します。

!pip install pyspark

次に、必要なライブラリをインポートし、Spark セッションを開始します。

import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, udf
from pyspark.ml.feature import VectorAssembler, StringIndexer, MinMaxScaler
from pyspark.ml.classification import RandomForestClassifier, GBTClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.linalg import Vectors
import numpy as np
from pyspark.sql.types import FloatType

pyspark セッションを開始します

spark = SparkSession.builder \
    .appName("FraudDetectionImproved") \
    .master("local[*]") \
    .config("spark.executorEnv.PYTHONHASHSEED", "0") \
    .getOrCreate()

ステップ 2: データのロードと準備

data = spark.read.csv('creditcard.csv', header=True, inferSchema=True)
data = data.orderBy("Time")  # Ensure data is sorted by time
data.show(5)
data.describe().show()
# Check for missing values in each column
data.select([sum(col(c).isNull().cast("int")).alias(c) for c in data.columns]).show()

# Prepare the feature columns
feature_columns = data.columns
feature_columns.remove("Class")  # Removing "Class" column as it is our label

# Assemble features into a single vector
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
data = assembler.transform(data)
data.select("features", "Class").show(5)

# Split data into train (60%), test (20%), and unseen (20%)
train_data, temp_data = data.randomSplit([0.6, 0.4], seed=42)
test_data, unseen_data = temp_data.randomSplit([0.5, 0.5], seed=42)

# Print class distribution in each dataset
print("Train Data:")
train_data.groupBy("Class").count().show()

print("Test and parameter optimisation Data:")
test_data.groupBy("Class").count().show()

print("Unseen Data:")
unseen_data.groupBy("Class").count().show()

ステップ 3: モデルの初期化

# Initialize RandomForestClassifier
rf = RandomForestClassifier(labelCol="Class", featuresCol="features", probabilityCol="probability")

# Create ParamGrid for Cross Validation
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [10, 20 ]) \
    .addGrid(rf.maxDepth, [5, 10]) \
    .build()

# Create 5-fold CrossValidator
crossval = CrossValidator(estimator=rf,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(labelCol="Class", metricName="areaUnderROC"),
                          numFolds=5)

ステップ 4: 適合、相互検証を実行し、最適なパラメーターのセットを選択します

# Run cross-validation, and choose the best set of parameters
rf_model = crossval.fit(train_data)

# Make predictions on test data
predictions_rf = rf_model.transform(test_data)

# Evaluate Random Forest Model
binary_evaluator = BinaryClassificationEvaluator(labelCol="Class", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
pr_evaluator = BinaryClassificationEvaluator(labelCol="Class", rawPredictionCol="rawPrediction", metricName="areaUnderPR")

auc_rf = binary_evaluator.evaluate(predictions_rf)
auprc_rf = pr_evaluator.evaluate(predictions_rf)
print(f"Random Forest - AUC: {auc_rf:.4f}, AUPRC: {auprc_rf:.4f}")

# UDF to extract positive probability from probability vector
extract_prob = udf(lambda prob: float(prob[1]), FloatType())
predictions_rf = predictions_rf.withColumn("positive_probability", extract_prob(col("probability")))

ステップ 5 精度、再現率、F1 スコアを計算する関数

# Function to calculate precision, recall, and F1-score
def calculate_metrics(predictions):
    tp = predictions.filter((col("Class") == 1) & (col("prediction") == 1)).count()
    fp = predictions.filter((col("Class") == 0) & (col("prediction") == 1)).count()
    fn = predictions.filter((col("Class") == 1) & (col("prediction") == 0)).count()

    precision = tp / (tp + fp) if (tp + fp) != 0 else 0
    recall = tp / (tp + fn) if (tp + fn) != 0 else 0
    f1_score = (2 * precision * recall) / (precision + recall) if (precision + recall) != 0 else 0

    return precision, recall, f1_score

ステップ 6: モデルの最適なしきい値を見つける

# Find the best threshold for the model
best_threshold = 0.5
best_f1 = 0
for threshold in np.arange(0.1, 0.9, 0.1):
    thresholded_predictions = predictions_rf.withColumn("prediction", (col("positive_probability") > threshold).cast("double"))
    precision, recall, f1 = calculate_metrics(thresholded_predictions)

    if f1 > best_f1:
        best_f1 = f1
        best_threshold = threshold

print(f"Best threshold: {best_threshold}, Best F1-score: {best_f1:.4f}")

ステップ 7: 目に見えないデータを評価する

# Evaluate on unseen data
predictions_unseen = rf_model.transform(unseen_data)
auc_unseen = binary_evaluator.evaluate(predictions_unseen)
print(f"Unseen Data - AUC: {auc_unseen:.4f}")

precision, recall, f1 = calculate_metrics(predictions_unseen)
print(f"Unseen Data - Precision: {precision:.4f}, Recall: {recall:.4f}, F1-score: {f1:.4f}")

area_under_roc = binary_evaluator.evaluate(predictions_unseen)
area_under_pr = pr_evaluator.evaluate(predictions_unseen)
print(f"Unseen Data - AUC: {area_under_roc:.4f}, AUPRC: {area_under_pr:.4f}")

結果

Best threshold: 0.30000000000000004, Best F1-score: 0.9062
Unseen Data - AUC: 0.9384
Unseen Data - Precision: 0.9655, Recall: 0.7568, F1-score: 0.8485
Unseen Data - AUC: 0.9423, AUPRC: 0.8618

このモデル (数 KB) を保存し、pyspark パイプラインのどこでも使用できます

rf_model.save()

現実世界の機械学習タスクで大規模なデータセットを扱うときに PySpark が大きな違いを生む理由は次のとおりです:

拡張が簡単: PySpark はクラスター全体にタスクを分散できるため、メモリ不足になることなくテラバイトのデータを処理できます。
オンザフライのデータ処理: PySpark はデータセット全体をメモリにロードする必要はありません。必要に応じてデータを処理するため、効率が大幅に向上します。
モデル トレーニングの高速化: 分散コンピューティングを使用すると、計算ワークロードを複数のマシンに分散することで、モデルをより速くトレーニングできます。
最終的な感想
PyTorch と TensorFlow は機械学習モデルを構築するための素晴らしいツールですが、現実世界の大規模なタスクにはさらに多くのツールが必要です。 PySpark を使用した分散コンピューティングにより、巨大なデータセットを効率的に処理し、リアルタイムでデータを処理し、機械学習パイプラインを拡張することができます。

そのため、次回不正検出、推奨システム、財務分析などの大規模なデータを扱う場合は、PySpark を使用してプロジェクトを次のレベルに引き上げることを検討してください。

完全なコードと結果については、このノートブックを参照してください。 :
https://colab.research.google.com/drive/1W9naxNZirirLRodSEnHAUWevYd5LH8D4?authuser=5#scrollTo=odmodmqKcY23

__

I am Swapnil, feel free leave your comments Results and ideas, or ping me - swapnil@nooffice.no for Data, Software dev gigs and jobs

以上が現実世界の機械学習に分散コンピューティングが必要な理由の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。