JDK8多线程并发处理

jdk8提供了新的链式编程和多线程并发处理方法,异常统一处理, CompletableFuture。

CompletableFuture

package com.jackromer.demo.Thread;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.junit.Test;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class MyTest {
  // 此executor线程池如果不传,CompletableFuture经测试默认只启用最多3个线程,所以最好自己指定线程数量
  ExecutorService executor = Executors.newFixedThreadPool(15);

  @Test
  public void ex() {
    long start = System.currentTimeMillis();
    // 参数
    List<String> webPageLinks = Arrays.asList("A", "B", "C", "D", "E", "F", "G", "H");
    List<CompletableFuture<Void>> pageContentFutures = webPageLinks.stream().map(webPageLink -> handle(webPageLink)).collect(Collectors.toList());

    CompletableFuture<Void> allFutures = CompletableFuture.allOf(pageContentFutures.toArray(new CompletableFuture[pageContentFutures.size()]));

    allFutures.join();
    log.info("所有线程已执行完[{}]", allFutures.isDone());
    allFutures.whenComplete(new BiConsumer<Void, Throwable>() {
      @Override
      public void accept(Void aVoid, Throwable throwable) {
        log.info("执行最后一步操作");
        // doSth();
        long end = System.currentTimeMillis();
        log.info("耗时:" + (end - start) / 1000L);
      }
    });
  }

  // executor 可不传入,则默认最多3个线程
  CompletableFuture<Void> handle(String pageLink) {
    return CompletableFuture.runAsync(() -> {
      // int i = 1/0;
      log.info("执行任务" + pageLink);
      try {
        Thread.sleep(3000);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }

    }, executor).exceptionally(new Function<Throwable, Void>() { // 捕捉异常,不会导致整个流程中断
      @Override
      public Void apply(Throwable throwable) {
        log.info("线程[{}]发生了异常, 继续执行其他线程,错误详情[{}]", Thread.currentThread().getName(), throwable.getMessage());
        return null;
      }
    });
  }
}

atomic包装类在多线程的使用

我们知道AtomicLong、AtomicInteger是基于硬件级别cas实现的保证线程安全的自增类,能保证原子化的自增操作。在多线程下,性能远好于加锁synchronized。

AutomicLong

public class TestAtomicLong {
    private static AtomicLong ai = new AtomicLong(0);

    public static void main(String[] args) throws InterruptedException {
        MyThread mt = new MyThread();

        Long time = System.currentTimeMillis();
        System.out.println("当前时间:" + time);
        for (int i = 0; i < 4; i++) {
            Thread t = new Thread(mt);
            t.start();
        }

        Thread.sleep(5000);
    }

    static class MyThread implements Runnable {

        @Override
        public void run() {
            while (true) {
                if (ai.get() >= 100000000L) {
                    System.out.println("已达到1亿");
                    System.out.println(System.currentTimeMillis());
                    return;
                }
                ai.getAndIncrement();
            }
        }
    }

}

从automicLong 的add代码可以看出,其每次更新都调用compareAndSet方法,通过死循环的方式CAS到特定的值,达到更新数据的目的.

public final int updateAndGet(IntUnaryOperator updateFunction) {
    int prev, next;
    do {
        prev = get();
        next = updateFunction.applyAsInt(prev);
    } while (!compareAndSet(prev, next));
    return next;
}

public final boolean compareAndSet(int expect, int update) {
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

LongAdder

public class TestLongAdder {
    private static LongAdder ai = new LongAdder();

    public static void main(String[] args) throws InterruptedException {
        MyThread mt = new MyThread();

        Long time = System.currentTimeMillis();
        System.out.println("当前时间:" + time);
        for (int i = 0; i < 4; i++) {
            Thread t = new Thread(mt);
            t.start();
        }

        Thread.sleep(5000);
    }

    static class MyThread implements Runnable {

        @Override
        public void run() {
            while (true) {
                if (ai.longValue() >= 100000000L) {
                    System.out.println("已达到1亿");
                    System.out.println(System.currentTimeMillis());
                    return;
                }
                ai.increment();
            }
        }
    }

LongAdder如何保证高并发和效率呢,先看下其ADD方法代码

public void add(long x) {
    Cell[] as; long b, v; int m; Cell a;
    if ((as = cells) != null || !casBase(b = base, b + x)) {
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[getProbe() & m]) == null ||
            !(uncontended = a.cas(v = a.value, v + x)))
            longAccumulate(x, null, uncontended);
    }
}

其中使用了cell类,看一下源码

@sun.misc.Contended static final class Cell {
    volatile long value; // volatile也使用了cas的机制
    Cell(long x) { value = x; }
    final boolean cas(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
    }

    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long valueOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> ak = Cell.class;
            valueOffset = UNSAFE.objectFieldOffset
                (ak.getDeclaredField("value"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

Cell类也使用了CAS机制保证原子性的线程安全,但是为什么LongAdder会在多线程的时候比AtomicLong性能更好呢。回头再看一下其ADD方法,可以大致发现其使用了热点分离的思路对automicLong进行了优化,在CAS竞争激烈的时候,将竞争的数据进行分解.基于这个思路,虽然在CAS操作中没有锁,但是像减少锁粒度这种分离热点的思路依然可以使用。

一种可行的方案就是仿造ConcurrengHashMap,将热点数据分离,比如,可以将AtomicInteger的内部核心数据value分离成一个数组,每个线程访问时,通过哈希等算法映射到其中一个数字进行计数,而最终的计数结果,则为这个数组的求和累加。

热点数据value被分离成多个单元cell,每个cell独自维护内部的值,当前对象的实际值由所有的cell累计合成,这样,热点就进行了有效的分离,提高了并行度,LongAdder正是使用了这种思想,其中的CELL就想当于ConcurrengHashMap数组中的分段加锁,其真实的数据是多个CELL值得叠加,所以longAdder在多线程并发时候比AtomicLong有更好得性能。

实验结果是只有4个线程的情况下,多个线程对一个变量加到1亿时,AtomicLong用了2.1秒,LongAdder用了2.2秒,性能差于AtomicLong。

修改线程为50时,AtomicLong用了3秒,LongAdder用了2.3秒。

总结

JDK提供了更加方便安全得atomic类用于开发人员在多线程并发时候处理数据,同时也提供了CompletableFuture等新的多线程并发处理技术,让多线程处理更加的方便。


   Reprint policy


《JDK8多线程并发处理》 by jackromer is licensed under a Creative Commons Attribution 4.0 International License
 Previous
mybatis-使用 mybatis-使用
mybatis很方便,不用像hibernate一样关心太多的关联关系,也不用写JPA本文主要介绍mybatis的一些常用用法和注意事项 EXAMPLE使用 Example example = new Example(demo.cl
2019-09-03
Next 
lambda处理集合 lambda处理集合
java8 lambda表达式利用流处理各种集合 处理listpublic class Jdk8Main { public static void main(String[] args) { List<P
2019-08-27
  目录