Rumah  >  Artikel  >  Java  >  Cara menggunakan Java thread pool Executor

Cara menggunakan Java thread pool Executor

王林
王林ke hadapan
2023-04-28 10:01:061841semak imbas

    Rajah Kelas Kolam Benang

    Cara menggunakan Java thread pool Executor

    Pelaksanaan Pelaksana kami yang paling biasa digunakan untuk mencipta kumpulan benang dan menggunakan benang adalah terutamanya untuk gunakan di atas Kelas-kelas yang disediakan dalam rajah kelas. Gambar rajah kelas di atas termasuk rangka kerja Pelaksana, iaitu rangka kerja yang menjadualkan, melaksanakan dan mengawal tugas tak segerak berdasarkan set panggilan strategi pelaksanaan Tujuannya adalah untuk menyediakan mekanisme yang memisahkan penyerahan tugas daripada cara tugasan dijalankan. Ia mengandungi tiga antara muka pelaksana:

    • Pelaksana: antara muka mudah untuk menjalankan tugas baharu

    • Perkhidmatan Pelaksana: memanjangkan Pelaksana dan menambah Kaedah untuk mengurus hayat pelaksana kitaran dan kitaran hayat tugasan

    • ScheduleExcutorService: melanjutkan ExecutorService untuk menyokong Masa Depan dan tugasan yang dilaksanakan secara kerap

    Faedah Thread pool

    - Ia boleh mengawal bilangan maksimum urutan serentak, meningkatkan penggunaan sumber sistem dan mengelakkan persaingan dan penyekatan sumber yang berlebihan. Apabila tugasan tiba, tugasan itu boleh dilaksanakan serta-merta tanpa menunggu penciptaan utas
  • Tingkatkan kebolehurusan utas
  • -Sediakan pelaksanaan berjadual, pelaksanaan berkala, utas tunggal, Concurrency kawalan dan fungsi lain.
  • Kelemahan Thread baharu

  • Setiap kali Thread baharu mencipta objek baharu, prestasinya kurang baik

  • Thread Kekurangan pengurusan bersatu boleh mencipta rangkaian baharu tanpa had, bersaing antara satu sama lain dan mungkin menduduki terlalu banyak sumber sistem, yang membawa kepada ranap atau OOM (kehabisan memori memori Punca masalah ini bukan semata-mata a Thread baru, tetapi mungkin disebabkan oleh Ia disebabkan oleh pepijat program atau kecacatan reka bentuk yang menyebabkan Thread baharu berterusan.

      Tiada lebih banyak ciri seperti lebih banyak pelaksanaan, pelaksanaan berkala, gangguan rangkaian.
    • Kelas teras kolam benang-ThreadPoolExecutor
    • Perihalan parameter: ThreadPoolExecutor mempunyai sejumlah tujuh parameter ini berfungsi bersama-sama untuk membentuk fungsi berkuasa kumpulan benang.

    • corePoolSize

      : Bilangan utas teras

    maximumPoolSize

    : Bilangan maksimum thread

    workQueue

    : menyekat Baris Gilir, yang menyimpan tugasan yang menunggu untuk dilaksanakan, adalah sangat penting dan akan memberi kesan yang ketara pada proses menjalankan kumpulan benangApabila kami menyerahkan tugasan baharu kepada kumpulan benang, kumpulan benang akan menjadi berdasarkan bilangan utas yang berjalan dalam kumpulan semasa. Tentukan cara tugasan akan dikendalikan. Terdapat tiga kaedah pemprosesan:

    1 Pensuisan langsung (SynchronusQueue) 2 Bilangan maksimum utas yang boleh dibuat oleh baris gilir tanpa had (LinkedBlockingQueue) ialah corePoolSize. maximumPoolSize tidak akan berfungsi. Apabila semua utas teras dalam kumpulan utas sedang berjalan, penyerahan tugas baharu akan diletakkan dalam baris gilir menunggu.

    3. Saiz kumpulan maksimum baris gilir terhad (ArrayBlockingQueue) boleh mengurangkan penggunaan sumber, tetapi kaedah ini menjadikan penjadualan kumpulan benang lebih sukar. Kerana kumpulan benang dan kapasiti baris gilir adalah terhad. Oleh itu, jika kami mahu kadar daya pemprosesan kumpulan benang dan tugasan pemprosesan mencapai julat yang munasabah, dan jika kami ingin menjadikan penjadualan benang kami agak mudah dan mengurangkan penggunaan sumber sebanyak mungkin, kami perlu mengehadkan kedua-dua teknik peruntukan kuantiti ini dengan munasabah. : [ Jika anda ingin mengurangkan penggunaan sumber, termasuk mengurangkan penggunaan CPU, penggunaan sumber sistem pengendalian, overhed penukaran konteks, dsb., anda boleh menetapkan kapasiti baris gilir yang lebih besar dan kapasiti kumpulan benang yang lebih kecil, yang akan mengurangkan daya pemprosesan kumpulan benang . Jika tugasan yang kami serahkan sering disekat, kami boleh melaraskan maksimumPoolSize. Jika kapasiti baris gilir kami kecil, kami perlu menetapkan saiz kolam benang lebih besar, supaya penggunaan CPU akan lebih tinggi. Walau bagaimanapun, jika kapasiti kumpulan benang ditetapkan terlalu besar dan bilangan tugasan ditambah terlalu banyak, jumlah konkurensi akan meningkat, jadi penjadualan antara utas adalah isu yang perlu dipertimbangkan. Ini sebaliknya boleh mengurangkan daya pemprosesan tugasan pemprosesan. ]

    keepAliveTime

    : Masa maksimum untuk urutan disimpan sehingga ia ditamatkan apabila tiada pelaksanaan tugas (apabila bilangan utas dalam utas lebih besar daripada corePoolSize, jika tiada yang baharu tugasan diserahkan kepada utas di luar utas teras pada masa ini tidak akan dimusnahkan serta-merta, tetapi tunggu sehingga keepAliveTime melebihi)

    unit

    : unit masa keepAliveTime

    threadFactory

    : thread factory, used Create a thread, terdapat kilang lalai untuk mencipta thread, supaya thread yang baru dibuat mempunyai keutamaan yang sama, adalah thread non-daemon, dan mempunyai nama yang ditetapkan)

    rejectHandler

    : apabila Strategi pemprosesan penolakan semasa tugasan (baris gilir menyekat penuh) (Dasar lalai AbortPolicy membuang pengecualian secara langsung, CallerRunsPolicy menggunakan urutan pemanggil untuk melaksanakan tugasan, DiscardOldestPolicy membuang tugas teratas dalam baris gilir dan melaksanakan tugas semasa, DiscardPolicy terus membuang tugas semasa)

    Cara menggunakan Java thread pool Executor

    Hubungan antara corePoolSize, maximumPoolSize dan workQueue: Jika bilangan utas berjalan kurang daripada corePoolSize, buat urutan baharu secara langsung untuk mengendalikan tugas. Walaupun benang lain dalam kumpulan benang terbiar. Jika bilangan utas berjalan lebih besar daripada corePoolSize dan kurang daripada maximumPoolSize, maka utas baharu akan dibuat untuk memproses tugas hanya apabila workQueue penuh. Jika corePoolSize dan maximumPoolSize adalah sama, saiz kumpulan benang yang dibuat adalah tetap. Pada masa ini, tugasan baharu diserahkan dan apabila workQueue tidak penuh, permintaan diletakkan dalam workQueue. Tunggu benang kosong untuk mengalih keluar tugas daripada workQueue. Jika workQueue juga penuh pada masa ini, maka gunakan parameter dasar penolakan tambahan untuk melaksanakan dasar penolakan.

    Kaedah permulaan: terdiri daripada tujuh parameter kepada empat kaedah permulaan

    Cara menggunakan Java thread pool Executor

    Kaedah lain:

    execute();	//提交任务,交给线程池执行	
    submit();//提交任务,能够返回执行结果 execute+Future
    shutdown();//关闭线程池,等待任务都执行完
    shutdownNow();//关闭线程池,不等待任务执行完
    getTaskCount();//线程池已执行和未执行的任务总数
    getCompleteTaskCount();//已完成的任务数量
    getPoolSize();//线程池当前的线程数量
    getActiveCount();//当前线程池中正在执行任务的线程数量

    Kitaran hayat kumpulan benang:

    Cara menggunakan Java thread pool Executor

    • berjalan: boleh menerima tugasan yang baru diserahkan dan juga boleh memproses tugasan dalam baris gilir menyekat

    • tutup : Tidak dapat memproses tugasan baharu, tetapi boleh terus memproses tugasan dalam baris gilir yang disekat

    • berhenti: Tidak boleh menerima tugas baharu dan tidak boleh memproses tugas dalam baris gilir

    • kemasan: Jika semua tugas telah ditamatkan, bilangan utas berkesan ialah 0

    • ditamatkan: keadaan akhir

    Gunakan Pelaksana untuk mencipta utas pools

    Menggunakan Executors, anda boleh mencipta empat pool pool: sepadan dengan kaedah permulaan empat thread pool yang disebutkan di atas

    Executors.newCachedThreadPool

    newCachedThreadPool ialah Buat kumpulan thread untuk utas baharu mengikut keperluan apabila tugasan diserahkan, corePoolSize ialah 0 dan tiada utas teras dibuat ialah baris gilir yang tidak menyimpan elemen dibuat. Benang bukan teras akan dikitar semula apabila melahu selama 60 saat. Oleh kerana Integer.MAX_VALUE adalah sangat besar, ia boleh dianggap bahawa urutan boleh dibuat tanpa had, yang boleh menyebabkan pengecualian OOM dengan mudah apabila sumber terhad.

    //创建newCachedThreadPool线程池源码
    public static ExecutorService newCachedThreadPool() {
    		/**
            *corePoolSize: 0,核心线程池的数量为0
    		*maximumPoolSize:  Integer.MAX_VALUE,可以认为最大线程数是无限的
    		*keepAliveTime: 60L
    		*unit: 秒
    		*workQueue: SynchronousQueue
            **/
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }

    Kes penggunaan:

    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        for (int i = 0; i < 10; i++) {
            final int index = i;
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    log.info("task:{}",index);
                }
            });
        }
    }

    Perlu diperhatikan bahawa nilai pulangan newCachedThreadPool ialah jenis ExecutorService, yang hanya mengandungi kaedah kumpulan benang asas, tetapi tidak termasuk kaedah berkaitan pemantauan benang , keadaan tertentu mesti dipertimbangkan semasa membuat utas baharu menggunakan jenis kumpulan benang dengan nilai pulangan ExecutorService.

    Cara menggunakan Java thread pool Executor

    Executors.newSingleThreadExecutor

    newSingleThreadExecutor ialah kumpulan benang satu benang dengan hanya satu utas teras dan satu utas kongsi untuk melaksanakan tugasan bagi memastikan semua tugasan dilaksanakan seperti yang ditentukan Pelaksanaan berurutan (FIFO, keutamaan...)

    //newSingleThreadExecutor创建线程池源码
    public static ExecutorService newSingleThreadExecutor() {
        /**
          *  corePoolSize : 1,核心线程池的数量为1
    
          *  maximumPoolSize : 1,只可以创建一个非核心线程
    
          *  keepAliveTime : 0L
    
          *  unit => 秒
    
          *  workQueue => LinkedBlockingQueue
          **/
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }

    Apabila tugasan diserahkan, utas teras akan dibuat untuk melaksanakan tugasan tersebut Jika bilangan utas teras melebihi, ia akan dimasukkan ke dalam baris gilir kerana LinkedBlockingQueue ialah baris gilir dengan panjang Integer.MAX_VALUE boleh dianggap sebagai baris gilir tidak terhad, jadi bilangan tugas yang tidak terhingga boleh dimasukkan ke dalam baris gilir, yang boleh menyebabkan pengecualian OOM dengan mudah apabila sumber terhad pada masa yang sama, kerana baris gilir tidak terhad, parameter maximumPoolSize dan keepAliveTime akan menjadi tidak sah dan tidak akan berfungsi sama sekali.

    Executors.newFixedThreadPool

    Kumpulan benang panjang tetap, bilangan utas teras dan bilangan maksimum utas dihantar oleh pengguna Anda boleh menetapkan bilangan utas serentak maksimum bilangan melebihi, tunggu dalam baris gilir

    //newFixedThreadPool创建线程池源码
    public static ExecutorService newFixedThreadPool(int nThreads) {
        	/**
              *  corePoolSize : 核心线程的数量为自定义输入nThreads
    
              *  maximumPoolSize : 最大线程的数量为自定义输入nThreads
    
              *  keepAliveTime : 0L
    
              *  unit : 秒
    
              *  workQueue : LinkedBlockingQueue
              **/
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }

    newFixedThreadPool adalah serupa dengan SingleThreadExecutor Satu-satunya perbezaan ialah bilangan utas teras adalah berbeza dan kerana LinkedBlockingQueue digunakan, ia mudah menyebabkan pengecualian OOM apabila sumber terhad. .

    Executors.newScheduledThreadPool

    Kolam benang panjang tetap, bilangan utas teras dihantar masuk oleh pengguna, menyokong pelaksanaan tugas berjadual dan berkala

    //newScheduledThreadPool创建线程池源码
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
            return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        /**
          *  corePoolSize : 核心线程的数量为自定义输入corePoolSize
    
          *  maximumPoolSize : 最大线程的数量为Integer.MAX_VALUE
    
          *  keepAliveTime : 0L
    
          *  unit : 纳秒
    
          *  workQueue : DelayedWorkQueue
          **/
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

    Apabila tugasan diserahkan , corePoolSize adalah sendiri- Tentukan input, utas teras dicipta dahulu, selepas utas teras penuh, jadi akhirnya utas bukan teras dicipta untuk melaksanakan tugas. Benang bukan teras akan dikitar semula selepas digunakan. Oleh kerana Integer.MAX_VALUE adalah sangat besar, ia boleh dianggap bahawa urutan boleh dibuat tanpa had, yang boleh menyebabkan pengecualian OOM dengan mudah apabila sumber terhad. Kerana DelayedWorkQueue yang digunakan boleh melaksanakan tugas berjadual dan berkala. ScheduledExecutorService menyediakan tiga kaedah yang boleh digunakan:

    Cara menggunakan Java thread pool Executor

    jadual: Laksanakan tugas selepas jadual kelewatanAtFixedRate: Laksanakan tugas pada jadual kadar yang ditentukanWithFixedDelay: Laksanakan tugas dengan kelewatan yang ditentukan . Kes penggunaan:

        public static void main(String[] args) {
    
            ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
    
    //        executorService.schedule(new Runnable() {
    //            @Override
    //            public void run() {
    //                log.warn("schedule run");
    //            }
    //         //延迟3秒后执行
    //        }, 3, TimeUnit.SECONDS);
            //        executorService.shutdown();
    
    //        executorService.scheduleWithFixedDelay(new Runnable() {
    //            @Override
    //            public void run() {
    //                log.warn("scheduleWithFixedDelay run");
    //            }
    //            //延迟一秒后每隔3秒执行
    //        }, 1, 3, TimeUnit.SECONDS);
            
            executorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    log.warn("schedule run");
                }
                //延迟一秒后每隔3秒执行
            }, 1, 3, TimeUnit.SECONDS);
    
            /**
             * 定时器调度,不推荐使用,推荐ScheduledExecutorService调度
             */
    //        Timer timer = new Timer();
    //        timer.schedule(new TimerTask() {
    //            @Override
    //            public void run() {
    //                log.warn("timer run");
    //            }
    //        //从当前时间每隔5秒执行
    //        }, new Date(), 5 * 1000);
        }

    Ringkasan

    • Panjang gilir permintaan yang dibenarkan bagi FixedThreadPool dan SingleThreadExecutor ialah Integer.MAX_VALUE, yang mungkin mengumpul sejumlah besar permintaan, menyebabkan pengecualian OOM

    • Bilangan urutan yang dibenarkan untuk dibuat oleh CachedThreadPool dan newScheduledThreadPool ialah Integer.MAX_VALUE, yang mungkin mencipta sejumlah besar urutan, menyebabkan pengecualian OOM

    Inilah sebabnya mengapa penggunaannya dilarang Sebab Pelaksana mencipta kumpulan benang dan bukannya mengesyorkan membuat sendiri ThreadPoolExecutor

    Cara menentukan parameter kumpulan benang

    Intensif CPU: Saiz kumpulan benang disyorkan sebagai bilangan CPU + 1, bilangan CPU boleh diperolehi mengikut kaedah Runtime.availableProcessors IO-intensif: Bilangan CPU * Penggunaan CPU * (1 + masa menunggu benang / masa CPU benang) Bercampur: akan Tugas dibahagikan kepada intensif CPU dan intensif IO, dan kemudian kumpulan benang berbeza digunakan untuk memprosesnya, supaya setiap kumpulan benang boleh dilaraskan mengikut kepada beban kerjanya sendiri Baris gilir menyekat: Adalah disyorkan untuk menggunakan baris gilir terhad membantu mengelakkan keletihan sumber Dasar penolakan: Dasar lalai ialah AbortPolicy, yang membuang RejectedExecutionException secara langsung dalam program [kerana ia adalah pengecualian runtime, jadi tiada tangkapan diperlukan]. Strategi berikut disyorkan untuk mengendalikan penolakan:

    • Tangkap pengecualian RejectedExecutionException dalam program dan proses tugasan dalam pengecualian yang ditangkap. Gunakan dasar penolakan CallerRunsPolicy untuk dasar penolakan lalai

    • Dasar ini akan menyerahkan tugas kepada urutan yang memanggil laksanakan [biasanya utas utama]. tidak akan dapat melaksanakan untuk tempoh masa Serahkan apa-apa tugas, menyebabkan urutan pekerja mengendalikan tugas pelaksanaan. Urutan yang diserahkan pada masa ini akan disimpan dalam baris gilir TCP Barisan gilir penuh TCP akan menjejaskan pelanggan, yang merupakan penurunan prestasi yang lembut

    • Dasar penolakan tersuai, hanya perlu dilaksanakan. Antara muka RejectedExecutionHandler boleh digunakan

    • Jika tugasan tidak begitu penting, anda juga boleh menggunakan dasar penolakan DiscardPolicy dan DiscardOldestPolicy untuk membuang tugasan

    Jika menggunakan Executors Kaedah statik mencipta objek ThreadPoolExecutor Anda boleh menggunakan Semaphore untuk mengehadkan pelaksanaan tugas dan mengelakkan pengecualian OOM

    Atas ialah kandungan terperinci Cara menggunakan Java thread pool Executor. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

    Kenyataan:
    Artikel ini dikembalikan pada:yisu.com. Jika ada pelanggaran, sila hubungi admin@php.cn Padam