从CPS到Kotlin Coroutine基岩层探秘

从CPS到Kotlin Coroutine基岩层探秘

前言

Kotlin Coroutine是一个十分优秀的并发语言特性,他几乎透明,开发者友好,在一定程度上甚至保留了Java的互操作性。

Kotlin Coroutine的底层是怎么实现的?我们今天将通过观察反编译后的代码的方式深入学习一下Coroutine调用的全过程以及幕后发生了什么。

读懂本文您不需要任何Kotlin语法知识。笔者希望您能够通过阅读本文获取在任何编程语言中都能通用的知识。

-1. Continuation Passing Style

在开始之前,我们要了解一下CPS是什么。CPS范式在许多函数式编程语言中很著名,其思想是:将“我要拿结果干什么”封装成一个函数参数!

一般情况下,我们会写这样的代码:

fun get42(): Int { //:Int标注了返回值类型
    return 42
}

fun transform(x: Int): Int {
    return x+2
}

fun main(){
    val res=get42(); //拿到了值
    println(res); //接下来干这个!
    val res2=get42();
    println(transform(res2)); //或者对结果做一些操作
}

在CPS下,我们则是这么写:

fun <T> get42(next: (Int) -> T): T { //next表示“我拿着算完的结果,下一步应该找谁干什么?”因为我们不知道最后用户要什么类型,所以用“任意类型”泛型T。
    return next(42)
}

fun <T> transform(x: Int, next: (Int) -> T): T {
    return next(x + 2)
}

fun main() {
    get42 { //Kotlin中的lambda表达式可以直接放在函数调用的括号外面,这样就不需要写成get42({ ... })了。而且默认参数名就是it。
        println(it)
    }

    get42 {
        transform(it) {
            println(it)
        }
    }
}

这样写有什么好处呢?最常见的好处是我们可以引入类似pipe的操作符(某些语言中如Huskell就内置了)链式调用。写起来很方便啊:

get42 |> transform |> transform |> transform |> println

也有其他的一些好处,比如写解释器的时候、可以进行尾递归优化等等:

fun <T> myIf(b: Boolean, t: () -> T, f: () -> T):T{
    return if (b) t() else f()
}

0. 整体来看的Coroutine并发模型

接下来我们先整体感性理解一下Kotlin的并发模型。Kotlin的并发模型有这样几个过程:

  • Coroutine:“我要运行啦”,请Dispatcher哥哥把我分配到一个线程上跑吧!
  • Dispatcher:好嘞,你去线程2,运行吧!
  • Coroutine运行到一个地方。他说:“这里要等待网络资源,先睡觉,suspend!!”,从线程上脱离。这就是suspend。
  • 网络资源准备好了,负责网络的同学resume(恢复)这个Coroutine,将其放回线程上跑
  • Coroutine终于结束了,将返回值送回上游函数

Kotlin中Coroutine的表示方式是 suspend fun 关键字,Coroutine调用另一个Coroutine默认行为是阻塞,而不需要 await 关键字等,很方便。例如:

suspend fun x(){
    println("Hello")
    delay(1000) //等待1s。delay也是个suspend fun
    println("World")
}

那么接下来我们通过实际代码来深入探究他的底层逻辑吧!这是我们马上要用到的例子:

suspend fun y():Int{
//... hidden
}

// x先说Hello,调用y,等待y返回后说World!和y的返回值
suspend fun x(){
    println("Hello")
    val res=y() 
    println("World!"+res)
}

选择Idea里的 Show Kotlin Bytecode 点击 Decompile 即可得到下面的反编译后的Java代码(没错Kotlin会编译成Java编译出来的Java Bytecode,反编译就是Java了,这就是为什么他们两可以互操作):

@Nullable
public static final Object x(@NotNull Continuation completion) {
  Continuationcontinuation;
  label20: {
     if (completion instanceof <undefinedtype>) {continuation = (<undefinedtype>)completion;
        if ((continuation.label & Integer.MIN_VALUE) != 0) {
           continuation.label -= Integer.MIN_VALUE;
           break label20;
        }
     }continuation = new ContinuationImpl(completion) {
        //FF: synthetic field
        Object result;
        int label;

        @Nullable
        public final Object invokeSuspend(@NotNull Object result) {
           this.result =result;
           this.label |= Integer.MIN_VALUE;
           return ContinuationTestKt.x((Continuation)this);
        }
     };
  }

  Object result =continuation.result;
  Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
  Object var10000;
  switch (continuation.label) {
     case 0:
        ResultKt.throwOnFailure(result);
        System.out.println("Hello!");
        continuation.label = 1;
        var10000 = y(continuation);
        if (var10000 == var4) {
           return var4;
        }
        break;
     case 1:
        ResultKt.throwOnFailure(result);
        var10000 =result;
        break;
     default:
        throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
  }

  int res = ((Number)var10000).intValue();
  System.out.println("World!" + res);
  return Unit.INSTANCE;
}

我去,这都是啥啊!接下来我们就来拆解这段代码的内容。

1. 状态机生成

首先,Kotlin先根据suspend的点将函数分割,生成状态机。(一个函数只有在中断点处才能被中断!其他时候你就算想要打断或取消也要等到下一个中断点才刹得住车)一个状态就维护了“到下一次suspend”之前我要做什么。

也就是说,我们需要知道:

  • 现在在哪个状态
  • 这个状态要做什么
  • 这个状态resume后要转移到哪个状态(比如在有if的情况下,是根据suspend完的结果去True的那边,还是False的那边,每个都可能是不一样的状态)
  • 局部变量的信息以便恢复

在Kotlin中这些东西叫做 Continuation 。哎这不就是CPS来了。在上面的代码中, $continuation.label 就是记录了现在在状态哪!

switch (continuation.label) {
     case 0: //初始状态
        ResultKt.throwOnFailure(result);
        System.out.println("Hello!");
        continuation.label = 1; //下一步去状态1
        var10000 = y(continuation);
        if (var10000 == var4) {
           return var4;
        }
        break;
     case 1: //状态1:从y回来之后
        ResultKt.throwOnFailure(result);
        var10000 =result;
        break;
     default:
        throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
  }

我们也不难猜出: $result 中存储了suspend回来以后的返回值,因为:

...

     case 1:
        ResultKt.throwOnFailure(result);
        var10000 =result; //var10000就是res
        break;
     default:
        throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
  }

  int res = ((Number)var10000).intValue(); //在这里
  System.out.println("World!" + res);

原来的函数签名 suspend fun x() 也变成了:

public static final Object x(@NotNull Continuation $completion)

是不是很像CPS?

你说为什么返回值是Object? 这是因为……

2. suspend的逻辑

Kotlin中,如果一个Coroutine被suspend了,他的返回值是一个特殊值:

  Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED(); //就是这个!

还记得吗,如果我调用的东西suspend了(因为等待资源……),我也要suspend(没资源我干啥):

                var10000 = y($continuation); 
        if (var10000 == var4) { //儿子传来中断
           return var4; //那我也中断,同时告诉我爸爸赶紧中断
        }

同时,被suspend的函数(申请suspend的函数,例如这里是y(其实是y下面最最最底层的东西),不是x)的Continuation会被保存给Dispatcher待用。

3. resume的逻辑

在合适的时机(例如资源到了)有人会调用 Continuation.resumeWith(result) 函数。result里就是我们要的资源(也可能是错误)。这个函数干了什么呢?请看(来自BaseContinuationImpl):

// This implementation is final. This fact is used to unroll resumeWith recursion.
public final override fun resumeWith(result: Result<Any?>) {
    // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
    var current = this
    var param = result
    while (true) {
        // Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
        // can precisely track what part of suspended callstack was already resumed
        probeCoroutineResumed(current)
        with(current) {
            val completion = completion!! // fail fast when trying to resume continuation without completion
            val outcome: Result<Any?> =
                try {
                    val outcome = invokeSuspend(param) //关键在这里!!!
                    if (outcome === COROUTINE_SUSPENDED) return
                    Result.success(outcome)
                } catch (exception: Throwable) {
                    Result.failure(exception)
                }
            releaseIntercepted() // this state machine instance is terminating
            if (completion is BaseContinuationImpl) {
                // unrolling recursion via loop
                current = completion
                param = outcome
            } else {
                // top-level completion reached -- invoke and return
                completion.resumeWith(outcome)
                return
            }
        }
    }
}

invokeSuspend 是什么?请看原来的反编译后的文件:

$continuation = new ContinuationImpl($completion) { //继承BaseContinuationImpl
    // $FF: synthetic field
    Object result;
    int label;

    @Nullable
    public final Object invokeSuspend(@NotNull Object $result) {
       this.result = $result; //保存返回结果
       this.label |= Integer.MIN_VALUE;
       return ContinuationTestKt.x((Continuation)this); //在这里!注意:函数是x,这里调用自己x。如果函数是y,这里也是y
    }
 };

啊!也就是保存了结果到 result ,然后重新调用了自己。我们又回到了那个状态机,执行下一个状态,被suspend+切换状态,然后再一次恢复,直到……

4. terminate的逻辑

我们的Coroutine结束了,然后呢?请看BaseContinuationImpl:

// top-level completion reached -- invoke and return
completion.resumeWith(outcome)
return

$continuation = new ContinuationImpl($completion) { //这里的$completion是啥??
    // $FF: synthetic field
    Object result;
    int label;

    @Nullable
    public final Object invokeSuspend(@NotNull Object $result) {
       this.result = $result;
       this.label |= Integer.MIN_VALUE;
       return ContinuationTestKt.x((Continuation)this); 
    }
 };

public static final Object x(@NotNull Continuation completion) //上面的就是这个completion!!

也就是说,当子Coroutine被运行时,他记录下了父亲的Continuation作为Completion。当自己结束时,他就调用父亲的Continuation继续执行父亲的代码!

  • 我保存了父亲的Continuation
  • 父亲的Continuation保存了父亲的父亲的Continuation
  • 父亲的父亲的Continuation保存了父亲的父亲的父亲的Continuation
  • ……
  • 父亲的父亲的父亲的Continuation保存的Completion是空!(调用开始点)

完美的链式反应!

5. 总结

最后,我们再模拟一下这个x函数的执行过程:

  • 启动,给x传入空Continuation,x用这个空Continuation作为Completion初始化自己的Continuation,进入状态0。
  • x在状态0,打印Hello,准备转移到状态1
  • x调用y
    • y初始化自己的Continuation。y的Continuation中将x的Continuation保存为自己的Completion。y进入状态0。
    • y suspend,返回 IntrinsicsKt.getCOROUTINE_SUSPENDED() 。Dispatcher记录下y的Continuation。
    • x收到suspend,同样返回 IntrinsicsKt.getCOROUTINE_SUSPENDED()
    • ……………………
    • 记录下的y的Continuation被 resumeWith。y在resume中调用。y巴拉巴拉数据处理,进入最后一个状态
    • y结束,调用 completion.resumeWith(result) 。这里的 completion 就是x的Continuation。x被再次调用。
  • x发现自己在状态1,取出y的结果,打印。
  • x结束,调用 completion.resumeWith(result) 。这里的 completion 是空的。終わり!

本文实际上还有很多东西没讲,比如底层的compiler intrinsic函数,intercept等等,感兴趣的读者可以继续深入阅读。

6. 练习题

在游戏开发中,如果创建了许多对象,会让GC很头疼。下面这个“函数”会创建很多对象吗?如果是,可以怎么改,还是改不了?

suspend fun y(){
    delay(1) //等1ms
}
suspend fun x(){
    repeat(1000){
        y()
    }
}

版权声明:
作者:XGN
链接:https://blog.hellholestudios.top/archives/2056
来源:Hell Hole Studios Blog
文章版权归作者所有,未经允许请勿转载。

THE END
分享
二维码
< <上一篇
下一篇>>