>  기사  >  Java  >  jdk1.5 스레드 향상에 대한 Java 특별 주제

jdk1.5 스레드 향상에 대한 Java 특별 주제

Y2J
Y2J원래의
2017-04-24 11:55:581667검색

1. 스레드 풀의 개념과 Executors 클래스의 적용

jdk1.5 이후에는 Executors 클래스의 정적 메소드를 사용하여 스레드 풀 객체를 생성할 수 있습니다
/ /이 메소드는 스레드 풀 개체를 생성하고 지정된 매개변수를 통해 스레드 풀에 몇 개의 스레드 개체가 있는지 확인하는 데 사용됩니다. 개체는 ExecutorService 인터페이스를 구현합니다
1.

public static ExecutorService newFixedThreadPool(int nThreads)

//이 메소드는 동적 작업을 기반으로 스레드 개체 수를 생성하는 스레드 풀을 생성합니다. 스레드 풀 개체는 ExecutorService 인터페이스
를 구현합니다.

public static ExecutorService newCachedThreadPool()

//이 메소드는 스레드 풀 객체를 생성합니다. 스레드 풀 객체는 ExecutorService 인터페이스를 구현합니다
3.

public static ExecutorService newSingleThreadExecutor()

4. 또한 Executor의
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) 메소드를 통해 ScheduledExecutorService에 대해 설명합니다. ScheduledExecutorService 인터페이스를 구현하는 객체를 얻습니다.
이 객체의 주요 기능은 동일한 작업을 여러 번 실행할 수 있다는 것입니다.
5 ScheduledExecutorService와 ExecutorService의 관계 Executor는
Executor
|
ExecutorService
|
ScheduledExecutorService
입니다. 구체적인 코드 예시를 살펴보겠습니다

예제 1

package com.xiaogao.thread;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class NewThreadPool {
	public static void main(String[] args) {
		//创建一个线程池,该线程池中含有3个线程对象
		//ExecutorService es = Executors.newFixedThreadPool(3);
		//创建一个具有缓冲线程池的线程
		//ExecutorService es = Executors.newCachedThreadPool();
		//创建一个具有单线程的线程池
		ExecutorService es = Executors.newSingleThreadExecutor();
		for(int i=1; i<11; i++) {
			final int task = i;
			//循环十次,给线程池分配十次任务,并执行
			es.execute(new Runnable() {
				public void run() {
					//执行十次循环,这是每个任务需要做的事,打印信息
					for(int j=1; j<11; j++) {
						try {
							//睡眠20毫秒
							Thread.sleep(20);
						} catch (InterruptedException e) {
							e.printStackTrace();
						}
						System.out.println("第" + task + "号任务执行,第" + j + "次循环");
					}
				}
			});
		}
		System.out.println("十次任务由三个线程执行完毕,此时java虚拟机还没对退出,必须调用shutdown方法");
		es.shutdown();
		try {
			Thread.sleep(200);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		//这个方法非常危险,会杀死当前正在执行任务的线程,暂停正在处理的等待中的任务,正在执行任务的线程被杀死后,会抛出异常
		//es.shutdownNow();
		//第一次执行,隔六秒,之后每一次执行隔2秒
		Executors.newScheduledThreadPool(3).scheduleAtFixedRate(
				new Runnable(){
					@Override
				public void run() {
					System.out.println("bombing!");	
				}},
				6,
				2,
				TimeUnit.SECONDS);
	}
}

2. java1.5 이후 잠금 사용
java1.5 이전에는 멀티스레딩에서 다중 스레드가 공유 데이터를 조작할 때 오류가 발생하지 않도록 하려면 동기화로 수정된 메소드 또는 코드 블록에 해당 코드를 캡슐화합니다.
이를 구현하기 위해 Lock 인터페이스를 사용하는 것을 고려할 수 있습니다. 여섯 가지 주요 메소드는 다음과 같습니다. lock() 및 Unlock() 두 메소드는 잠금을 획득하고 해제하는 데 사용됩니다. ReentrantLock 클래스는 Lock 인터페이스를 구현하며 주로
잠금 객체 생성
인스턴스 2

package com.xiaogao.test;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class TestLock {
	public static void main(String[] args) {
		ShareaDate sd = new ShareaDate();
		//创建并启动4个线程
		new Thread(sd).start();
		new Thread(sd).start();
		new Thread(sd).start();
		new Thread(sd).start();
	}
	//定义一个内部类,该类中封装了共享数据
	static class ShareaDate implements Runnable {
		//创建共享字段
		//重设共享数据
		private int count = 10000;
		//拿到锁对象,用于同步代码
		Lock lock = new ReentrantLock();
		//为了保持操作数据的原子性,原本是加synchronized关键字的,现在可以使用Lock
		public /* synchronized */ int getCount() {
					
			//为了保险起见,万一try代码块中发生了异常,锁无法得到释放,应该将释放锁的代码放在finally中
			try {
				//上锁
				lock.lock();
				count = count - 1;
				return count;
			} finally {
				//释放锁
				lock.unlock();
			}
		}
		@Override
		public void run() {
			while(count > 0 ) {
				System.out.println(getCount());
			}
		}
	}
}

3. 잠금에 대한 심층 연구

ReadWriteLock은 한 쌍의 관련 잠금을 유지 관리합니다. 하나는 읽기 전용 작업용이고 다른 하나는 쓰기 작업용입니다. 작성기가 없는 한 읽기 잠금은 여러 판독기 스레드에 의해 동시에 유지될 수 있습니다. 쓰기 잠금은 배타적입니다.
이 클래스는 두 개의 추상 메소드를 캡슐화합니다.
Lock readLock()
Lock writeLock()
읽기 잠금 및 쓰기 잠금을 얻는 데 사용됩니다.
ReentrantReadWriteLock 이 클래스는 ReadWriteLock 인터페이스를 구현하지만 재정의하지 않았습니다. readLock() 메서드 및 writeLick() 메서드를 살펴보면 이 클래스가 ReentrantReadWriteLock.ReadLock 및 ReentrantReadWriteLock.WriteLock이라는 두 개의 정적 내부 클래스를 캡슐화하고 있음을 알 수 있습니다. of writeLock()
읽기 잠금과 쓰기 잠금의 기능은 무엇이며 일반 잠금과의 차이점은 무엇입니까? 멀티 스레드 프로그램에서는 공유 데이터를 얻어야 하고 때로는 공유 데이터를 수정해야 할 수도 있습니다. 공유된 데이터를 얻을 때 스레드의 침입은 아무런 해를 끼치지 않을 수 있지만, 데이터를 수정하는 경우에는 큰 해를 끼치게 됩니다. 이때 효율성을 높이기 위해 읽기 잠금(read lock)과 쓰기 잠금(write lock)이라는 개념이 구체적으로 등장했습니다. ?
1. 읽기 잠금은 상호 배타적이지 않습니다. 스레드가 읽기 잠금에 들어가고 해당 코드의 실행이 완료되지 않은 경우 다른 스레드가 잠겨 있지 않은 것처럼 실행에 들어갈 수 있습니다.
2. 쓰기 잠금은 상호 배타적입니다.
3. 쓰기 잠금은 상호 배타적입니다. 한 스레드가 들어가면 다른 스레드는 들어갈 수 없습니다.
일반 잠금에 비해 장점은 효과적으로 개선될 수 있다는 것입니다. 성능이 일반적인 잠금인 경우 어떤 작업을 수행하더라도 다른 스레드의 진입을 허용하지 않습니다. 다음 코드는
예제 3
을 보여줍니다.

package com.xiaogao.test;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
 *测试读锁和写锁的区别 
 * 
 */
public class TestLock {
	public static void main(String[] args) {
		ShareaDate sd = new ShareaDate();
		//创建并启动4个线程
		new Thread(sd, "1").start();
		new Thread(sd, "2").start();
		new Thread(sd, "3").start();
		new Thread(sd, "4").start();
	}
	//定义一个内部类,该类中封装了共享数据
	static class ShareaDate implements Runnable {
		//创建共享字段
		//重设共享数据
		private int count = 100;
		//创建读写锁
		ReadWriteLock rel = new ReentrantReadWriteLock();
		//为了保持操作数据的原子性,原本是加synchronized关键字的,现在可以使用Lock
/*		public int getCount() {
			//拿到锁对象,用于同步代码
			
			//拿到读锁,并上锁
			rel.readLock().lock();
			try {
				for(int i=0; i<10; i++) {
					try {
						Thread.sleep(20);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					System.out.println("第" + Thread.currentThread().getName() + "号线程进入执行," + i+"count:" + count);
				}
				return count;
			} finally {
				//释放读锁
				rel.readLock().unlock();
			}
			
		}*/
		
		public void resetCount() {
			//拿到写锁,并上锁
			rel.writeLock().lock();
			//保险起见,在判断一次
			if(count > 0) {
				try {
					count--;
					for(int i=0; i<10; i++) {
						//循环10次
						System.out.println("第" + Thread.currentThread().getName() + "号线程进入执行," + i+"count:" + count);
					}
				} finally {
					//释放锁
					rel.writeLock().unlock();
				}
			}
		}
		@Override
		public void run() {
			while(count > 0 ) {
				//getCount();
				resetCount();
			}
		}
	}
}

从代码运行结果来看,如果用的是读锁,在内部循环时,其他线程可以进入,而用写锁时,无法进入.
什么是锁的降级:重入还允许从写入锁降级为读取锁,其实现方式是:先获取写入锁,然后获取读取锁,最后释放写入锁。但是,从读取锁升级到写入锁是不可能的。貌似应用比较少
这里就不加举例子了
4.Condition 的简单应用
Condition 将 Object 监视器方法(wait、notify 和 notifyAll)分解成截然不同的对象,以便通过将这些对象与任意 Lock 实现组合使用,为每个对象提供多个等待 
set(wait-set)。其中,Lock 替代了 synchronized 方法和语句的使用,Condition 替代了 Object 监视器方法的使用。
咱们可以这么理解 Condition ,它可用于唤醒别的线程和暂停本线程.在原来的 synchronized 修饰的语句块中,我们是通过调用锁的wait()方法和notify()来暂停和唤醒线程,同理
Condition 必须基于线程同步代码块的互斥才行,也就是必须存在于Lock.lock()与Lock.unlock()之中,这样的话我们也可以更好的理解为什么读锁为什么无法拿到 Condition 对象了
加读锁后,线程之间并不互斥,就像没有加 synchronized 一样,自然没有资格使用wait()方法和notify()一样
synchronized 和 Condition 的区别
相同点:都是使用在并发编程当中,并且必须基于一把锁对象
不同点:当有>=3个的线程的话,用notify()方法没有办法指定唤醒哪一个线程,完全是随机的,就算是你使用notifyall()唤醒全部线程也没有办法保证你想要的线程得到锁,但是 Condition就不一样,你可以指定唤醒某个线程,并将锁扔给他.
让我们现在来看一道面试题有三个线程,分别执行三段代码,主线程执行完后,执行子1号线程,之后子2号线程,之后再主线程,这样来回若干次
分析:如果我们不使用 Condition 这个新技术,会很麻烦的,因为我们唤醒其他线程的时候不知道下一个是哪个线程会被唤醒并得到执行,而如果使用 Condition 可以很好的解决这个问题
见具体代码
实例4

package com.xiaogao.test;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ThreeConditionCommunication {
	/**
	 * @param args
	 */
	public static void main(String[] args) {
		final Business business = new Business();
		//创建3个线程对象并启动他们,每个线程执行的任务不同
		new Thread(
				new Runnable() {
					
					@Override
					public void run() {
					
						for(int i=1;i<=50;i++){
							business.sub2(i);
						}
					}
				}
		).start();
		new Thread(
				new Runnable() {
					@Override
					public void run() {
					
						for(int i=1;i<=50;i++){
							business.sub3(i);
						}
					}
				}
		).start();		
		for(int i=1;i<=50;i++){
			business.main(i);
		}
	}
	static class Business {
		    //创建锁对象
			Lock lock = new ReentrantLock();
			//得到3个Condition对象
			Condition condition1 = lock.newCondition();
			Condition condition2 = lock.newCondition();
			Condition condition3 = lock.newCondition();
		  private int shouldSub = 1;
		  public  void sub2(int i){
			  //上锁
			  lock.lock();
			  try{
				  while(shouldSub != 2){
					  try {
						  //正在执行的线程等待,condition2记住了当前线程,后面就可以通过condition2.signal()唤醒该线程;
						condition2.await();
						System.out.println("第二号子线程2222222222");
					} catch (Exception e) {
						e.printStackTrace();
					}
				  }
					for(int j=1;j<=10;j++){
						System.out.println("sub2 thread sequence of " + j + ",loop of " + i);
					}
				  shouldSub = 3;
				  //通过condition3唤醒指定的线程
				  condition3.signal();
			  }finally{
				  lock.unlock();
			  }
		  }
		  public  void sub3(int i){
			  lock.lock();
			  try{
				  while(shouldSub != 3){
					  try {
						condition3.await();
						System.out.println("第三号子线程33333333333333");
					} catch (Exception e) {
						e.printStackTrace();
					}
				  }
					for(int j=1;j<=20;j++){
						System.out.println("sub3 thread sequence of " + j + ",loop of " + i);
					}
				  shouldSub = 1;
				  condition1.signal();
			  }finally{
				  //解锁,释放权限
				  lock.unlock();
			  }
		  }		  
		  public  void main(int i){
			  lock.lock();
			  try{
				 while(shouldSub != 1){
				  		try {
							condition1.await();
							System.out.println("主线程1111111111111111111");
						} catch (Exception e) {
							e.printStackTrace();
						}
				  }
				for(int j=1;j<=100;j++){
					System.out.println("main thread sequence of " + j + ",loop of " + i);
				}
				shouldSub = 2;
				condition2.signal();
		  }finally{
			  lock.unlock();
		}
	  }
        }
    }

我在每个condition.await();后面加了一句打印代码,就是为了确认某个condition调用signal()后,是不是指定的线程得到权限,并执行,从结果来看,显然如此
下面介绍几个多线程并发的工具类,没什么难度,记得在哪用就可以了
5.Semaphore 的使用
一个计数信号量。从概念上讲,信号量维护了一个许可集。如有必要,在许可可用前会阻塞每一个 acquire(),然后再获取该许可。每个 release() 添加一个许可,
从而可能释放一个正在阻塞的获取者。但是,不使用实际的许可对象,Semaphore 只对可用许可的号码进行计数,并采取相应的行动。 
Semaphore 通常用于限制可以访问某些资源(物理或逻辑的)的线程数目。例如,下面的类使用信号量控制对内容池的访问:
实例5

package com.xiaogao.test;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class SemaphoreTest {
	public static void main(String[] args) {
		ExecutorService service = Executors.newCachedThreadPool();
		//创建一个Semaphore对象,并指定其权限个数
		final  Semaphore sp = new Semaphore(3);
		//循环10次
		for(int i=0;i<10;i++){
			Runnable runnable = new Runnable(){
					public void run(){
					try {
						//取得一个许可权限,当权限用完后,线程将阻塞,等待其他线程释放权限
						sp.acquire();
					} catch (InterruptedException e1) {
						e1.printStackTrace();
					}
					System.out.println("线程" + Thread.currentThread().getName() + 
							"进入,当前已有" + (3-sp.availablePermits()) + "个并发");
					try {
						Thread.sleep(3000);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					System.out.println("线程" + Thread.currentThread().getName() + 
							"即将离开");					
					sp.release();
					//下面代码有时候执行不准确,因为其没有和上面的代码合成原子单元
					System.out.println("线程" + Thread.currentThread().getName() + 
							"已离开,当前已有" + (3-sp.availablePermits()) + "个并发");					
				}
			};
			service.execute(runnable);			
		}
	}
}

6.CyclicBarrier
一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,
此时 CyclicBarrier 很有用。因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier
实例5

package com.xiaogao.test;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CyclicBarrierTest {
	public static void main(String[] args) {
		ExecutorService service = Executors.newCachedThreadPool();
		final  CyclicBarrier cb = new CyclicBarrier(3);
		for(int i=0;i<3;i++){
			Runnable runnable = new Runnable(){
					public void run(){
					try {
						Thread.sleep((long)(Math.random()*10000));	
						System.out.println("线程" + Thread.currentThread().getName() + 
								"即将到达集合地点1,当前已有" + (cb.getNumberWaiting()+1) + "个已经到达," + (cb.getNumberWaiting()==2?"都到齐了,继续走啊":"正在等候"));						
						cb.await();
						
						Thread.sleep((long)(Math.random()*10000));	
						System.out.println("线程" + Thread.currentThread().getName() + 
								"即将到达集合地点2,当前已有" + (cb.getNumberWaiting()+1) + "个已经到达," + (cb.getNumberWaiting()==2?"都到齐了,继续走啊":"正在等候"));
						cb.await();	
						Thread.sleep((long)(Math.random()*10000));	
						System.out.println("线程" + Thread.currentThread().getName() + 
								"即将到达集合地点3,当前已有" + (cb.getNumberWaiting() + 1) + "个已经到达," + (cb.getNumberWaiting()==2?"都到齐了,继续走啊":"正在等候"));						
						cb.await();						
					} catch (Exception e) {
						e.printStackTrace();
					}				
				}
			};
			service.execute(runnable);
		}
		service.shutdown();
	}
}

7.CountDownLatch
一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。 用给定的计数 初始化 CountDownLatch。由于调用了 countDown() 方法,
所以在当前计数到达零之前,await 方法会一直受阻塞。之后,会释放所有等待的线程,await 的所有后续调用都将立即返回。这种现象只出现一次——计数无法被重置。
如果需要重置计数,请考虑使用 CyclicBarrier。 
CountDownLatch 与 CyclicBarrier 的区别还有 CyclicBarrier 调用await()方法后,它本身的数值会减1,而 CountDownLatch 必须调用countDown()方法才能减1
实例6

package com.xiaogao.test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CountdownLatchTest {

	public static void main(String[] args) {
		ExecutorService service = Executors.newCachedThreadPool();
		final CountDownLatch cdOrder = new CountDownLatch(1);
		final CountDownLatch cdAnswer = new CountDownLatch(3);		
		for(int i=0;i<3;i++){
			Runnable runnable = new Runnable(){
					public void run(){
					try {
						System.out.println("线程" + Thread.currentThread().getName() + 
								"正准备接受命令");						
						cdOrder.await();
						System.out.println("线程" + Thread.currentThread().getName() + 
						"已接受命令");								
						Thread.sleep((long)(Math.random()*10000));	
						System.out.println("线程" + Thread.currentThread().getName() + 
								"回应命令处理结果");						
						cdAnswer.countDown();						
					} catch (Exception e) {
						e.printStackTrace();
					}				
				}
			};
			service.execute(runnable);
		}		
		try {
			Thread.sleep((long)(Math.random()*10000));
		
			System.out.println("线程" + Thread.currentThread().getName() + 
					"即将发布命令");						
			cdOrder.countDown();
			System.out.println("线程" + Thread.currentThread().getName() + 
			"已发送命令,正在等待结果");	
			cdAnswer.await();
			System.out.println("线程" + Thread.currentThread().getName() + 
			"已收到所有响应结果");	
		} catch (Exception e) {
			e.printStackTrace();
		}				
		service.shutdown();

	}
}

8.Exchanger
可以在对中对元素进行配对和交换的线程的同步点。每个线程将条目上的某个方法呈现给 exchange 方法,与伙伴线程进行匹配,并且在返回时接收其伙伴的对象。Exchanger
可能被视为 SynchronousQueue 的双向形式。Exchanger 可能在应用程序(比如遗传算法和管道设计)中很有用。
实例7

package com.xiaogao.test;
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ExchangerTest {
	public static void main(String[] args) {
		ExecutorService service = Executors.newCachedThreadPool();
		final Exchanger<String> exchanger = new Exchanger<String>();
		service.execute(new Runnable(){
			public void run() {
				try {				
					String data1 = "zxx";
					System.out.println("线程" + Thread.currentThread().getName() + 
					"正在把数据" + data1 +"换出去");
					Thread.sleep((long)(Math.random()*10000));
					String data2 = (String)exchanger.exchange(data1);
					System.out.println("线程" + Thread.currentThread().getName() + 
					"换回的数据为" + data2);
				}catch(Exception e){
				}
			}	
		});
		service.execute(new Runnable(){
			public void run() {
				try {				
					String data1 = "lhm";
					System.out.println("线程" + Thread.currentThread().getName() + 
					"正在把数据" + data1 +"换出去");
					Thread.sleep((long)(Math.random()*10000));					
					String data2 = (String)exchanger.exchange(data1);
					System.out.println("线程" + Thread.currentThread().getName() + 
					"换回的数据为" + data2);
				}catch(Exception e){
					
				}				
			}	
		});
		service.shutdown();
	}
}

9.下面介绍一下线程阻塞队列的类和接口
接口:BlockingQueue1a4db2c2c2313771e5742b6debf617a1
支持两个附加操作的 Queue,这两个操作是:获取元素时等待队列变为非空,以及存储元素时等待空间变得可用。 
BlockingQueue 方法以四种形式出现,对于不能立即满足但可能在将来某一时刻可以满足的操作,这四种形式的
处理方式不同:第一种是抛出一个异常,第二种是返回一个特殊值(null 或 false,具体取决于操作),
第三种是在操作可以成功前,无限期地阻塞当前线程,第四种是在放弃前只在给定的最大时间限制内阻塞。
下表中总结了这些方法: 
     抛出异常      特殊值      阻塞     超时 
插入 add(e)       offer(e)  put(e)  offer(e, time, unit) 
移除 remove()    poll()      take()    poll(time, unit) 
检查 element()   peek()  不可用   不可用 

实现了这个接口的类有 ArrayBlockingQueue1a4db2c2c2313771e5742b6debf617a1
一个由数组支持的有界阻塞队列。此队列按 FIFO(先进先出)原则对元素进行排序。队列的头部 是在队列中存在时间最长的元素。
队列的尾部 是在队列中存在时间最短的元素。新元素插入到队列的尾部,队列获取操作则是从队列头部开始获得元素。
这个类与 Semaphore 很像,但与 Semaphore 不同的是 Semaphore 中装的是许可权限,一个线程拿到后就少一个名额,当权限为0时
必须线程自己释放才能使别的线程得到执行机会,而 ArrayBlockingQueue 是代表一个数组序列,当数组序列满时,再调用put()方法
当前线程将会阻塞,当数组序列为0时,调用take()方法线程也将会阻塞, ArrayBlockingQueue 是多个线程操纵一个数组序列,当一个
线程调用一个方法时,可能会使另一个线程从阻塞状态恢复回来
下面看两个示例代码
实例8

package com.xiaogao.test;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class BlockingQueueTest {
	public static void main(String[] args) {
		final BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(3);
		for(int i=0;i<2;i++){
			new Thread(){
				public void run(){
					while(true){
						try {
							//随眠时间1000毫秒以下的任意值时间
							Thread.sleep((long)(Math.random()*1000));
							System.out.println(Thread.currentThread().getName() + "准备放数据!");
							//添加数据,如果已经填满,将阻塞
							queue.put(1);
							System.out.println(Thread.currentThread().getName() + "已经放了数据," + 							
										"队列目前有" + queue.size() + "个数据");
						} catch (InterruptedException e) {
							e.printStackTrace();
						}
					}
				}
				
			}.start();
		}
		new Thread(){
			public void run(){
				while(true){
					try {
						//睡眠1秒后取数据
						Thread.sleep(1000);
						System.out.println(Thread.currentThread().getName() + "准备取数据!");
						//取得数据,如果没有数据将会阻塞
						queue.take();
						System.out.println(Thread.currentThread().getName() + "已经取走数据," + 							
								"队列目前有" + queue.size() + "个数据");					
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			}
			
		}.start();			
	}
}

从程序执行的结果来看,很明显,程序中一共有三个线程并发执行,两个线程负责向集合中添加数据,另外还有一个负责取出数据,并且添加数据的频率明显快于取出数据的
频率,所以总会显示集合队列中有3个数据,改一下睡眠的时间,结果又会不一样

用两个具有1个空间的的队列来实现同步功能
问题分析
1.实现同步功能,首先必须要有两个线程,每个线程处理一个队列
2.为了避免阻塞的产生,应该当一个线程阻塞时,另外一个线程得以运行,在第二个线程阻塞前使第一个线程致使阻塞的条件得以解除,这样往复执行
实例9

package com.xiaogao.test;
import java.util.Collections;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
public class BlockingQueueCommunication {
	public static void main(String[] args) {
		final Business business = new Business();
		//创建并执行线程,线程内部循环50次
		new Thread(
				new Runnable() {
					public void run() {
						for(int i=1;i<=50;i++){
							business.sub(i);
						}
					}
				}
		).start();
		//主线程循环50次
		for(int i=1;i<=50;i++){
			business.main(i);
		}
	}
	 static class Business {
		  //创建两个序列,每个序列的容量为1
		  BlockingQueue<Integer> queue1 = new ArrayBlockingQueue<Integer>(1);
		  BlockingQueue<Integer> queue2 = new ArrayBlockingQueue<Integer>(1);
		  //让Business对象创建时就填满队列queue2队列
		  {
			  try {
				queue2.put(1);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		  }
		  public  void sub(int i){
			  	try {
					queue1.put(1);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				for(int j=1;j<=10;j++){
					System.out.println("sub thread sequece of " + j + ",loop of " + i);
				}
				try {
					queue2.take();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
		  }
		  public  void main(int i){
			  	try {
					queue2.put(1);
				} catch (InterruptedException e1) {
					e1.printStackTrace();
				}
				for(int j=1;j<=100;j++){
					System.out.println("main thread sequece of " + j + ",loop of " + i);
				}
				try {
					queue1.take();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
		  }
	  }
}

위 내용은 jdk1.5 스레드 향상에 대한 Java 특별 주제의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.