来自公众号:新世界杂货铺
阅读建议
这是 HTTP2.0 系列的第二篇,所以笔者推荐阅读顺序如下:
Go中的HTTP请求之——HTTP1.1请求流程分析
Go发起HTTP2.0请求流程分析(前篇)
本篇主要分为三个部分:数据帧,流控制器以及通过分析源码逐步了解流控制。
本有意将这三个部分拆成三篇文章,但它们之间又有联系,所以最后依旧决定放在一篇文章里面。由于内容较多,笔者认为分三次分别阅读三个部分较佳。
数据帧
HTTP2 通信的最小单位是数据帧,每一个帧都包含两部分:帧头和*Payload*。不同数据流的帧可以交错发送(同一个数据流的帧必须顺序发送),然后再根据每个帧头的数据流标识符重新组装。
由于 Payload 中为有效数据,故仅对帧头进行分析描述。
帧头
帧头总长度为 9 个字节,并包含四个部分,分别是:
Payload 的长度,占用三个字节。
数据帧类型,占用一个字节。
数据帧标识符,占用一个字节。
数据流 ID,占用四个字节。
用图表示如下:
数据帧的格式和各部分的含义已经清楚了, 那么我们看看代码中怎么读取一个帧头:
func http2readFrameHeader(buf []byte, r io.Reader) (http2FrameHeader, error) {
_, err := io.ReadFull(r, buf[:http2frameHeaderLen])
if err != nil {
return http2FrameHeader{}, err
}
return http2FrameHeader{
Length: (uint32(buf[0])<<16 | uint32(buf[1])<<8 | uint32(buf[2])),
Type: http2FrameType(buf[3]),
Flags: http2Flags(buf[4]),
StreamID: binary.BigEndian.Uint32(buf[5:]) & (1<<31 - 1),
valid: true,
}, nil
}
复制代码
在上面的代码中http2frameHeaderLen
是一个常量,其值为 9。
从 io.Reader 中读取 9 个字节后,将前三个字节和后四个字节均转为uint32
的类型,从而得到 Payload 长度和数据流 ID。另外需要理解的是帧头的前三个字节和后四个字节存储格式为大端(大小端笔者就不在这里解释了,请尚不了解的读者自行百度)。
数据帧类型
根据http://http2.github.io/http2-spec/#rfc.section.11.2描述,数据帧类型总共有 10 个。在 go 源码中均有体现:
const (
http2FrameData http2FrameType = 0x0
http2FrameHeaders http2FrameType = 0x1
http2FramePriority http2FrameType = 0x2
http2FrameRSTStream http2FrameType = 0x3
http2FrameSettings http2FrameType = 0x4
http2FramePushPromise http2FrameType = 0x5
http2FramePing http2FrameType = 0x6
http2FrameGoAway http2FrameType = 0x7
http2FrameWindowUpdate http2FrameType = 0x8
http2FrameContinuation http2FrameType = 0x9
)
复制代码
http2FrameData
:主要用于发送请求 body 和接收响应的数据帧。
http2FrameHeaders
:主要用于发送请求 header 和接收响应 header 的数据帧。
http2FrameSettings
:主要用于 client 和 server 交流设置相关的数据帧。
http2FrameWindowUpdate
:主要用于流控制的数据帧。
其他数据帧类型因为本文不涉及,故不做描述。
数据帧标识符
由于数据帧标识符种类较多,笔者在这里仅介绍其中部分标识符,先看源码:
const (
// Data Frame
http2FlagDataEndStream http2Flags = 0x1
// Headers Frame
http2FlagHeadersEndStream http2Flags = 0x1
// Settings Frame
http2FlagSettingsAck http2Flags = 0x1
// 此处省略定义其他数据帧标识符的代码
)
复制代码
http2FlagDataEndStream
:在前篇中提到,调用(*http2ClientConn).newStream
方法会创建一个数据流,那这个数据流什么时候结束呢,这就是http2FlagDataEndStream
的作用。
当 client 收到有响应 body 的响应时(HEAD 请求无响应 body,301,302 等响应也无响应 body),一直读到http2FrameData
数据帧的标识符为http2FlagDataEndStream
则意味着本次请求结束可以关闭当前数据流。
http2FlagHeadersEndStream
:如果读到的http2FrameHeaders
数据帧有此标识符也意味着本次请求结束。
http2FlagSettingsAck
:该标示符意味着对方确认收到http2FrameSettings
数据帧。
流控制器
流控制是一种阻止发送方向接收方发送大量数据的机制,以免超出后者的需求或处理能力。Go 中 HTTP2 通过http2flow
结构体进行流控制:
type http2flow struct {
// n is the number of DATA bytes we're allowed to send.
// A flow is kept both on a conn and a per-stream.
n int32
// conn points to the shared connection-level flow that is
// shared by all streams on that conn. It is nil for the flow
// that's on the conn directly.
conn *http2flow
}
复制代码
字段含义英文注释已经描述的很清楚了,所以笔者不再翻译。下面看一下和流控制有关的方法。
(*http2flow).available
此方法返回当前流控制可发送的最大字节数:
func (f *http2flow) available() int32 {
n := f.n
if f.conn != nil && f.conn.n < n {
n = f.conn.n
}
return n
}
复制代码
(*http2flow).take
此方法用于消耗当前流控制器的可发送字节数:
func (f *http2flow) take(n int32) {
if n > f.available() {
panic("internal error: took too much")
}
f.n -= n
if f.conn != nil {
f.conn.n -= n
}
}
复制代码
通过实际需要传递一个参数,告知当前流控制器想要发送的数据大小。如果发送的大小超过流控制器允许的大小,则panic
,如果未超过流控制器允许的大小,则将当前数据流和当前连接的可发送字节数-n
。
(*http2flow).add
有消耗就有新增,此方法用于增加流控制器可发送的最大字节数:
func (f *http2flow) add(n int32) bool {
sum := f.n + n
if (sum > n) == (f.n > 0) {
f.n = sum
return true
}
return false
}
复制代码
上面的代码唯一需要注意的地方是,当 sum 超过 int32 正数最大值(2^31-1)时会返回 false。
回顾:在前篇中提到的(*http2Transport).NewClientConn
方法和(*http2ClientConn).newStream
方法均通过(*http2flow).add
初始化可发送数据窗口大小。
有了帧和流控制器的基本概念,下面我们结合源码来分析总结流控制的具体实现。
(*http2ClientConn).readLoop
前篇分析(*http2Transport).newClientConn
时止步于读循环,那么今天我们就从(*http2ClientConn).readLoop
开始。
func (cc *http2ClientConn) readLoop() {
rl := &http2clientConnReadLoop{cc: cc}
defer rl.cleanup()
cc.readerErr = rl.run()
if ce, ok := cc.readerErr.(http2ConnectionError); ok {
cc.wmu.Lock()
cc.fr.WriteGoAway(0, http2ErrCode(ce), nil)
cc.wmu.Unlock()
}
}
复制代码
由上可知,readLoop 的逻辑比较简单,其核心逻辑在(*http2clientConnReadLoop).run
方法里。
func (rl *http2clientConnReadLoop) run() error {
cc := rl.cc
rl.closeWhenIdle = cc.t.disableKeepAlives() || cc.singleUse
gotReply := false // ever saw a HEADERS reply
gotSettings := false
for {
f, err := cc.fr.ReadFrame()
// 此处省略代码
maybeIdle := false // whether frame might transition us to idle
switch f := f.(type) {
case *http2MetaHeadersFrame:
err = rl.processHeaders(f)
maybeIdle = true
gotReply = true
case *http2DataFrame:
err = rl.processData(f)
maybeIdle = true
case *http2GoAwayFrame:
err = rl.processGoAway(f)
maybeIdle = true
case *http2RSTStreamFrame:
err = rl.processResetStream(f)
maybeIdle = true
case *http2SettingsFrame:
err = rl.processSettings(f)
case *http2PushPromiseFrame:
err = rl.processPushPromise(f)
case *http2WindowUpdateFrame:
err = rl.processWindowUpdate(f)
case *http2PingFrame:
err = rl.processPing(f)
default:
cc.logf("Transport: unhandled response frame type %T", f)
}
if err != nil {
if http2VerboseLogs {
cc.vlogf("http2: Transport conn %p received error from processing frame %v: %v", cc, http2summarizeFrame(f), err)
}
return err
}
if rl.closeWhenIdle && gotReply && maybeIdle {
cc.closeIfIdle()
}
}
}
复制代码
由上可知,(*http2clientConnReadLoop).run
的核心逻辑是读取数据帧然后对不同的数据帧进行不同的处理。
cc.fr.ReadFrame()
会根据前面介绍的数据帧格式读出数据帧。
前篇中提到使用了一个支持 h2 协议的图片进行分析,本篇继续复用该图片对(*http2clientConnReadLoop).run
方法进行 debug。
收到 http2FrameSettings 数据帧
读循环会最先读到http2FrameSettings
数据帧。读到该数据帧后会调用(*http2clientConnReadLoop).processSettings
方法。(*http2clientConnReadLoop).processSettings
主要包含 3 个逻辑。
1、判断是否是http2FrameSettings
的 ack 信息,如果是直接返回,否则继续后面的步骤。
if f.IsAck() {
if cc.wantSettingsAck {
cc.wantSettingsAck = false
return nil
}
return http2ConnectionError(http2ErrCodeProtocol)
}
复制代码
2、处理不同http2FrameSettings
的数据帧,并根据 server 传递的信息,修改maxConcurrentStreams
等的值。
err := f.ForeachSetting(func(s http2Setting) error {
switch s.ID {
case http2SettingMaxFrameSize:
cc.maxFrameSize = s.Val
case http2SettingMaxConcurrentStreams:
cc.maxConcurrentStreams = s.Val
case http2SettingMaxHeaderListSize:
cc.peerMaxHeaderListSize = uint64(s.Val)
case http2SettingInitialWindowSize:
if s.Val > math.MaxInt32 {
return http2ConnectionError(http2ErrCodeFlowControl)
}
delta := int32(s.Val) - int32(cc.initialWindowSize)
for _, cs := range cc.streams {
cs.flow.add(delta)
}
cc.cond.Broadcast()
cc.initialWindowSize = s.Val
default:
// TODO(bradfitz): handle more settings? SETTINGS_HEADER_TABLE_SIZE probably.
cc.vlogf("Unhandled Setting: %v", s)
}
return nil
})
复制代码
当收到 ID 为http2SettingInitialWindowSize
的帧时,会调整当前连接中所有数据流的可发送数据窗口大小,并修改当前连接的initialWindowSize
(每个新创建的数据流均会使用该值初始化可发送数据窗口大小)为s.Val
。
3、发送http2FrameSettings
的 ack 信息给 server。
cc.wmu.Lock()
defer cc.wmu.Unlock()
cc.fr.WriteSettingsAck()
cc.bw.Flush()
return cc.werr
复制代码
收到 http2WindowUpdateFrame 数据帧
在笔者 debug 的过程中,处理完http2FrameSettings
数据帧后,紧接着就收到了http2WindowUpdateFrame
数据帧。收到该数据帧后会调用(*http2clientConnReadLoop).processWindowUpdate
方法:
func (rl *http2clientConnReadLoop) processWindowUpdate(f *http2WindowUpdateFrame) error {
cc := rl.cc
cs := cc.streamByID(f.StreamID, false)
if f.StreamID != 0 && cs == nil {
return nil
}
cc.mu.Lock()
defer cc.mu.Unlock()
fl := &cc.flow
if cs != nil {
fl = &cs.flow
}
if !fl.add(int32(f.Increment)) {
return http2ConnectionError(http2ErrCodeFlowControl)
}
cc.cond.Broadcast()
return nil
}
复制代码
上面的逻辑主要用于更新当前连接和数据流的可发送数据窗口大小。如果 http2WindowUpdateFrame 帧中的 StreamID 为 0,则更新当前连接的可发送数据窗口大小,否则更新对应数据流可发送数据窗口大小。
注意:在 debug 的过程,收到http2WindowUpdateFrame
数据帧后,又收到一次http2FrameSettings
,且该数据帧标识符为http2FlagSettingsAck
。
笔者在这里特意提醒,这是因为前篇中提到的(*http2Transport).NewClientConn 方法,也向 server 发送了 http2FrameSettings 数据帧和 http2WindowUpdateFrame 数据帧。
另外,在处理http2FrameSettings
和http2WindowUpdateFrame
过程中,均出现了cc.cond.Broadcast()
调用,该调用主要用于唤醒因为以下两种情况而Wait
的请求:
因当前连接处理的数据流已经达到maxConcurrentStreams
的上限(详见前篇中(*http2ClientConn).awaitOpenSlotForRequest
方法分析)。
因发送数据流已达可发送数据窗口上限而等待可发送数据窗口更新的请求(后续会介绍)。
收到 http2MetaHeadersFrame 数据帧
收到此数据帧意味着某一个请求已经开始接收响应数据。此数据帧对应的处理函数为(*http2clientConnReadLoop).processHeaders
:
func (rl *http2clientConnReadLoop) processHeaders(f *http2MetaHeadersFrame) error {
cc := rl.cc
cs := cc.streamByID(f.StreamID, false)
// 此处省略代码
res, err := rl.handleResponse(cs, f)
if err != nil {
// 此处省略代码
cs.resc <- http2resAndError{err: err}
return nil // return nil from process* funcs to keep conn alive
}
if res == nil {
// (nil, nil) special case. See handleResponse docs.
return nil
}
cs.resTrailer = &res.Trailer
cs.resc <- http2resAndError{res: res}
return nil
}
复制代码
首先我们先看cs.resc <- http2resAndError{res: res}
这一行代码,向数据流写入http2resAndError
即本次请求的响应。在(*http2ClientConn).roundTrip
方法中有这样一行代码readLoopResCh := cs.resc
。
回顾:前篇(*http2ClientConn).roundTrip
方法的第 7 点和本部分关联起来就可以形成一个完整的请求链。
接下来我们对rl.handleResponse
方法展开分析。
(*http2clientConnReadLoop).handleResponse
(*http2clientConnReadLoop).handleResponse
的主要作用是构建一个Response
变量,下面对该函数的关键步骤进行描述。
1、构建一个Response
变量。
header := make(Header)
res := &Response{
Proto: "HTTP/2.0",
ProtoMajor: 2,
Header: header,
StatusCode: statusCode,
Status: status + " " + StatusText(statusCode),
}
复制代码
2、构建 header(本篇不对 header 进行展开分析)。
for _, hf := range f.RegularFields() {
key := CanonicalHeaderKey(hf.Name)
if key == "Trailer" {
t := res.Trailer
if t == nil {
t = make(Header)
res.Trailer = t
}
http2foreachHeaderElement(hf.Value, func(v string) {
t[CanonicalHeaderKey(v)] = nil
})
} else {
header[key] = append(header[key], hf.Value)
}
}
复制代码
3、处理响应 body 的 ContentLength。
streamEnded := f.StreamEnded()
isHead := cs.req.Method == "HEAD"
if !streamEnded || isHead {
res.ContentLength = -1
if clens := res.Header["Content-Length"]; len(clens) == 1 {
if clen64, err := strconv.ParseInt(clens[0], 10, 64); err == nil {
res.ContentLength = clen64
} else {
// TODO: care? unlike http/1, it won't mess up our framing, so it's
// more safe smuggling-wise to ignore.
}
} else if len(clens) > 1 {
// TODO: care? unlike http/1, it won't mess up our framing, so it's
// more safe smuggling-wise to ignore.
}
}
复制代码
由上可知,当前数据流没有结束或者是 HEAD 请求才读取 ContentLength。如果 header 中的 ContentLength 不合法则 res.ContentLength 的值为 -1。
4、构建res.Body
。
cs.bufPipe = http2pipe{b: &http2dataBuffer{expected: res.ContentLength}}
cs.bytesRemain = res.ContentLength
res.Body = http2transportResponseBody{cs}
go cs.awaitRequestCancel(cs.req)
if cs.requestedGzip && res.Header.Get("Content-Encoding") == "gzip" {
res.Header.Del("Content-Encoding")
res.Header.Del("Content-Length")
res.ContentLength = -1
res.Body = &http2gzipReader{body: res.Body}
res.Uncompressed = true
}
复制代码
根据Content-Encoding
的编码方式,会构建两种不同的 Body:
非 gzip 编码时,构造的 res.Body 类型为http2transportResponseBody
。
gzip 编码时,构造的 res.Body 类型为http2gzipReader
。
收到 http2DataFrame 数据帧
收到此数据帧意味着我们开始接收真实的响应,即平常开发中需要处理的业务数据。此数据帧对应的处理函数为(*http2clientConnReadLoop).processData
。
因为 server 无法及时知道数据流在 client 端的状态,所以 server 可能会向 client 中一个已经不存在的数据流发送数据:
cc := rl.cc
cs := cc.streamByID(f.StreamID, f.StreamEnded())
data := f.Data()
if cs == nil {
cc.mu.Lock()
neverSent := cc.nextStreamID
cc.mu.Unlock()
// 此处省略代码
if f.Length > 0 {
cc.mu.Lock()
cc.inflow.add(int32(f.Length))
cc.mu.Unlock()
cc.wmu.Lock()
cc.fr.WriteWindowUpdate(0, uint32(f.Length))
cc.bw.Flush()
cc.wmu.Unlock()
}
return nil
}
复制代码
接收到的数据帧在 client 没有对应的数据流处理时,通过流控制器为当前连接可读窗口大小增加f.Length
,并且通过http2FrameWindowUpdate
数据帧告知 server 将当前连接的可写窗口大小增加f.Length
。
如果 client 有对应的数据流且f.Length
大于 0:
1、如果是 head 请求结束当前数据流并返回。
if cs.req.Method == "HEAD" && len(data) > 0 {
cc.logf("protocol error: received DATA on a HEAD request")
rl.endStreamError(cs, http2StreamError{
StreamID: f.StreamID,
Code: http2ErrCodeProtocol,
})
return nil
}
复制代码
2、检查当前数据流能否处理f.Length
长度的数据。
cc.mu.Lock()
if cs.inflow.available() >= int32(f.Length) {
cs.inflow.take(int32(f.Length))
} else {
cc.mu.Unlock()
return http2ConnectionError(http2ErrCodeFlowControl)
}
复制代码
由上可知当前数据流如果能够处理该数据,通过流控制器调用cs.inflow.take
减小当前数据流可接受窗口大小。
3、当前数据流被重置或者被关闭即cs.didReset
为 true 时又或者数据帧有填充数据时需要调整流控制窗口。
var refund int
if pad := int(f.Length) - len(data); pad > 0 {
refund += pad
}
// Return len(data) now if the stream is already closed,
// since data will never be read.
didReset := cs.didReset
if didReset {
refund += len(data)
}
if refund > 0 {
cc.inflow.add(int32(refund))
cc.wmu.Lock()
cc.fr.WriteWindowUpdate(0, uint32(refund))
if !didReset {
cs.inflow.add(int32(refund))
cc.fr.WriteWindowUpdate(cs.ID, uint32(refund))
}
cc.bw.Flush()
cc.wmu.Unlock()
}
cc.mu.Unlock()
复制代码
最后,根据计算的 refund 增加当前连接或者当前数据流的可接受窗口大小,并且同时告知 server 增加当前连接或者当前数据流的可写窗口大小。
4、数据长度大于 0 且数据流正常则将数据写入数据流缓冲区。
if len(data) > 0 && !didReset {
if _, err := cs.bufPipe.Write(data); err != nil {
rl.endStreamError(cs, err)
return err
}
}
复制代码
回顾:前面的(*http2clientConnReadLoop).handleResponse
方法中有这样一行代码res.Body = http2transportResponseBody{cs}
,所以在业务开发时能够通过 Response 读取到数据流中的缓冲数据。
(http2transportResponseBody).Read
在前面的内容里,如果数据流状态正常且数据帧没有填充数据则数据流和连接的可接收窗口会一直变小,而这部分内容就是增加数据流的可接受窗口大小。
因为篇幅和主旨的问题笔者仅分析描述该方法内和流控制有关的部分。
1、读取响应数据后计算当前连接需要增加的可接受窗口大小。
cc.mu.Lock()
defer cc.mu.Unlock()
var connAdd, streamAdd int32
// Check the conn-level first, before the stream-level.
if v := cc.inflow.available(); v < http2transportDefaultConnFlow/2 {
connAdd = http2transportDefaultConnFlow - v
cc.inflow.add(connAdd)
}
复制代码
如果当前连接可接受窗口的大小已经小于http2transportDefaultConnFlow
(1G)的一半,则当前连接可接收窗口大小需要增加http2transportDefaultConnFlow - cc.inflow.available()
。
回顾:http2transportDefaultConnFlow
在前篇(*http2Transport).NewClientConn
方法部分有提到,且连接刚建立时会通过http2WindowUpdateFrame
数据帧告知 server 当前连接可发送窗口大小增加http2transportDefaultConnFlow
。
2、读取响应数据后计算当前数据流需要增加的可接受窗口大小。
if err == nil { // No need to refresh if the stream is over or failed.
// Consider any buffered body data (read from the conn but not
// consumed by the client) when computing flow control for this
// stream.
v := int(cs.inflow.available()) + cs.bufPipe.Len()
if v < http2transportDefaultStreamFlow-http2transportDefaultStreamMinRefresh {
streamAdd = int32(http2transportDefaultStreamFlow - v)
cs.inflow.add(streamAdd)
}
}
复制代码
如果当前数据流可接受窗口大小加上当前数据流缓冲区剩余未读数据的长度小于http2transportDefaultStreamFlow-http2transportDefaultStreamMinRefresh
(4M-4KB),则当前数据流可接受窗口大小需要增加http2transportDefaultStreamFlow - v
。
回顾:http2transportDefaultStreamFlow
在前篇(*http2Transport).NewClientConn
方法和(*http2ClientConn).newStream
方法中均有提到。
连接刚建立时,发送http2FrameSettings
数据帧,告知 server 每个数据流的可发送窗口大小为http2transportDefaultStreamFlow
。
在newStream
时,数据流默认的可接收窗口大小为http2transportDefaultStreamFlow
。
3、将连接和数据流分别需要增加的窗口大小通过http2WindowUpdateFrame
数据帧告知 server。
if connAdd != 0 || streamAdd != 0 {
cc.wmu.Lock()
defer cc.wmu.Unlock()
if connAdd != 0 {
cc.fr.WriteWindowUpdate(0, http2mustUint31(connAdd))
}
if streamAdd != 0 {
cc.fr.WriteWindowUpdate(cs.ID, http2mustUint31(streamAdd))
}
cc.bw.Flush()
}
复制代码
以上就是 server 向 client 发送数据的流控制逻辑。
(*http2clientStream).writeRequestBody
前篇中(*http2ClientConn).roundTrip
未对(*http2clientStream).writeRequestBody
进行分析,下面我们看看该方法的源码:
func (cs *http2clientStream) writeRequestBody(body io.Reader, bodyCloser io.Closer) (err error) {
cc := cs.cc
sentEnd := false // whether we sent the final DATA frame w/ END_STREAM
// 此处省略代码
req := cs.req
hasTrailers := req.Trailer != nil
remainLen := http2actualContentLength(req)
hasContentLen := remainLen != -1
var sawEOF bool
for !sawEOF {
n, err := body.Read(buf[:len(buf)-1])
// 此处省略代码
remain := buf[:n]
for len(remain) > 0 && err == nil {
var allowed int32
allowed, err = cs.awaitFlowControl(len(remain))
switch {
case err == http2errStopReqBodyWrite:
return err
case err == http2errStopReqBodyWriteAndCancel:
cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil)
return err
case err != nil:
return err
}
cc.wmu.Lock()
data := remain[:allowed]
remain = remain[allowed:]
sentEnd = sawEOF && len(remain) == 0 && !hasTrailers
err = cc.fr.WriteData(cs.ID, sentEnd, data)
if err == nil {
err = cc.bw.Flush()
}
cc.wmu.Unlock()
}
if err != nil {
return err
}
}
// 此处省略代码
return err
}
复制代码
上面的逻辑可简单总结为:不停的读取请求 body 然后将读取的内容通过 cc.fr.WriteData
转为http2FrameData
数据帧发送给 server,直到请求 body 读完为止。其中和流控制有关的方法是awaitFlowControl
,下面我们对该方法进行分析。
(*http2clientStream).awaitFlowControl
此方法的主要作用是等待当前数据流可写窗口有容量能够写入数据。
func (cs *http2clientStream) awaitFlowControl(maxBytes int) (taken int32, err error) {
cc := cs.cc
cc.mu.Lock()
defer cc.mu.Unlock()
for {
if cc.closed {
return 0, http2errClientConnClosed
}
if cs.stopReqBody != nil {
return 0, cs.stopReqBody
}
if err := cs.checkResetOrDone(); err != nil {
return 0, err
}
if a := cs.flow.available(); a > 0 {
take := a
if int(take) > maxBytes {
take = int32(maxBytes) // can't truncate int; take is int32
}
if take > int32(cc.maxFrameSize) {
take = int32(cc.maxFrameSize)
}
cs.flow.take(take)
return take, nil
}
cc.cond.Wait()
}
}
复制代码
根据源码可以知道,数据流被关闭或者停止发送请求 body,则当前数据流无法写入数据。当数据流状态正常时,又分为两种情况:
当前数据流可写窗口剩余可写数据大于 0,则计算可写字节数,并将当前数据流可写窗口大小消耗take
。
当前数据流可写窗口剩余可写数据小于等于 0,则会一直等待直到被唤醒并进入下一次检查。
上面的第二种情况在收到 http2WindowUpdateFrame 数据帧这一节中提到过。
server 读取当前数据流的数据后会向 client 对应数据流发送http2WindowUpdateFrame
数据帧,client 收到该数据帧后会增大对应数据流可写窗口,并执行cc.cond.Broadcast()
唤醒因发送数据已达流控制上限而等待的数据流继续发送数据。
以上就是 client 向 server 发送数据的流控制逻辑。
总结
帧头长度为 9 个字节,并包含四个部分:Payload 的长度、帧类型、帧标识符和数据流 ID。
流控制可分为两个步骤:
预告
前篇和中篇已经完成,下一期将对 http2.0 标头压缩进行分析。
最后,衷心希望本文能够对各位读者有一定的帮助。
注:写本文时, 笔者所用 go 版本为: go1.14.2
评论