Technology

Chart Type 《大数据经典论文解读》 三驾马车学习 Spark 内存管理及调优 Yarn学习 从Spark部署模式开始讲源码分析 容器狂占内存资源怎么办? 多角度理解一致性 golang io使用及优化模式 Flink学习 c++学习 学习ebpf go设计哲学 ceph学习 学习mesh kvm虚拟化 学习MQ go编译器 学习go 为什么要有堆栈 汇编语言 计算机组成原理 运行时和库 Prometheus client mysql 事务 mysql 事务的隔离级别 mysql 索引 坏味道 学习分布式 学习网络 学习Linux go 内存管理 golang 系统调用与阻塞处理 Goroutine 调度过程 重新认识cpu mosn有的没的 负载均衡泛谈 单元测试的新解读 《Redis核心技术与实现》笔记 《Prometheus监控实战》笔记 Prometheus 告警学习 calico源码分析 对容器云平台的理解 Prometheus 源码分析 并发的成本 基础设施优化 hashicorp raft源码学习 docker 架构 mosn细节 与微服务框架整合 Java动态代理 编程范式 并发通信模型 《网络是怎样连接的》笔记 go channel codereview gc分析 jvm 线程实现 go打包机制 go interface及反射 如何学习Kubernetes 《编译原理之美》笔记——后端部分 《编译原理之美》笔记——前端部分 Pilot MCP协议分析 go gc 内存管理玩法汇总 软件机制 istio流量管理 Pilot源码分析 golang io 学习Spring mosn源码浅析 MOSN简介 《datacenter as a computer》笔记 学习JVM Tomcat源码分析 Linux可观测性 学习存储 学计算 Gotty源码分析 kubernetes operator kaggle泰坦尼克问题实践 kubernetes扩缩容 神经网络模型优化 直觉上理解深度学习 如何学习机器学习 TIDB源码分析 什么是云原生 Alibaba Java诊断工具Arthas TIDB存储——TIKV 《Apache Kafka源码分析》——简介 netty中的线程池 guava cache 源码分析 Springboot 启动过程分析 Spring 创建Bean的年代变迁 Linux内存管理 自定义CNI IPAM 共识算法 spring redis 源码分析 kafka实践 spring kafka 源码分析 Linux进程调度 让kafka支持优先级队列 Codis源码分析 Redis源码分析 C语言学习 《趣谈Linux操作系统》笔记 docker和k8s安全访问机制 jvm crash分析 Prometheus 学习 Kubernetes监控 容器日志采集 Kubernetes 控制器模型 容器狂占资源怎么办? Kubernetes资源调度——scheduler 时序性数据库介绍及对比 influxdb入门 maven的基本概念 《Apache Kafka源码分析》——server Kubernetes类型系统 源码分析体会 《数据结构与算法之美》——算法新解 Kubernetes源码分析——controller mananger Kubernetes源码分析——apiserver Kubernetes源码分析——kubelet Kubernetes介绍 ansible学习 Kubernetes源码分析——从kubectl开始 jib源码分析之Step实现 jib源码分析之细节 线程排队 跨主机容器通信 jib源码分析及应用 为容器选择一个合适的entrypoint kubernetes yaml配置 《持续交付36讲》笔记 mybatis学习 程序猿应该知道的 无锁数据结构和算法 CNI——容器网络是如何打通的 为什么很多业务程序猿觉得数据结构和算法没用? 串一串一致性协议 当我在说PaaS时,我在说什么 《数据结构与算法之美》——数据结构笔记 PouchContainer技术分享体会 harbor学习 用groovy 来动态化你的代码 精简代码的利器——lombok 学习 《深入剖析kubernetes》笔记 编程语言那些事儿 rxjava3——背压 rxjava2——线程切换 spring cloud 初识 《深入拆解java 虚拟机》笔记 《how tomcat works》笔记 hystrix 学习 rxjava1——概念 Redis 学习 TIDB 学习 如何分发计算 Storm 学习 AQS1——论文学习 Unsafe Spark Stream 学习 linux vfs轮廓 《自己动手写docker》笔记 java8 实践 中本聪比特币白皮书 细读 区块链泛谈 比特币 大杂烩 总纲——如何学习分布式系统 hbase 泛谈 forkjoin 泛谈 看不见摸不着的cdn是啥 《jdk8 in action》笔记 程序猿视角看网络 bgp初识 calico学习 AQS——粗略的代码分析 我们能用反射做什么 web 跨域问题 《clean code》笔记 《Elasticsearch权威指南》笔记 mockito简介及源码分析 2017软件开发小结—— 从做功能到做系统 《Apache Kafka源码分析》——clients dns隐藏的一个坑 《mysql技术内幕》笔记 log4j学习 为什么netty比较难懂? 回溯法 apollo client源码分析及看待面向对象设计 学习并发 docker运行java项目的常见问题 OpenTSDB 入门 spring事务小结 分布式事务 javascript应用在哪里 《netty in action》读书笔记 netty对http2协议的解析 ssl证书是什么东西 http那些事 苹果APNs推送框架pushy apple 推送那些事儿 编写java框架的几大利器 java内存模型 java exception Linux IO学习 netty内存管理 测试环境docker化实践 netty在框架中的使用套路 Nginx简单使用 《Linux内核设计的艺术》小结 Go并发机制及语言层工具 Linux网络源代码学习——数据包的发送与接收 《docker源码分析》小结 docker namespace和cgroup Linux网络源代码学习——整体介绍 zookeeper三重奏 数据库的一些知识 Spark 泛谈 链式处理的那些套路 netty回顾 Thrift基本原理与实践(二) Thrift基本原理与实践(一) 回调 异步执行抽象——Executor与Future Docker0.1.0源码分析 java gc Jedis源码分析 深度学习泛谈 Linux网络命令操作 JTA与TCC 换个角度看待设计模式 Scala初识 向Hadoop学习NIO的使用 以新的角度看数据结构 并发控制相关的硬件与内核支持 systemd 简介 quartz 源码分析 基于docker搭建测试环境(二) spring aop 实现原理简述 自己动手写spring(八) 支持AOP 自己动手写spring(七) 类结构设计调整 分析log日志 自己动手写spring(六) 支持FactoryBean 自己动手写spring(九) 总结 自己动手写spring(五) bean的生命周期管理 自己动手写spring(四) 整合xml与注解方式 自己动手写spring(三) 支持注解方式 自己动手写spring(二) 创建一个bean工厂 自己动手写spring(一) 使用digester varnish 简单使用 关于docker image的那点事儿 基于docker搭建测试环境 分布式配置系统 JVM执行 git maven/ant/gradle/make使用 再看tcp kv系统 java nio的多线程扩展 《Concurrency Models》笔记 回头看Spring IOC IntelliJ IDEA使用 Java泛型 vagrant 使用 Go常用的一些库 Python初学 Goroutine 调度模型 虚拟网络 《程序员的自我修养》小结 Kubernetes存储 访问Kubernetes上的Service Kubernetes副本管理 Kubernetes pod 组件 Go基础 JVM类加载 硬币和扑克牌问题 LRU实现 virtualbox 使用 ThreadLocal小结 docker快速入门

Architecture

实时训练 分布式链路追踪 helm tensorflow原理——python层分析 如何学习tensorflow 数据并行——allreduce 数据并行——ps 机器学习中的python调用c 机器学习训练框架概述 embedding的原理及实践 tensornet源码分析 大模型训练 X的生成——特征工程 tvm tensorflow原理——core层分析 模型演变 《深度学习推荐系统实战》笔记 keras 和 Estimator tensorflow分布式训练 分布式训练的一些问题 基于Volcano的弹性训练 图神经网络 pytorch弹性分布式训练 在离线业务混部 RNN pytorch分布式训练 CNN 《动手学深度学习》笔记 pytorch与线性回归 多活 volcano特性源码分析 推理服务 kubebuilder 学习 mpi 学习pytorch client-go学习 tensorflow学习 提高gpu 利用率 GPU与容器的结合 GPU入门 AI云平台 tf-operator源码分析 k8s批处理调度 喜马拉雅容器化实践 Kubernetes 实践 学习rpc BFF 生命周期管理 openkruise学习 可观察性和监控系统 基于Kubernetes选主及应用 《许式伟的架构课》笔记 Kubernetes webhook 发布平台系统设计 k8s水平扩缩容 Scheduler如何给Node打分 Scheduler扩展 controller 组件介绍 openkruise cloneset学习 controller-runtime源码分析 pv与pvc实现 csi学习 client-go源码分析 kubelet 组件分析 调度实践 Pod是如何被创建出来的? 《软件设计之美》笔记 mecha 架构学习 Kubernetes events学习及应用 CRI 资源调度泛谈 业务系统设计原则 grpc学习 元编程 以应用为中心 istio学习 下一代微服务Service Mesh 《实现领域驱动设计》笔记 serverless 泛谈 概率论 《架构整洁之道》笔记 处理复杂性 那些年追过的并发 服务器端编程 网络通信协议 架构大杂烩 如何学习架构 《反应式设计模式》笔记 项目的演化特点 反应式架构摸索 函数式编程的设计模式 服务化 ddd反模式——CRUD的败笔 研发效能平台 重新看面向对象设计 业务系统设计的一些体会 函数式编程 《左耳听风》笔记 业务程序猿眼中的微服务管理 DDD实践——CQRS 项目隔离——案例研究 《编程的本质》笔记 系统故障排查汇总及教训 平台支持类系统的几个点 代码腾挪的艺术 abtest 系统设计汇总 《从0开始学架构》笔记 初级权限系统设计 领域驱动理念入门 现有上传协议分析 移动网络下的文件上传要注意的几个问题 推送系统的几个基本问题 用户登陆 做配置中心要想好的几个基本问题 不同层面的异步 分层那些事儿 性能问题分析 当我在说模板引擎的时候,我在说什么 用户认证问题 资源的分配与回收——池 消息/任务队列


异步执行抽象——Executor与Future

2016年07月08日

简介

Runnable + Thread 实现了 logic 和 runner 的分离,runner 又进一步扩展为 executor 以便线程复用(控制并发度)。代码交给另一个线程执行,还有一个好处,就是保护调用者线程。在一个项目中,不同的线程的重要性是不同的,比如tomcat 线程池中的线程、mq 消费者线程、netty 的事件驱动线程等,它们是驱动 代码执行的源动力。假设tomcat 线程池一共10个线程,当中有一个任务处理较慢,一个线程被占用较长的时间,会严重限制tomcat的吞吐量。但总有各种耗时的任务,此时,一个重要方法是将 任务交给另一个 线程执行。调用线程 持有 future 对象,可以主动选择 等、不等或者等多长时间。这一点 可以在hystrix 看到。

Executor

Executor provides a way of decoupling task submission from the mechanics of how each task will be run, including details of thread use, scheduling, etc. Executor 是一个如此成功的抽象,就像linux的File 接口一样。 任务的提交与执行相分离。 PS:有点类似于Spring IOC,Bean的创建与使用相分离。

Executor 框架为并发编程提供了一个完善的架构体系,不仅包括了线程池的管理,还提供了线程工厂、队列(类似于操作系统中的task_struct 数组)以及拒绝策略等,将线程的调度和管理设置在了用户态

线程复用——ThreadPoolExecutor

戏(细)说 Executor 框架线程池任务执行全过程(上)

提交任务

ThreadPoolExecutor.execute 这个方法看着比较简单,但是线程池什么时候创建新的作业线程来处理任务,什么时候只接收任务不创建作业线程,另外什么时候拒绝任务。线程池的接收任务、维护工作线程的策略都要在其中体现。

作业线程

与ThreadPerTaskExecutor 中的线程不同,线程复用之后,ThreadPoolExecutor 中的线程必须改造为拥有task处理逻辑的作业线程,还必须负责作业线程的创建于销毁。

有一个形象的比喻:经理给组长提任务,并不管组长是自己做还是分派给下面的小伙伴。经理等着组长report 即可,从小伙伴的视角看,每天的“日常”就是不停的从组长那里领取task,组长视情况给任务排期,实在忙不过来便增加人手

作业线程逻辑

worker线程在受限的条件下创建,其工作内容便是 不停的从workQueue 中取出task 并执行。

private final class Worker implements Runnable{
    public void run() {
        try {
            Runnable task = firstTask;
            // 循环从线程池的任务队列获取任务 
            while (task != null || (task = getTask()!= null) {
                runTask(task);// 执行任务 
                task = null;
            }
        } finally {
            workerDone(this);
        }
    }
    private void runTask(Runnable task) {         
            task.run();
    }
}

你真的了解线程池吗?每一个Worker在创建出来的时候,会调用它本身的run()方法,实现是runWorker(this),这个实现的核心是一个while循环,这个循环不结束,Worker线程就不会终止,就是这个基本逻辑。

  1. 在这个while条件中,有个getTask()方法是核心中的核心,它所做的事情就是从等待队列中取出任务来执行
  2. 如果没有达到corePoolSize,则创建的Worker在执行完它承接的任务后,核心线程会用workQueue.take()取任务、注意,这个接口是阻塞接口,如果取不到任务,Worker线程一直阻塞。
  3. 如果超过了corePoolSize,或者allowCoreThreadTimeOut,一个Worker在空闲了之后,非核心线程会用workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)取任务。注意,这个接口只阻塞等待keepAliveTime时间,超过这个时间返回null,则Worker的while循环执行结束,则被终止了。

作业线程的管理

ThreadPoolExecutor 作业线程 由一个HashSet 成员专门持有, 管理/crud大都由调用方线程触发

  1. caller thread 提交任务,在特定场景下(核心线程数、最大线程数、任务队列长度),由addThread 创建新线程
  2. caller thread 线程调用shutdown,作业线程在 没有任务或shutdown状态下自动结束

创建新的作业线程逻辑

private Thread addThread(Runnable firstTask) {
    // 为当前接收到的任务 firstTask 创建 Worker
    Worker w = new Worker(firstTask);
    Thread t = threadFactory.newThread(w);
    w.thread = t;
    // 将 Worker 添加到作业集合 HashSet<Worker> workers 中,并启动作业 
    workers.add(w);
    t.start();
    return t;
}

对于资源紧张的应用,如果担心线程池资源使用不当,可以利用ThreadPoolExecutor的API(有很多get方法可以获取状态)实现简单的监控,然后进行分析和优化。

对Executor 的扩展

对Executor 的扩展 主要体现在几个方面

  1. 规范 作业线程的管理,比如ExecutorService
  2. 提供 更丰富的 异步处理返回值,帮忙执行下回调,比如guava 的ListeningExecutorService
  3. 优化特定场景,比如netty的SingleThreadEventExecutor,只有一个作业线程
  4. 针对特定业务场景,更改作业线程的处理逻辑(不单纯执行Runnable.run)。比如netty的EventLoopGroup,其作业线程逻辑为 io + task ,并可以根据ioRatio 调整io 与task的cpu 占比。

在 ExecutorService 中,正如其名字暗示的一样,定义了一个服务,定义了完整的线程池的行为,可以接受提交任务、执行任务、关闭服务。抽象类 AbstractExecutorService 类实现了 ExecutorService 接口,也实现了接口定义的默认行为。

Using as a generic library 将netty的并发编程库与guava 与jdk8 做了对比,Because Netty tries to minimize its set of dependencies, some of its utility classes are similar to those in other popular libraries, such as Guava.

在上图中,netty EventExecutorGroup 的方法返回的是netty 自己实现的io.netty.util.concurrent.Future extends java.util.concurrent.Future,guava 则直接一点,ListeningExecutorService 直接返回自己定义的com.google.common.util.concurrent.ListenableFuture extends java.util.concurrent.Future

EventExecutorGroup 使用实例(不一定非得netty里才能用)

EventExecutorGroup group = new DefaultEventExecutorGroup(4); // 4 threads
Future<?> f = group.submit(new Runnable() { ... });
f.addListener(new FutureListener<?> {
    public void operationComplete(Future<?> f) {
        ..
    }
});
...

Executor的使用

百花齐放的Future

同步方法有参数和返回值,异步方法也有参数和返回值,只是异步方法的返回值 统一为Future 抽象。我们可以直接对同步方法的返回值进行处理,而java 也在不断地对Future进行扩展以对异步结果进行处理

Chaining async calls using Java Futures

JAVA 拾遗–Future 模式与 Promise 模式

  1. A Future represents the result of an asynchronous computation.
  2. Future 模式相当于一个占位符,代表一个操作的未来的结果
  3. A Future (also called promise, task or deferred depending on the programming language) is a proxy to an asynchronous computation
  4. This approach of making Future a type means that an asynchronous computation is a first class object, which can be passed to other functions and received as a result。 如果异步方法只能通过回调来处理异步结果,则异步方法就不能作为另一个方法的参数了。
  5. Future also makes chains of transformations easily doable via functions like map, which allow you to transform the result into a result of another type while using a strongly typed and unit testable function. 与Future 神似的是Optional,它们都可以采用Stream 类似的方法链

谁来处理异步操作的结果

异步和回调是孪生兄弟,毕竟不管同步还是异步,都要对拿到的结果进行处理。对结果的处理,可以直接写在异步方法的回调中,也可以挂在异步方法返回的future中。异步本身分为调用线程和执行线程,对异步结果的后续处理也有几种情况

  1. 执行线程处理
  2. 额外传入一个executor线程(池)处理,此时对异步结果的处理 本身又可以一个异步操作

此外,我们可以按功能对线程池进行划分,比如rpc框架中的快慢线程池、IO框架中的IO线程池和CPU密集型线程池

One issue with complex workflows is that you might have a mixture of CPU and I/O intensive steps. The most common way to solve this problem is to use two thread pools, one with a large number of threads for I/O and another one with one thread per physical core for CPU intensive tasks. This means that in addition to specifying the order of the steps, we also have to specify where should each function be executed. Future also helps us with this problem, because the supplyAsync method also allows fine grained control of where each function should be executed.

FutureTask

我们来看一个Futrue的简单使用

ExecutorService executor = Executors.newFixedThreadPool();
Future<Integer> future = executor.submit(new MyJob()));

跟踪submit方法所属的类,Executors.newFixedThreadPool() ==> ThreadPoolExecutor ==> AbstractExecutorService

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
}

返回的future是一个FutureTask,FutureTask是interface RunnableFuture<V> extends Runnable, Future<V>的实现类。 你真的了解线程池吗?get的核心实现是有个awaitDone方法,这是一个死循环,只有任务的状态是“已完成”,才会跳出死循环;否则会依赖UNSAFE包下的LockSupport.park原语进行阻塞,等待LockSupport.unpark信号量。而这个信号量只有当运行结束获得结果、或者出现异常的情况下,才会发出来。分别对应方法set和setException。这就是异步执行、阻塞获取的原理

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        // 核心代码
        s = awaitDone(false, 0L);
   
    return report(s);
}
private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;  boolean queued = false;
    for (;;) { // 死循环
        if (Thread.interrupted()) { removeWaiter(q);throw new InterruptedException();}
        int s = state;
        // 只有任务的状态是’已完成‘,才会跳出死循环
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        else if (q == null)
            q = new WaitNode();
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }
        else
            LockSupport.park(this);
    }
}

对Future 的扩展

不管同步异步,都要拿到数据的结果,并且对拿到的结果进行后续处理。区别只是,同步代码是按照时间顺序书写的,更符合人类直觉,而异步代码则要转换下思维,Future future = timeConsumingOperation() 之后立马future.get() 就没什么意思了, 所以异步代码的“文风”(学名:异步流程控制模式)有几种

  1. 串行(series),后一个调用参数依赖前一个调用的结果。
  2. 并行(parallel),连续发起多个异步操作,然后对异步结果进行组合
  3. 瀑布(waterfall)等,后一个调用是否执行 + 调用参数 依赖前一个调用的结果

解决回调地狱——promise模式

CompletableFuture原理与实践-外卖商家端API的异步化在Java8之前我们一般通过Future实现异步,Future用于表示异步计算的结果,只能通过阻塞或者轮询的方式获取结果,而且不支持设置回调方法。Netty 和 Guava 的扩展都提供了 addListener 这样的接口,用于处理 Callback 调用,但future的Callback容易出现回调地狱的问题,由此衍生出了 Promise 模式来解决这个问题。

jdk1.8 也提供了相关的方案:CompletableFuture,A Future that may be explicitly completed (setting its value and status), and may be used as a CompletionStage, supporting dependent functions and actions that trigger upon its completion.

[concurrency-interest] CompletableFuture CompletableFuture 曾经被讨论过以下命名:SettableFuture, FutureValue, Promise, and probably others.

基于异步接口组织业务逻辑——编排/Futures

the biggest advantage of using Futures is composability. You might imagine that dealing with transformations which are themselves asynchronous means having to somehow extract your result from a mess that looks like Future<Future<…<Future<T>>…>>. The existence of methods like thenCompose means that any sequence of asynchronous operations will be handled like one asynchronous operation in the rest of your program and this what makes reasoning about and working with these operations much easier. 将多个异步操作组合为一个异步操作

guava ListenableFuture和AbstractFuture

ListenableFuture的简单使用

ListeningExecutorService executorService=MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
final ListenableFuture<Integer> listenableFuture = executorService.submit(new MyJob<Integer>());
// 添加监听事件
Futures.addCallback(listenableFuture, new FutureCallback() {
    public void onSuccess(Integer result) {}
    public void onFailure(Throwable thrown) {}
});

跟踪submit方法所属的类,ListeningExecutorService ==> AbstractListeningExecutorService ==> AbstractExecutorService

public abstract class AbstractListeningExecutorService extends AbstractExecutorService{
    protected final <T> ListenableFutureTask<T> newTaskFor(Runnable runnable, T value){
        return ListenableFutureTask.create(runnable, value);
    }
    public <T> ListenableFuture<T> submit(Callable<T> task) {
        return (ListenableFuture)super.submit(task);
    }
}
public abstract class AbstractExecutorService implements ExecutorService{
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
}

实际执行的submit方法和上节的submit方法一样一样的,但在submit方法中,上节执行的是AbstractExecutorService.newTaskFor返回FutureTask,此处执行的是AbstractListeningExecutorService.newTaskFor返回ListenableFutureTask,其实际也是个java.util.concurrent.FutureTask。所以一个ListenableFuture具有cancel的能力就不奇怪了。看来本质上,ListenableFutureTask取消任务的方式还是和FutureTask一样。

ListenableFuture所具备的addListener方法则是任务挂在一个地方,当run方法执行完毕后,执行这些任务。(不同的guava版本实现代码有很大不同)

CompletionFutre

我们看jdk1.8 CompletionFutre,可以看到:各种thenXX,即便对同步调用的返回值进行各种处理,也不过如此了。将异步代码写的如何更像 同步代码 一点,是异步抽象/封装一个发展方向

void business(){
    Value value1 = timeConsumingOperation1();
    Object result1 = function1(value1);
    Object result2 = function2(value1);
    Value value2 = timeConsumingOperation2();
    Object result3 = function3(value1,value2);
    ...
}
void business(){
    CompletionFutre future1 = timeConsumingOperationAsync1();
    CompletionFutre future2 = timeConsumingOperationAsync2();
    future1.thenApply(function1).thenApply(function2).thenCombine(future2,function3);
    ...
}

CompletableFuture中包含两个字段:result和stack。result用于存储当前CF的结果,stack(Completion)表示当前CF完成后需要触发的依赖动作(Dependency Actions),去触发依赖它的CF的计算,依赖动作可以有多个(表示有多个依赖它的CF),以栈(Treiber stack)的形式存储,stack表示栈顶元素。