首页 技术 正文
技术 2022年11月17日
0 收藏 737 点赞 2,275 浏览 11242 个字

java多线程的三大设计模式

本章主要记录java常见的三大设计模式,Future、Master-Worker和生产者-消费者模式。

一、Future模式

    使用场景:数据可以不及时返回,到下一次实际要使用结果的之前,后台自动查询并返回。类似与Ajax异步加载。

    原理:客户端发起请求,结果需要返回Data对象,当服务器收到请求以后,FutureData包装类实现Data接口,不查询数据库,直接返回结果。(核心)。然后后台自己开一个线程去查询数据库,RealData真

       实数据类,也实现Data接口,并返回数据。当实际使用时。获取到返回的真实数据。

        

      代码分析:

     

 1 //  FutureClient 客户端类:
2
3       public class FutureClient {
4
5       public Data request(final String queryStr){
6           //1 我想要一个代理对象(Data接口的实现类)先返回给发送请求的客户端,告诉他请求已经接收到,可以做其他的事情
7           final FutureData futureData = new FutureData();
8           //2 启动一个新的线程,去加载真实的数据,传递给这个代理对象
9           new Thread(new Runnable() {
10           @Override
11           public void run() {
12               //3 这个新的线程可以去慢慢的加载真实对象,然后传递给代理对象
13               RealData realData = new RealData(queryStr);
14               futureData.setRealData(realData);
15               }
16           }).start();
17           return futureData;
18           }
19
20

    

 1  // Data类:
2
3       
4
5         public interface Data {
6
7           String getRequest();
8
9         }
10
11   

    

 1  // FutureData类:   
2
3         public class FutureData implements Data{
4
5         private RealData realData ;
6
7           private boolean isReady = false;
8
9           public synchronized void setRealData(RealData realData) {
10           //如果已经装载完毕了,就直接返回
11           if(isReady){
12                 return;
13                 }
14               //如果没装载,进行装载真实对象
15                  this.realData = realData;
16                  isReady = true;
17               //进行通知
18                 notify();
19              }
20
21           @Override
22           public synchronized String getRequest() {
23           //如果没装载好 程序就一直处于阻塞状态
24             while(!isReady){
25             try {
26               wait();
27               } catch (InterruptedException e) {
28                 e.printStackTrace();
29               }
30               }
31               //装载好直接获取数据即可
32               return this.realData.getRequest();
33               }
34
35
36
37            }
38
39

     

 1 // RealData类:   
2
3         public class RealData implements Data{
4
5         private String result ;
6
7         public RealData (String queryStr){
8             System.out.println("根据" + queryStr + "进行查询,这是一个很耗时的操作..");
9             try {
10               Thread.sleep(5000);
11               } catch (InterruptedException e) {
12                 e.printStackTrace();
13               }
14                 System.out.println("操作完毕,获取结果");
15                 result = "查询结果";
16               }
17
18            @Override
19               public String getRequest() {
20               return result;
21              }
22
23             }
24
25

     

 1 // Main测试类:            
2
3         public class Main {
4
5             public static void main(String[] args) throws InterruptedException {
6
7             FutureClient fc = new FutureClient();
8             Data data = fc.request("请求参数");
9             System.out.println("请求发送成功!");
10             System.out.println("做其他的事情...");
11             String result = data.getRequest();
12             System.out.println(result);
13           }
14         }

 

二:Master-Worker模式(并行计算模式)      

    使用场景:互不影响的多任务时。返回结果需要共同返回。其好处是讲一个大任务分解成若干个小任务。并行执行,提高系统的吞吐量。

    原理:核心思想是系统由两类进程协作工作;Master进程和Worker进程。Master进程负责接收和分配工作,Worker进程主要负责处理子任务。当各
       个Worker进程处理完后。会将结果返回给Master,由Master做归纳和总结,并返回。

      

      

      

    

    

    

    代码分析:

      

 1 //Worker类:
2
3       public class Worker implements Runnable {
4
5         private ConcurrentLinkedQueue<Task> workQueue;
6         private ConcurrentHashMap<String, Object> resultMap;
7
8         public void setWorkQueue(ConcurrentLinkedQueue<Task> workQueue) {
9           this.workQueue = workQueue;
10         }
11
12         public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {
13           this.resultMap = resultMap;
14         }
15
16         @Override
17         public void run() {
18           while(true){
19             Task input = this.workQueue.poll();
20               if(input == null) break;
21                 Object output = handle(input);
22                 this.resultMap.put(Integer.toString(input.getId()), output);
23               }
24             }
25
26         private Object handle(Task input) {
27             Object output = null;
28               try {
29                 //处理任务的耗时。。 比如说进行操作数据库。。。
30                 Thread.sleep(500);
31                 output = input.getPrice(); //模拟把Task类的价格做为结果返回
32                 } catch (InterruptedException e) {
33                 e.printStackTrace();
34                 }
35                 return output;
36              }
37
38         }
39
40

    

 1  //Master类:
2
3       public class Master {
4
5         //1 有一个盛放任务的容器
6         private ConcurrentLinkedQueue<Task> workQueue = new ConcurrentLinkedQueue<Task>();
7
8         //2 需要有一个盛放worker的集合
9         private HashMap<String, Thread> workers = new HashMap<String, Thread>();
10
11         //3 需要有一个盛放每一个worker执行任务的结果集合
12         private ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap<String, Object>();
13
14         //4 构造方法
15         public Master(Worker worker , int workerCount){
16           worker.setWorkQueue(this.workQueue);
17           worker.setResultMap(this.resultMap);
18
19           for(int i = 0; i < workerCount; i ++){
20               this.workers.put(Integer.toString(i), new Thread(worker));
21             }
22             }
23
24           //5 需要一个提交任务的方法
25        public void submit(Task task){
26             this.workQueue.add(task);
27             }
28
29           //6 需要有一个执行的方法,启动所有的worker方法去执行任务
30       public void execute(){
31           for(Map.Entry<String, Thread> me : workers.entrySet()){
32               me.getValue().start();
33             }
34             }
35
36           //7 判断是否运行结束的方法
37       public boolean isComplete() {
38           for(Map.Entry<String, Thread> me : workers.entrySet()){
39               if(me.getValue().getState() != Thread.State.TERMINATED){
40               return false;
41                 }
42               }
43             return true;
44           }
45
46           //8 计算结果方法
47       public int getResult() {
48           int priceResult = 0;
49           for(Map.Entry<String, Object> me : resultMap.entrySet()){
50           priceResult += (Integer)me.getValue();
51           }
52           return priceResult;
53           }
54       }

    

     

 1 // Task类:
2
3         
4
5       public class Task {
6
7         private int id;
8         private int price ;
9         public int getId() {
10           return id;
11            }
12         public void setId(int id) {
13           this.id = id;
14           }
15         public int getPrice() {
16           return price;
17           }
18         public void setPrice(int price) {
19           this.price = price;
20         }
21        }
22
23

      

 1 //main测试类:
2
3          public class Main {
4
5           public static void main(String[] args) {
6
7           int Processors= Runtime.getRuntime().availableProcessors(); //获取到当前电脑的线程数
8           System.out.println("当前电脑是"+Processors+"核");
9           Master master = new Master(new Worker(),Processors );
10           //Master master = new Master(new Worker(),20 ); //开20个线程
11           Random r = new Random();
12           for(int i = 1; i <= 100; i++){
13             Task t = new Task();
14             t.setId(i);
15             t.setPrice(r.nextInt(1000));
16             master.submit(t);
17             }
18             master.execute(); //执行任务
19             long start = System.currentTimeMillis();
20
21             while(true){
22             if(master.isComplete()){
23             long end = System.currentTimeMillis() - start;
24             int priceResult = master.getResult();
25             System.out.println("最终结果:" + priceResult + ", 执行时间:" + end);
26             break;
27             }
28             }
29           }
30         }
31
32

三:生产者-消费者模式

    使用场景:消息中间件。

      

    代码分析:

       

 1 // main测试类:  
2
3           public class Main {
4
5           public static void main(String[] args) throws Exception {
6           //内存缓冲区
7           BlockingQueue<Data> queue = new LinkedBlockingQueue<Data>(10);
8           //生产者
9           Provider p1 = new Provider(queue);
10           Provider p2 = new Provider(queue);
11           Provider p3 = new Provider(queue);
12           //消费者
13           Consumer c1 = new Consumer(queue);
14           Consumer c2 = new Consumer(queue);
15           Consumer c3 = new Consumer(queue);
16           //创建线程池运行,这是一个缓存的线程池,可以创建无穷大的线程,没有任务的时候不创建线程。空闲线程存活时间为60s(默认值)  
17
18           ExecutorService cachePool = Executors.newCachedThreadPool();
19
20           cachePool.execute(p1);
21           cachePool.execute(p2);
22           cachePool.execute(p3);
23           cachePool.execute(c1);
24           cachePool.execute(c2);
25           cachePool.execute(c3);
26
27           try {
28             Thread.sleep(3000);
29             } catch (InterruptedException e) {
30               e.printStackTrace();
31             }
32               p1.stop();
33               p2.stop();
34               p3.stop();
35           try {
36             Thread.sleep(2000);
37             } catch (InterruptedException e) {
38               e.printStackTrace();
39             }
40           }
41
42         }
43
44

     

 1  Data类:        
2
3         public final class Data {
4
5         private String id;
6         private String name;
7
8         public Data(String id, String name){
9           this.id = id;
10           this.name = name;
11           }
12
13         public String getId() {
14           return id;
15           }
16
17         public void setId(String id) {
18           this.id = id;
19           }
20
21         public String getName() {
22           return name;
23           }
24
25         public void setName(String name) {
26           this.name = name;
27           }
28
29         @Override
30         public String toString(){
31           return "{id: " + id + ", name: " + name + "}";
32           }
33
34         }
35
36

     

 1  //Provider成产者类:
2
3         
4
5         public class Provider implements Runnable{
6
7             //共享缓存区
8           private BlockingQueue<Data> queue;
9             //多线程间是否启动变量,有强制从主内存中刷新的功能。即时返回线程的状态
10           private volatile boolean isRunning = true;
11             //id生成器
12           private static AtomicInteger count = new AtomicInteger();
13             //随机对象
14           private static Random r = new Random();
15
16           public Provider(BlockingQueue queue){
17             this.queue = queue;
18             }
19
20           @Override
21           public void run() {
22           while(isRunning){
23             try {
24               //随机休眠0 - 1000 毫秒 表示获取数据(产生数据的耗时)
25               Thread.sleep(r.nextInt(1000));
26               //获取的数据进行累计...
27               int id = count.incrementAndGet();
28               //比如通过一个getData方法获取了
29               Data data = new Data(Integer.toString(id), "数据" + id);
30               System.out.println("当前线程:" + Thread.currentThread().getName() + ", 获取了数据,id为:" + id + ", 进行装载到公共缓冲区中...");
31               if(!this.queue.offer(data, 2, TimeUnit.SECONDS)){
32               System.out.println("提交缓冲区数据失败....");
33               //do something... 比如重新提交
34               }
35             } catch (InterruptedException e) {
36               e.printStackTrace();
37             }
38           }
39           }
40
41           public void stop(){
42             this.isRunning = false;
43           }
44
45         }
46
47

    

 1  // ConSumber消费者类:
2
3         
4
5         public class Consumer implements Runnable{
6
7           private BlockingQueue<Data> queue;
8
9           public Consumer(BlockingQueue queue){
10           this.queue = queue;
11           }
12
13           //随机对象
14           private static Random r = new Random();
15
16           @Override
17           public void run() {
18             while(true){
19               try {
20               //获取数据
21               Data data = this.queue.take();
22               //进行数据处理。休眠0 - 1000毫秒模拟耗时
23               Thread.sleep(r.nextInt(1000));
24               System.out.println("当前消费线程:" + Thread.currentThread().getName() + ", 消费成功,消费数据为id: " + data.getId());
25               } catch (InterruptedException e) {
26                 e.printStackTrace();
27               }
28             }
29             }
30          }
31
32
相关推荐
python开发_常用的python模块及安装方法
adodb:我们领导推荐的数据库连接组件bsddb3:BerkeleyDB的连接组件Cheetah-1.0:我比较喜欢这个版本的cheeta…
日期:2022-11-24 点赞:878 阅读:9,082
Educational Codeforces Round 11 C. Hard Process 二分
C. Hard Process题目连接:http://www.codeforces.com/contest/660/problem/CDes…
日期:2022-11-24 点赞:807 阅读:5,556
下载Ubuntn 17.04 内核源代码
zengkefu@server1:/usr/src$ uname -aLinux server1 4.10.0-19-generic #21…
日期:2022-11-24 点赞:569 阅读:6,406
可用Active Desktop Calendar V7.86 注册码序列号
可用Active Desktop Calendar V7.86 注册码序列号Name: www.greendown.cn Code: &nb…
日期:2022-11-24 点赞:733 阅读:6,179
Android调用系统相机、自定义相机、处理大图片
Android调用系统相机和自定义相机实例本博文主要是介绍了android上使用相机进行拍照并显示的两种方式,并且由于涉及到要把拍到的照片显…
日期:2022-11-24 点赞:512 阅读:7,815
Struts的使用
一、Struts2的获取  Struts的官方网站为:http://struts.apache.org/  下载完Struts2的jar包,…
日期:2022-11-24 点赞:671 阅读:4,898