博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Java设计模式(5) - 多线程并发设计模式 - 生产者-消费者设计模式多种写法
阅读量:4164 次
发布时间:2019-05-26

本文共 5863 字,大约阅读时间需要 19 分钟。

生产者-消费者设计模式

/**

 * 生产者-消费者模式:是一个非常经典的多线程模式,在实际开发中应用非常广泛的思想理念,在生产-消费模式中,

 * 通常有两类线程,即N个生产线程和N个消费者线程。

 * 生产者负责提交用户请求,消费者线程则负责具体处理生产者提交的任务,在生产者和消费者之间通过共享内存缓存区进行通信。

 * 典型的应用:MQ

 */

public class C05ProducerConsumer {

    public static void main(String[] args) throws InterruptedException {

         //任务

         LinkedBlockingDeque<Task2> taskQueue = new LinkedBlockingDeque<Task2>(10);

        

         //生产者

         Producer producer1 = new Producer(taskQueue);

         Producer producer2 = new Producer(taskQueue);

         Producer producer3 = new Producer(taskQueue);

        

         //消费者

         Consumer consumer1 = new Consumer(taskQueue);

         Consumer consumer2 = new Consumer(taskQueue);

         Consumer consumer3 = new Consumer(taskQueue);

        

         //线程池运行

         ExecutorService pool = Executors.newCachedThreadPool();

         pool.execute(producer1);

         pool.execute(producer2);

         pool.execute(producer3);

         pool.execute(consumer1);

         pool.execute(consumer2);

         pool.execute(consumer3);

        

         Thread.sleep(13000);

         producer1.stop();

         producer2.stop();

         producer3.stop();

         Thread.sleep(12000);

        

    }

}

/**

 * 生产者

 */

class Producer implements Runnable {

   

    //任务

    private BlockingQueue<Task2> taskQueue;

   

    //用于停止运行,强制从主内存刷新

    private volatile boolean isRun = true;

 

    //id生成

    private static AtomicInteger id = new AtomicInteger();

   

    public Producer(BlockingQueue<Task2> taskQueue) {

         super();

         this.taskQueue = taskQueue;

    }

 

    /*

     * 不眠不休的生产任务

     */

    @Override

    public void run() {

         while (isRun) {

             Task2 task2 = new Task2(this.id.getAndIncrement(), Thread.currentThread().getName());

             try {

                  //模拟数据产生过程

                  Thread.sleep(2000);

                  //2秒进不去队列就失败

                  boolean offer = this.taskQueue.offer(task2, 2, TimeUnit.SECONDS);

                  System.out.println("生产 - " + task2.getId());

                  if (!offer) {

                      System.out.println(id + " - 任务提交失败");

                  }

             } catch (InterruptedException e) {

                  e.printStackTrace();

             }

         }

    }

   

    /**

     * 停止运行

     */

    public void stop() {

         isRun = false;

    }

   

}

/**

 * 消费者

 */

class Consumer implements Runnable {

    //任务

    private BlockingQueue<Task2> taskQueue;

   

    public Consumer(BlockingQueue<Task2> taskQueue) {

         super();

         this.taskQueue = taskQueue;

    }

 

    /*

     * 消费者消费

     */

    @Override

    public void run() {

         while (true) {

             try {

                  Task2 take = this.taskQueue.take();

                  //模拟数据消费过程

                  Thread.sleep(2000);

                  System.out.println("消费:" + Thread.currentThread().getName() + " - " + take.getId());

             } catch (Exception e) {

                  e.printStackTrace();

             }       

         }

    }

}

class Task2 {

    private Integer id;

   

    private String desc;

 

    public Task2(Integer id, String desc) {

         super();

         this.id = id;

         this.desc = desc;

    }

 

    public Integer getId() {

         return id;

    }

 

    public void setId(Integer id) {

         this.id = id;

    }

 

    public String getDesc() {

         return desc;

    }

 

    public void setDesc(String desc) {

         this.desc = desc;

    }

}

/**

 * 案例一:生产者-消费者

 * 采用JDK1.5Lock的方式

 */

public class J16MessagePCByLock {

    public static void main(String[] args) {

         ProductA productA = new ProductA();

         ProducerA producer = new ProducerA(productA);

         ConsumerA consumer = new ConsumerA(productA);

         new Thread(producer,"p1").start();

         new Thread(producer,"p2").start();

         new Thread(consumer,"c1").start();

         new Thread(consumer,"c2").start();

    }

}

/**

 * 商品

 */

class ProductA {

   

    /**

     * 是否有商品

     */

    boolean isHashProduct = false;

   

    /**

     * 自定义锁

     */

    Lock lock = new ReentrantLock();

   

    /**

     * 监视器

     */

    Condition condition = lock.newCondition();

   

    /**

     * 生产

     */

    public void product() throws InterruptedException {

         //生产,上锁

         lock.lock();

        

         while (isHashProduct) {

             //已有商品,进入等待,放弃CPU执行权、放弃锁

             condition.await();

         }

        

         isHashProduct = true;

         Thread.sleep(3000);

         System.out.println("生产一台笔记本");

 

         //唤醒消费者

         condition.signalAll();

         //生产完成,释放锁

         lock.unlock();

    }

    /**

     * 消费者

     */

    public void consumer() throws InterruptedException {

         //消费上锁

         lock.lock();

        

         while (!isHashProduct) {

             //没有商品,进入等待

             condition.await();

         }

        

         isHashProduct = false;

         Thread.sleep(3000);

         System.out.println("消费一台笔记本");

        

         //唤醒生产者

         condition.signalAll();

         lock.unlock();

    }

}

class ProducerA implements Runnable {

   

    /**

     * 生产者

     */

    ProductA productA;

   

    public ProducerA(ProductA productA) {

         super();

         this.productA = productA;

    }

 

    @Override

    public void run() {

         try {

             productA.product();

         } catch (InterruptedException e) {

             e.printStackTrace();

         }

    }

}

class ConsumerA implements Runnable {

   

    /**

     * 生产者

     */

    ProductA productA;

   

    public ConsumerA(ProductA productA) {

         super();

         this.productA = productA;

    }

 

    @Override

    public void run() {

         try {

             productA.consumer();

         } catch (InterruptedException e) {

             e.printStackTrace();

         }

    }

}

/**

 * 案例二:生产者-消费者

 * 采用JDK1.5Lock的方式:多个监视器

 */

public class J17MessagePCByLockMC {

    public static void main(String[] args) {

         ProductB productB = new ProductB();

         ProducerB producer = new ProducerB(productB);

         ConsumerB consumer = new ConsumerB(productB);

         new Thread(producer,"p1").start();

         new Thread(producer,"p2").start();

         new Thread(consumer,"c1").start();

         new Thread(consumer,"c2").start();

    }

}

/**

 * 商品

 */

class ProductB {

   

    /**

     * 是否有商品

     */

    boolean isHashProduct = false;

   

    /**

     * 自定义锁

     */

    Lock lock = new ReentrantLock();

   

    /**

     * 监视器

     */

    Condition pCondition = lock.newCondition();

    Condition cCondition = lock.newCondition();

   

    /**

     * 生产

     */

    public void product() throws InterruptedException {

         //生产,上锁

         lock.lock();

        

         while (isHashProduct) {

             //已有商品,进入等待,放弃CPU执行权、放弃锁

             pCondition.await();

         }

        

         isHashProduct = true;

         Thread.sleep(3000);

         System.out.println("生产一台笔记本");

 

         //唤醒消费者

         cCondition.signal();

         //生产完成,释放锁

         lock.unlock();

    }

    /**

     * 消费者

     */

    public void consumer() throws InterruptedException {

         //消费上锁

         lock.lock();

        

         while (!isHashProduct) {

             //没有商品,进入等待

             cCondition.await();

         }

        

         isHashProduct = false;

         Thread.sleep(3000);

         System.out.println("消费一台笔记本");

        

         //唤醒生产者

         pCondition.signal();

         lock.unlock();

    }

}

class ProducerB implements Runnable {

   

    /**

     * 生产者

     */

    ProductB productB;

   

    public ProducerB(ProductB productB) {

         super();

         this.productB = productB;

    }

 

    @Override

    public void run() {

         try {

             productB.product();

         } catch (InterruptedException e) {

             e.printStackTrace();

         }

    }

}

class ConsumerB implements Runnable {

   

    /**

     * 生产者

     */

    ProductB productB;

   

    public ConsumerB(ProductB productB) {

         super();

         this.productB = productB;

    }

 

    @Override

    public void run() {

         try {

             productB.consumer();

         } catch (InterruptedException e) {

             e.printStackTrace();

         }

    }

}

转载地址:http://ummxi.baihongyu.com/

你可能感兴趣的文章
KMP算法详解
查看>>
Web技术四层结构
查看>>
简单叙述一下MYSQL的优化
查看>>
Clustered Index & Non Clustered Index
查看>>
为数据库建立索引
查看>>
对Session和Cookie的区分与理解
查看>>
HTTP协议中POST、GET、HEAD的区别是什么?分别在什么情况下使用?(
查看>>
表单中post与get的区别
查看>>
PHP文件上传
查看>>
半小时精通正则表达式
查看>>
HTTP协议中请求方法Get和Post的区别是什么?
查看>>
Nutch搜索引擎分析
查看>>
map-reduce简介
查看>>
!!!!搜索引擎设计实用教程-以百度为例
查看>>
搜索引擎工作原理(Nutch)
查看>>
七、 基于Nutch主题搜索引擎方案设计
查看>>
垂直搜索引擎 nutch
查看>>
同一进程中的线程究竟共享哪些资源
查看>>
超文本传输协议-HTTP
查看>>
TCP/IP协议分析-协议分层
查看>>