java多线程(2):并发编程实践(四)

2014-11-24 02:42:53 · 作者: · 浏览: 9
ervice.execute(new Runnable() {
@Override
public void run() {
long[] times = new long[]{3000,1000,5000,500,2000,5000};
try {
for(int i = 0; i < 5; i++) {
// 生产产品
productNum.getAndIncrement();
delayQueue.add(new DelayTask("产品-" + productNum.get(), times[i]));
// 休眠300ms
Thread.sleep(300);
System.out.println("生产产品:" + productNum.get() + ",无界阻塞队列:" + delayQueue.toString());
}
} catch (InterruptedException ex) {
}
}
});
//消费
executorService.execute(new Runnable(){
@Override
public void run() {
try {
while (true) {
// 获取延时任务
DelayTask delayTask = delayQueue.take();
//消费产品
delayTask.run();
// 休眠1000ms
Thread.sleep(1000);
System.out.println("消费完产品:"+ delayTask.getName() + ",无界阻塞队列:"+ delayQueue.toString());
}
} catch (InterruptedException ex) {
}
}
});
// 程序运行5s后,所有任务停止
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
}
executorService.shutdownNow();
System.out.println("main thread finished");
}

}


第6章:线程间通信Exchanger

第0节:札记

* Exchanger让两个线程可以互换信息。
* 每个线程将条目上的某个方法呈现给 exchange 方法,与伙伴线程进行匹配,并且在返回时接收其伙伴的对象。
* Exchanger 可能被视为 SynchronousQueue 的双向形式。Exchanger 可能在应用程序(比如遗传算法和管道设计)中很有用。

第1节:实例

package com.mcc.core.test.thread;

import com.mcc.core.concurrent.ExecutorServiceUtils;

import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Exchanger实例测试,实例以字符串接龙方式呈现
*
*
* @author menergy
* DateTime: 13-12-30 下午4:58
*/
public class ExchangerTest {
public static void main(String args[]){
//原子计数器1
final AtomicInteger productNum1 = new AtomicInteger(0);
//原子计数器2
final AtomicInteger productNum2 = new AtomicInteger(0);
//资源持有开关,假设只有一个资源
final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
// 初始化一个Exchanger,交换的信息以字符串呈现
final Exchanger exchanger = new Exchanger ();
//交换的信息
final String[] info = {""};
ExecutorService executorService = ExecutorServiceUtils.getExecutor("test", 2);
//甲线程
executorService.execute(new Runnable() {
@Override
public void run() {
try {
while (true) {
if(!atomicBoolean.compareAndSet(false,true)){
// 处理消息
TimeUnit.MILLISECONDS.sleep(200);
productNum1.getAndIncrement();
System.out.println("甲" + productNum1.get() + "处理完成,等待乙");
//处理完,等待乙交换
info[0] = info[0] + "甲" + productNum1.get() + "--";
exchanger.exchange(info[0]);
System.out.println("甲" + productNum1.get() + "处理完成,交换给乙,信息:" + info[0]);
atomicBoolean.compareAndSet(true, false);
}else{
System.out.println("甲不处理,等待乙");
//处理完,等待乙交换
info[0] = exchanger.exchange(info[0]);
System.out.println("甲不处理,和乙交换,信息:" + info[0]);
}
}
} catch (InterruptedException ex) {
}
}
});
//乙线程
executorService.execute(new Runnable(){
@Override
public void run() {
try {
while (true) {
if(!atomicBoolean.compareAndSet(false,true)){
// 处理消息
TimeUnit.MILLISECONDS.sleep(500);
productNum2.getAndIncrement();
System.out.println("乙" + productNum2.get() + "处理完成,等待甲");
//处理完,等待乙交换
info[0] = info[0] + "乙" + productNum2.get() + "--";
exchanger.exchange(info[0]);
System.out.println("乙" + productNum2.get() + "处理完成,交换给甲,信息:" + info[0]);
atomicBoolean.compareAndSet(true, false);
}else{
System.out.println("乙不处理,等待甲");
//处理完,等待乙交换
info[0] =