本文最后更新于 883 天前,其中的信息可能已经有所发展或是发生改变。
1.CoroutineScheduler
internal class CoroutineScheduler(...) : Executor, Closeable {
@JvmField
val globalBlockingQueue = GlobalQueue()
fun runSafely(task: Task) {
try {
task.run()
} catch (e: Throwable) {
val thread = Thread.currentThread()
thread.uncaughtExceptionHandler.uncaughtException(thread, e)
} finally {
unTrackTask()
}
}
//省略...
internal inner class Worker private constructor() : Thread() {
override fun run() = runWorker()
private fun runWorker() {
var rescanned = false
while (!isTerminated && state != WorkerState.TERMINATED) {
val task = findTask(mayHaveLocalTasks)
if (task != null) {
rescanned = false
minDelayUntilStealableTaskNs = 0L
executeTask(task)
continue
} else {
mayHaveLocalTasks = false
}
//省略...
continue
}
}
private fun executeTask(task: Task) {
//省略...
runSafely(task)
//省略...
}
fun findTask(scanLocalQueue: Boolean): Task? {
if (tryAcquireCpuPermit()) return findAnyTask(scanLocalQueue)
val task = if (scanLocalQueue) {
localQueue.poll() ?: globalBlockingQueue.removeFirstOrNull()
} else {
globalBlockingQueue.removeFirstOrNull()
}
return task ?: trySteal(blockingOnly = true)
}
//省略...
}
//省略...
}
CoroutineScheduler继承Executor,Worker继承Thread,同时runWorker也是线程的run方法。在runWorker执行了executeTask(task),接着在executeTask调用中runSafely(task),然后我们看到runSafely使用try..catch了这个task任务的执行,最后在catch中抛出了未捕获的异常
internal abstract class DispatchedTask<in T>(
@JvmField public var resumeMode: Int
) : SchedulerTask() {
//省略...
internal open fun getExceptionalResult(state: Any?): Throwable? =
(state as? CompletedExceptionally)?.cause
public final override fun run() {
assert { resumeMode != MODE_UNINITIALIZED }
val taskContext = this.taskContext
var fatalException: Throwable? = null
try {
// continuation 通过 DispatchedContinuation获取
val delegate = delegate as DispatchedContinuation<T>
val continuation = delegate.continuation
withContinuationContext(continuation, delegate.countOrElement) {
val context = continuation.context
val state = takeState()
val exception = getExceptionalResult(state)
val job = if (exception == null && resumeMode.isCancellableMode) context[Job] else null
if (job != null && !job.isActive) {
val cause = job.getCancellationException()
cancelCompletedResult(state, cause)
continuation.resumeWithStackTrace(cause)
} else {
if (exception != null) {
// 实际调用的是 resumeWith函数的扩展,只能返回 Result.Fail
continuation.resumeWithException(exception)
} else {
continuation.resume(getSuccessfulResult(state))
}
}
}
} catch (e: Throwable) {
fatalException = e
} finally {
val result = runCatching { taskContext.afterTask() }
// kotlinx.coroutines库相关的异常,如上下文、编译器等
handleFatalException(fatalException, result.exceptionOrNull())
}
}
}
2.BaseContinuationImpl
internal abstract class BaseContinuationImpl(
public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
public final override fun resumeWith(result: Result<Any?>) {
var current = this
var param = result
while (true) {
probeCoroutineResumed(current)
with(current) {
val completion = completion!! // fail fast when trying to resume continuation
val outcome: Result<Any?> =
try {
val outcome = invokeSuspend(param)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
if (completion is BaseContinuationImpl) {
current = completion
param = outcome
} else {
completion.resumeWith(outcome)
return
}
}
}
}
}
3.异常的处理
private fun testException(){
// 定义异常处理器
val exceptionHandler = CoroutineExceptionHandler { coroutineContext, throwable ->
Log.d("exceptionHandler", "${coroutineContext[CoroutineName]} :$throwable")
}
// 通过plus添加
GlobalScope.launch(CoroutineName("异常处理") + exceptionHandler){
val job = launch{
Log.d("${Thread.currentThread().name}","我要开始抛异常了" )
throw NullPointerException("异常测试")
}
Log.d("${Thread.currentThread().name}", "end")
}
源码如何取 我们定义的异常处理器
public fun handleCoroutineException(context: CoroutineContext, exception: Throwable) {
try {
// 尝试去取是否定义 异常处理器
context[CoroutineExceptionHandler]?.let {
it.handleException(context, exception)
return
}
} catch (t: Throwable) {
handleCoroutineExceptionImpl(context, handlerException(exception, t))
return
}
// 没有的话直接 抛出异常导致程序崩溃
handleCoroutineExceptionImpl(context, exception)
}