百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 编程网 > 正文

次时代Java编程(一) Java里的协程

yuyutoo 2025-03-03 20:52 1 浏览 0 评论

什么是协程(coroutine)

这东西其实有很多名词,比如有的人喜欢称为纤程(Fiber),或者绿色线程(GreenThread)。其实最直观的解释可以定义为线程的线程。有点拗口,但本质上就是这样。

我们先回忆一下线程的定义,操作系统产生一个进程,进程再产生若干个线程并行的处理逻辑,线程的切换由操作系统负责调度。传统语言C++ Java等线程其实与操作系统线程是1:1的关系,每个线程都有自己的Stack, Java在64位系统默认Stack大小是1024KB,所以指望一个进程开启上万个线程是不现实的。但是实际上我们也不会这么干,因为起这么多线程并不能充分的利用CPU,大部分线程处于等待状态,CPU也没有这么核让线程使用。所以一般线程数目都是CPU的核数。

传统的J2EE系统都是基于每个请求占用一个线程去完成完整的业务逻辑,(包括事务)。所以系统的吞吐能力取决于每个线程的操作耗时。如果遇到很耗时的I/O行为,则整个系统的吞吐立刻下,比如JDBC是同步阻塞的,这也是为什么很多人都说数据库是瓶颈的原因。这里的耗时其实是让CPU一直在等待I/O返回,说白了线程根本没有利用CPU去做运算,而是处于空转状态。暴殄天物啊。另外过多的线程,也会带来更多的ContextSwitch开销。

Java的JDK里有封装很好的ThreadPool,可以用来管理大量的线程生命周期,但是本质上还是不能很好的解决线程数量的问题,以及线程空转占用CPU资源的问题。

现阶段行业里的比较流行的解决方案之一就是单线程加上异步回调。其代表派是node.js以及Java里的新秀Vert.x。他们的核心思想是一样的,遇到需要进行I/O操作的地方,就直接让出CPU资源,然后注册一个回调函数,其他逻辑则继续往下走,I/O结束后带着结果向事件队列里插入执行结果,然后由事件调度器调度回调函数,传入结果。这时候执行的地方可能就不是你原来的代码区块了,具体表现在代码层面上,你会发现你的局部变量全部丢失,毕竟相关的栈已经被覆盖了,所以为了保存之前的栈上数据,你要么选择带着一起放入回调函数里,要么就不停的嵌套,从而引起反人类的Callback hell.

因此相关的Promise,CompletableFuture等技术都是为解决相关的问题而产生的。但是本质上还是不能解决业务逻辑的割裂。

说了这么多,终于可以提一下协程了,协程的本质上其实还是和上面的方法一样,只不过他的核心点在于调度那块由他来负责解决,遇到阻塞操作,立刻yield掉,并且记录当前栈上的数据,阻塞完后立刻再找一个线程恢复栈并把阻塞的结果放到这个线程上去跑,这样看上去好像跟写同步代码没有任何差别,这整个流程可以称为coroutine,而跑在由coroutine负责调度的线程称为Fiber。比如Golang里的 go关键字其实就是负责开启一个Fiber,让func逻辑跑在上面。而这一切都是发生的用户态上,没有发生在内核态上,也就是说没有ContextSwitch上的开销。

既然我们的标题叫Java里的协程,自然我们会讨论JVM上的实现,JVM上早期有kilim以及现在比较成熟的Quasar。而本文章会全部基于Quasar,因为kilim已经很久不更新了。

简单的例子,用Java写出Golang的味道

上面已经说明了什么是Fiber,什么是coroutine。这里尝试通过Quasar来实现类似于golang的coroutine以及channel。这里假设各位已经大致了解golang。

为了对比,这里先用golang实现一个对于10以内自然数分别求平方的例子,当然了可以直接单线程for循环就完事了,但是为了凸显coroutine的高逼格,我们还是要稍微复杂化一点的。

golang

func counter(out chanint) {

for x := 0; x < 10; x++ {

out out)

}

func squarer(out chanint, in int) {

for v := range in {

out out)

}

func printer(in int) {

for v := range in {

fmt.Println(v)

}

}

func main() {

//定义两个int类型的channel

naturals := make(chan int)

squares := make(chan int)

//产生两个Fiber,用go关键字

go counter(naturals)

go squarer(squares, naturals)

//获取计算结果

printer(squares)

}

上面的例子,有点类似生产消费者模式,通过channel两解耦两边的数据共享。大家可以将channel理解为Java里的SynchronousQueue。那传统的基于线程模型的Java实现方式,想必大家都知道怎么做,这里就不啰嗦了,我直接上Quasar版的,几乎可以原封不动的copy golang的代码。

Java

public class Example {

private static void printer(Channel in) throws SuspendExecution, InterruptedException {

Integer v;

while ((v = in.receive()) != null) {

System.out.println(v);

}

}

public static void main(String[] args) throws ExecutionException, InterruptedException, SuspendExecution {

//定义两个Channel

Channel naturals = Channels.newChannel(-1);

Channel squares = Channels.newChannel(-1);

//运行两个Fiber实现.

new Fiber(() -> {

for (int i = 0; i < 10; i++) naturals.send(i); naturals.close(); }).start(); new Fiber(() -> {

Integer v;

while ((v = naturals.receive()) != null)

squares.send(v ** v);

squares.close();

}).start();

printer(squares);

}

}

看起来Java似乎要啰嗦一点,没办法这是Java的风格,而且毕竟不是语言上支持coroutine,是通过第三方的库。到后面我会考虑用其他JVM上的语言去实现,这样会显得更精简一点。

说到这里各位肯定对Fiber很好奇了。也许你会表示怀疑Fiber是不是如上面所描述的那样,下面我们尝试用Quasar建立一百万个Fiber,看看内存占用多少,我先尝试了创建百万个Thread

Java

for (int i = 0; i < 1_000_000; i++) { new Thread(() -> {

try {

Thread.sleep(10000);

} catch (InterruptedException e) {

e.printStackTrace();

}

}).start();

}

很不幸,直接报Exception in thread "main" java.lang.OutOfMemoryError: unable to create new native thread,这是情理之中的。下面是通过Quasar建立百万个Fiber


Java

public static void main(String[] args) throws ExecutionException, InterruptedException, SuspendExecution {

int FiberNumber = 1_000_000;

CountDownLatch latch = new CountDownLatch(1);

AtomicInteger counter = new AtomicInteger(0);

for (int i = 0; i < FiberNumber; i++) { new Fiber(() -> {

counter.incrementAndGet();

if (counter.get() == FiberNumber) {

System.out.println("done");

}

Strand.sleep(1000000);

}).start();

}

latch.await();

}

我这里加了latch,阻止程序跑完就关闭,Strand.sleep其实跟Thread.sleep一样,只是这里针对的是Fiber

最终控制台是可以输出done的,说明程序已经创建了百万个Fiber,设置Sleep是为了让Fiber一直运行,从而方便计算内存占用。官方宣称一个空闲的Fiber大约占用400Byte,那这里应该是占用400MB堆内存,但是这里通过jmap -heap pid显示大约占用了1000MB,也就是说一个Fiber占用1KB。



Quasar是怎么实现Fiber的

其实Quasar实现的coroutine的方式与Golang很像,只不过一个是框架级别实现,一个是语言内置机制而已。

如果你熟悉了Golang的调度机制,那理解Quasar的调度机制就会简单很多,因为两者是差不多的。

Quasar里的Fiber其实是一个continuation,他可以被Quasar定义的scheduler调度,一个continuation记录着运行实例的状态,而且会被随时中断,并且也会随后在他被中断的地方恢复。Quasar其实是通过修改bytecode来达到这个目的,所以运行Quasar程序的时候,你需要先通过java-agent在运行时修改你的代码,当然也可以在编译期间这么干。golang的内置了自己的调度器,Quasar则默认使用ForkJoinPool这个JDK7以后才有的,具有work-stealing功能的线程池来当调度器。work-stealing非常重要,因为你不清楚哪个Fiber会先执行完,而work-stealing可以动态的从其他的等等队列偷一个context过来,这样可以最大化使用CPU资源。

那这里你会问了,Quasar怎么知道修改哪些字节码呢,其实也很简单,Quasar会通过java-agent在运行时扫描哪些方法是可以中断的,同时会在方法被调用前和调度后的方法内插入一些continuation逻辑,如果你在方法上定义了@Suspendable注解,那Quasar会对调用该注解的方法做类似下面的事情。

这里假设你在方法f上定义了@Suspendable,同时去调用了有同样注解的方法g,那么所有调用f的方法会插入一些字节码,这些字节码的逻辑就是记录当前Fiber栈上的状态,以便在未来可以动态的恢复。(Fiber类似线程也有自己的栈)。在suspendable方法链内Fiber的父类会调用Fiber.park,这样会抛出SuspendExecution异常,从而来停止线程的运行,好让Quasar的调度器执行调度。这里的SuspendExecution会被Fiber自己捕获,业务层面上不应该捕获到。如果Fiber被唤醒了(调度器层面会去调用Fiber.unpark),那么f会在被中断的地方重新被调用(这里Fiber会知道自己在哪里被中断),同时会把g的调用结果(g会return结果)插入到f的恢复点,这样看上去就好像g的return是flocal variables了,从而避免了callback嵌套。

上面啰嗦了一大堆,其实简单点讲就是,想办法让运行中的线程栈停下来,好让Quasar的调度器介入。JVM线程中断的条件只有两个,一个是抛异常,另外一个就是return。这里Quasar就是通过抛异常的方式来达到的,所以你会看到我上面的代码会抛出SuspendExecution。但是如果你真捕获到这个异常,那就说明有问题了,所以一般会这么写。

Java

@Suspendable

public int f() {

try {

// do some stuff

return g() ** 2;

} catch(SuspendExecution s) {

//这里不应该捕获到异常.

throw new AssertionError(s);

}

}

与Golang性能对比

在github上无意中发现一个有趣的benchmark,大致是测试各种语言在生成百万actor/Fiber的开销skynet。

大致的逻辑是先生成10个Fiber,每个Fiber再生成10个Fiber,直到生成1百万个Fiber,然后每个Fiber做加法累积计算,并把结果发到channel里,这样一直递归到根Fiber。后将最终结果发到channel。如果逻辑没有错的话结果应该是499999500000。我们搞个Quasar版的,来测试一下性能。

所有的测试都是基于我的Macbook Pro Retina 2013later。Quasar-0.7.5:JDK8,JDK 1.8.0_91,Golang 1.6

Java

public class Skynet {

private static final int RUNS = 4;

private static final int BUFFER = 1000; // = 0 unbufferd, > 0 buffered ; < 0 unlimited

static void skynet(Channel c, long num, int size, int div) throws SuspendExecution, InterruptedException {

if (size == 1) {

c.send(num);

return;

}

Channel rc = newChannel(BUFFER);

long sum = 0L;

for (int i = 0; i < div; i++) { long subNum = num + i ** (size / div); new Fiber(() -> skynet(rc, subNum, size / div, div)).start();

}

for (int i = 0; i < div; i++)

sum += rc.receive();

c.send(sum);

}

public static void main(String[] args) throws Exception {

//这里跑4次,是为了让JVM预热好做优化,所以我们以最后一个结果为准。

for (int i = 0; i < RUNS; i++) { long start = System.nanoTime(); Channel c = newChannel(BUFFER); new Fiber(() -> skynet(c, 0, 1_000_000, 10)).start();

long result = c.receive();

long elapsed = (System.nanoTime() - start) / 1_000_000;

System.out.println((i + 1) + ": " + result + " (" + elapsed + " ms)");

}

}

}

golang的代码我就不贴了,大家可以从github上拿到,我这里直接贴出结果。

platformtime
Golang261ms
Quasar612ms

从Skynet测试中可以看出,Quasar的性能对比Golang还是有差距的,但是不应该达到两倍多吧,经过向Quasar作者求证才得知这个测试并没有测试出实际性能,只是测试调度开销而已。

因为skynet方法内部几乎没有做任何事情,只是简单的做了一个加法然后进一步的递归生成新的Fiber而已,相当于只是测试了Quasar生成并调度百万Fiber所需要的时间而已。而Java里的加法操作开销远比生成Fiber的开销要低,因此感觉整体性能不如golang(golang的coroutine是语言级别的)。

实际上我们在实际项目中生成的Fiber中不可能只做一下简单的加法就退出,至少要花费1ms做一些简单的事情吧,(Quasar里Fiber的调度差不多在us级别),所以我们考虑在skynet里加一些比较耗时的操作,比如随机生成1000个整数并对其进行排序,这样Fiber里算是有了相应的性能开销,与调度的开销相比,调度的开销就可以忽略不计了。(大家可以把调度开销想象成不定积分的常数)。

下面我分别为两种语言了加了数组排序逻辑,并插在响应的Fiber里。

Java

public class Skynet {

private static Random random = new Random();

private static final int NUMBER_COUNT = 1000;

private static final int RUNS = 4;

private static final int BUFFER = 1000; // = 0 unbufferd, > 0 buffered ; < 0 unlimited

private static void numberSort() {

int[] nums = new int[NUMBER_COUNT];

for (int i = 0; i < NUMBER_COUNT; i++)

nums[i] = random.nextInt(NUMBER_COUNT);

Arrays.sort(nums);

}

static void skynet(Channel c, long num, int size, int div) throws SuspendExecution, InterruptedException {

if (size == 1) {

c.send(num);

return;

}

//加入排序逻辑

numberSort();

Channel rc = newChannel(BUFFER);

long sum = 0L;

for (int i = 0; i < div; i++) { long subNum = num + i ** (size / div); new Fiber(() -> skynet(rc, subNum, size / div, div)).start();

}

for (int i = 0; i < div; i++)

sum += rc.receive();

c.send(sum);

}

public static void main(String[] args) throws Exception {

for (int i = 0; i < RUNS; i++) { long start = System.nanoTime(); Channel c = newChannel(BUFFER); new Fiber(() -> skynet(c, 0, 1_000_000, 10)).start();

long result = c.receive();

long elapsed = (System.nanoTime() - start) / 1_000_000;

System.out.println((i + 1) + ": " + result + " (" + elapsed + " ms)");

}

}

}

golang

const (

numberCount = 1000

loopCount = 1000000

)

//排序函数

func numberSort() {

nums := make([]int, numberCount)

for i := 0; i < numberCount; i++ {

nums[i] = rand.Intn(numberCount)

}

sort.Ints(nums)

}

func skynet(c chan int, num int, size int, div int) {

if size == 1 {

c return

}

//加了排序逻辑

numberSort()

rc := make(chan int)

var sum int

for i := 0; i < div; i++ {

subNum := num + i**(size/div)

go skynet(rc, subNum, size/div, div)

}

for i := 0; i < div; i++ {

sum += func main() {

c := make(chan int)

start := time.Now()

go skynet(c, 0, loopCount, 10)

result := "Result: %d in %d ms.\n", result, took.Nanoseconds()/1e6)

}


platformtime
Golang23615ms
Quasar15448ms

最后再进行一次测试,发现Java的性能优势体现出来了。几乎是golang的1.5倍,这也许是JVM/JDK经过多年优化的优势。因为加了业务逻辑后,对比的就是各种库以及编译器对语言的优化了,协程调度开销几乎可以忽略不计。

为什么协程在Java里一直那么小众

其实早在JDK1的时代,Java的线程被称为GreenThread,那个时候就已经有了Fiber,但是当时不能与操作系统实现N:M绑定,所以放弃了。现在Quasar凭借ForkJoinPool这个成熟的线程调度库。另外,如果你希望你的代码能够跑在Fiber里面,需要一个很大的前提条件,那就是你所有的库,必须是异步无阻塞的,也就说必须类似于node.js上的库,所有的逻辑都是异步回调,而自Java里基本上所有的库都是同步阻塞的,很少见到异步无阻塞的。而且得益于J2EE,以及Java上的三大框架(SSH)洗脑,大部分Java程序员都已经习惯了基于线程,线性的完成一个业务逻辑,很难让他们接受一种将逻辑割裂的异步编程模型。

但是随着异步无阻塞这股风气起来,以及相关的coroutine语言Golang大力推广,人们越来越知道如何更好的榨干CPU性能(让CPU避免不必要的等待,减少上下文切换),阻塞的行为基本发生在I/O上,如果能有一个库能把所有的I/O行为都包装成异步阻塞的话,那么Quasar就会有用武之地,JVM上公认的是异步网络通信库是Netty,通过Netty基本解决了网络I/O问题,另外还有一个是文件I/O,而这个JDK7提供的NIO2就可以满足,通过AsynchronousFileChannel即可。剩下的就是如何将他们封装成更友好的API了。目前能达到生产级别的这种异步工具库,JVM上只有Vert.x3,封装了Netty4,封装了AsynchronousFileChannel,而且Vert.x官方也出了一个相对应的封装了Quasar的库vertx-sync

Quasar目前是由一家商业公司Parallel Universe控制着,且有自己的一套体系,包括Quasar-actor,Quasar-galaxy等各个模块,但是Quasar-core是开源的,此外Quasar自己也通过Fiber封装了很多的第三方库,目前全都在comsat这个项目里。随便找一个项目看看,你会发现其实通过Quasar的Fiber去封装第三方的同步库还是很简单的。

写在最后

异步无阻塞的编码方式其实有很多种实现,比如node.js的提倡的Promise,对应到Java8的就是CompletableFuture。

另外事件响应式也算是一个比较流行的做法,比如ReactiveX系列,RxJava,Rxjs,RxSwift,等。我个人觉得RxJava是一个非常好的函数式响应实现(JDK9会有对应的JDK实现),但是我们不能要求所有的程序员一眼就提炼出业务里的functor,monad(这些能力需要长期浸淫在函数式编程思想里),反而RxJava特别适合用在前端与用户交互的部分,因为用户的点击滑动行为是一个个真实的事件流,这也是为什么RxJava在Android端非常火的原因,而后端基本上都是通过Rest请求过来,每一个请求其实已经限定了业务范围,不会再有复杂的事件逻辑,所以基本上RxJava在Vert.x这端只是做了一堆的flatmap,再加上微服务化,所有的业务逻辑都已经做了最小的边界,所以顺序的同步的编码方式更适合写业务逻辑的后端程序员。

所以这里Golang开了个好头,但是Golang也有其自身的限制,比如不支持泛型,当然这个仁者见仁智者见智了,包的依赖管理比较弱,此外Golang没有线程池的概念,如果coroutine里的逻辑发生了阻塞,那么整个程序会hang死。而这点Vert.x提供了一个Worker Pool的概念,可以将需要耗时执行的逻辑包到线程池里面,执行完后异步返回给EventLoop线程。

下一篇我们来研究一下vertx-sync,让vert.x里所有的异步编码方式同步化,彻底解决Vert.x里的Callback Hell。

?―――――――――↓―――――――――?

本文作者系 MaxLeap 团队成员:刘小溪【原创】,转载请务必注明作者、原创地址以及自媒体硬件再发明。

原创地址htt
ps://blog.maxleap.cn/archives/816

欢迎关注微信订阅号:从移动到云端

欢迎加入我们的MaxLeap活动QQ群:555973817,我们将不定期做技术分享活动

相关推荐

迷你世界:最恐怖的地图?公墓探险解密,胆小慎入,气氛拉满

恐怖题材的地图一直都是玩家们又爱又恨的,一方面地图的代入感和体验感都是非常不错的,而另一方面太过于紧张的气氛又让人觉得非常的害怕。而今天给大家推荐的这款地图,则是将地图的恐怖感放到了最大,一起来看看吧...

迷你世界:羊纸体验玩家自制地图,嘲笑地图太简单,结果却被炸飞

前言:迷你世界主播羊纸体验玩家自制闯关地图,本以为能轻松过关嘲笑地图套简单,却被无情炸飞迷你世界可以说是目前最受欢迎的沙盒游戏,在游戏中小伙伴们可以使用道具建造出各种好玩、有趣的建筑物,而且玩家们还会...

迷你世界:找不到好地图?无需再求地形码,掌握这些自己做一个

在《迷你世界》中,玩家如果想玩某张地图,是可以通过输入激活码来进入该地图的。但如果玩家想对地图进行更改,例如多给地图添加一些资源或者让整张地图仅有一到两个区域时,则需要自己进行修改。这种修改的方法很简...

迷你世界:地图大盘点,网友:每一张都是精心制作

如今在众多玩家们的心中,迷你世界有着不可撼动的地位,确实,这款游戏能有今天这样的成就着实不容易。近日有关迷你世界要迎来新版本的消息也被大家纷纷议论着,很多人都在等待全新迷你的到来,毕竟里面的新地形以及...

CS2更新,沙2、Nuke地图调整;CS战队排名更新

在防守端,两名玩家不再需要快速爬上A小了。V社发布了CS2的一个小更新,其中最突出的变化是更改了Dust2上CT出生点通向A小的箱子堆。...

2024年7月23日更新日志:沙二现可单人跳上A小!

2024年7月23日CS更新日志...

迷你世界:想做地图却不知从何下手?理清思路方法,让你轻松造图

在《迷你世界》中,制作地图是一件非常有意思的事情,特别是当玩家呕心沥血制作出来的地图被别的玩家所肯定时,那种满足感是无可比拟的。不过,要想制作出好的地图可不是一件容易的事,很多玩家在开始的时候就犯了难...

路痴的福音,玩家在《新世界》自制了一款迷你地图

在国产网游已经进化到一键寻路的今天,很难想象还有《新世界》这种没有小地图的。再考虑到这款亚马逊网游的超大地图,以及并没有坐骑(官方原因归于“动物抵抗运动”),那是非常不适合路痴了,每次切换大地图也十分...

迷你世界:地图模块怎么用?编辑地图更简单,立马变成大佬

最近关于迷你世界的议论可不少,不过在这些议论声中,大家却都是充满开心的。原来迷你世界版本又进行了一次更新,这次的更新中,不但为玩家们带来了双旦节日的一系列活动,此外还有一个重要改变,那就是迷你工坊的里...

linux基础命令之zip、unzip命令

zip命令是linux系统下压缩文件的命令,后缀为.zip。unzip命令是对zip压缩包进行解压,和tar命令不同,tar命令是支持压缩和解压的命令语法格式:zip参数压缩名称要进行压缩的目录...

台湾是如何应用大数据分析提高半导体竞争力

自引进IC封装开始,台湾半导体产业已发展40余年,近年来总产值已近20,000亿元新台币,在全世界占有一席之地。半导体产业竞争力来自於成本、良率及交货时间,其中良率更是一家公司有竞争力之所在。近年来自...

如何评估自闭症儿童配对的能力,VBmapp 的评估介绍(八)

VB-视觉感知与样品配对:锻炼孩子视觉“看”的能力第一阶段(0-18个月)1.能5次从视觉上追踪移动的刺激物达2秒。练习孩子视觉追踪的能力。2.用拇指、食指和中指抓起小物件(钳形抓握法)。孩子在...

怎么学好唱歌技巧?

1、相信你自己作为一个歌手,如果连你都对自己没有信心的话,也没有人会相信你。你要用你的真情实意去演绎自己的音乐,但如果你想唱得够好,你要有足够的信心。有自信的声音听起来会更加富有感染力和吸引力。想象一...

维修笔记本主板必须要把三极管搞懂,今天带大家学习一下三极管.

三极管介绍:1.三极管:全称应为半导体三极管,也称双极型晶体管、晶体三极管,是一种控制电流的器件也是非线性原件,有三个极,分别叫做集电极C,基极B,发射极E.目前三极管有NPN与PNP两种,三极管有三...

VBA中的函数过程及对&quot;形参&quot;、&quot;实参&quot;的理解(学习方案六)

2.1.8Function过程,也就是我们经常提到的函数过程。对于函数,我给出了一个更为确切的定义,这个定义我在各个平台上均有发表和阐述,我给出的定义如下:如果对于唯一性输入值value,输出(或者...

取消回复欢迎 发表评论: