通过前面的学习,我们已经对OKHttp有了简单的认识,并对使用有了详细的了解,下面我们将以一个同步Get请求为例进行OKHttp源码分析。
private val mUrl = ""// 1、createClientprivate val mClient = OkHttpClient()
// createClientTwo
private val mClient2 = OkHttpClient.Builder().build()
// 2、createRequest
private val mRequest = Request.Builder().url(mUrl).build()// 3、发起同步get请求GlobalScope.launch {wCall(mRequest).execute().use { response ->if (response.isSuccessful) {LogUtil.D(log = "request success code is ${de} body is ${String()}")} else {LogUtil.D(log="request error code is ${de}")}}}
通过代码我们发现,创建OkHttpClient的方式有两种:
constructor() : this(Builder())class Builder constructor() {// 调度器internal var dispatcher: Dispatcher = Dispatcher()// 连接池internal var connectionPool: ConnectionPool = ConnectionPool()// 应用拦截器listinternal val interceptors: MutableList<Interceptor> = mutableListOf()// 网络拦截器listinternal val networkInterceptors: MutableList<Interceptor> = mutableListOf()// eventListenerFactoryinternal var eventListenerFactory: EventListener.Factory = EventListener.NONE.asFactory()// 自动重试布尔值internal var retryOnConnectionFailure = true// 认证中心 默认为NONEinternal var authenticator: Authenticator = Authenticator.NONE// 重定向布尔值internal var followRedirects = true// ssl重定向布尔值internal var followSslRedirects = true// cookie控制internal var cookieJar: CookieJar = CookieJar.NO_COOKIES// 缓存internal var cache: Cache? = null// DNS配置internal var dns: Dns = Dns.SYSTEM// 代理internal var proxy: Proxy? = null// 代理选择器internal var proxySelector: ProxySelector? = null// 代理认证器internal var proxyAuthenticator: Authenticator = Authenticator.NONE// 使用默认的Socket工厂产生Socketinternal var socketFactory: SocketFactory = Default()// sslSocketFactory默认为nullinternal var sslSocketFactoryOrNull: SSLSocketFactory? = null// x509认证管理器默认为nullinternal var x509TrustManagerOrNull: X509TrustManager? = null// 默认连接配置 TLS和CLEARTEXTinternal var connectionSpecs: List<ConnectionSpec> = DEFAULT_CONNECTION_SPECS// 默认HTTP协议internal var protocols: List<Protocol> = DEFAULT_PROTOCOLS// HostName认证器internal var hostnameVerifier: HostnameVerifier = OkHostnameVerifier// 认证加密组建internal var certificatePinner: CertificatePinner = CertificatePinner.DEFAULT// 证书链cleanerinternal var certificateChainCleaner: CertificateChainCleaner? = null// 默认请求超时时常internal var callTimeout = 0// 默认连接时常internal var connectTimeout = 10_000// 默认读数据超时时常internal var readTimeout = 10_000// 默认写数据超时时常internal var writeTimeout = 10_000// ping间隔,心跳时间internal var pingInterval = 0// websocket消息最小压缩值internal var minWebSocketMessageToCompress = RealWebSocket.DEFAULT_MINIMUM_DEFLATE_SIZE}
默认调用的方法是class Builder constructor() 与方法直接创建方式相同,通过构建者默认对外暴露了自定义配置的方法。
// 创建builder构造器class Builder constructor()// build创建OkHttpClientfun build(): OkHttpClient = OkHttpClient(this)
// 默认请求方式为GET
constructor() {hod = "GET"this.headers = Headers.Builder()}
/**
* Sets the URL target of this request.
* @throws IllegalArgumentException if [url] is not a valid HTTP or HTTPS URL. Avoid this
* exception by calling [HttpUrl.parse]; it returns null for invalid URLs.
*/
open fun url(url: String): Builder {// 替换webSocketURL为http类型url// Silently replace web socket URLs with HTTP URLs.val finalUrl: String = when {url.startsWith("ws:", ignoreCase = true) -> {"http:${url.substring(3)}"}url.startsWith("wss:", ignoreCase = true) -> {"https:${url.substring(4)}"}else -> url}return HttpUrl())}
open fun build(): Request {return Request(checkNotNull(url) { "url == null" },method,headers.build(),ImmutableMap())}
通过传入的url、method、headers、body创建Request对象
interface Call : Cloneable {fun request(): Requestfun execute(): Responsefun cancel()fun isExecuted(): Booleanfun isCanceled(): Booleanfun timeout(): Timeoutpublic override fun clone(): Callfun interface Factory {fun newCall(request: Request): Call}
}
通过查看Call源码可见,Call实际上是一个接口,并定义了一些常用的方法,具体实现由其实现类提供
/** Prepares the [request] to be executed at some point in the future. */override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)
通过查看方法可见,实际创建的是一个RealCall对象
默认创建的RealCall用来进行HTTP通信,不是进行websocket通信
/*** Bridge between OkHttp's application and network layers. This class exposes high-level application* layer primitives: connections, requests, responses, and streams.** This class supports [asynchronous canceling][cancel]. This is intended to have the smallest* blast radius possible. If an HTTP/2 stream is active, canceling will cancel that stream but not* the other streams sharing its connection. But if the TLS handshake is still in progress then* canceling may break the entire connection.*/
class RealCall(val client: OkHttpClient,val originalRequest: Request,val forWebSocket: Boolean
) : Call {......
}
通过注释我们可以知道:
override fun execute(): Response {check(executedpareAndSet(false, true)) { "Already Executed" }()callStart()try {// 步骤uted(this)// 步骤2return getResponseWithInterceptorChain()} finally {// 标记请求完成client.dispatcher.finished(this)}}
实际上execute执行的方法是通过 uted(this)实现的
// Dispatcher是用于异步请求执行的策略类,每一个Dispatcher都是用Executor Service独立进行请求,如果使用本地的executor,可以支持maxRequests(默认64)数量集并发请求
class Dispatcher constructor() {
// 支持的最大请求数
@get:Synchronized var maxRequests = 64set(maxRequests) {require(maxRequests >= 1) { "max < 1: $maxRequests" }synchronized(this) {field = maxRequests}promoteAndExecute()}
// 支持的单个host地址请求数@get:Synchronized var maxRequestsPerHost = 5set(maxRequestsPerHost) {require(maxRequestsPerHost >= 1) { "max < 1: $maxRequestsPerHost" }synchronized(this) {field = maxRequestsPerHost}promoteAndExecute()}@set:Synchronized@get:Synchronizedvar idleCallback: Runnable? = nullprivate var executorServiceOrNull: ExecutorService? = null// java线程池 不解释@get:Synchronized@get:JvmName("executorService") val executorService: ExecutorServiceget() {if (executorServiceOrNull == null) {executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))}return executorServiceOrNull!!}// 可执行请求队列private val readyAsyncCalls = ArrayDeque<AsyncCall>()// 正在执行的异步call队列private val runningAsyncCalls = ArrayDeque<AsyncCall>()// 正在执行的同步请求队列private val runningSyncCalls = ArrayDeque<RealCall>()constructor(executorService: ExecutorService) : this() {utorServiceOrNull = executorService}// 异步请求internal fun enqueue(call: AsyncCall) {synchronized(this) {readyAsyncCalls.add(call)if (!call.call.forWebSocket) {val existingCall = findExistingCallWithHost(call.host)if (existingCall != null) useCallsPerHostFrom(existingCall)}}promoteAndExecute()}private fun findExistingCallWithHost(host: String): AsyncCall? {for (existingCall in runningAsyncCalls) {if (existingCall.host == host) return existingCall}for (existingCall in readyAsyncCalls) {if (existingCall.host == host) return existingCall}return null}// 取消所有请求,包括同步请求和异步请求@Synchronized fun cancelAll() {for (call in readyAsyncCalls) {call.call.cancel()}for (call in runningAsyncCalls) {call.call.cancel()}for (call in runningSyncCalls) {call.cancel()}}// 执行请求,返回是否有异步call正在执行状态private fun promoteAndExecute(): Boolean {this.assertThreadDoesntHoldLock()// 可以执行的请求队列val executableCalls = mutableListOf<AsyncCall>()val isRunning: Boolean// 同步方法synchronized(this) {val i = readyAsyncCalls.iterator()while (i.hasNext()) {val asyncCall = i.next()// 最大请求数限制策略if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.// 最大单host请求限制策略if (() >= this.maxRequestsPerHost) continue // Host max capacity.// 从readyAsyncCalls队列中移除i.remove()asyncCall.callsPerHost.incrementAndGet()// 添加到executableCalls队列中executableCalls.add(asyncCall)runningAsyncCalls.add(asyncCall)}isRunning = runningCallsCount() > 0}// 执行executableCalls中请求for (i in 0 until executableCalls.size) {val asyncCall = executableCalls[i]// 调用RealCall中executeOn()方法uteOn(executorService)}return isRunning}// 同步请求,将RealCall添加到runningSyncCalls队列中@Synchronized internal fun executed(call: RealCall) {runningSyncCalls.add(call)}// 请求完成回调internal fun finished(call: AsyncCall) {// callsPerHost数量自减call.callsPerHost.decrementAndGet()finished(runningAsyncCalls, call)}internal fun finished(call: RealCall) {finished(runningSyncCalls, call)}private fun <T> finished(calls: Deque<T>, call: T) {val idleCallback: Runnable?synchronized(this) {// 从calls中移除callif (!ve(call)) throw AssertionError("Call wasn't in-flight!")idleCallback = this.idleCallback}val isRunning = promoteAndExecute()// 如果不是在running状态并且callback不为空,运行idleCallbackif (!isRunning && idleCallback != null) {idleCallback.run()}}/** Returns a snapshot of the calls currently awaiting execution. */@Synchronized fun queuedCalls(): List<Call> {return Collections.unmodifiableList(readyAsyncCalls.map { it.call })}/** Returns a snapshot of the calls currently being executed. */@Synchronized fun runningCalls(): List<Call> {return Collections.unmodifiableList(runningSyncCalls + runningAsyncCalls.map { it.call })}@Synchronized fun queuedCallsCount(): Int = readyAsyncCalls.size@Synchronized fun runningCallsCount(): Int = runningAsyncCalls.size + runningSyncCalls.size@JvmName("-deprecated_executorService")@Deprecated(message = "moved to val",replaceWith = ReplaceWith(expression = "executorService"),level = DeprecationLevel.ERROR)fun executorService(): ExecutorService = executorService
}
/*** Attempt to enqueue this async call on [executorService]. This will attempt to clean up* if the executor has been shut down by reporting the call as failed.*/fun executeOn(executorService: ExecutorService) {client.dispatcher.assertThreadDoesntHoldLock()var success = falsetry {// 使用线程池执行请求ute(this)// 成功标记success = true} catch (e: RejectedExecutionException) {val ioException = InterruptedIOException("executor rejected")ioException.initCause(e)noMoreExchanges(ioException)// 抛出io异常Failure(this@RealCall, ioException)} finally {// 如果没有请求成功,将该请求进行关闭if (!success) {client.dispatcher.finished(this) // This call is no longer running!}}}
@Throws(IOException::class)internal fun getResponseWithInterceptorChain(): Response {// Build a full stack of interceptors.val interceptors = mutableListOf<Interceptor>()interceptors += client.interceptorsinterceptors += RetryAndFollowUpInterceptor(client)interceptors += kieJar)interceptors += CacheInterceptor(client.cache)interceptors += ConnectInterceptorif (!forWebSocket) {interceptors += clientworkInterceptors}interceptors += CallServerInterceptor(forWebSocket)// 构建RealInterceptorChainval chain = RealInterceptorChain(call = this,interceptors = interceptors,index = 0,exchange = null,request = originalRequest,connectTimeoutMillis = tTimeoutMillis,readTimeoutMillis = adTimeoutMillis,writeTimeoutMillis = client.writeTimeoutMillis)var calledNoMoreExchanges = false// 获取responsetry {val response = chain.proceed(originalRequest)if (isCanceled()) {response.closeQuietly()throw IOException("Canceled")}return response} catch (e: IOException) {calledNoMoreExchanges = truethrow noMoreExchanges(e) as Throwable} finally {if (!calledNoMoreExchanges) {noMoreExchanges(null)}}}
/*** A concrete interceptor chain that carries the entire interceptor chain: all application* interceptors, the OkHttp core, all network interceptors, and finally the network caller.** If the chain is for an application interceptor then [exchange] must be null. Otherwise it is for* a network interceptor and [exchange] must be non-null.*/
class RealInterceptorChain(internal val call: RealCall,private val interceptors: List<Interceptor>,private val index: Int,internal val exchange: Exchange?,internal val request: Request,internal val connectTimeoutMillis: Int,internal val readTimeoutMillis: Int,internal val writeTimeoutMillis: Int
) : Interceptor.Chain {
通过注释我们可以知道RealInterceptorChain实现了Interceptor.Chain是一个interceptors、OkHttp core、所有网络拦截器和网络调用器的集合
如果chain作为应用拦截器使用时,exchange必须为空;作为网络拦截器使用时,exchange必须非空
通过chain.proceed获取response
val response = chain.proceed(originalRequest)
@Throws(IOException::class)override fun proceed(request: Request): Response {check(index < interceptors.size)calls++if (exchange != null) {check(exchange.finder.sameHostAndPort(request.url)) {"network interceptor ${interceptors[index - 1]} must retain the same host and port"}check(calls == 1) {"network interceptor ${interceptors[index - 1]} must call proceed() exactly once"}}// Call the next interceptor in the chain.// 调用chain中的下一个拦截器val next = copy(index = index + 1, request = request)// 获取当前拦截器集合中拦截器val interceptor = interceptors[index]@Suppress("USELESS_ELVIS")// 通过下一个拦截器获取response val response = interceptor.intercept(next) ?: throw NullPointerException("interceptor $interceptor returned null")if (exchange != null) {check(index + 1 >= interceptors.size || next.calls == 1) {"network interceptor $interceptor must call proceed() exactly once"}}check(response.body != null) { "interceptor $interceptor returned a response with no body" }return response}
看到当前拦截器的Response依赖于下一个拦截器的Intercept的Response,会沿着这条拦截器链依次调用每一个拦截器,最后返回返回Response
internal fun copy(index: Int = this.index,exchange: Exchange? = hange,request: Request = quest,connectTimeoutMillis: Int = tTimeoutMillis,readTimeoutMillis: Int = adTimeoutMillis,writeTimeoutMillis: Int = this.writeTimeoutMillis) = RealInterceptorChain(call, interceptors, index, exchange, request, connectTimeoutMillis,readTimeoutMillis, writeTimeoutMillis)
通过构造传参创建RealInterceptorChain对象
fun interface Interceptor {@Throws(IOException::class)fun intercept(chain: Chain): Response
...
}
通过代码我们可见,Interceptor是一个接口,具体的实现通过其实现类来提供
class RetryAndFollowUpInterceptor(private val client: OkHttpClient) : Interceptor {@Throws(IOException::class)override fun intercept(chain: Interceptor.Chain): Response {val realChain = chain as RealInterceptorChainvar request = questval call = realChain.callvar followUpCount = 0var priorResponse: Response? = nullvar newExchangeFinder = truevar recoveredFailures = listOf<IOException>()while (true) {// 将call请求添加到网络拦截器中NetworkInterceptorExchange(request, newExchangeFinder)var response: Responsevar closeActiveExchange = truetry {if (call.isCanceled()) {throw IOException("Canceled")}try {// 通过realChain处理请求response = realChain.proceed(request)newExchangeFinder = true} catch (e: RouteException) {// The attempt to connect via a route failed. The request will not have been sent.// 路由通过失败,尝试进行恢复,如果请求不能恢复则抛出异常if (!recover(e.lastConnectException, call, request, requestSendStarted = false)) {throw e.firstConnectException.withSuppressed(recoveredFailures)} else {recoveredFailures += e.firstConnectException}newExchangeFinder = falsecontinue} catch (e: IOException) {// An attempt to communicate with a server failed. The request may have been sent.// 与server通信失败,发生IO异常时,尝试进行恢复if (!recover(e, call, request, requestSendStarted = e !is ConnectionShutdownException)) {throw e.withSuppressed(recoveredFailures)} else {recoveredFailures += e}newExchangeFinder = falsecontinue}// Attach the prior response if it exists. Such responses never have a body. // 如果priorResponse存在的话,附加priorResponse来创建responseif (priorResponse != null) {response = wBuilder().wBuilder().body(null).build()).build()}// exchange 用来传输独立的request和response对val exchange = call.interceptorScopedExchange// 重点方法 通过response和exchange获取request进行重试val followUp = followUpRequest(response, exchange)// followUp为空,直接返回responseif (followUp == null) {// exchange不为空,并且exchange是双工通信方式 if (exchange != null && exchange.isDuplex) {// 退出timeout策略call.timeoutEarlyExit()}closeActiveExchange = falsereturn response}val followUpBody = followUp.body// followUpBody不为空,并且是单工通信方式,直接返回responseif (followUpBody != null && followUpBody.isOneShot()) {closeActiveExchange = falsereturn response}// 关闭response Bodyresponse.body?.closeQuietly()if (++followUpCount > MAX_FOLLOW_UPS) {throw ProtocolException("Too many follow-up requests: $followUpCount")}// 缓存requestrequest = followUp// 缓存responsepriorResponse = response} finally {// 退出itNetworkInterceptorExchange(closeActiveExchange)}}}
@Throws(IOException::class)private fun followUpRequest(userResponse: Response, exchange: Exchange?): Request? {val route = exchange?.connection?.route()val responseCode = deval method = hodwhen (responseCode) {HTTP_PROXY_AUTH -> {val selectedProxy = route!!.proxyif (pe() != Proxy.Type.HTTP) {throw ProtocolException("Received HTTP_PROXY_AUTH (407) code while not using proxy")}// code 407,调用认证器重新进行认证return client.proxyAuthenticator.authenticate(route, userResponse)}// 401 重新认证HTTP_UNAUTHORIZED -> return client.authenticator.authenticate(route, userResponse)HTTP_PERM_REDIRECT, HTTP_TEMP_REDIRECT, HTTP_MULT_CHOICE, HTTP_MOVED_PERM, HTTP_MOVED_TEMP, HTTP_SEE_OTHER -> {// 重定向return buildRedirectRequest(userResponse, method)}// 客户端超时处理HTTP_CLIENT_TIMEOUT -> {// 408's are rare in practice, but some servers like HAProxy use this response code. The// spec says that we may repeat the request without modifications. Modern browsers also// repeat the request (even non-idempotent ones.)if (!OnConnectionFailure) {// The application layer has directed us not to retry urn null}val requestBody = quest.bodyif (requestBody != null && requestBody.isOneShot()) {return null}val priorResponse = userResponse.priorResponseif (priorResponse != null && de == HTTP_CLIENT_TIMEOUT) {// We attempted to retry and got another timeout. urn null}if (retryAfter(userResponse, 0) > 0) {return null}quest}// code 503处理HTTP_UNAVAILABLE -> {val priorResponse = userResponse.priorResponseif (priorResponse != null && de == HTTP_UNAVAILABLE) {// We attempted to retry and got another timeout. urn null}if (retryAfter(userResponse, Integer.MAX_VALUE) == 0) {// specifically received an instruction to retry without quest}return null}// code 421处理HTTP_MISDIRECTED_REQUEST -> {// OkHttp can coalesce HTTP/2 connections even if the domain names are different. See// RealConnection.isEligible(). If we attempted this and the server returned HTTP 421, then// we can retry on a different connection.val requestBody = quest.bodyif (requestBody != null && requestBody.isOneShot()) {return null}if (exchange == null || !exchange.isCoalescedConnection) {return CoalescedConnections()quest}else -> return null}}
通过源码我们可以看到,该方法根据返回的不同状态码,进行了不同的重试和重定向操作
3) buildRedirectRequest
private fun buildRedirectRequest(userResponse: Response, method: String): Request? {// Does the client allow redirects?if (!client.followRedirects) return nullval location = userResponse.header("Location") ?: return null// Don't follow redirects to unsupported protocols.val url = solve(location) ?: return null// If configured, don't follow redirects between SSL and non-SSL.val sameScheme = url.scheme == quest.url.schemeif (!sameScheme && !client.followSslRedirects) return null// Most redirects don't include a request body.// 根据userReponse中数据,重新构造requestBuilderval requestBuilder = wBuilder()if (HttpMethod.permitsRequestBody(method)) {val responseCode = deval maintainBody = directsWithBody(method) ||responseCode == HTTP_PERM_REDIRECT ||responseCode == HTTP_TEMP_REDIRECTif (directsToGet(method) && responseCode != HTTP_PERM_REDIRECT && responseCode != HTTP_TEMP_REDIRECT) {hod("GET", null)} else {val requestBody = if (maintainBody) quest.body hod(method, requestBody)}// 不支持body时移除相关header内容if (!maintainBody) {veHeader("Transfer-Encoding")veHeader("Content-Length")veHeader("Content-Type")}}// When redirecting across hosts, drop all authentication headers. This// is potentially annoying to the application layer since they have no// way to retain them.if (!quest.url.canReuseConnectionFor(url)) {veHeader("Authorization")}// 返回构建好的requestreturn requestBuilder.url(url).build()}
根据用户的request创建网络请求,处理网络请求,最后通过网络请求将response返回给用户
/*** Bridges from application code to network code. First it builds a network request from a user* request. Then it proceeds to call the network. Finally it builds a user response from the network* response.*/
class BridgeInterceptor(private val cookieJar: CookieJar) : Interceptor {@Throws(IOException::class)override fun intercept(chain: Interceptor.Chain): Response {// 获取用户requestval userRequest = quest()val requestBuilder = wBuilder()val body = userRequest.body// 根据body添加相应的请求headerif (body != null) {val contentType = tType()if (contentType != null) {requestBuilder.header("Content-Type", String())}val contentLength = tLength()if (contentLength != -1L) {requestBuilder.header("Content-Length", String())veHeader("Transfer-Encoding")} else {requestBuilder.header("Transfer-Encoding", "chunked")veHeader("Content-Length")}}if (userRequest.header("Host") == null) {requestBuilder.header("Host", HostHeader())}if (userRequest.header("Connection") == null) {requestBuilder.header("Connection", "Keep-Alive")}// If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing// the transfer stream.var transparentGzip = falseif (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {transparentGzip = truerequestBuilder.header("Accept-Encoding", "gzip")}val cookies = cookieJar.loadForRequest(userRequest.url)// 根据cookie配置在header中添加cookieif (cookies.isNotEmpty()) {requestBuilder.header("Cookie", cookieHeader(cookies))}if (userRequest.header("User-Agent") == null) {requestBuilder.header("User-Agent", userAgent)}// 通过构建好的requestBuilder获取networkResponseval networkResponse = chain.proceed(requestBuilder.build())iveHeaders(userRequest.url, networkResponse.headers)val responseBuilder = wBuilder().request(userRequest)if (transparentGzip &&"gzip".equals(networkResponse.header("Content-Encoding"), ignoreCase = true) &&networkResponse.promisesBody()) {val responseBody = networkResponse.body// 根据gzip配置对response数据进行压缩操作if (responseBody != null) {val gzipSource = GzipSource(responseBody.source())val strippedHeaders = wBuilder().removeAll("Content-Encoding").removeAll("Content-Length").build()responseBuilder.headers(strippedHeaders)val contentType = networkResponse.header("Content-Type")responseBuilder.body(RealResponseBody(contentType, -1L, gzipSource.buffer()))}}return responseBuilder.build()}/** Returns a 'Cookie' HTTP request header with all cookies, like `a=b; c=d`. */private fun cookieHeader(cookies: List<Cookie>): String = buildString {cookies.forEachIndexed { index, cookie ->if (index > 0) append("; ")append(cookie.name).append('=').append(cookie.value)}}
}
用于向request提供缓存并将response数据写入缓存
/** Serves requests from the cache and writes responses to the cache. */
class CacheInterceptor(internal val cache: Cache?) : Interceptor {@Throws(IOException::class)override fun intercept(chain: Interceptor.Chain): Response {val call = chain.call()// 通过request查询缓存中该对应的responseval cacheCandidate = cache?.quest())val now = System.currentTimeMillis()// 获取缓存策略val strategy = CacheStrategy.Factory(now, quest(), cacheCandidate)pute()val networkRequest = strategyworkRequestval cacheResponse = strategy.cacheResponse// 根据缓存策略跟踪缓存cache?.trackResponse(strategy)val listener = (call as? RealCall)?.eventListener ?: EventListener.NONE// cacheResponse为空,关闭cacheCandidateif (cacheCandidate != null && cacheResponse == null) {// The cache candidate wasn't applicable. Close it.cacheCandidate.body?.closeQuietly()}// If we're forbidden from using the network and the cache is insufficient, fail.// 请求为空并且缓存响应为空,返回504if (networkRequest == null && cacheResponse == null) {return Response.Builder().quest()).protocol(Protocol.HTTP_1_1).code(HTTP_GATEWAY_TIMEOUT).message("Unsatisfiable Request (only-if-cached)").body(EMPTY_RESPONSE).sentRequestAtMillis(-1L).receivedResponseAtMillis(System.currentTimeMillis()).build().also {listener.satisfactionFailure(call, it)}}// If we don't need the network, we're done.// 返回cacheResponseif (networkRequest == null) {return cacheResponse!!.newBuilder()// 将cacheResponse中body置为空.cacheResponse(stripBody(cacheResponse)).build().also {// 回调cacheHitlistener.cacheHit(call, it)}}if (cacheResponse != null) {listener.cacheConditionalHit(call, cacheResponse)} else if (cache != null) {listener.cacheMiss(call)}var networkResponse: Response? = null// 请求网络获取响应try {networkResponse = chain.proceed(networkRequest)} finally {// If we're crashing on I/O or otherwise, don't leak the cache body.if (networkResponse == null && cacheCandidate != null) {cacheCandidate.body?.closeQuietly()}}// If we have a cache response too, then we're doing a conditional get.if (cacheResponse != null) {if (networkResponse?.code == HTTP_NOT_MODIFIED) {// response没有变化,合并cacheResponse和networkResponse的headers,更新缓存时间val response = wBuilder().headers(combine(cacheResponse.headers, networkResponse.headers)).sentRequestAtMillis(networkResponse.sentRequestAtMillis).ivedResponseAtMillis).cacheResponse(stripBody(cacheResponse))workResponse(stripBody(networkResponse)).build()networkResponse.body!!.close()// Update the cache after combining headers but before stripping the// Content-Encoding header (as performed by initContentStream()).cache!!.trackConditionalCacheHit()// 更新缓存cache.update(cacheResponse, response)return response.also {listener.cacheHit(call, it)}} else {// networkResponse发生了变化,cacheResponse已经失效,关闭cacheResponsecacheResponse.body?.closeQuietly()}}// 使用networkResponse构建responseval response = networkResponse!!.newBuilder().cacheResponse(stripBody(cacheResponse))workResponse(stripBody(networkResponse)).build()if (cache != null) {if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) {// Offer this request to the cache.// 将request写入缓存之中val cacheRequest = cache.put(response)return cacheWritingResponse(cacheRequest, response).also {if (cacheResponse != null) {// This will log a conditional cache miss only.listener.cacheMiss(call)}}}if (HttpMethod.hod)) {try {ve(networkRequest)} catch (_: IOException) {// The cache cannot be written.}}}return response}
/*** Returns a new source that writes bytes to [cacheRequest] as they are read by the source* consumer. This is careful to discard bytes left over when the stream is closed; otherwise we* may never exhaust the source stream and therefore not complete the cached response.*/@Throws(IOException::class)private fun cacheWritingResponse(cacheRequest: CacheRequest?, response: Response): Response {// Some apps return a null body; for compatibility we treat that like a null cache request.if (cacheRequest == null) return responseval cacheBodyUnbuffered = cacheRequest.body()val source = response.body!!.source()val cacheBody = cacheBodyUnbuffered.buffer()val cacheWritingSource = object : Source {private var cacheRequestClosed = false@Throws(IOException::class)override fun read(sink: Buffer, byteCount: Long): Long {val bytesRead: Longtry {bytesRead = ad(sink, byteCount)} catch (e: IOException) {if (!cacheRequestClosed) {cacheRequestClosed = truecacheRequest.abort() // Failed to write a complete cache response.}throw e}if (bytesRead == -1L) {if (!cacheRequestClosed) {cacheRequestClosed = truecacheBody.close() // The cache response is complete!}return -pyTo(cacheBody.buffer, sink.size - bytesRead, itCompleteSegments()return bytesRead}override fun timeout() = source.timeout()@Throws(IOException::class)override fun close() {if (!cacheRequestClosed &&!discard(ExchangeCodec.DISCARD_STREAM_TIMEOUT_MILLIS, MILLISECONDS)) {cacheRequestClosed = truecacheRequest.abort()}source.close()}}val contentType = response.header("Content-Type")val contentLength = tLength()// 返回带有header的response作为wBuilder().body(RealResponseBody(contentType, contentLength, cacheWritingSource.buffer())).build()}
ConnectInterceptor用于向目标服务器开启一个连接并指向下一个拦截器,用于返回response或者用于通过get方式刷新缓存
/*** Opens a connection to the target server and proceeds to the next interceptor. The network might* be used for the returned response, or to validate a cached response with a conditional GET.*/// 单例类
object ConnectInterceptor : Interceptor {@Throws(IOException::class)override fun intercept(chain: Interceptor.Chain): Response {val realChain = chain as RealInterceptorChain// 寻找下一个连接作为request和response的载体val exchange = realChain.call.initExchange(chain)// 复制到realChain中val connectedChain = py(exchange = exchange)// 处理请求,返回responsereturn connectedChain.quest)}
}
最后一个拦截器,用于通过网络请求服务器
/** This is the last interceptor in the chain. It makes a network call to the server. */
class CallServerInterceptor(private val forWebSocket: Boolean) : Interceptor {@Throws(IOException::class)override fun intercept(chain: Interceptor.Chain): Response {val realChain = chain as RealInterceptorChainval exchange = hange!!val request = questval requestBody = request.bodyval sentRequestMillis = System.currentTimeMillis()// 写入请求头exchange.writeRequestHeaders(request)var invokeStartEvent = truevar responseBuilder: Response.Builder? = null// 根据请求方式,设置requestBodyif (HttpMethod.hod) && requestBody != null) {// If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100// Continue" response before transmitting the request body. If we don't get that, return// what we did get (such as a 4xx response) without ever transmitting the request body.// 根据header构建responseBuilderif ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {exchange.flushRequest()responseBuilder = adResponseHeaders(expectContinue = sponseHeadersStart()invokeStartEvent = false}if (responseBuilder == null) {// 双工类型requestBody,刷新requestif (requestBody.isDuplex()) {// Prepare a duplex body so that the application can send a request hange.flushRequest()val bufferedRequestBody = ateRequestBody(request, true).buffer()requestBody.writeTo(bufferedRequestBody)} else {// Write the request body if the "Expect: 100-continue" expectation was met.// 非双工类型,将requestBody写入buffer中val bufferedRequestBody = ateRequestBody(request, false).buffer()requestBody.writeTo(bufferedRequestBody)bufferedRequestBody.close()}} else {RequestBody()// 不支持多路输出if (!tion.isMultiplexed) {// If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection// from being reused. Otherwise we're still obligated to transmit the request body to// leave the connection in a NewExchangesOnConnection()}}} else {RequestBody()}// 刷新底层socket,并发出无数据需要传输信号if (requestBody == null || !requestBody.isDuplex()) {exchange.finishRequest()}// responseBuilder为空时,通过exchange读取响应头if (responseBuilder == null) {responseBuilder = adResponseHeaders(expectContinue = false)!!if (invokeStartEvent) {sponseHeadersStart()invokeStartEvent = false}}// 通过responseBuilder,发起request请求,并通过exchange进行握手,获取responsevar response = quest(request).tion.handshake()).sentRequestAtMillis(sentRequestMillis).receivedResponseAtMillis(System.currentTimeMillis()).build()var code = de// code为100,重新通过exchange读取响应头来获取responseBuilderif (code == 100) {// Server sent a 100-continue even though we did not request one. Try again to read the actual// sponseBuilder = adResponseHeaders(expectContinue = false)!!// 读取响应头if (invokeStartEvent) {sponseHeadersStart()}// 通过responseBuilder重新获取responseresponse = quest(request).tion.handshake()).sentRequestAtMillis(sentRequestMillis).receivedResponseAtMillis(System.currentTimeMillis()).build()code = sponseHeadersEnd(response)response = if (forWebSocket && code == 101) {// Connection is upgrading, but we need to ensure interceptors see a non-null response body.// 构建websocket响应wBuilder().body(EMPTY_RESPONSE).build()} else {// 构建wBuilder().body(exchange.openResponseBody(response)).build()}if ("close".quest.header("Connection"), ignoreCase = true) ||"close".equals(response.header("Connection"), ignoreCase = true)) {NewExchangesOnConnection()}if ((code == 204 || code == 205) && response.body?.contentLength() ?: -1L > 0L) {throw ProtocolException("HTTP $code had non-zero Content-Length: ${response.body?.contentLength()}")}return response}
}
[> 通过前面的学习,我们已经对OKHttp有了简单的认识,并对使用有了详细的了解,下面我们将以一个同步Get请求为例进行OKHttp源码分析。
private val mUrl = ""// 1、createClientprivate val mClient = OkHttpClient()
// createClientTwo
private val mClient2 = OkHttpClient.Builder().build()
// 2、createRequest
private val mRequest = Request.Builder().url(mUrl).build()// 3、发起同步get请求GlobalScope.launch {wCall(mRequest).execute().use { response ->if (response.isSuccessful) {LogUtil.D(log = "request success code is ${de} body is ${String()}")} else {LogUtil.D(log="request error code is ${de}")}}}
通过代码我们发现,创建OkHttpClient的方式有两种:
constructor() : this(Builder())class Builder constructor() {// 调度器internal var dispatcher: Dispatcher = Dispatcher()// 连接池internal var connectionPool: ConnectionPool = ConnectionPool()// 应用拦截器listinternal val interceptors: MutableList<Interceptor> = mutableListOf()// 网络拦截器listinternal val networkInterceptors: MutableList<Interceptor> = mutableListOf()// eventListenerFactoryinternal var eventListenerFactory: EventListener.Factory = EventListener.NONE.asFactory()// 自动重试布尔值internal var retryOnConnectionFailure = true// 认证中心 默认为NONEinternal var authenticator: Authenticator = Authenticator.NONE// 重定向布尔值internal var followRedirects = true// ssl重定向布尔值internal var followSslRedirects = true// cookie控制internal var cookieJar: CookieJar = CookieJar.NO_COOKIES// 缓存internal var cache: Cache? = null// DNS配置internal var dns: Dns = Dns.SYSTEM// 代理internal var proxy: Proxy? = null// 代理选择器internal var proxySelector: ProxySelector? = null// 代理认证器internal var proxyAuthenticator: Authenticator = Authenticator.NONE// 使用默认的Socket工厂产生Socketinternal var socketFactory: SocketFactory = Default()// sslSocketFactory默认为nullinternal var sslSocketFactoryOrNull: SSLSocketFactory? = null// x509认证管理器默认为nullinternal var x509TrustManagerOrNull: X509TrustManager? = null// 默认连接配置 TLS和CLEARTEXTinternal var connectionSpecs: List<ConnectionSpec> = DEFAULT_CONNECTION_SPECS// 默认HTTP协议internal var protocols: List<Protocol> = DEFAULT_PROTOCOLS// HostName认证器internal var hostnameVerifier: HostnameVerifier = OkHostnameVerifier// 认证加密组建internal var certificatePinner: CertificatePinner = CertificatePinner.DEFAULT// 证书链cleanerinternal var certificateChainCleaner: CertificateChainCleaner? = null// 默认请求超时时常internal var callTimeout = 0// 默认连接时常internal var connectTimeout = 10_000// 默认读数据超时时常internal var readTimeout = 10_000// 默认写数据超时时常internal var writeTimeout = 10_000// ping间隔,心跳时间internal var pingInterval = 0// websocket消息最小压缩值internal var minWebSocketMessageToCompress = RealWebSocket.DEFAULT_MINIMUM_DEFLATE_SIZE}
默认调用的方法是class Builder constructor() 与方法直接创建方式相同,通过构建者默认对外暴露了自定义配置的方法。
// 创建builder构造器class Builder constructor()// build创建OkHttpClientfun build(): OkHttpClient = OkHttpClient(this)
// 默认请求方式为GET
constructor() {hod = "GET"this.headers = Headers.Builder()}
/**
* Sets the URL target of this request.
* @throws IllegalArgumentException if [url] is not a valid HTTP or HTTPS URL. Avoid this
* exception by calling [HttpUrl.parse]; it returns null for invalid URLs.
*/
open fun url(url: String): Builder {// 替换webSocketURL为http类型url// Silently replace web socket URLs with HTTP URLs.val finalUrl: String = when {url.startsWith("ws:", ignoreCase = true) -> {"http:${url.substring(3)}"}url.startsWith("wss:", ignoreCase = true) -> {"https:${url.substring(4)}"}else -> url}return HttpUrl())}
open fun build(): Request {return Request(checkNotNull(url) { "url == null" },method,headers.build(),ImmutableMap())}
通过传入的url、method、headers、body创建Request对象
interface Call : Cloneable {fun request(): Requestfun execute(): Responsefun cancel()fun isExecuted(): Booleanfun isCanceled(): Booleanfun timeout(): Timeoutpublic override fun clone(): Callfun interface Factory {fun newCall(request: Request): Call}
}
通过查看Call源码可见,Call实际上是一个接口,并定义了一些常用的方法,具体实现由其实现类提供
/** Prepares the [request] to be executed at some point in the future. */override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)
通过查看方法可见,实际创建的是一个RealCall对象
默认创建的RealCall用来进行HTTP通信,不是进行websocket通信
/*** Bridge between OkHttp's application and network layers. This class exposes high-level application* layer primitives: connections, requests, responses, and streams.** This class supports [asynchronous canceling][cancel]. This is intended to have the smallest* blast radius possible. If an HTTP/2 stream is active, canceling will cancel that stream but not* the other streams sharing its connection. But if the TLS handshake is still in progress then* canceling may break the entire connection.*/
class RealCall(val client: OkHttpClient,val originalRequest: Request,val forWebSocket: Boolean
) : Call {......
}
通过注释我们可以知道:
override fun execute(): Response {check(executedpareAndSet(false, true)) { "Already Executed" }()callStart()try {// 步骤uted(this)// 步骤2return getResponseWithInterceptorChain()} finally {// 标记请求完成client.dispatcher.finished(this)}}
实际上execute执行的方法是通过 uted(this)实现的
// Dispatcher是用于异步请求执行的策略类,每一个Dispatcher都是用Executor Service独立进行请求,如果使用本地的executor,可以支持maxRequests(默认64)数量集并发请求
class Dispatcher constructor() {
// 支持的最大请求数
@get:Synchronized var maxRequests = 64set(maxRequests) {require(maxRequests >= 1) { "max < 1: $maxRequests" }synchronized(this) {field = maxRequests}promoteAndExecute()}
// 支持的单个host地址请求数@get:Synchronized var maxRequestsPerHost = 5set(maxRequestsPerHost) {require(maxRequestsPerHost >= 1) { "max < 1: $maxRequestsPerHost" }synchronized(this) {field = maxRequestsPerHost}promoteAndExecute()}@set:Synchronized@get:Synchronizedvar idleCallback: Runnable? = nullprivate var executorServiceOrNull: ExecutorService? = null// java线程池 不解释@get:Synchronized@get:JvmName("executorService") val executorService: ExecutorServiceget() {if (executorServiceOrNull == null) {executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))}return executorServiceOrNull!!}// 可执行请求队列private val readyAsyncCalls = ArrayDeque<AsyncCall>()// 正在执行的异步call队列private val runningAsyncCalls = ArrayDeque<AsyncCall>()// 正在执行的同步请求队列private val runningSyncCalls = ArrayDeque<RealCall>()constructor(executorService: ExecutorService) : this() {utorServiceOrNull = executorService}// 异步请求internal fun enqueue(call: AsyncCall) {synchronized(this) {readyAsyncCalls.add(call)if (!call.call.forWebSocket) {val existingCall = findExistingCallWithHost(call.host)if (existingCall != null) useCallsPerHostFrom(existingCall)}}promoteAndExecute()}private fun findExistingCallWithHost(host: String): AsyncCall? {for (existingCall in runningAsyncCalls) {if (existingCall.host == host) return existingCall}for (existingCall in readyAsyncCalls) {if (existingCall.host == host) return existingCall}return null}// 取消所有请求,包括同步请求和异步请求@Synchronized fun cancelAll() {for (call in readyAsyncCalls) {call.call.cancel()}for (call in runningAsyncCalls) {call.call.cancel()}for (call in runningSyncCalls) {call.cancel()}}// 执行请求,返回是否有异步call正在执行状态private fun promoteAndExecute(): Boolean {this.assertThreadDoesntHoldLock()// 可以执行的请求队列val executableCalls = mutableListOf<AsyncCall>()val isRunning: Boolean// 同步方法synchronized(this) {val i = readyAsyncCalls.iterator()while (i.hasNext()) {val asyncCall = i.next()// 最大请求数限制策略if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.// 最大单host请求限制策略if (() >= this.maxRequestsPerHost) continue // Host max capacity.// 从readyAsyncCalls队列中移除i.remove()asyncCall.callsPerHost.incrementAndGet()// 添加到executableCalls队列中executableCalls.add(asyncCall)runningAsyncCalls.add(asyncCall)}isRunning = runningCallsCount() > 0}// 执行executableCalls中请求for (i in 0 until executableCalls.size) {val asyncCall = executableCalls[i]// 调用RealCall中executeOn()方法uteOn(executorService)}return isRunning}// 同步请求,将RealCall添加到runningSyncCalls队列中@Synchronized internal fun executed(call: RealCall) {runningSyncCalls.add(call)}// 请求完成回调internal fun finished(call: AsyncCall) {// callsPerHost数量自减call.callsPerHost.decrementAndGet()finished(runningAsyncCalls, call)}internal fun finished(call: RealCall) {finished(runningSyncCalls, call)}private fun <T> finished(calls: Deque<T>, call: T) {val idleCallback: Runnable?synchronized(this) {// 从calls中移除callif (!ve(call)) throw AssertionError("Call wasn't in-flight!")idleCallback = this.idleCallback}val isRunning = promoteAndExecute()// 如果不是在running状态并且callback不为空,运行idleCallbackif (!isRunning && idleCallback != null) {idleCallback.run()}}/** Returns a snapshot of the calls currently awaiting execution. */@Synchronized fun queuedCalls(): List<Call> {return Collections.unmodifiableList(readyAsyncCalls.map { it.call })}/** Returns a snapshot of the calls currently being executed. */@Synchronized fun runningCalls(): List<Call> {return Collections.unmodifiableList(runningSyncCalls + runningAsyncCalls.map { it.call })}@Synchronized fun queuedCallsCount(): Int = readyAsyncCalls.size@Synchronized fun runningCallsCount(): Int = runningAsyncCalls.size + runningSyncCalls.size@JvmName("-deprecated_executorService")@Deprecated(message = "moved to val",replaceWith = ReplaceWith(expression = "executorService"),level = DeprecationLevel.ERROR)fun executorService(): ExecutorService = executorService
}
/*** Attempt to enqueue this async call on [executorService]. This will attempt to clean up* if the executor has been shut down by reporting the call as failed.*/fun executeOn(executorService: ExecutorService) {client.dispatcher.assertThreadDoesntHoldLock()var success = falsetry {// 使用线程池执行请求ute(this)// 成功标记success = true} catch (e: RejectedExecutionException) {val ioException = InterruptedIOException("executor rejected")ioException.initCause(e)noMoreExchanges(ioException)// 抛出io异常Failure(this@RealCall, ioException)} finally {// 如果没有请求成功,将该请求进行关闭if (!success) {client.dispatcher.finished(this) // This call is no longer running!}}}
@Throws(IOException::class)internal fun getResponseWithInterceptorChain(): Response {// Build a full stack of interceptors.val interceptors = mutableListOf<Interceptor>()interceptors += client.interceptorsinterceptors += RetryAndFollowUpInterceptor(client)interceptors += kieJar)interceptors += CacheInterceptor(client.cache)interceptors += ConnectInterceptorif (!forWebSocket) {interceptors += clientworkInterceptors}interceptors += CallServerInterceptor(forWebSocket)// 构建RealInterceptorChainval chain = RealInterceptorChain(call = this,interceptors = interceptors,index = 0,exchange = null,request = originalRequest,connectTimeoutMillis = tTimeoutMillis,readTimeoutMillis = adTimeoutMillis,writeTimeoutMillis = client.writeTimeoutMillis)var calledNoMoreExchanges = false// 获取responsetry {val response = chain.proceed(originalRequest)if (isCanceled()) {response.closeQuietly()throw IOException("Canceled")}return response} catch (e: IOException) {calledNoMoreExchanges = truethrow noMoreExchanges(e) as Throwable} finally {if (!calledNoMoreExchanges) {noMoreExchanges(null)}}}
/*** A concrete interceptor chain that carries the entire interceptor chain: all application* interceptors, the OkHttp core, all network interceptors, and finally the network caller.** If the chain is for an application interceptor then [exchange] must be null. Otherwise it is for* a network interceptor and [exchange] must be non-null.*/
class RealInterceptorChain(internal val call: RealCall,private val interceptors: List<Interceptor>,private val index: Int,internal val exchange: Exchange?,internal val request: Request,internal val connectTimeoutMillis: Int,internal val readTimeoutMillis: Int,internal val writeTimeoutMillis: Int
) : Interceptor.Chain {
通过注释我们可以知道RealInterceptorChain实现了Interceptor.Chain是一个interceptors、OkHttp core、所有网络拦截器和网络调用器的集合
如果chain作为应用拦截器使用时,exchange必须为空;作为网络拦截器使用时,exchange必须非空
通过chain.proceed获取response
val response = chain.proceed(originalRequest)
@Throws(IOException::class)override fun proceed(request: Request): Response {check(index < interceptors.size)calls++if (exchange != null) {check(exchange.finder.sameHostAndPort(request.url)) {"network interceptor ${interceptors[index - 1]} must retain the same host and port"}check(calls == 1) {"network interceptor ${interceptors[index - 1]} must call proceed() exactly once"}}// Call the next interceptor in the chain.// 调用chain中的下一个拦截器val next = copy(index = index + 1, request = request)// 获取当前拦截器集合中拦截器val interceptor = interceptors[index]@Suppress("USELESS_ELVIS")// 通过下一个拦截器获取response val response = interceptor.intercept(next) ?: throw NullPointerException("interceptor $interceptor returned null")if (exchange != null) {check(index + 1 >= interceptors.size || next.calls == 1) {"network interceptor $interceptor must call proceed() exactly once"}}check(response.body != null) { "interceptor $interceptor returned a response with no body" }return response}
看到当前拦截器的Response依赖于下一个拦截器的Intercept的Response,会沿着这条拦截器链依次调用每一个拦截器,最后返回返回Response
internal fun copy(index: Int = this.index,exchange: Exchange? = hange,request: Request = quest,connectTimeoutMillis: Int = tTimeoutMillis,readTimeoutMillis: Int = adTimeoutMillis,writeTimeoutMillis: Int = this.writeTimeoutMillis) = RealInterceptorChain(call, interceptors, index, exchange, request, connectTimeoutMillis,readTimeoutMillis, writeTimeoutMillis)
通过构造传参创建RealInterceptorChain对象
fun interface Interceptor {@Throws(IOException::class)fun intercept(chain: Chain): Response
...
}
通过代码我们可见,Interceptor是一个接口,具体的实现通过其实现类来提供
class RetryAndFollowUpInterceptor(private val client: OkHttpClient) : Interceptor {@Throws(IOException::class)override fun intercept(chain: Interceptor.Chain): Response {val realChain = chain as RealInterceptorChainvar request = questval call = realChain.callvar followUpCount = 0var priorResponse: Response? = nullvar newExchangeFinder = truevar recoveredFailures = listOf<IOException>()while (true) {// 将call请求添加到网络拦截器中NetworkInterceptorExchange(request, newExchangeFinder)var response: Responsevar closeActiveExchange = truetry {if (call.isCanceled()) {throw IOException("Canceled")}try {// 通过realChain处理请求response = realChain.proceed(request)newExchangeFinder = true} catch (e: RouteException) {// The attempt to connect via a route failed. The request will not have been sent.// 路由通过失败,尝试进行恢复,如果请求不能恢复则抛出异常if (!recover(e.lastConnectException, call, request, requestSendStarted = false)) {throw e.firstConnectException.withSuppressed(recoveredFailures)} else {recoveredFailures += e.firstConnectException}newExchangeFinder = falsecontinue} catch (e: IOException) {// An attempt to communicate with a server failed. The request may have been sent.// 与server通信失败,发生IO异常时,尝试进行恢复if (!recover(e, call, request, requestSendStarted = e !is ConnectionShutdownException)) {throw e.withSuppressed(recoveredFailures)} else {recoveredFailures += e}newExchangeFinder = falsecontinue}// Attach the prior response if it exists. Such responses never have a body. // 如果priorResponse存在的话,附加priorResponse来创建responseif (priorResponse != null) {response = wBuilder().wBuilder().body(null).build()).build()}// exchange 用来传输独立的request和response对val exchange = call.interceptorScopedExchange// 重点方法 通过response和exchange获取request进行重试val followUp = followUpRequest(response, exchange)// followUp为空,直接返回responseif (followUp == null) {// exchange不为空,并且exchange是双工通信方式 if (exchange != null && exchange.isDuplex) {// 退出timeout策略call.timeoutEarlyExit()}closeActiveExchange = falsereturn response}val followUpBody = followUp.body// followUpBody不为空,并且是单工通信方式,直接返回responseif (followUpBody != null && followUpBody.isOneShot()) {closeActiveExchange = falsereturn response}// 关闭response Bodyresponse.body?.closeQuietly()if (++followUpCount > MAX_FOLLOW_UPS) {throw ProtocolException("Too many follow-up requests: $followUpCount")}// 缓存requestrequest = followUp// 缓存responsepriorResponse = response} finally {// 退出itNetworkInterceptorExchange(closeActiveExchange)}}}
@Throws(IOException::class)private fun followUpRequest(userResponse: Response, exchange: Exchange?): Request? {val route = exchange?.connection?.route()val responseCode = deval method = hodwhen (responseCode) {HTTP_PROXY_AUTH -> {val selectedProxy = route!!.proxyif (pe() != Proxy.Type.HTTP) {throw ProtocolException("Received HTTP_PROXY_AUTH (407) code while not using proxy")}// code 407,调用认证器重新进行认证return client.proxyAuthenticator.authenticate(route, userResponse)}// 401 重新认证HTTP_UNAUTHORIZED -> return client.authenticator.authenticate(route, userResponse)HTTP_PERM_REDIRECT, HTTP_TEMP_REDIRECT, HTTP_MULT_CHOICE, HTTP_MOVED_PERM, HTTP_MOVED_TEMP, HTTP_SEE_OTHER -> {// 重定向return buildRedirectRequest(userResponse, method)}// 客户端超时处理HTTP_CLIENT_TIMEOUT -> {// 408's are rare in practice, but some servers like HAProxy use this response code. The// spec says that we may repeat the request without modifications. Modern browsers also// repeat the request (even non-idempotent ones.)if (!OnConnectionFailure) {// The application layer has directed us not to retry urn null}val requestBody = quest.bodyif (requestBody != null && requestBody.isOneShot()) {return null}val priorResponse = userResponse.priorResponseif (priorResponse != null && de == HTTP_CLIENT_TIMEOUT) {// We attempted to retry and got another timeout. urn null}if (retryAfter(userResponse, 0) > 0) {return null}quest}// code 503处理HTTP_UNAVAILABLE -> {val priorResponse = userResponse.priorResponseif (priorResponse != null && de == HTTP_UNAVAILABLE) {// We attempted to retry and got another timeout. urn null}if (retryAfter(userResponse, Integer.MAX_VALUE) == 0) {// specifically received an instruction to retry without quest}return null}// code 421处理HTTP_MISDIRECTED_REQUEST -> {// OkHttp can coalesce HTTP/2 connections even if the domain names are different. See// RealConnection.isEligible(). If we attempted this and the server returned HTTP 421, then// we can retry on a different connection.val requestBody = quest.bodyif (requestBody != null && requestBody.isOneShot()) {return null}if (exchange == null || !exchange.isCoalescedConnection) {return CoalescedConnections()quest}else -> return null}}
通过源码我们可以看到,该方法根据返回的不同状态码,进行了不同的重试和重定向操作
3) buildRedirectRequest
private fun buildRedirectRequest(userResponse: Response, method: String): Request? {// Does the client allow redirects?if (!client.followRedirects) return nullval location = userResponse.header("Location") ?: return null// Don't follow redirects to unsupported protocols.val url = solve(location) ?: return null// If configured, don't follow redirects between SSL and non-SSL.val sameScheme = url.scheme == quest.url.schemeif (!sameScheme && !client.followSslRedirects) return null// Most redirects don't include a request body.// 根据userReponse中数据,重新构造requestBuilderval requestBuilder = wBuilder()if (HttpMethod.permitsRequestBody(method)) {val responseCode = deval maintainBody = directsWithBody(method) ||responseCode == HTTP_PERM_REDIRECT ||responseCode == HTTP_TEMP_REDIRECTif (directsToGet(method) && responseCode != HTTP_PERM_REDIRECT && responseCode != HTTP_TEMP_REDIRECT) {hod("GET", null)} else {val requestBody = if (maintainBody) quest.body hod(method, requestBody)}// 不支持body时移除相关header内容if (!maintainBody) {veHeader("Transfer-Encoding")veHeader("Content-Length")veHeader("Content-Type")}}// When redirecting across hosts, drop all authentication headers. This// is potentially annoying to the application layer since they have no// way to retain them.if (!quest.url.canReuseConnectionFor(url)) {veHeader("Authorization")}// 返回构建好的requestreturn requestBuilder.url(url).build()}
根据用户的request创建网络请求,处理网络请求,最后通过网络请求将response返回给用户
/*** Bridges from application code to network code. First it builds a network request from a user* request. Then it proceeds to call the network. Finally it builds a user response from the network* response.*/
class BridgeInterceptor(private val cookieJar: CookieJar) : Interceptor {@Throws(IOException::class)override fun intercept(chain: Interceptor.Chain): Response {// 获取用户requestval userRequest = quest()val requestBuilder = wBuilder()val body = userRequest.body// 根据body添加相应的请求headerif (body != null) {val contentType = tType()if (contentType != null) {requestBuilder.header("Content-Type", String())}val contentLength = tLength()if (contentLength != -1L) {requestBuilder.header("Content-Length", String())veHeader("Transfer-Encoding")} else {requestBuilder.header("Transfer-Encoding", "chunked")veHeader("Content-Length")}}if (userRequest.header("Host") == null) {requestBuilder.header("Host", HostHeader())}if (userRequest.header("Connection") == null) {requestBuilder.header("Connection", "Keep-Alive")}// If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing// the transfer stream.var transparentGzip = falseif (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {transparentGzip = truerequestBuilder.header("Accept-Encoding", "gzip")}val cookies = cookieJar.loadForRequest(userRequest.url)// 根据cookie配置在header中添加cookieif (cookies.isNotEmpty()) {requestBuilder.header("Cookie", cookieHeader(cookies))}if (userRequest.header("User-Agent") == null) {requestBuilder.header("User-Agent", userAgent)}// 通过构建好的requestBuilder获取networkResponseval networkResponse = chain.proceed(requestBuilder.build())iveHeaders(userRequest.url, networkResponse.headers)val responseBuilder = wBuilder().request(userRequest)if (transparentGzip &&"gzip".equals(networkResponse.header("Content-Encoding"), ignoreCase = true) &&networkResponse.promisesBody()) {val responseBody = networkResponse.body// 根据gzip配置对response数据进行压缩操作if (responseBody != null) {val gzipSource = GzipSource(responseBody.source())val strippedHeaders = wBuilder().removeAll("Content-Encoding").removeAll("Content-Length").build()responseBuilder.headers(strippedHeaders)val contentType = networkResponse.header("Content-Type")responseBuilder.body(RealResponseBody(contentType, -1L, gzipSource.buffer()))}}return responseBuilder.build()}/** Returns a 'Cookie' HTTP request header with all cookies, like `a=b; c=d`. */private fun cookieHeader(cookies: List<Cookie>): String = buildString {cookies.forEachIndexed { index, cookie ->if (index > 0) append("; ")append(cookie.name).append('=').append(cookie.value)}}
}
用于向request提供缓存并将response数据写入缓存
/** Serves requests from the cache and writes responses to the cache. */
class CacheInterceptor(internal val cache: Cache?) : Interceptor {@Throws(IOException::class)override fun intercept(chain: Interceptor.Chain): Response {val call = chain.call()// 通过request查询缓存中该对应的responseval cacheCandidate = cache?.quest())val now = System.currentTimeMillis()// 获取缓存策略val strategy = CacheStrategy.Factory(now, quest(), cacheCandidate)pute()val networkRequest = strategyworkRequestval cacheResponse = strategy.cacheResponse// 根据缓存策略跟踪缓存cache?.trackResponse(strategy)val listener = (call as? RealCall)?.eventListener ?: EventListener.NONE// cacheResponse为空,关闭cacheCandidateif (cacheCandidate != null && cacheResponse == null) {// The cache candidate wasn't applicable. Close it.cacheCandidate.body?.closeQuietly()}// If we're forbidden from using the network and the cache is insufficient, fail.// 请求为空并且缓存响应为空,返回504if (networkRequest == null && cacheResponse == null) {return Response.Builder().quest()).protocol(Protocol.HTTP_1_1).code(HTTP_GATEWAY_TIMEOUT).message("Unsatisfiable Request (only-if-cached)").body(EMPTY_RESPONSE).sentRequestAtMillis(-1L).receivedResponseAtMillis(System.currentTimeMillis()).build().also {listener.satisfactionFailure(call, it)}}// If we don't need the network, we're done.// 返回cacheResponseif (networkRequest == null) {return cacheResponse!!.newBuilder()// 将cacheResponse中body置为空.cacheResponse(stripBody(cacheResponse)).build().also {// 回调cacheHitlistener.cacheHit(call, it)}}if (cacheResponse != null) {listener.cacheConditionalHit(call, cacheResponse)} else if (cache != null) {listener.cacheMiss(call)}var networkResponse: Response? = null// 请求网络获取响应try {networkResponse = chain.proceed(networkRequest)} finally {// If we're crashing on I/O or otherwise, don't leak the cache body.if (networkResponse == null && cacheCandidate != null) {cacheCandidate.body?.closeQuietly()}}// If we have a cache response too, then we're doing a conditional get.if (cacheResponse != null) {if (networkResponse?.code == HTTP_NOT_MODIFIED) {// response没有变化,合并cacheResponse和networkResponse的headers,更新缓存时间val response = wBuilder().headers(combine(cacheResponse.headers, networkResponse.headers)).sentRequestAtMillis(networkResponse.sentRequestAtMillis).ivedResponseAtMillis).cacheResponse(stripBody(cacheResponse))workResponse(stripBody(networkResponse)).build()networkResponse.body!!.close()// Update the cache after combining headers but before stripping the// Content-Encoding header (as performed by initContentStream()).cache!!.trackConditionalCacheHit()// 更新缓存cache.update(cacheResponse, response)return response.also {listener.cacheHit(call, it)}} else {// networkResponse发生了变化,cacheResponse已经失效,关闭cacheResponsecacheResponse.body?.closeQuietly()}}// 使用networkResponse构建responseval response = networkResponse!!.newBuilder().cacheResponse(stripBody(cacheResponse))workResponse(stripBody(networkResponse)).build()if (cache != null) {if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) {// Offer this request to the cache.// 将request写入缓存之中val cacheRequest = cache.put(response)return cacheWritingResponse(cacheRequest, response).also {if (cacheResponse != null) {// This will log a conditional cache miss only.listener.cacheMiss(call)}}}if (HttpMethod.hod)) {try {ve(networkRequest)} catch (_: IOException) {// The cache cannot be written.}}}return response}
/*** Returns a new source that writes bytes to [cacheRequest] as they are read by the source* consumer. This is careful to discard bytes left over when the stream is closed; otherwise we* may never exhaust the source stream and therefore not complete the cached response.*/@Throws(IOException::class)private fun cacheWritingResponse(cacheRequest: CacheRequest?, response: Response): Response {// Some apps return a null body; for compatibility we treat that like a null cache request.if (cacheRequest == null) return responseval cacheBodyUnbuffered = cacheRequest.body()val source = response.body!!.source()val cacheBody = cacheBodyUnbuffered.buffer()val cacheWritingSource = object : Source {private var cacheRequestClosed = false@Throws(IOException::class)override fun read(sink: Buffer, byteCount: Long): Long {val bytesRead: Longtry {bytesRead = ad(sink, byteCount)} catch (e: IOException) {if (!cacheRequestClosed) {cacheRequestClosed = truecacheRequest.abort() // Failed to write a complete cache response.}throw e}if (bytesRead == -1L) {if (!cacheRequestClosed) {cacheRequestClosed = truecacheBody.close() // The cache response is complete!}return -pyTo(cacheBody.buffer, sink.size - bytesRead, itCompleteSegments()return bytesRead}override fun timeout() = source.timeout()@Throws(IOException::class)override fun close() {if (!cacheRequestClosed &&!discard(ExchangeCodec.DISCARD_STREAM_TIMEOUT_MILLIS, MILLISECONDS)) {cacheRequestClosed = truecacheRequest.abort()}source.close()}}val contentType = response.header("Content-Type")val contentLength = tLength()// 返回带有header的response作为wBuilder().body(RealResponseBody(contentType, contentLength, cacheWritingSource.buffer())).build()}
ConnectInterceptor用于向目标服务器开启一个连接并指向下一个拦截器,用于返回response或者用于通过get方式刷新缓存
/*** Opens a connection to the target server and proceeds to the next interceptor. The network might* be used for the returned response, or to validate a cached response with a conditional GET.*/// 单例类
object ConnectInterceptor : Interceptor {@Throws(IOException::class)override fun intercept(chain: Interceptor.Chain): Response {val realChain = chain as RealInterceptorChain// 寻找下一个连接作为request和response的载体val exchange = realChain.call.initExchange(chain)// 复制到realChain中val connectedChain = py(exchange = exchange)// 处理请求,返回responsereturn connectedChain.quest)}
}
最后一个拦截器,用于通过网络请求服务器
/** This is the last interceptor in the chain. It makes a network call to the server. */
class CallServerInterceptor(private val forWebSocket: Boolean) : Interceptor {@Throws(IOException::class)override fun intercept(chain: Interceptor.Chain): Response {val realChain = chain as RealInterceptorChainval exchange = hange!!val request = questval requestBody = request.bodyval sentRequestMillis = System.currentTimeMillis()// 写入请求头exchange.writeRequestHeaders(request)var invokeStartEvent = truevar responseBuilder: Response.Builder? = null// 根据请求方式,设置requestBodyif (HttpMethod.hod) && requestBody != null) {// If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100// Continue" response before transmitting the request body. If we don't get that, return// what we did get (such as a 4xx response) without ever transmitting the request body.// 根据header构建responseBuilderif ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {exchange.flushRequest()responseBuilder = adResponseHeaders(expectContinue = sponseHeadersStart()invokeStartEvent = false}if (responseBuilder == null) {// 双工类型requestBody,刷新requestif (requestBody.isDuplex()) {// Prepare a duplex body so that the application can send a request hange.flushRequest()val bufferedRequestBody = ateRequestBody(request, true).buffer()requestBody.writeTo(bufferedRequestBody)} else {// Write the request body if the "Expect: 100-continue" expectation was met.// 非双工类型,将requestBody写入buffer中val bufferedRequestBody = ateRequestBody(request, false).buffer()requestBody.writeTo(bufferedRequestBody)bufferedRequestBody.close()}} else {RequestBody()// 不支持多路输出if (!tion.isMultiplexed) {// If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection// from being reused. Otherwise we're still obligated to transmit the request body to// leave the connection in a NewExchangesOnConnection()}}} else {RequestBody()}// 刷新底层socket,并发出无数据需要传输信号if (requestBody == null || !requestBody.isDuplex()) {exchange.finishRequest()}// responseBuilder为空时,通过exchange读取响应头if (responseBuilder == null) {responseBuilder = adResponseHeaders(expectContinue = false)!!if (invokeStartEvent) {sponseHeadersStart()invokeStartEvent = false}}// 通过responseBuilder,发起request请求,并通过exchange进行握手,获取responsevar response = quest(request).tion.handshake()).sentRequestAtMillis(sentRequestMillis).receivedResponseAtMillis(System.currentTimeMillis()).build()var code = de// code为100,重新通过exchange读取响应头来获取responseBuilderif (code == 100) {// Server sent a 100-continue even though we did not request one. Try again to read the actual// sponseBuilder = adResponseHeaders(expectContinue = false)!!// 读取响应头if (invokeStartEvent) {sponseHeadersStart()}// 通过responseBuilder重新获取responseresponse = quest(request).tion.handshake()).sentRequestAtMillis(sentRequestMillis).receivedResponseAtMillis(System.currentTimeMillis()).build()code = sponseHeadersEnd(response)response = if (forWebSocket && code == 101) {// Connection is upgrading, but we need to ensure interceptors see a non-null response body.// 构建websocket响应wBuilder().body(EMPTY_RESPONSE).build()} else {// 构建wBuilder().body(exchange.openResponseBody(response)).build()}if ("close".quest.header("Connection"), ignoreCase = true) ||"close".equals(response.header("Connection"), ignoreCase = true)) {NewExchangesOnConnection()}if ((code == 204 || code == 205) && response.body?.contentLength() ?: -1L > 0L) {throw ProtocolException("HTTP $code had non-zero Content-Length: ${response.body?.contentLength()}")}return response}
}
本文发布于:2024-01-29 02:18:11,感谢您对本站的认可!
本文链接:https://www.4u4v.net/it/170646589612020.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
留言与评论(共有 0 条评论) |