新浪微博 登陆  注册   设为首页 加入收藏

学PHP >> Android开发应用 >> 黑马程序员_基础加强_Java线程通信和线程并发库

黑马程序员_基础加强_Java线程通信和线程并发库

查看次数3545 发表时间2013-06-12 07:36:32

 ------- android培训、java培训、期待与您交流! ----------java5的线程锁技术Lock&Condition实现线程同步通信Lock比传统的synchronized方式更加面向对象,两个线程执行的代码块要实现同步互斥,必...

 ------- android培训java培训、期待与您交流! ----------

java5的线程锁技术

Lock&Condition实现线程同步通信
Lock比传统的synchronized方式更加面向对象,两个线程执行的代码块要实现同步互斥,必须持有同一个Lock对象。
ReadWriteLock,多个读锁不互斥,读锁与写锁互斥。如果需要多线程同时读,但不能同时写,加读锁;如果代码修改数据,为改代码加写锁,写锁是线程独占的。
在等待 Condition 时,允许发生“虚假唤醒”,Condition 应该总是在一个循环中被等待,并测试正被等待的状态。
Condition condition = lock.newCondition();
使用读写锁的缓存功能
  1. class CachedData {
  2. Object data;
  3. volatile boolean cacheValid;
  4. ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
  5. void processCachedData() {
  6. rwl.readLock().lock();
  7. if (!cacheValid) {//如果缓存中没有data
  8. // 在使用写锁前必须释放读锁
  9. rwl.readLock().unlock();
  10. rwl.writeLock().lock();
  11. // 获取写锁后在检查
  12. if (!cacheValid) {
  13. data = ...//写入data
  14. cacheValid = true;
  15. }
  16. // 写入数据后,上读锁,在释放写锁
  17. rwl.readLock().lock();
  18. rwl.writeLock().unlock();
  19. }
  20. use(data);
  21. rwl.readLock().unlock();
  22. }
  23. }

Semaphore 计数信号灯

限制可以访问某些资源的线程数目。可进入同一段代码的线程数目。
Semaphore(int permits) permits允许的并发线程数.
acquire() 从此信号量获取一个许可.
void release() 释放一个许可。


单个信号量的Semaphore对象可以实现互斥锁的功能,并且可以是由一个线程获得了“锁”,再由另一个线程释放“锁”。
  1. public class SemaphoreTest {
  2. public static void main(String[] args) {
  3. ExecutorService service = Executors.newCachedThreadPool();
  4. final Semaphore sp = new Semaphore(3);//创建一个可以允许3个线程并发访问的信号灯
  5. for(int i=0;i<10;i++){
  6. Runnable runnable = new Runnable(){
  7. public void run(){
  8. try {
  9. sp.acquire();
  10. } catch (InterruptedException e1) {
  11. e1.printStackTrace();
  12. }
  13. System.out.println("线程" + Thread.currentThread().getName() +
  14. "进入,当前已有" + (3-sp.availablePermits()) + "个并发");
  15. try {
  16. Thread.sleep((long)(Math.random()*10000));
  17. } catch (InterruptedException e) {
  18. e.printStackTrace();
  19. }
  20. System.out.println("线程" + Thread.currentThread().getName() +
  21. "即将离开");
  22. sp.release();
  23. //下面代码有时候执行不准确,因为其没有和上面的代码合成原子单元
  24. System.out.println("线程" + Thread.currentThread().getName() +
  25. "已离开,当前已有" + (3-sp.availablePermits()) + "个并发");
  26. }
  27. };
  28. service.execute(runnable); //将任务交给线程池
  29. }
  30. }
  31. }

CyclicBarrier

一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点。
  1. public class CyclicBarrierTest {
  2. public static void main(String[] args) {
  3. ExecutorService service = Executors.newCachedThreadPool();//创建一个线程池
  4. final CyclicBarrier cb = new CyclicBarrier(3);//创建一个循环barrier,线程数目为3
  5. for(int i=0;i<3;i++){//新建3个线程,交给线程池执行
  6. Runnable runnable = new Runnable(){
  7. public void run(){
  8. try {
  9. Thread.sleep((long)(Math.random()*10000));
  10. System.out.println("线程" + Thread.currentThread().getName() +
  11. "即将到达集合地点1,当前已有" + (cb.getNumberWaiting()+1) + "个已经到达," + (cb.getNumberWaiting()==2?"都到齐了,继续走啊":"正在等候"));
  12. cb.await();//等待其他线程
  13. Thread.sleep((long)(Math.random()*10000));
  14. System.out.println("线程" + Thread.currentThread().getName() +
  15. "即将到达集合地点2,当前已有" + (cb.getNumberWaiting()+1) + "个已经到达," + (cb.getNumberWaiting()==2?"都到齐了,继续走啊":"正在等候"));
  16. cb.await();
  17. Thread.sleep((long)(Math.random()*10000));
  18. System.out.println("线程" + Thread.currentThread().getName() +
  19. "即将到达集合地点3,当前已有" + (cb.getNumberWaiting() + 1) + "个已经到达," + (cb.getNumberWaiting()==2?"都到齐了,继续走啊":"正在等候"));
  20. cb.await();
  21. } catch (Exception e) {
  22. e.printStackTrace();
  23. }
  24. }
  25. };
  26. service.execute(runnable);
  27. }
  28. service.shutdown();
  29. }
  30. }

CountDownLatch

CountDownLatch(int count) 构造一个用给定计数初始化的 CountDownLatch。
调用countDown方法就将计数器减1,当计数到达0时,则所有等待者或单个等待者开始执行。
  1. public class CountdownLatchTest {
  2. public static void main(String[] args) {
  3. ExecutorService service = Executors.newCachedThreadPool();
  4. final CountDownLatch cdOrder = new CountDownLatch(1);
  5. final CountDownLatch cdAnswer = new CountDownLatch(3);
  6. for(int i=0;i<3;i++){
  7. Runnable runnable = new Runnable(){//创建3个线程
  8. public void run(){
  9. try {
  10. System.out.println("线程" + Thread.currentThread().getName() +
  11. "正准备接受命令");
  12. cdOrder.await();//等待主线程控制cdOrder开始任务
  13. System.out.println("线程" + Thread.currentThread().getName() +
  14. "已接受命令");
  15. Thread.sleep((long)(Math.random()*10000));
  16. System.out.println("线程" + Thread.currentThread().getName() +
  17. "回应命令处理结果");
  18. cdAnswer.countDown();//任务完成通知主线程
  19. } catch (Exception e) {
  20. e.printStackTrace();
  21. }
  22. }
  23. };
  24. service.execute(runnable);
  25. }
  26. try {
  27. Thread.sleep((long)(Math.random()*10000));
  28. System.out.println("线程" + Thread.currentThread().getName() +
  29. "即将发布命令");
  30. cdOrder.countDown();//将count归0,开始执行任务
  31. System.out.println("线程" + Thread.currentThread().getName() +
  32. "已发送命令,正在等待结果");
  33. cdAnswer.await();//等待子线程完成
  34. System.out.println("线程" + Thread.currentThread().getName() +
  35. "已收到所有响应结果");
  36. } catch (Exception e) {
  37. e.printStackTrace();
  38. }
  39. service.shutdown();
  40. }
  41. }

Exchanger

实现持有同一Exchanger对象的两个线程间的数据交换。
V exchange(V x) 方法交换数据。

可阻塞队列

public class ArrayBlockingQueue<E>extends AbstractQueue<E>implements BlockingQueue<E>, Serializable
按 FIFO(先进先出)原则对元素进行排序。
生产者使用put方法放入数据,消费者使用take方法取出数据。
新元素插入到队列的尾部,队列获取操作则是从队列头部开始获得元素。
size() 返回此队列中元素的数量。
可以由两个只有一个元素的同步队列对象实现线程通信功能。


空中网题目:
Test类中的代码在不断地产生数据,然后交给TestDo.doSome()方法去处理,就好像生产者在不断地产生数据,消费者在不断消费数据。
程序有10个线程来消费生成者产生的数据,这些消费者都调用TestDo.doSome()方法去进行处理,故每个消费者都需要一秒才能处理完,程序应保证这些消费者线程依次有序地消费数据,只有上一个消费者消费完后,下一个消费者才能消费数据,下一个消费者是谁都可以,但要保证这些消费者线程拿到的数据是有顺序的。
  1. public class Test {
  2. public static void main(String[] args) {
  3. final Semaphore semaphore = new Semaphore(1);
  4. final SynchronousQueue<String> queue = new SynchronousQueue<String>();
  5. for(int i=0;i<10;i++){
  6. new Thread(new Runnable(){
  7. @Override
  8. public void run() {
  9. try {
  10. semaphore.acquire();
  11. String input = queue.take();//保证按顺序取出数据
  12. String output = TestDo.doSome(input);
  13. System.out.println(Thread.currentThread().getName()+ ":" + output);
  14. semaphore.release();
  15. } catch (InterruptedException e) {
  16. e.printStackTrace();
  17. }
  18. }
  19. }).start();
  20. }
  21. System.out.println("begin:"+(System.currentTimeMillis()/1000));
  22. for(int i=0;i<10;i++){ //这行不能改动
  23. String input = i+""; //这行不能改动
  24. try {
  25. queue.put(input);//主线程的循环产生数据,并将数据放入同步队列中,由子线程对队列进行操作
  26. } catch (InterruptedException e) {
  27. e.printStackTrace();
  28. }
  29. }
  30. }
  31. }
  32. //不能改动此TestDo类
  33. class TestDo {
  34. public static String doSome(String input){
  35. try {
  36. Thread.sleep(1000);
  37. } catch (InterruptedException e) {
  38. e.printStackTrace();
  39. }
  40. String output = input + ":"+ (System.currentTimeMillis() / 1000);
  41. return output;
  42. }
  43. }

同步集合

传统方式下的Collection在迭代集合时,不允许对集合进行修改。
同步集合类:ConcurrentHashMap、ConcurrentSkipListMap、ConcurrentSkipListSet、CopyOnWriteArrayList 和 CopyOnWriteArraySet。当期望许多线程访问一个给定 collection 时,ConcurrentHashMap 通常优于同步的 HashMap,ConcurrentSkipListMap 通常优于同步的 TreeMap。当期望的读数和遍历远远大于列表的更新数时,CopyOnWriteArrayList 优于同步的 ArrayList。


空中网面试题:
如果有几个线程调用TestDo.doSome(key, value)方法时,传递进去的key相等(equals比较为true),则这几个线程应互斥排队输出结果,即当有两个线程的key都是"1"时,它们中的一个要比另外其他线程晚1秒输出结果,如下所示:
4:4:1258199615
1:1:1258199615
3:3:1258199615
1:2:1258199616
  1. import java.util.Iterator;
  2. import java.util.concurrent.CopyOnWriteArrayList;
  3. //不能改动此Test类
  4. public class Test extends Thread{
  5. private TestDo testDo;
  6. private String key;
  7. private String value;
  8. public Test(String key,String key2,String value){
  9. this.testDo = TestDo.getInstance();
  10. /*常量"1"和"1"是同一个对象,下面这行代码就是要用"1"+""的方式产生新的对象,
  11. 以实现内容没有改变,仍然相等(都还为"1"),但对象却不再是同一个的效果*/
  12. this.key = key+key2;
  13. this.value = value;
  14. }
  15. public static void main(String[] args) throws InterruptedException{
  16. Test a = new Test("1","","1");
  17. Test b = new Test("1","","2");
  18. Test c = new Test("3","","3");
  19. Test d = new Test("4","","4");
  20. System.out.println("begin:"+(System.currentTimeMillis()/1000));
  21. a.start();
  22. b.start();
  23. c.start();
  24. d.start();
  25. }
  26. public void run(){
  27. testDo.doSome(key, value);
  28. }
  29. }
  30. class TestDo {
  31. private TestDo() {}
  32. private static TestDo _instance = new TestDo();
  33. public static TestDo getInstance() {
  34. return _instance;
  35. }
  36. //因为在执行doSome时,需要对key进行判断和添加,需要使用同步集合
  37. private CopyOnWriteArrayList keys = new CopyOnWriteArrayList();
  38. public void doSome(Object key, String value) {
  39. Object o = key;//以key做同步互斥锁对象
  40. if(!keys.contains(o)){
  41. keys.add(o);
  42. }else{
  43. for(Iterator iter=keys.iterator();iter.hasNext();){
  44. try {
  45. Thread.sleep(20);//测试代码,让对集合操作的动作多执行一会
  46. } catch (InterruptedException e) {
  47. e.printStackTrace();
  48. }
  49. Object oo = iter.next();
  50. if(oo.equals(o)){
  51. o = oo;
  52. break;
  53. }
  54. }
  55. }
  56. synchronized(o)
  57. // 以大括号内的是需要局部同步的代码,不能改动!
  58. {
  59. try {
  60. Thread.sleep(1000);
  61. System.out.println(key+":"+value + ":"
  62. + (System.currentTimeMillis() / 1000));
  63. } catch (InterruptedException e) {
  64. e.printStackTrace();
  65. }
  66. }
  67. }
  68. }

  ------- android培训java培训、期待与您交流! ----------


(转发请注明转自:学PHP)    


  相关推荐




  发表评论
昵称:
(不超过20个字符或10个汉字)
内容: