jdk8提供了新的链式编程和多线程并发处理方法,异常统一处理, CompletableFuture。
CompletableFuture
package com.jackromer.demo.Thread;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.junit.Test;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class MyTest {
// 此executor线程池如果不传,CompletableFuture经测试默认只启用最多3个线程,所以最好自己指定线程数量
ExecutorService executor = Executors.newFixedThreadPool(15);
@Test
public void ex() {
long start = System.currentTimeMillis();
// 参数
List<String> webPageLinks = Arrays.asList("A", "B", "C", "D", "E", "F", "G", "H");
List<CompletableFuture<Void>> pageContentFutures = webPageLinks.stream().map(webPageLink -> handle(webPageLink)).collect(Collectors.toList());
CompletableFuture<Void> allFutures = CompletableFuture.allOf(pageContentFutures.toArray(new CompletableFuture[pageContentFutures.size()]));
allFutures.join();
log.info("所有线程已执行完[{}]", allFutures.isDone());
allFutures.whenComplete(new BiConsumer<Void, Throwable>() {
@Override
public void accept(Void aVoid, Throwable throwable) {
log.info("执行最后一步操作");
// doSth();
long end = System.currentTimeMillis();
log.info("耗时:" + (end - start) / 1000L);
}
});
}
// executor 可不传入,则默认最多3个线程
CompletableFuture<Void> handle(String pageLink) {
return CompletableFuture.runAsync(() -> {
// int i = 1/0;
log.info("执行任务" + pageLink);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, executor).exceptionally(new Function<Throwable, Void>() { // 捕捉异常,不会导致整个流程中断
@Override
public Void apply(Throwable throwable) {
log.info("线程[{}]发生了异常, 继续执行其他线程,错误详情[{}]", Thread.currentThread().getName(), throwable.getMessage());
return null;
}
});
}
}
atomic包装类在多线程的使用
我们知道AtomicLong、AtomicInteger是基于硬件级别cas实现的保证线程安全的自增类,能保证原子化的自增操作。在多线程下,性能远好于加锁synchronized。
AutomicLong
public class TestAtomicLong {
private static AtomicLong ai = new AtomicLong(0);
public static void main(String[] args) throws InterruptedException {
MyThread mt = new MyThread();
Long time = System.currentTimeMillis();
System.out.println("当前时间:" + time);
for (int i = 0; i < 4; i++) {
Thread t = new Thread(mt);
t.start();
}
Thread.sleep(5000);
}
static class MyThread implements Runnable {
@Override
public void run() {
while (true) {
if (ai.get() >= 100000000L) {
System.out.println("已达到1亿");
System.out.println(System.currentTimeMillis());
return;
}
ai.getAndIncrement();
}
}
}
}
从automicLong 的add代码可以看出,其每次更新都调用compareAndSet方法,通过死循环的方式CAS到特定的值,达到更新数据的目的.
public final int updateAndGet(IntUnaryOperator updateFunction) {
int prev, next;
do {
prev = get();
next = updateFunction.applyAsInt(prev);
} while (!compareAndSet(prev, next));
return next;
}
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
LongAdder
public class TestLongAdder {
private static LongAdder ai = new LongAdder();
public static void main(String[] args) throws InterruptedException {
MyThread mt = new MyThread();
Long time = System.currentTimeMillis();
System.out.println("当前时间:" + time);
for (int i = 0; i < 4; i++) {
Thread t = new Thread(mt);
t.start();
}
Thread.sleep(5000);
}
static class MyThread implements Runnable {
@Override
public void run() {
while (true) {
if (ai.longValue() >= 100000000L) {
System.out.println("已达到1亿");
System.out.println(System.currentTimeMillis());
return;
}
ai.increment();
}
}
}
LongAdder如何保证高并发和效率呢,先看下其ADD方法代码
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
其中使用了cell类,看一下源码
@sun.misc.Contended static final class Cell {
volatile long value; // volatile也使用了cas的机制
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
Cell类也使用了CAS机制保证原子性的线程安全,但是为什么LongAdder会在多线程的时候比AtomicLong性能更好呢。回头再看一下其ADD方法,可以大致发现其使用了热点分离的思路对automicLong进行了优化,在CAS竞争激烈的时候,将竞争的数据进行分解.基于这个思路,虽然在CAS操作中没有锁,但是像减少锁粒度这种分离热点的思路依然可以使用。
一种可行的方案就是仿造ConcurrengHashMap,将热点数据分离,比如,可以将AtomicInteger的内部核心数据value分离成一个数组,每个线程访问时,通过哈希等算法映射到其中一个数字进行计数,而最终的计数结果,则为这个数组的求和累加。
热点数据value被分离成多个单元cell,每个cell独自维护内部的值,当前对象的实际值由所有的cell累计合成,这样,热点就进行了有效的分离,提高了并行度,LongAdder正是使用了这种思想,其中的CELL就想当于ConcurrengHashMap数组中的分段加锁,其真实的数据是多个CELL值得叠加,所以longAdder在多线程并发时候比AtomicLong有更好得性能。
实验结果是只有4个线程的情况下,多个线程对一个变量加到1亿时,AtomicLong用了2.1秒,LongAdder用了2.2秒,性能差于AtomicLong。
修改线程为50时,AtomicLong用了3秒,LongAdder用了2.3秒。
总结
JDK提供了更加方便安全得atomic类用于开发人员在多线程并发时候处理数据,同时也提供了CompletableFuture等新的多线程并发处理技术,让多线程处理更加的方便。