本文共 5863 字,大约阅读时间需要 19 分钟。
/** * 生产者-消费者模式:是一个非常经典的多线程模式,在实际开发中应用非常广泛的思想理念,在生产-消费模式中, * 通常有两类线程,即N个生产线程和N个消费者线程。 * 生产者负责提交用户请求,消费者线程则负责具体处理生产者提交的任务,在生产者和消费者之间通过共享内存缓存区进行通信。 * 典型的应用:MQ。 */ public class C05ProducerConsumer { public static void main(String[] args) throws InterruptedException { //任务 LinkedBlockingDeque<Task2> taskQueue = new LinkedBlockingDeque<Task2>(10);
//生产者 Producer producer1 = new Producer(taskQueue); Producer producer2 = new Producer(taskQueue); Producer producer3 = new Producer(taskQueue);
//消费者 Consumer consumer1 = new Consumer(taskQueue); Consumer consumer2 = new Consumer(taskQueue); Consumer consumer3 = new Consumer(taskQueue);
//线程池运行 ExecutorService pool = Executors.newCachedThreadPool(); pool.execute(producer1); pool.execute(producer2); pool.execute(producer3); pool.execute(consumer1); pool.execute(consumer2); pool.execute(consumer3);
Thread.sleep(13000); producer1.stop(); producer2.stop(); producer3.stop(); Thread.sleep(12000);
} } /** * 生产者 */ class Producer implements Runnable {
//任务 private BlockingQueue<Task2> taskQueue;
//用于停止运行,强制从主内存刷新 private volatile boolean isRun = true;
//id生成 private static AtomicInteger id = new AtomicInteger();
public Producer(BlockingQueue<Task2> taskQueue) { super(); this.taskQueue = taskQueue; }
/* * 不眠不休的生产任务 */ @Override public void run() { while (isRun) { Task2 task2 = new Task2(this.id.getAndIncrement(), Thread.currentThread().getName()); try { //模拟数据产生过程 Thread.sleep(2000); //2秒进不去队列就失败 boolean offer = this.taskQueue.offer(task2, 2, TimeUnit.SECONDS); System.out.println("生产 - " + task2.getId()); if (!offer) { System.out.println(id + " - 任务提交失败"); } } catch (InterruptedException e) { e.printStackTrace(); } } }
/** * 停止运行 */ public void stop() { isRun = false; }
} /** * 消费者 */ class Consumer implements Runnable { //任务 private BlockingQueue<Task2> taskQueue;
public Consumer(BlockingQueue<Task2> taskQueue) { super(); this.taskQueue = taskQueue; }
/* * 消费者消费 */ @Override public void run() { while (true) { try { Task2 take = this.taskQueue.take(); //模拟数据消费过程 Thread.sleep(2000); System.out.println("消费:" + Thread.currentThread().getName() + " - " + take.getId()); } catch (Exception e) { e.printStackTrace(); } } } } class Task2 { private Integer id;
private String desc;
public Task2(Integer id, String desc) { super(); this.id = id; this.desc = desc; }
public Integer getId() { return id; }
public void setId(Integer id) { this.id = id; }
public String getDesc() { return desc; }
public void setDesc(String desc) { this.desc = desc; } } |
/** * 案例一:生产者-消费者 * 采用JDK1.5的Lock的方式 */ public class J16MessagePCByLock { public static void main(String[] args) { ProductA productA = new ProductA(); ProducerA producer = new ProducerA(productA); ConsumerA consumer = new ConsumerA(productA); new Thread(producer,"p1").start(); new Thread(producer,"p2").start(); new Thread(consumer,"c1").start(); new Thread(consumer,"c2").start(); } } /** * 商品 */ class ProductA {
/** * 是否有商品 */ boolean isHashProduct = false;
/** * 自定义锁 */ Lock lock = new ReentrantLock();
/** * 监视器 */ Condition condition = lock.newCondition();
/** * 生产 */ public void product() throws InterruptedException { //生产,上锁 lock.lock();
while (isHashProduct) { //已有商品,进入等待,放弃CPU执行权、放弃锁 condition.await(); }
isHashProduct = true; Thread.sleep(3000); System.out.println("生产一台笔记本");
//唤醒消费者 condition.signalAll(); //生产完成,释放锁 lock.unlock(); } /** * 消费者 */ public void consumer() throws InterruptedException { //消费上锁 lock.lock();
while (!isHashProduct) { //没有商品,进入等待 condition.await(); }
isHashProduct = false; Thread.sleep(3000); System.out.println("消费一台笔记本");
//唤醒生产者 condition.signalAll(); lock.unlock(); } } class ProducerA implements Runnable {
/** * 生产者 */ ProductA productA;
public ProducerA(ProductA productA) { super(); this.productA = productA; }
@Override public void run() { try { productA.product(); } catch (InterruptedException e) { e.printStackTrace(); } } } class ConsumerA implements Runnable {
/** * 生产者 */ ProductA productA;
public ConsumerA(ProductA productA) { super(); this.productA = productA; }
@Override public void run() { try { productA.consumer(); } catch (InterruptedException e) { e.printStackTrace(); } } } |
/** * 案例二:生产者-消费者 * 采用JDK1.5的Lock的方式:多个监视器 */ public class J17MessagePCByLockMC { public static void main(String[] args) { ProductB productB = new ProductB(); ProducerB producer = new ProducerB(productB); ConsumerB consumer = new ConsumerB(productB); new Thread(producer,"p1").start(); new Thread(producer,"p2").start(); new Thread(consumer,"c1").start(); new Thread(consumer,"c2").start(); } } /** * 商品 */ class ProductB {
/** * 是否有商品 */ boolean isHashProduct = false;
/** * 自定义锁 */ Lock lock = new ReentrantLock();
/** * 监视器 */ Condition pCondition = lock.newCondition(); Condition cCondition = lock.newCondition();
/** * 生产 */ public void product() throws InterruptedException { //生产,上锁 lock.lock();
while (isHashProduct) { //已有商品,进入等待,放弃CPU执行权、放弃锁 pCondition.await(); }
isHashProduct = true; Thread.sleep(3000); System.out.println("生产一台笔记本");
//唤醒消费者 cCondition.signal(); //生产完成,释放锁 lock.unlock(); } /** * 消费者 */ public void consumer() throws InterruptedException { //消费上锁 lock.lock();
while (!isHashProduct) { //没有商品,进入等待 cCondition.await(); }
isHashProduct = false; Thread.sleep(3000); System.out.println("消费一台笔记本");
//唤醒生产者 pCondition.signal(); lock.unlock(); } } class ProducerB implements Runnable {
/** * 生产者 */ ProductB productB;
public ProducerB(ProductB productB) { super(); this.productB = productB; }
@Override public void run() { try { productB.product(); } catch (InterruptedException e) { e.printStackTrace(); } } } class ConsumerB implements Runnable {
/** * 生产者 */ ProductB productB;
public ConsumerB(ProductB productB) { super(); this.productB = productB; }
@Override public void run() { try { productB.consumer(); } catch (InterruptedException e) { e.printStackTrace(); } } } |
转载地址:http://ummxi.baihongyu.com/