首页 技术 正文
技术 2022年11月7日
0 收藏 487 点赞 565 浏览 4479 个字

一、Master-Worker设计模式

Master-Worker模式是常用的并行设计模式。它的核心思想是,系统有两个进程协议工作:Master进程和Worker进程。Master进程负责接收和分配任务,Worker进程负责处理子任务。当各个Worker进程将子任务处理完后,将结果返回给Master进程,由Master进行归纳和汇总,从而得到系统结果。

Master-Worker模式的好处是,它能将大任务分解成若干个小任务,并发执行,从而提高系统性能。而对于系统请求者Client来说,任务一旦提交,Master进程就会立刻分配任务并立即返回,并不会等系统处理完全部任务再返回,其处理过程是异步的。

并发模型之Master-Worker设计模式

二、Master-Worker设计模式代码实现

1、创建Task任务对象

 package com.ietree.basicskill.mutilthread.designpattern.masterworker; /**
* Created by Root on 5/12/2017.
*/
public class Task { private int id; private String name; private int price; public int getId() {
return id;
} public void setId(int id) {
this.id = id;
} public String getName() {
return name;
} public void setName(String name) {
this.name = name;
} public int getPrice() {
return price;
} public void setPrice(int price) {
this.price = price;
}
}

2、实现Worker对象

 package com.ietree.basicskill.mutilthread.designpattern.masterworker; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue; /**
* Created by Root on 5/12/2017.
*/
public class Worker implements Runnable { private ConcurrentLinkedQueue<Task> workQueue;
private ConcurrentHashMap<String, Object> resultMap; public void setWorkerQueue(ConcurrentLinkedQueue<Task> workQueue) {
this.workQueue = workQueue;
} public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {
this.resultMap = resultMap;
} @Override
public void run() {
while (true) {
Task input = this.workQueue.poll();
if (input == null) {
break;
}
// 真正的去做业务处理
//Object output = handle(input);
// 改造
Object output = MyWorker.handle(input);
// 返回处理结果集
this.resultMap.put(Integer.toString(input.getId()), output);
}
} // private Object handle(Task input) {
// Object output = null;
// try {
// // 表示处理task任务的耗时,可能是数据的加工,也可能是操作数据库......
// Thread.sleep(500);
// output = input.getPrice();
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// return output;
// } // 优化,考虑让继承类去自己实现具体的业务处理
public static Object handle(Task input) {
return null;
} }

3、为了使程序更灵活,将具体的业务执行逻辑抽离,在具体的Worker对象去实现,如这里的MyWorker对象

 package com.ietree.basicskill.mutilthread.designpattern.masterworker; /**
* Created by Root on 5/13/2017.
*/
public class MyWorker extends Worker { public static Object handle(Task input) {
Object output = null;
try {
// 表示处理task任务的耗时,可能是数据的加工,也可能是操作数据库......
Thread.sleep(500);
output = input.getPrice();
} catch (InterruptedException e) {
e.printStackTrace();
}
return output;
} }

4、Master类

 package com.ietree.basicskill.mutilthread.designpattern.masterworker; import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue; /**
* Created by Root on 5/12/2017.
*/
public class Master { // 1、使用一个ConcurrentLinkedQueue集合来装载所有需要执行的任务
private ConcurrentLinkedQueue<Task> workQueue = new ConcurrentLinkedQueue<Task>(); // 2、使用HashMap来装载所有的worker对象
private HashMap<String, Thread> workers = new HashMap<String, Thread>(); // 3、使用一个容器承装每一个worker并发执行任务的结果集
private ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap<String, Object>(); // 4、构造方法
public Master(Worker worker, int workerCount) {
// 每一个worker对象都需要有Master的引用,workQueue用于任务的领取,resultMap用于任务的提交
worker.setWorkerQueue(this.workQueue);
worker.setResultMap(this.resultMap); for (int i = 0; i < workerCount; i++) {
workers.put("子节点" + Integer.toString(i), new Thread(worker));
}
} // 5、提交方法
public void submit(Task task) {
this.workQueue.add(task);
} // 6、需要有一个执行的方法(启动应用程序,让所有的worker工作)
public void execute() {
for (Map.Entry<String, Thread> me : workers.entrySet()) {
me.getValue().start();
}
} // 7、判断线程是否执行完毕
public boolean isComplete() {
for (Map.Entry<String, Thread> me : workers.entrySet()) {
// 判断所有的线程状态是否属于已停止状态
if (me.getValue().getState() != Thread.State.TERMINATED) {
return false;
}
}
return true;
} // 8、返回结果集数据
public int getResult() {
int ret = 0;
for (Map.Entry<String, Object> me : resultMap.entrySet()) {
// 汇总逻辑
ret += (Integer) me.getValue();
}
return ret;
} }

5、测试,具体调用实现

 package com.ietree.basicskill.mutilthread.designpattern.masterworker; import java.util.Random; /**
* Created by Root on 5/13/2017.
*/
public class MasterWorkerTest { public static void main(String[] args) { // Master master = new Master(new Worker(), 10);
// 改造
// Master master = new Master(new MyWorker(), 10);
// 改造(获取当前机器可用线程数)
System.out.println("我的机器可用Processors数量:" + Runtime.getRuntime().availableProcessors());
Master master = new Master(new MyWorker(), Runtime.getRuntime().availableProcessors()); Random r = new Random();
for (int i = 1; i <= 10; i++) {
Task t = new Task();
t.setId(i);
t.setName("任务" + i);
t.setPrice(r.nextInt(1000));
master.submit(t);
}
master.execute(); long start = System.currentTimeMillis(); while (true) {
if (master.isComplete()) {
long end = System.currentTimeMillis() - start;
int ret = master.getResult();
System.out.println("最终的结果:" + ret + ",执行耗时:" + end);
break;
}
}
} }

程序输出:

我的机器可用Processors数量:20
最终的结果:4473,执行耗时:500

从上面的运行结果来看,程序最终执行时间几乎就等于一个线程单独运行的时间,在此注意的是,同时执行的线程数是根据你执行此程序的机器配置决定的。

相关推荐
python开发_常用的python模块及安装方法
adodb:我们领导推荐的数据库连接组件bsddb3:BerkeleyDB的连接组件Cheetah-1.0:我比较喜欢这个版本的cheeta…
日期:2022-11-24 点赞:878 阅读:9,020
Educational Codeforces Round 11 C. Hard Process 二分
C. Hard Process题目连接:http://www.codeforces.com/contest/660/problem/CDes…
日期:2022-11-24 点赞:807 阅读:5,513
下载Ubuntn 17.04 内核源代码
zengkefu@server1:/usr/src$ uname -aLinux server1 4.10.0-19-generic #21…
日期:2022-11-24 点赞:569 阅读:6,359
可用Active Desktop Calendar V7.86 注册码序列号
可用Active Desktop Calendar V7.86 注册码序列号Name: www.greendown.cn Code: &nb…
日期:2022-11-24 点赞:733 阅读:6,142
Android调用系统相机、自定义相机、处理大图片
Android调用系统相机和自定义相机实例本博文主要是介绍了android上使用相机进行拍照并显示的两种方式,并且由于涉及到要把拍到的照片显…
日期:2022-11-24 点赞:512 阅读:7,772
Struts的使用
一、Struts2的获取  Struts的官方网站为:http://struts.apache.org/  下载完Struts2的jar包,…
日期:2022-11-24 点赞:671 阅读:4,850