首页 技术 正文
技术 2022年11月19日
0 收藏 947 点赞 2,833 浏览 7483 个字

Disruptor VS BlockingQueue的压测对比:

import java.util.concurrent.ArrayBlockingQueue;public class ArrayBlockingQueue4Test {    public static void main(String[] args) {
final ArrayBlockingQueue<Data> queue = new ArrayBlockingQueue<Data>(100000000);
final long startTime = System.currentTimeMillis();
//向容器中添加元素
new Thread(new Runnable() { public void run() {
long i = 0;
while (i < Constants.EVENT_NUM_OHM) {
Data data = new Data(i, "c" + i);
try {
queue.put(data);
} catch (InterruptedException e) {
e.printStackTrace();
}
i++;
}
}
}).start(); new Thread(new Runnable() {
public void run() {
int k = 0;
while (k < Constants.EVENT_NUM_OHM) {
try {
queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
k++;
}
long endTime = System.currentTimeMillis();
System.out.println("ArrayBlockingQueue costTime = " + (endTime - startTime) + "ms");
}
}).start();
}
}

  

public interface Constants {int EVENT_NUM_OHM = 1000000;int EVENT_NUM_FM = 50000000;int EVENT_NUM_OM = 10000000;}

  

import java.util.concurrent.Executors;import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.BusySpinWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;public class DisruptorSingle4Test { public static void main(String[] args) {
int ringBufferSize = 65536;
final Disruptor<Data> disruptor = new Disruptor<Data>(
new EventFactory<Data>() {
public Data newInstance() {
return new Data();
}
},
ringBufferSize,
Executors.newSingleThreadExecutor(),
ProducerType.SINGLE,
//new BlockingWaitStrategy()
new YieldingWaitStrategy()
); DataConsumer consumer = new DataConsumer();
//消费数据
disruptor.handleEventsWith(consumer);
disruptor.start();
new Thread(new Runnable() { public void run() {
RingBuffer<Data> ringBuffer = disruptor.getRingBuffer();
for (long i = 0; i < Constants.EVENT_NUM_OHM; i++) {
long seq = ringBuffer.next();
Data data = ringBuffer.get(seq);
data.setId(i);
data.setName("c" + i);
ringBuffer.publish(seq);
}
}
}).start();
}
}

  

import com.lmax.disruptor.EventHandler;public class DataConsumer implements EventHandler<Data> {    private long startTime;
private int i; public DataConsumer() {
this.startTime = System.currentTimeMillis();
} public void onEvent(Data data, long seq, boolean bool)
throws Exception {
i++;
if (i == Constants.EVENT_NUM_OHM) {
long endTime = System.currentTimeMillis();
System.out.println("Disruptor costTime = " + (endTime - startTime) + "ms");
}
}}

  

import java.io.Serializable;public class Data implements Serializable {private static final long serialVersionUID = 2035546038986494352L;
private Long id ;
private String name;public Data() {
}
public Data(Long id, String name) {
super();
this.id = id;
this.name = name;
}public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}

  

BlockingQueue测试:

Java 并发框架Disruptor(七)

1.建立一个工厂Event类,用于创建Event类实例对象

2.需要有一个jian监听事件类,用于处理数据(Event类)

3.实例化Disruptor实例,配置一系列参数,编写DisDisruptor核心组件

4.编写生产者组件,向Disruptor容器中投递数据

pom.xml添加:

<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<scope>3.3.2</scope>
</dependency>

  

public class OrderEvent {private long value; //订单的价格public long getValue() {
return value;
}public void setValue(long value) {
this.value = value;
}
}

  

import com.lmax.disruptor.EventFactory;public class OrderEventFactory implements EventFactory<OrderEvent>{public OrderEvent newInstance() {
return new OrderEvent();//这个方法就是为了返回空的数据对象(Event)
}
}

  

public class OrderEventHandler implements EventHandler<OrderEvent>{public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {
Thread.sleep(Integer.MAX_VALUE);
System.err.println("消费者: " + event.getValue());
}
}

  

import java.nio.ByteBuffer;import com.lmax.disruptor.RingBuffer;public class OrderEventProducer {private RingBuffer<OrderEvent> ringBuffer;public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}public void sendData(ByteBuffer data) {
//1 在生产者发送消息的时候, 首先 需要从我们的ringBuffer里面 获取一个可用的序号
long sequence = ringBuffer.next();//0
try {
//2 根据这个序号, 找到具体的 "OrderEvent" 元素 注意:此时获取的OrderEvent对象是一个没有被赋值的"空对象"
OrderEvent event = ringBuffer.get(sequence);
//3 进行实际的赋值处理
event.setValue(data.getLong(0));
} finally {
//4 提交发布操作
ringBuffer.publish(sequence);
}
}
}

  

import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;public class Main {
public static void main(String[] args) {
// 参数准备工作
OrderEventFactory orderEventFactory = new OrderEventFactory();
int ringBufferSize = 4;
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());/**
* 1 eventFactory: 消息(event)工厂对象
* 2 ringBufferSize: 容器的长度
* 3 executor: 线程池(建议使用自定义线程池) RejectedExecutionHandler
* 4 ProducerType: 单生产者 还是 多生产者
* 5 waitStrategy: 等待策略
*/
//1. 实例化disruptor对象
Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(orderEventFactory,
ringBufferSize,
executor,
ProducerType.SINGLE,
new BlockingWaitStrategy());//2. 添加消费者的监听 (构建disruptor 与 消费者的一个关联关系)
disruptor.handleEventsWith(new OrderEventHandler());//3. 启动disruptor
disruptor.start();//4. 获取实际存储数据的容器: RingBuffer
RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();OrderEventProducer producer = new OrderEventProducer(ringBuffer);ByteBuffer bb = ByteBuffer.allocate(8);for(long i = 0 ; i < 100; i ++){
bb.putLong(0, i);
producer.sendData(bb);
}disruptor.shutdown();
executor.shutdown();}
}

  Java 并发框架Disruptor(七)

Java 并发框架Disruptor(七)

Java 并发框架Disruptor(七)

Java 并发框架Disruptor(七)

Java 并发框架Disruptor(七)

Java 并发框架Disruptor(七)

Java 并发框架Disruptor(七)

Java 并发框架Disruptor(七)

Java 并发框架Disruptor(七)

Java 并发框架Disruptor(七)

Java 并发框架Disruptor(七)

Java 并发框架Disruptor(七)

Java 并发框架Disruptor(七)

Java 并发框架Disruptor(七)

Java 并发框架Disruptor(七)

Java 并发框架Disruptor(七)

Java 并发框架Disruptor(七)

public final class BlockingWaitStrategy implements WaitStrategy
{
private final Lock lock = new ReentrantLock();
private final Condition processorNotifyCondition = lock.newCondition(); @Override
public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier)
throws AlertException, InterruptedException
{
long availableSequence;
if ((availableSequence = cursorSequence.get()) < sequence)
{
lock.lock();
try
{
while ((availableSequence = cursorSequence.get()) < sequence)
{
barrier.checkAlert();
processorNotifyCondition.await();
}
}
finally
{
lock.unlock();
}
} while ((availableSequence = dependentSequence.get()) < sequence)
{
barrier.checkAlert();
} return availableSequence;
} @Override
public void signalAllWhenBlocking()
{
lock.lock();
try
{
processorNotifyCondition.signalAll();
}
finally
{
lock.unlock();
}
}
}

  

public final class SleepingWaitStrategy implements WaitStrategy
{
private static final int DEFAULT_RETRIES = 200; private final int retries; public SleepingWaitStrategy()
{
this(DEFAULT_RETRIES);
} public SleepingWaitStrategy(int retries)
{
this.retries = retries;
} @Override
public long waitFor(final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier)
throws AlertException, InterruptedException
{
long availableSequence;
int counter = retries; while ((availableSequence = dependentSequence.get()) < sequence)
{
counter = applyWaitMethod(barrier, counter);
} return availableSequence;
} @Override
public void signalAllWhenBlocking()
{
} private int applyWaitMethod(final SequenceBarrier barrier, int counter)
throws AlertException
{
barrier.checkAlert(); if (counter > 100)
{
--counter;
}
else if (counter > 0)
{
--counter;
Thread.yield();
}
else
{
LockSupport.parkNanos(1L);
} return counter;
}
}

  

public final class YieldingWaitStrategy implements WaitStrategy
{
private static final int SPIN_TRIES = 100; @Override
public long waitFor(final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier)
throws AlertException, InterruptedException
{
long availableSequence;
int counter = SPIN_TRIES; while ((availableSequence = dependentSequence.get()) < sequence)
{
counter = applyWaitMethod(barrier, counter);
} return availableSequence;
} @Override
public void signalAllWhenBlocking()
{
} private int applyWaitMethod(final SequenceBarrier barrier, int counter)
throws AlertException
{
barrier.checkAlert(); if (0 == counter)
{
Thread.yield();
}
else
{
--counter;
} return counter;
}
}

  Java 并发框架Disruptor(七)

Java 并发框架Disruptor(七)

Java 并发框架Disruptor(七)

Java 并发框架Disruptor(七)

Java 并发框架Disruptor(七)

上一篇: Java8时间转换
相关推荐
python开发_常用的python模块及安装方法
adodb:我们领导推荐的数据库连接组件bsddb3:BerkeleyDB的连接组件Cheetah-1.0:我比较喜欢这个版本的cheeta…
日期:2022-11-24 点赞:878 阅读:9,022
Educational Codeforces Round 11 C. Hard Process 二分
C. Hard Process题目连接:http://www.codeforces.com/contest/660/problem/CDes…
日期:2022-11-24 点赞:807 阅读:5,513
下载Ubuntn 17.04 内核源代码
zengkefu@server1:/usr/src$ uname -aLinux server1 4.10.0-19-generic #21…
日期:2022-11-24 点赞:569 阅读:6,359
可用Active Desktop Calendar V7.86 注册码序列号
可用Active Desktop Calendar V7.86 注册码序列号Name: www.greendown.cn Code: &nb…
日期:2022-11-24 点赞:733 阅读:6,142
Android调用系统相机、自定义相机、处理大图片
Android调用系统相机和自定义相机实例本博文主要是介绍了android上使用相机进行拍照并显示的两种方式,并且由于涉及到要把拍到的照片显…
日期:2022-11-24 点赞:512 阅读:7,773
Struts的使用
一、Struts2的获取  Struts的官方网站为:http://struts.apache.org/  下载完Struts2的jar包,…
日期:2022-11-24 点赞:671 阅读:4,851