THREAD-POOL-多线程并发


多线程简介

多线程适合多种开发语言的开发, 这是一种思想,基于的是操作系统底层的CPU调度.
此篇介绍线程、多线程、和线程池的使用和机制。

进程\线程

  1. 进程

    1 进程 是指在系统中正在运行的一个应用程序, 每一个进程都有一个PID, 当进程占用太多资源或者卡死的时候,CPU会强杀掉进程。
    2 每个进程之间是独立的,每个进程均运行在其专用的且受保护的内存空间内。

  2. 线程

    1 个进程由多个线程组成(1个进程至少要有1个线程)。
    2 线程是进程的基本执行单元,一个进程的所有任务都在线程中执行。

多线程

  1. 多线程

    1个进程中可以开启多个线程,多个线程可以同时执行不同的任务, 多线程一般情况下可以提高程序的执行效率。

  2. 多线程原理

    对于单核CPU来说,同一时间,CPU只能处理1个线程,只有1个线程正在执行,现在大部分CPU都是4核8线程。
    多线程同时执行的本质:是CPU快速的在多个线程之间的切换,这个时间非常的短在毫秒级别,以至于用户无法察觉。
    CPU调度线程的时间足够快,就造成了多线程的“同时”执行。
    如果线程数非常多,CPU会在n个线程之间切换,消耗大量的CPU资源,每个线程被调度的次数会降低,线程的执行效率降低。

  3. 多线程的优点

    能适当提高程序的执行效率,但是在业务简单时,单线程会比多线程更快。
    能适当提高资源的利用率(CPU、内存)。
    线程上的任务执行完成后,线程会自动销毁。

  4. 多线程的缺点

    开启线程需要占用一定的内存空间(默认情况下,每一个线程都占用512KB),如果开启大量的线程,会占用大量的内存空间,降低程序的性能。
    线程越多,CPU在调用线程上的开销就越大。
    程序设计更加复杂,比如线程间的通信、多线程的数据共享。

  5. 主线程

    一个程序运行后,默认会开启1个线程,称为“主线程”或“UI线程”,java中的main方法就是一个程序的主线程,如果这个方法体中的业务逻辑存在while循环、线程池
    、websorcket等阻塞程序的,那么会使main方法阻塞,程序则不会退出,这也是程序正常运行不退出的原因;一些简单类的main方法会退出是因为并没有阻塞main方法。
    主线程一般用来刷新UI界面,处理UI事件。
    主线程使用注意:别将耗时的操作放到主线程中,因为耗时操作会卡住主线程,严重影响UI的流畅度,给用户一种卡的坏体验。

同步和异步

  1. 同步执行

    写程序的时候都是从上到下,从左到右,代码执行顺序也是从上到下从右到左。
    1个线程执行多个任务,也是依次执行a->b->c。
    1个线程同一时间执行1个任务。

  2. 异步执行

    多个线程可以同时执行多个任务。
    多个线程执行多个任务可以同时执行a,b,c。

  3. synchronized

    同步块大家都比较熟悉,通过 synchronized 关键字来实现;所有加上 synchronized 的方法和块语句,在多线程访问的时候,同一时刻只能有一个线程能够访问。

  4. wait()、notify()、notifyAll()

    这三个方法是 java.lang.Object 的 final native 方法,任何继承 java.lang.Object 的类都有这三个方法。
    它们是Java语言提供的实现线程间阻塞和控制进程内调度的底层机制,平时我们会很少用到的。

  5. wait():

    导致线程进入等待状态,直到它被其他线程通过notify()或者notifyAll唤醒,该方法只能在同步方法中调用。

  6. notify():

    随机选择一个在该对象上调用wait方法的线程,解除其阻塞状态,该方法只能在同步方法或同步块内部调用。

  7. notifyAll():

    解除所有那些在该对象上调用wait方法的线程的阻塞状态,同样该方法只能在同步方法或同步块内部调用。
    调用这三个方法中任意一个,当前线程必须是锁的持有者,如果不是会抛出一个 IllegalMonitorStateException 异常。

  8. wait()与Thread.sleep(long time)的区别

    sleep() :方法在指定的毫秒数内让当前正在执行的线程休眠(暂停执行),该线程不丢失任何监视器的所属权,sleep() 是 Thread 类专属的静态方法,针对一个特定的线程。
    wait() : 方法使实体所处线程暂停执行,从而使对象进入等待状态,直到被 notify() 方法通知或者 wait() 的等待的时间到。
    sleep() : 方法使持有的线程暂停运行,从而使线程进入休眠状态,直到用interrupt 方法来打断他的休眠或者 sleep 的休眠的时间到。
    wait() : 方法进入等待状态时会释放同步锁,而 sleep() 方法不会释放同步锁。所以,当一个线程无限 sleep 时又没有任何人去 interrupt 它的时候,程序就产生大麻烦了。
    notify() : 是用来通知线程,但在 notify() 之前线程是需要获得 lock 的。另个意思就是必须写在 synchronized(lockobj) {…} 之中。
    wait() : 也是这个样子,一个线程需要释放某个 lock,也是在其获得 lock 情况下才能够释放,所以 wait() 也需要放在 synchronized(lockobj) {…} 之中。

  9. volatile 关键字

    volatile 是一个特殊的修饰符,只有成员变量才能使用它。在Java并发程序缺少同步类的情况下,多线程对成员变量的操作对其它线程是透明的。
    volatile 变量可以保证下一个读取操作会在前一个写操作之后发生。线程都会直接从内存中读取该变量并且不缓存它。这就确保了线程读取到的变量是同内存中是一致的。

  10. ThreadLocal 变量

    ThreadLocal 是Java里一种特殊的变量。每个线程都有一个 ThreadLocal 就是每个线程都拥有了自己独立的一个变量,竞争条件被彻底消除了。
    如果为每个线程提供一个自己独有的变量拷贝,将大大提高效率。首先,通过复用减少了代价高昂的对象的创建个数。
    其次,你在没有使用高代价的同步或者不变性的情况下获得了线程安全。

  11. Join() 方法

    join() 方法定义在 Thread 类中,所以调用者必须是一个线程,join() 方法主要是让调用该方法的 Thread 完成 run() 方法里面的东西后,再执行 join() 方法后面的代码。

            Thread t1 = new Thread(计数线程一);  
            Thread t2 = new Thread(计数线程二);  
            t1.start(); 
            t1.join(); // 等待计数线程一执行完成,再执行计数线程二
            t2.start();  
> 启动 t1 后,调用了 join() 方法,直到 t1 的计数任务结束,才轮到 t2 启动,然后 t2 才开始计数任务,两个线程是按着严格的顺序来执行的。  
> 如果 t2 的执行需要依赖于 t1 中的完整数据的时候,这种方法就可以很好的确保两个线程的同步性。
  1. Thread.yield() 方法

    Thread.sleep(long time):线程暂时终止执行(睡眠)一定的时间。
    Thread.yield():线程放弃运行,将CPU的控制权让出。
    这两个方法都会将当前运行线程的CPU控制权让出来,但 sleep() 方法在指定的睡眠时间内一定不会再得到运行机会,直到它的睡眠时间完成;
    而 yield() 方法让出控制权后,还有可能马上被系统的调度机制选中来运行,比如,执行yield()方法的线程优先级高于其他的线程,那么这个线程即使执行了 yield() 方法也可能不能起到让出CPU控制权的效果,因为它让出控制权后,进入排队队列,调度机制将>从等待运行的线程队列中选出一个等级最高的线程来运行,那么它又(很可能)被选中来运行。

Java 多线程的实现的三种方式

  1. 继承thread方式
        public class newThread extends Thread{
            @Override
            public void run() {
                //do something
            }
            @Override
            public void start() {
                //do something
            }
        }
> Thread 类中的 start() 和 run() 方法有什么区别  
> 调用 start() 方法才会启动新线程;如果直接调用 Thread 的 run() 方法,它的行为就会和普通的方法一样;为了在新的线程中执行我们的代码,必须使用 Thread.start() 方法。
  1. 实现runable接口方式
        public class NewThread implements Runnable{
            @Override
            public void run() {
                //do something
            }
        }
  1. 继承thread其实也是实现runable接口的方式实现的
  2. 实现callable接口
        public class NewCallableThread implements Callable<String> {//String 是线程的返回值类型
            private String threadName;
            private int count = 0;
            private Object obj;

            public NewCallableThread(String threadName, int count, Object obj) {
                this.threadName = threadName;
                this.count = count;
                this.obj   = obj;
            }

            @Override
            public String call(){
                while(count < 100) {
                    try {
                        count ++;
                        Thread.sleep(1000);
                        System.out.println(threadName + ">count>" + count +"object is " + obj);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                return "这是返回值";//callable接口可以返回线程的执行结果,也可以抛出异常,例如此线程处理完的数据结果可返回给调用者,但是线程不会释放资源直到拿到返回值,所以如果不需要返回return null即可
            }

            public static void main(String[] args) {
                NewCallableThread thread1 = new NewCallableThread("thread1", 0, null);
                thread1.call();
                NewCallableThread thread2 = new NewCallableThread("thread2", 5, "hello");
                thread2.call();
            }

        }
  1. 单线程和多线程演示
        public class NewThread extends Thread {
            private String threadName;
            private int count = 0;
            private Object obj;//obj 是任何类型的可将此对象传入线程中使用

            public NewThread(String threadName, int count, Object obj) {
                this.threadName = threadName;
                this.count = count;
                this.obj   = obj;
            }

            @Override
            public void run(){
                while(count < 100) {
                    try {
                        count ++;
                        Thread.sleep(1000);
                        System.out.println(threadName + ">count>" + count +"object is " + obj);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }

            public static void main(String[] args) {
                NewThread thread1 = new NewThread("thread1", 0, null);
                thread1.run();
                NewThread thread2 = new NewThread("thread2", 5, "hello");
                thread2.run();
            }
        }
> thread1、thread2都是单独的一个线程,都是由CPU调度执行
  1. 附runable和callable的接口源码
         public interface Runnable {
             public void run();
         }

         public interface Callable<V> {
             V call() throws Exception;
        }
  1. runable和callable的对比

    Callable 接口下的方法是 call(),Runnable 接口的方法是 run()。
    Callable 的任务执行后可返回值,而 Runnable 的任务是不能返回值的。
    call() 方法可以抛出异常,run()方法不可以的。
    运行 Callable 任务可以拿到一个 Future 对象,表示异步计算的结果。它提供了检查计算是否完成的方法,以等待计算的完成,并检索计算的结果。
    通过 Future 对象可以了解任务执行情况,可取消任务的执行,还可获取执行结果。

  2. 附future接口的源码

         public interface Future<V> {

             boolean cancel(boolean mayInterruptIfRunning);

             boolean isCancelled();

             boolean isDone();

             V get() throws InterruptedException, ExecutionException;

             V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
         }

boolean cancel(boolean mayInterruptIfRunning);
此方法试图取消对此任务的执行。[任务已完成、已取消,或者由于某些其他原因而无法取消,则此尝试将失败,cancel调用成功后,isCancelled和isDone将始终返回true]
mayInterruptIfRunning 参数确定是否应该使用试图停止任务的方式来中断执行此任务的线程

boolean isCancelled(); 如果在任务正常完成前将其取消,则返回 true

boolean isDone(); 任务是否已完成

V get() throws InterruptedException, ExecutionException; 如有必要等待线程执行完毕,获取返回结果

V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; 在规定时间内等待线程执行完毕,获取返回结果

  1. FutureTask-Thread的设计优化
             public class FutureTask<V> implements RunnableFuture<V> {
                 ...
             }

             public interface RunnableFuture<V> extends Runnable, Future<V> {
                 void run();
             }

FutureTask 实现了 Runnable 和 Future,所以兼顾两者优点,既可以在 Thread 中使用,又可以在 ExecutorService 中使用。

            Callable<String> callable = new Callable<String>() {
                @Override
                public String call() throws Exception {
                    return "something";
                }
            };
            FutureTask<String> task = new FutureTask<String>(callable);

            Thread t = new Thread(task);
            t.start(); // 启动线程
            task.cancel(true); // 取消线程

使用 FutureTask 的好处是 FutureTask 弥补了Thread 的不足,它可以让调用者准确地知道线程什么时候执行完成并获得线程执行完成后的返回结果。
FutureTask 是一种可以取消的异步的计算任务,它的计算是通过 Callable 实现的,它等价于可以携带结果的 Runnable,并且有三个状态:等待、运行和完成。
完成包括所有计算以任意的方式结束,包括正常结束、取消和异常。


线程池

  1. 线程池简介

    Java里面线程池的顶级接口是 Executor,不过真正的线程池接口是 ExecutorService, ExecutorService 的默认实现是 ThreadPoolExecutor;
    普通类 Executors 里面调用的就是 ThreadPoolExecutor。

  2. 线程池的优点

    1 避免线程的创建和销毁带来的性能开销。
    2 避免大量的线程间因互相抢占系统资源导致的阻塞现象。
    3 能够对线程进行简单的管理并提供定时执行、间隔执行等功能。

  3. 如何初始化一个线程池

        public class NewPool {
            final static int corePoolSize = 5;
            final static int maximumPoolSize = 5;
            final static BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>(1000);
            private static ExecutorService pool = null;
    
            //初始化线程池
            public static void initPool() throws InterruptedException {
                pool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 0L, TimeUnit.MILLISECONDS, workQueue);
                //或者使用Executors创建线程池
                //pool = Executors.newSingleThreadExecutor();
                //pool = Executors.newCachedThreadPool();
                //pool = Executors.newFixedThreadPool(5);
                //pool = Executors.newScheduledThreadPool(5);
            }
            //当有新的数据时调用poolAddThread(String,int,Object obj)处理传入的数据,
            //例如如obj, object就相当于数据
            public static void poolAddThread(String threadName, int count, Object obj) {
                NewThread thread = new NewThread(threadName, count, obj);
                pool.submit(thread);
                NewCallableThread callThread = new NewCallableThread(threadName, count, obj);
                pool.submit(callThread);
                pool.execute(thread);
            }
    
            public static void main(String[] args) throws InterruptedException {
                initPool();
            }
        }

    execute 和submit的区别在于submit可以抛出异常和返回结果,给外部调用者处理,例如某一个task失败则终止其他task执行,或者shutdown线程池.

  4. 线程池分类及比较-Executors 提供四种线程池

    newCachedThreadPool
    可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。对于执行很多短期异步任务的程序而言,这些线程池通常可提高程序性能。
    调用 execute() 将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中,
    终止并从缓存中移除那些已有 60 秒钟未被使用的线程。
    因此,长时间保持空闲的线程池不会使用任>何资源。注意,可以使用 ThreadPoolExecutor 构造方法创建具有类似属性但细节不同(例如超时参数)的线程池。

    newSingleThreadExecutor
    创建一个单线程池,也就是该线程池只有一个线程在工作,所有的任务是串行执行的,如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它,
    此线程池保证所有任务的执行顺序按照任务的提交顺序执行。

    newFixedThreadPool
    创建固定大小的线程池,每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小,线程池的大小一旦达到最大值就会保持不变,
    如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。

    newScheduledThreadPool
    创建一个大小无限的线程池,此线程池支持定时以及周期性执行任务的需求。

             public interface Executor {
                 void execute(Runnable command);
             }

             public interface ExecutorService extends Executor {
                 void shutdown();
                 List<Runnable> shutdownNow();

                 boolean isShutdown();
                 boolean isTerminated();

                 <T> Future<T> submit(Callable<T> task);
                 <T> Future<T> submit(Runnable task, T result);
                 Future<?> submit(Runnable task);
                 ...
             }

             public class Executors {
                 public static ExecutorService newCachedThreadPool() {
                         return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, 
                                         new SynchronousQueue<Runnable>());
                 }
                 //...
             }
  1. ThreadPoolExecutor构造函数
             public ThreadPoolExecutor(int corePoolSize,
                                     int maximumPoolSize,
                                     long keepAliveTime,
                                     TimeUnit unit,
                                     BlockingQueue<Runnable> workQueue,
                                     ThreadFactory threadFactory) {
                 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 
                     threadFactory, defaultHandler);
             }
> 1 corePoolSize:线程池的核心线程数,一般情况下不管有没有任务都会一直在线程池中一直存活,只有在 ThreadPoolExecutor 中的方法  
> allowCoreThreadTimeOut(boolean value) 设置为 true 时,闲置的核心线程会存在超时机制,如果在指定时间没有新任务来时,  
> 核心线程也会被终止,而这个时间间隔由第3个属性 keepAliveTime 指定。

> 2 maximumPoolSize:线程池所能容纳的最大线程数,当活动的线程数达到这个值后,后续的新任务将会被阻塞。

> 3 keepAliveTime:控制线程闲置时的超时时长,超过则终止该线程。一般情况下用于非核心线程,  
> 只有在 ThreadPoolExecutor 中的方法 allowCoreThreadTimeOut(boolean value) 设置为 true时,也作用于核心线程。

> 4 unit:用于指定 keepAliveTime 参数的时间单位,TimeUnit 是个 enum 枚举类型,常用的有:TimeUnit.HOURS(小时)、TimeUnit.MINUTES(分钟)、  
> TimeUnit.SECONDS(秒) 和TimeUnit.MILLISECONDS(毫秒)等。

> 5 workQueue:线程池的任务队列,通过线程池的 execute(Runnable command) 方法会将任务 Runnable 存储在队列中,之后我会出一个submit和excute的详细对比。

> 6 threadFactory:线程工厂,它是一个接口,用来为线程池创建新线程的。
  1. 线程池的关闭

    ThreadPoolExecutor 提供了两个方法,用于线程池的关闭,分别是 shutdown() 和 shutdownNow()。
    shutdown():不会立即的终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务。
    shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务。


参考https://www.jianshu.com/p/b8197dd2934c



   Reprint policy


《THREAD-POOL-多线程并发》 by jackromer is licensed under a Creative Commons Attribution 4.0 International License
 Previous
NETTY-NIO NETTY-NIO
什么是netty Netty是一个NIO网络编程框架,快速开发高性能、高可靠性的网络服务器/客户端程序。 极大地简化了TCP和UDP等网络编程。是一个异步事件驱动的网络框架,快速、高性能。RPC(pigeon、dubbo、HSF)Hado
2019-08-27
Next 
STORM介绍,集群搭建以及topology使用 STORM介绍,集群搭建以及topology使用
本文主要介绍STORM,以及storm集群的安装和使用 什么是storm Apache Storm是自由开源的分布式实时计算系统,擅长处理海量数据,适用于数据实时处理而非批处理。批处理使用的大多是鼎鼎大名的hadoop或者hive,作为一
2019-08-27
  目录