ホームページ  >  記事  >  バックエンド開発  >  データ処理のパフォーマンスを向上させる方法 - 同時実行性を導入しますが、同期を回避します

データ処理のパフォーマンスを向上させる方法 - 同時実行性を導入しますが、同期を回避します

WBOY
WBOYオリジナル
2016-06-20 12:35:59826ブラウズ

バックグラウンド

データベースが存在する限り、データテーブルのバックアップ、定期的なクリーニング、データの置換、データのバックグラウンドバッチ処理が必要になります。バッチ処理の場合、多くの場合、大量のクエリ、フィルタリング、分類、集計計算が必要となるため、バッチ スクリプトでデータベースに直接クエリを実行すると、パフォーマンスが低下し、データベース ロック テーブルでオンライン事故が発生する可能性もあります。したがって、通常は最初にファイルにエクスポートし、そのファイルで計算してからインポートします。例:

1. mysql -e "select * from table" > を使用します。 ; Output.txt は SQL を実行し、結果をファイルにエクスポートします。

2. ファイルの場合は、さまざまな方法を使用して集計、フィルタリング、置換などの計算を実行し、最終的に必要な形式に出力します。 use;

3. 出力ファイルを公開するか、load を使用します。

これはデータベースの 1 回限りのバッチ クエリであるため、ファイルにエクスポートされ、そのファイルに対して計算が実行されるため、毎回データベースにクエリを実行する代わりに、大量のネットワーク IO 消費が節約され、処理速度が向上します。

ただし、エクスポートされたファイルを取得した後、ファイルが大きすぎる場合、または CPU を大量に消費する定期的なマッチングや集計計算などの計算ロジックが複雑な場合、シングルスレッド処理では同時処理を導入すると、マシンの CPU、メモリ、IO、ネットワーク、その他のリソースを最大限に活用でき、処理時間を大幅に短縮できます。

マルチスレッドを導入し、入力ファイルを複数のファイルに分割し、小さなファイルごとに処理スレッドを開始します。

HADOOP の MAP-REDUCE メソッドは、最初にファイルを小さな断片化されたファイルに分割します。その後、各フラグメントの計算を実行し、最後に各フラグメントの結果をまとめます。ただし、HADOOP のスケジューリングやクラスターの安定性などのさまざまな理由により、MB サイズのファイルの処理速度はさらに遅くなる場合があります。単一マシンのシングルスレッドの処理速度よりも、単一マシンのシングルスレッドをマルチスレッドに変更すると、パフォーマンスが驚くほど向上することがよくあります。

直感的なアプローチは、メインスレッドを使用して入力された単一の大きなファイルを読み取り、その読み取り結果を処理のためにサブスレッドに割り当て、その後メインスレッドが統合を行うことです。この方法は複数のファイルを統合するためです。スレッドは単一ファイルを共有します。 IO の場合、ファイルの同期メカニズムを追加する必要があります。最終的に、パフォーマンスのボトルネックはこの単一ファイルの読み取り同期にあります。

大きなファイルは小さなファイルに断片化され、各ファイルは独立した処理のために単一のスレッドに割り当てられ、各スレッドは個別の CPU コア、メモリ ユニット、およびファイル ハンドルを利用します。処理速度は最速に達します。

この方法を使用すると、次の手順を実行できます。

1. SHELL を使用して、入力ファイルを所定の数のスレッドに分割し、ディレクトリに保存します。 >2. プログラミング言語 JAVA/PYTHON は入力ファイルのディレクトリ パスをパラメータとして読み取り、各ファイルの処理スレッドを開始します。

3.ディレクトリ内のすべてのファイルを出力し、cat file* > Output_file を使用して最終計算結果を取得します

シェル

入力ファイルを分割します複数の小さなファイルに分割し、マルチスレッドで処理を開始し、結果ファイルを出力します。

function run multitask(){ # 複数の非同期スレッドを開始します SPLITS COUNT=20 # 入力ファイルの総数 sourcefile namescount=

# 分割ファイルの数を計算します Split filelines count=$(( $sourcefile Linescount / $SPLITS COUNT )) # ファイルを分割します -l $splitfile Linescount -a 3 -d ${入力ファイル} ${input

dir}/inputFile_
cat ${input_file} | wc -l

}
# 执行JAVA程序$JAVA_CMD -classpath $jar_path "net.crazyant.BackTaskMain" "${input_dir}" "${output_dir}" "${output_err_dir}"# 合并文件cat ${output_dir}/* > ${output_file}

マルチタスクの実行

## 将输入文件拆分成多个小文件,启动多线程进行处理,输出结果文件#function run_multi_task(){ # 开启多个异步线程 SPLITS_COUNT=20 # 输入文件总数 source_file_lines_count=`cat ${input_file} | wc -l` # 计算出拆分的文件个数 split_file_lines_count=$(( $source_file_lines_count / $SPLITS_COUNT )) # 进行文件拆分 split -l $split_file_lines_count -a 3 -d ${input_file} ${input_dir}/inputFile_  # 执行JAVA程序 $JAVA_CMD -classpath $jar_path "net.crazyant.BackTaskMain" "${input_dir}" "${output_dir}" "${output_err_dir}"  # 合并文件 cat ${output_dir}/* > ${output_file}} run_multi_task
ここで、ファイルを分割するときは、サイズに応じて分割するには、split を使用します。

に対応する JAVA プログラムは、フォルダー内のファイル リストを読み取ることでファイルごとに別のスレッドを開始します。 🎜>

Java

このようにして、大きなファイルを小さなファイルに分割し、複数のスレッドを開始し、各スレッドが小さなファイルを処理し、最終的に各小さなファイルの結果を集約して最終出力を取得しますが、パフォーマンスは大幅に向上します。
public class BackTaskMain {    public static void main(String[] args) {        String inputDataDir = args[1];        String outputDataDir = args[2];        String errDataDir = args[3];                File inputDir = new File(inputDataDir);        File[] inputFiles = inputDir.listFiles();                // 记录开启的线程        List<Thread> threads = new ArrayList<Thread>();        for (File inputFile : inputFiles) {            if (inputFile.getName().equals(".") || inputFile.getName().equals("..")) {                continue;            }                        // 针对每个inputFile,生成对应的outputFile和errFile            String outputSrcLiceFpath = outputDataDir + "/" + inputFile.getName() + ".out";            String errorOutputFpath = errDataDir + "/" + inputFile.getName() + ".err";                        // 创建Runnable            BackRzInterface backRzInterface = new BackRzInterface();            backRzInterface.setInputFilePath(inputFile.getAbsolutePath());            backRzInterface.setOutputFilePath(outputSrcLiceFpath);            backRzInterface.setErrorOutputFpath(errorOutputFpath);                        // 创建Thread,启动线程            Thread singleRunThread = new Thread(backRzInterface);            threads.add(singleRunThread);            singleRunThread.start();        }                for (Thread thread : threads) {            try {                // 使用thread.join(),等待所有的线程执行完毕                thread.join();                System.out.println(thread.getName() + " has over");            } catch (InterruptedException e) {                e.printStackTrace();            }        }        System.out.println("proccess all over");    }}
public class BackTaskMain {    public static void main(String[] args) {        String inputDataDir = args[1];        String outputDataDir = args[2];        String errDataDir = args[3];                FileinputDir = new File(inputDataDir);        File[] inputFiles = inputDir.listFiles();                // 记录开启的线程        List<Thread> threads = new ArrayList<Thread>();        for (FileinputFile : inputFiles) {            if (inputFile.getName().equals(".") || inputFile.getName().equals("..")) {                continue;            }                        // 针对每个inputFile,生成对应的outputFile和errFile            String outputSrcLiceFpath = outputDataDir + "/" + inputFile.getName() + ".out";            String errorOutputFpath = errDataDir + "/" + inputFile.getName() + ".err";                        // 创建Runnable            BackRzInterfacebackRzInterface = new BackRzInterface();            backRzInterface.setInputFilePath(inputFile.getAbsolutePath());            backRzInterface.setOutputFilePath(outputSrcLiceFpath);            backRzInterface.setErrorOutputFpath(errorOutputFpath);                        // 创建Thread,启动线程            ThreadsingleRunThread = new Thread(backRzInterface);            threads.add(singleRunThread);            singleRunThread.start();        }                for (Threadthread : threads) {            try {                // 使用thread.join(),等待所有的线程执行完毕                thread.join();                System.out.println(thread.getName() + " has over");            } catch (InterruptedException e) {                e.printStackTrace();            }        }        System.out.println("proccess all over");    }}
依存リソースがある場合は、依存リソースがパフォーマンスのボトルネックになるのを防ぐために、最初にスレッドごとにコピー、分割、クローンを作成できます。

上記のコードでは、それぞれのリソースが存在する場合に BackRzInterface が必要です。使用される Runnable オブジェクトについては、毎回 new で作成されることがわかります:

// Create RunnableBackRzInterface backRzInterface = new BackRzInterface();

このようにして、BackRzInterface が作成されます。各処理スレッドは独立しているため、この Runnable コードを使用しても同期の問題は発生しません。

マルチスレッド処理で外部リソースを使用する必要がある場合は、各スレッドが独自の排他的リソースを使用できるようにする方法を見つけるのが最善です。それらの間に競合がない場合、最大の同時処理を実現できます。 。

その他の例:

  • 多线程用到了字典文件,那么方法是首先将字典文件复制多份,每个线程使用自己独占的字典,避免并发同步访问字典;
  • 多线程若需要统一ID发号,可以提前计算出每个输入文件的行数,然后依次生成第一个线程需要的ID范围、第二个线程需要的ID范围,这些不同的ID范围也可以分别生成不同的文件,这样每个线程会使用各自独立的ID资源,避免了多个线程单时刻访问单个ID发号服务,使得发号成为性能瓶颈的可能;
  • 多线程如果依赖相同的Service,如果可以每次new对象就每次new,如果Bean都是在Spring中管理,则将Service加上@Scope(“prototype”),或者将对象每次clone一下得到一个新对象,保证最终每个线程使用自己独占的对象。
  • 尽量使用函数式编程的思想,每个函数都不要产生副作用,不要修改入参,结果只能通过return返回,避免增加代码同步冲突的可能;

通过以上这些类似的方法,每次将可能需要同步访问的共享资源,通过复制、分片等手段得到不同份,每个线程单独访问自己那一份,避免同步访问,最终实现性能最优。

避免同步的终极方法:使用多进程进行实现资源隔离

如果将文件拆分成了多份,依赖的ID、词典等资源也相应提供了多份,但是发现代码中存在无法解决的代码级别同步,该怎么办呢?

相对于想尽办法解决代码中的同步问题来说,多线程和多进程之间的性能差别微乎其微,我们都知道线程会使用进程的资源,所以导致了线程之间存在竞争进程资源,但是对于进程来说,CPU、内存等硬件资源是完全隔离的,这时候将程序运行在多进程而不是多线程,反而能更好的提升性能。

对于一些支持多线程不好的语言,比如PHP,直接用这种多进程计算的方法,速度并不比支持多线程的JAVA、PYTHON语言差:

Shell

要拆分的文件数,也就是要启动的多进程数

SPLITS_COUNT=20

input splitsdir="${input dir}splits" output splitsdir="${output dir}splits"

输入文件行数

source filelines_count=

cat ${input_file} | wc -l

每个文件拆分的行数=总行数除以要拆分的文件个数(也就是对应进程的个数)

split filelines count=$(( $sourcefile linescount / ${SPLITS_COUNT} ))

执行拆分,注意这里使用-l进行行级别拆分更好

split -l $split filelines count -a 3 -d ${inputfile} ${input splitsdir}/inputfile_

process idx=1 for fname in $(ls ${inputsplits dir}); do inputfpath=${input splitsdir}/$fname ouput fpath=${outputsplits dir}/$fname # 后台执行所有进程 php "/php/main.php" "${inputfpath}" "${ouput fpath}" & (( processidx++ )) done

等待所有后台进程执行结束

wait

合并文件

cat $output splitsdir/* > ${output_file}

# 要拆分的文件数,也就是要启动的多进程数SPLITS_COUNT=20 input_splits_dir="${input_dir}_splits"output_splits_dir="${output_dir}_splits"# 输入文件行数source_file_lines_count=`cat ${input_file} | wc -l`# 每个文件拆分的行数=总行数除以要拆分的文件个数(也就是对应进程的个数)split_file_lines_count=$(( $source_file_lines_count / ${SPLITS_COUNT} ))# 执行拆分,注意这里使用-l进行行级别拆分更好split -l $split_file_lines_count -a 3 -d ${input_file} ${input_splits_dir}/inputfile_ process_idx=1for fname in $(ls ${input_splits_dir}); do input_fpath=${input_splits_dir}/$fname ouput_fpath=${output_splits_dir}/$fname # 后台执行所有进程 php "/php/main.php" "${input_fpath}" "${ouput_fpath}" & (( process_idx++ )) done # 等待所有后台进程执行结束wait # 合并文件cat $output_splits_dir/* > ${output_file}

上述代码中,使用shell的&符号,可以在后台同时启动多个进程,使用wait语法,可以实现多线程的Thread.join特性,等待所有的进程执行结束。

总结

对于输入文件的大小、计算的复杂度处于单机和集群计算之间的数据处理,使用并发处理最为合适,但是并发的同步处理却会降低多线程的性能,这时可以借助于输入文件复制拆分、依赖资源复制拆分切片等方法,实现每个线程处理自己的独占资源,从而最大化提升计算速度。而对于一些无法避免的代码同步冲突逻辑,可以退化为多进程处理数据,借助于SHELL的后台进程支持,实现进程级别的资源独占,最终大幅提升处理性能。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。