写点什么

Go 发起 HTTP2.0 请求流程分析 (中篇)——数据帧 & 流控制

用户头像
Gopher指北
关注
发布于: 2020 年 10 月 19 日
Go发起HTTP2.0请求流程分析(中篇)——数据帧&流控制

来自公众号:新世界杂货铺

阅读建议


这是 HTTP2.0 系列的第二篇,所以笔者推荐阅读顺序如下:


  1. Go中的HTTP请求之——HTTP1.1请求流程分析

  2. Go发起HTTP2.0请求流程分析(前篇)


本篇主要分为三个部分:数据帧,流控制器以及通过分析源码逐步了解流控制。


本有意将这三个部分拆成三篇文章,但它们之间又有联系,所以最后依旧决定放在一篇文章里面。由于内容较多,笔者认为分三次分别阅读三个部分较佳。


数据帧


HTTP2 通信的最小单位是数据帧,每一个帧都包含两部分:帧头和*Payload*。不同数据流的帧可以交错发送(同一个数据流的帧必须顺序发送),然后再根据每个帧头的数据流标识符重新组装。

由于 Payload 中为有效数据,故仅对帧头进行分析描述。

帧头

帧头总长度为 9 个字节,并包含四个部分,分别是:


  1. Payload 的长度,占用三个字节。

  2. 数据帧类型,占用一个字节。

  3. 数据帧标识符,占用一个字节。

  4. 数据流 ID,占用四个字节。


用图表示如下:



数据帧的格式和各部分的含义已经清楚了, 那么我们看看代码中怎么读取一个帧头:


func http2readFrameHeader(buf []byte, r io.Reader) (http2FrameHeader, error) {	_, err := io.ReadFull(r, buf[:http2frameHeaderLen])	if err != nil {		return http2FrameHeader{}, err	}	return http2FrameHeader{		Length:   (uint32(buf[0])<<16 | uint32(buf[1])<<8 | uint32(buf[2])),		Type:     http2FrameType(buf[3]),		Flags:    http2Flags(buf[4]),		StreamID: binary.BigEndian.Uint32(buf[5:]) & (1<<31 - 1),		valid:    true,	}, nil}
复制代码


在上面的代码中http2frameHeaderLen是一个常量,其值为 9。


从 io.Reader 中读取 9 个字节后,将前三个字节和后四个字节均转为uint32的类型,从而得到 Payload 长度和数据流 ID。另外需要理解的是帧头的前三个字节和后四个字节存储格式为大端(大小端笔者就不在这里解释了,请尚不了解的读者自行百度)。

数据帧类型

根据http://http2.github.io/http2-spec/#rfc.section.11.2描述,数据帧类型总共有 10 个。在 go 源码中均有体现:


const (	http2FrameData         http2FrameType = 0x0	http2FrameHeaders      http2FrameType = 0x1	http2FramePriority     http2FrameType = 0x2	http2FrameRSTStream    http2FrameType = 0x3	http2FrameSettings     http2FrameType = 0x4	http2FramePushPromise  http2FrameType = 0x5	http2FramePing         http2FrameType = 0x6	http2FrameGoAway       http2FrameType = 0x7	http2FrameWindowUpdate http2FrameType = 0x8	http2FrameContinuation http2FrameType = 0x9)
复制代码


http2FrameData:主要用于发送请求 body 和接收响应的数据帧。


http2FrameHeaders:主要用于发送请求 header 和接收响应 header 的数据帧。


http2FrameSettings:主要用于 client 和 server 交流设置相关的数据帧。


http2FrameWindowUpdate:主要用于流控制的数据帧。


其他数据帧类型因为本文不涉及,故不做描述。

数据帧标识符

由于数据帧标识符种类较多,笔者在这里仅介绍其中部分标识符,先看源码:


const (	// Data Frame	http2FlagDataEndStream http2Flags = 0x1    // Headers Frame	http2FlagHeadersEndStream  http2Flags = 0x1    // Settings Frame	http2FlagSettingsAck http2Flags = 0x1	// 此处省略定义其他数据帧标识符的代码)
复制代码


http2FlagDataEndStream:在前篇中提到,调用(*http2ClientConn).newStream方法会创建一个数据流,那这个数据流什么时候结束呢,这就是http2FlagDataEndStream的作用。

当 client 收到有响应 body 的响应时(HEAD 请求无响应 body,301,302 等响应也无响应 body),一直读到http2FrameData数据帧的标识符为http2FlagDataEndStream则意味着本次请求结束可以关闭当前数据流。

http2FlagHeadersEndStream:如果读到的http2FrameHeaders数据帧有此标识符也意味着本次请求结束。

http2FlagSettingsAck:该标示符意味着对方确认收到http2FrameSettings数据帧。


流控制器

流控制是一种阻止发送方向接收方发送大量数据的机制,以免超出后者的需求或处理能力。Go 中 HTTP2 通过http2flow结构体进行流控制:


type http2flow struct {	// n is the number of DATA bytes we're allowed to send.	// A flow is kept both on a conn and a per-stream.	n int32
// conn points to the shared connection-level flow that is // shared by all streams on that conn. It is nil for the flow // that's on the conn directly. conn *http2flow}
复制代码

字段含义英文注释已经描述的很清楚了,所以笔者不再翻译。下面看一下和流控制有关的方法。

(*http2flow).available

此方法返回当前流控制可发送的最大字节数:


func (f *http2flow) available() int32 {	n := f.n	if f.conn != nil && f.conn.n < n {		n = f.conn.n	}	return n}
复制代码


  • 如果f.conn为 nil 则意味着此控制器的控制级别为连接,那么可发送的最大字节数就是f.n

  • 如果f.conn不为 nil 则意味着此控制器的控制级别为数据流,且当前数据流可发送的最大字节数不能超过当前连接可发送的最大字节数。

(*http2flow).take

此方法用于消耗当前流控制器的可发送字节数:


func (f *http2flow) take(n int32) {	if n > f.available() {		panic("internal error: took too much")	}	f.n -= n	if f.conn != nil {		f.conn.n -= n	}}
复制代码


通过实际需要传递一个参数,告知当前流控制器想要发送的数据大小。如果发送的大小超过流控制器允许的大小,则panic,如果未超过流控制器允许的大小,则将当前数据流和当前连接的可发送字节数-n


(*http2flow).add

有消耗就有新增,此方法用于增加流控制器可发送的最大字节数:


func (f *http2flow) add(n int32) bool {	sum := f.n + n	if (sum > n) == (f.n > 0) {		f.n = sum		return true	}	return false}
复制代码


上面的代码唯一需要注意的地方是,当 sum 超过 int32 正数最大值(2^31-1)时会返回 false。

回顾:在前篇中提到的(*http2Transport).NewClientConn方法和(*http2ClientConn).newStream方法均通过(*http2flow).add初始化可发送数据窗口大小。

有了帧和流控制器的基本概念,下面我们结合源码来分析总结流控制的具体实现。

(*http2ClientConn).readLoop

前篇分析(*http2Transport).newClientConn时止步于读循环,那么今天我们就从(*http2ClientConn).readLoop开始。


func (cc *http2ClientConn) readLoop() {	rl := &http2clientConnReadLoop{cc: cc}	defer rl.cleanup()	cc.readerErr = rl.run()	if ce, ok := cc.readerErr.(http2ConnectionError); ok {		cc.wmu.Lock()		cc.fr.WriteGoAway(0, http2ErrCode(ce), nil)		cc.wmu.Unlock()	}}
复制代码


由上可知,readLoop 的逻辑比较简单,其核心逻辑在(*http2clientConnReadLoop).run方法里。


func (rl *http2clientConnReadLoop) run() error {	cc := rl.cc	rl.closeWhenIdle = cc.t.disableKeepAlives() || cc.singleUse	gotReply := false // ever saw a HEADERS reply	gotSettings := false	for {		f, err := cc.fr.ReadFrame()    // 此处省略代码		maybeIdle := false // whether frame might transition us to idle
switch f := f.(type) { case *http2MetaHeadersFrame: err = rl.processHeaders(f) maybeIdle = true gotReply = true case *http2DataFrame: err = rl.processData(f) maybeIdle = true case *http2GoAwayFrame: err = rl.processGoAway(f) maybeIdle = true case *http2RSTStreamFrame: err = rl.processResetStream(f) maybeIdle = true case *http2SettingsFrame: err = rl.processSettings(f) case *http2PushPromiseFrame: err = rl.processPushPromise(f) case *http2WindowUpdateFrame: err = rl.processWindowUpdate(f) case *http2PingFrame: err = rl.processPing(f) default: cc.logf("Transport: unhandled response frame type %T", f) } if err != nil { if http2VerboseLogs { cc.vlogf("http2: Transport conn %p received error from processing frame %v: %v", cc, http2summarizeFrame(f), err) } return err } if rl.closeWhenIdle && gotReply && maybeIdle { cc.closeIfIdle() } }}
复制代码


由上可知,(*http2clientConnReadLoop).run的核心逻辑是读取数据帧然后对不同的数据帧进行不同的处理。

cc.fr.ReadFrame()会根据前面介绍的数据帧格式读出数据帧。

前篇中提到使用了一个支持 h2 协议的图片进行分析,本篇继续复用该图片对(*http2clientConnReadLoop).run方法进行 debug。

收到 http2FrameSettings 数据帧

读循环会最先读到http2FrameSettings数据帧。读到该数据帧后会调用(*http2clientConnReadLoop).processSettings方法。(*http2clientConnReadLoop).processSettings主要包含 3 个逻辑。


1、判断是否是http2FrameSettings的 ack 信息,如果是直接返回,否则继续后面的步骤。


if f.IsAck() {  if cc.wantSettingsAck {    cc.wantSettingsAck = false    return nil  }  return http2ConnectionError(http2ErrCodeProtocol)}
复制代码


2、处理不同http2FrameSettings的数据帧,并根据 server 传递的信息,修改maxConcurrentStreams等的值。


err := f.ForeachSetting(func(s http2Setting) error {  switch s.ID {    case http2SettingMaxFrameSize:    cc.maxFrameSize = s.Val    case http2SettingMaxConcurrentStreams:    cc.maxConcurrentStreams = s.Val    case http2SettingMaxHeaderListSize:    cc.peerMaxHeaderListSize = uint64(s.Val)    case http2SettingInitialWindowSize:    if s.Val > math.MaxInt32 {      return http2ConnectionError(http2ErrCodeFlowControl)    }    delta := int32(s.Val) - int32(cc.initialWindowSize)    for _, cs := range cc.streams {      cs.flow.add(delta)    }    cc.cond.Broadcast()    cc.initialWindowSize = s.Val    default:    // TODO(bradfitz): handle more settings? SETTINGS_HEADER_TABLE_SIZE probably.    cc.vlogf("Unhandled Setting: %v", s)  }  return nil})
复制代码


当收到 ID 为http2SettingInitialWindowSize的帧时,会调整当前连接中所有数据流的可发送数据窗口大小,并修改当前连接的initialWindowSize(每个新创建的数据流均会使用该值初始化可发送数据窗口大小)为s.Val


3、发送http2FrameSettings的 ack 信息给 server。


	cc.wmu.Lock()	defer cc.wmu.Unlock()
cc.fr.WriteSettingsAck() cc.bw.Flush() return cc.werr
复制代码

收到 http2WindowUpdateFrame 数据帧


在笔者 debug 的过程中,处理完http2FrameSettings数据帧后,紧接着就收到了http2WindowUpdateFrame数据帧。收到该数据帧后会调用(*http2clientConnReadLoop).processWindowUpdate方法:


func (rl *http2clientConnReadLoop) processWindowUpdate(f *http2WindowUpdateFrame) error {	cc := rl.cc	cs := cc.streamByID(f.StreamID, false)	if f.StreamID != 0 && cs == nil {		return nil	}
cc.mu.Lock() defer cc.mu.Unlock()
fl := &cc.flow if cs != nil { fl = &cs.flow } if !fl.add(int32(f.Increment)) { return http2ConnectionError(http2ErrCodeFlowControl) } cc.cond.Broadcast() return nil}
复制代码


上面的逻辑主要用于更新当前连接和数据流的可发送数据窗口大小。如果 http2WindowUpdateFrame 帧中的 StreamID 为 0,则更新当前连接的可发送数据窗口大小,否则更新对应数据流可发送数据窗口大小。


注意:在 debug 的过程,收到http2WindowUpdateFrame数据帧后,又收到一次http2FrameSettings,且该数据帧标识符为http2FlagSettingsAck


笔者在这里特意提醒,这是因为前篇中提到的(*http2Transport).NewClientConn 方法,也向 server 发送了 http2FrameSettings 数据帧和 http2WindowUpdateFrame 数据帧。


另外,在处理http2FrameSettingshttp2WindowUpdateFrame过程中,均出现了cc.cond.Broadcast()调用,该调用主要用于唤醒因为以下两种情况而Wait的请求:


  1. 因当前连接处理的数据流已经达到maxConcurrentStreams的上限(详见前篇中(*http2ClientConn).awaitOpenSlotForRequest方法分析)。

  2. 因发送数据流已达可发送数据窗口上限而等待可发送数据窗口更新的请求(后续会介绍)。


收到 http2MetaHeadersFrame 数据帧

收到此数据帧意味着某一个请求已经开始接收响应数据。此数据帧对应的处理函数为(*http2clientConnReadLoop).processHeaders


func (rl *http2clientConnReadLoop) processHeaders(f *http2MetaHeadersFrame) error {	cc := rl.cc	cs := cc.streamByID(f.StreamID, false)	// 此处省略代码	res, err := rl.handleResponse(cs, f)	if err != nil {		// 此处省略代码		cs.resc <- http2resAndError{err: err}		return nil // return nil from process* funcs to keep conn alive	}	if res == nil {		// (nil, nil) special case. See handleResponse docs.		return nil	}	cs.resTrailer = &res.Trailer	cs.resc <- http2resAndError{res: res}	return nil}
复制代码


首先我们先看cs.resc <- http2resAndError{res: res}这一行代码,向数据流写入http2resAndError即本次请求的响应。在(*http2ClientConn).roundTrip方法中有这样一行代码readLoopResCh := cs.resc


回顾:前篇(*http2ClientConn).roundTrip方法的第 7 点和本部分关联起来就可以形成一个完整的请求链。

接下来我们对rl.handleResponse方法展开分析。

(*http2clientConnReadLoop).handleResponse

(*http2clientConnReadLoop).handleResponse的主要作用是构建一个Response变量,下面对该函数的关键步骤进行描述。


1、构建一个Response变量。


header := make(Header)res := &Response{  Proto:      "HTTP/2.0",  ProtoMajor: 2,  Header:     header,  StatusCode: statusCode,  Status:     status + " " + StatusText(statusCode),}
复制代码


2、构建 header(本篇不对 header 进行展开分析)。


for _, hf := range f.RegularFields() {  key := CanonicalHeaderKey(hf.Name)  if key == "Trailer" {    t := res.Trailer    if t == nil {      t = make(Header)      res.Trailer = t    }    http2foreachHeaderElement(hf.Value, func(v string) {      t[CanonicalHeaderKey(v)] = nil    })  } else {    header[key] = append(header[key], hf.Value)  }}
复制代码


3、处理响应 body 的 ContentLength。


streamEnded := f.StreamEnded()isHead := cs.req.Method == "HEAD"if !streamEnded || isHead {  res.ContentLength = -1  if clens := res.Header["Content-Length"]; len(clens) == 1 {    if clen64, err := strconv.ParseInt(clens[0], 10, 64); err == nil {      res.ContentLength = clen64    } else {      // TODO: care? unlike http/1, it won't mess up our framing, so it's      // more safe smuggling-wise to ignore.    }  } else if len(clens) > 1 {    // TODO: care? unlike http/1, it won't mess up our framing, so it's    // more safe smuggling-wise to ignore.  }}
复制代码


由上可知,当前数据流没有结束或者是 HEAD 请求才读取 ContentLength。如果 header 中的 ContentLength 不合法则 res.ContentLength 的值为 -1


4、构建res.Body


cs.bufPipe = http2pipe{b: &http2dataBuffer{expected: res.ContentLength}}cs.bytesRemain = res.ContentLengthres.Body = http2transportResponseBody{cs}go cs.awaitRequestCancel(cs.req)
if cs.requestedGzip && res.Header.Get("Content-Encoding") == "gzip" { res.Header.Del("Content-Encoding") res.Header.Del("Content-Length") res.ContentLength = -1 res.Body = &http2gzipReader{body: res.Body} res.Uncompressed = true}
复制代码


根据Content-Encoding的编码方式,会构建两种不同的 Body:


  1. 非 gzip 编码时,构造的 res.Body 类型为http2transportResponseBody

  2. gzip 编码时,构造的 res.Body 类型为http2gzipReader


收到 http2DataFrame 数据帧

收到此数据帧意味着我们开始接收真实的响应,即平常开发中需要处理的业务数据。此数据帧对应的处理函数为(*http2clientConnReadLoop).processData

因为 server 无法及时知道数据流在 client 端的状态,所以 server 可能会向 client 中一个已经不存在的数据流发送数据:


cc := rl.cccs := cc.streamByID(f.StreamID, f.StreamEnded())data := f.Data()if cs == nil {  cc.mu.Lock()  neverSent := cc.nextStreamID  cc.mu.Unlock() // 此处省略代码  if f.Length > 0 {    cc.mu.Lock()    cc.inflow.add(int32(f.Length))    cc.mu.Unlock()
cc.wmu.Lock() cc.fr.WriteWindowUpdate(0, uint32(f.Length)) cc.bw.Flush() cc.wmu.Unlock() } return nil}
复制代码


接收到的数据帧在 client 没有对应的数据流处理时,通过流控制器为当前连接可读窗口大小增加f.Length,并且通过http2FrameWindowUpdate数据帧告知 server 将当前连接的可写窗口大小增加f.Length


如果 client 有对应的数据流且f.Length大于 0:


1、如果是 head 请求结束当前数据流并返回。


if cs.req.Method == "HEAD" && len(data) > 0 {  cc.logf("protocol error: received DATA on a HEAD request")  rl.endStreamError(cs, http2StreamError{    StreamID: f.StreamID,    Code:     http2ErrCodeProtocol,  })  return nil}
复制代码


2、检查当前数据流能否处理f.Length长度的数据。


cc.mu.Lock()if cs.inflow.available() >= int32(f.Length) {  cs.inflow.take(int32(f.Length))} else {  cc.mu.Unlock()  return http2ConnectionError(http2ErrCodeFlowControl)}
复制代码


由上可知当前数据流如果能够处理该数据,通过流控制器调用cs.inflow.take减小当前数据流可接受窗口大小。


3、当前数据流被重置或者被关闭即cs.didReset为 true 时又或者数据帧有填充数据时需要调整流控制窗口。


var refund intif pad := int(f.Length) - len(data); pad > 0 {  refund += pad}// Return len(data) now if the stream is already closed,// since data will never be read.didReset := cs.didResetif didReset {  refund += len(data)}if refund > 0 {  cc.inflow.add(int32(refund))  cc.wmu.Lock()  cc.fr.WriteWindowUpdate(0, uint32(refund))  if !didReset {    cs.inflow.add(int32(refund))    cc.fr.WriteWindowUpdate(cs.ID, uint32(refund))  }  cc.bw.Flush()  cc.wmu.Unlock()}cc.mu.Unlock()
复制代码


  • 如果数据帧有填充数据则计算需要返还的填充数据长度。

  • 如果数据流无效该数据帧的长度需要全部返还。


最后,根据计算的 refund 增加当前连接或者当前数据流的可接受窗口大小,并且同时告知 server 增加当前连接或者当前数据流的可写窗口大小。


4、数据长度大于 0 且数据流正常则将数据写入数据流缓冲区。


if len(data) > 0 && !didReset {  if _, err := cs.bufPipe.Write(data); err != nil {    rl.endStreamError(cs, err)    return err  }}
复制代码


回顾:前面的(*http2clientConnReadLoop).handleResponse方法中有这样一行代码res.Body = http2transportResponseBody{cs},所以在业务开发时能够通过 Response 读取到数据流中的缓冲数据。

(http2transportResponseBody).Read


在前面的内容里,如果数据流状态正常且数据帧没有填充数据则数据流和连接的可接收窗口会一直变小,而这部分内容就是增加数据流的可接受窗口大小。

因为篇幅和主旨的问题笔者仅分析描述该方法内和流控制有关的部分。


1、读取响应数据后计算当前连接需要增加的可接受窗口大小。


cc.mu.Lock()defer cc.mu.Unlock()var connAdd, streamAdd int32// Check the conn-level first, before the stream-level.if v := cc.inflow.available(); v < http2transportDefaultConnFlow/2 {  connAdd = http2transportDefaultConnFlow - v  cc.inflow.add(connAdd)}
复制代码


如果当前连接可接受窗口的大小已经小于http2transportDefaultConnFlow(1G)的一半,则当前连接可接收窗口大小需要增加http2transportDefaultConnFlow - cc.inflow.available()


回顾http2transportDefaultConnFlow在前篇(*http2Transport).NewClientConn方法部分有提到,且连接刚建立时会通过http2WindowUpdateFrame数据帧告知 server 当前连接可发送窗口大小增加http2transportDefaultConnFlow


2、读取响应数据后计算当前数据流需要增加的可接受窗口大小。


if err == nil { // No need to refresh if the stream is over or failed.  // Consider any buffered body data (read from the conn but not  // consumed by the client) when computing flow control for this  // stream.  v := int(cs.inflow.available()) + cs.bufPipe.Len()  if v < http2transportDefaultStreamFlow-http2transportDefaultStreamMinRefresh {    streamAdd = int32(http2transportDefaultStreamFlow - v)    cs.inflow.add(streamAdd)  }}
复制代码


如果当前数据流可接受窗口大小加上当前数据流缓冲区剩余未读数据的长度小于http2transportDefaultStreamFlow-http2transportDefaultStreamMinRefresh(4M-4KB),则当前数据流可接受窗口大小需要增加http2transportDefaultStreamFlow - v


回顾http2transportDefaultStreamFlow在前篇(*http2Transport).NewClientConn方法和(*http2ClientConn).newStream方法中均有提到。


连接刚建立时,发送http2FrameSettings数据帧,告知 server 每个数据流的可发送窗口大小为http2transportDefaultStreamFlow


newStream时,数据流默认的可接收窗口大小为http2transportDefaultStreamFlow


3、将连接和数据流分别需要增加的窗口大小通过http2WindowUpdateFrame数据帧告知 server。


if connAdd != 0 || streamAdd != 0 {  cc.wmu.Lock()  defer cc.wmu.Unlock()  if connAdd != 0 {    cc.fr.WriteWindowUpdate(0, http2mustUint31(connAdd))  }  if streamAdd != 0 {    cc.fr.WriteWindowUpdate(cs.ID, http2mustUint31(streamAdd))  }  cc.bw.Flush()}
复制代码


以上就是 server 向 client 发送数据的流控制逻辑。

(*http2clientStream).writeRequestBody


前篇中(*http2ClientConn).roundTrip未对(*http2clientStream).writeRequestBody进行分析,下面我们看看该方法的源码:


func (cs *http2clientStream) writeRequestBody(body io.Reader, bodyCloser io.Closer) (err error) {	cc := cs.cc	sentEnd := false // whether we sent the final DATA frame w/ END_STREAM  // 此处省略代码	req := cs.req	hasTrailers := req.Trailer != nil	remainLen := http2actualContentLength(req)	hasContentLen := remainLen != -1
var sawEOF bool for !sawEOF { n, err := body.Read(buf[:len(buf)-1]) // 此处省略代码 remain := buf[:n] for len(remain) > 0 && err == nil { var allowed int32 allowed, err = cs.awaitFlowControl(len(remain)) switch { case err == http2errStopReqBodyWrite: return err case err == http2errStopReqBodyWriteAndCancel: cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil) return err case err != nil: return err } cc.wmu.Lock() data := remain[:allowed] remain = remain[allowed:] sentEnd = sawEOF && len(remain) == 0 && !hasTrailers err = cc.fr.WriteData(cs.ID, sentEnd, data) if err == nil { err = cc.bw.Flush() } cc.wmu.Unlock() } if err != nil { return err } } // 此处省略代码 return err}
复制代码


上面的逻辑可简单总结为:不停的读取请求 body 然后将读取的内容通过 cc.fr.WriteData转为http2FrameData数据帧发送给 server,直到请求 body 读完为止。其中和流控制有关的方法是awaitFlowControl,下面我们对该方法进行分析。

(*http2clientStream).awaitFlowControl


此方法的主要作用是等待当前数据流可写窗口有容量能够写入数据。


func (cs *http2clientStream) awaitFlowControl(maxBytes int) (taken int32, err error) {	cc := cs.cc	cc.mu.Lock()	defer cc.mu.Unlock()	for {		if cc.closed {			return 0, http2errClientConnClosed		}		if cs.stopReqBody != nil {			return 0, cs.stopReqBody		}		if err := cs.checkResetOrDone(); err != nil {			return 0, err		}		if a := cs.flow.available(); a > 0 {			take := a			if int(take) > maxBytes {
take = int32(maxBytes) // can't truncate int; take is int32 } if take > int32(cc.maxFrameSize) { take = int32(cc.maxFrameSize) } cs.flow.take(take) return take, nil } cc.cond.Wait() }}
复制代码


根据源码可以知道,数据流被关闭或者停止发送请求 body,则当前数据流无法写入数据。当数据流状态正常时,又分为两种情况:


  1. 当前数据流可写窗口剩余可写数据大于 0,则计算可写字节数,并将当前数据流可写窗口大小消耗take

  2. 当前数据流可写窗口剩余可写数据小于等于 0,则会一直等待直到被唤醒并进入下一次检查。


上面的第二种情况在收到 http2WindowUpdateFrame 数据帧这一节中提到过。


server 读取当前数据流的数据后会向 client 对应数据流发送http2WindowUpdateFrame数据帧,client 收到该数据帧后会增大对应数据流可写窗口,并执行cc.cond.Broadcast()唤醒因发送数据已达流控制上限而等待的数据流继续发送数据。


以上就是 client 向 server 发送数据的流控制逻辑。


总结


  1. 帧头长度为 9 个字节,并包含四个部分:Payload 的长度、帧类型、帧标识符和数据流 ID。

  2. 流控制可分为两个步骤:

  • 初始时,通过http2FrameSettings数据帧和http2WindowUpdateFrame数据帧告知对方当前连接读写窗口大小以及连接中数据流读写窗口大小。

  • 在读写数据过程中,通过发送http2WindowUpdateFrame数据帧控制另一端的写窗口大小。

预告

前篇和中篇已经完成,下一期将对 http2.0 标头压缩进行分析。


最后,衷心希望本文能够对各位读者有一定的帮助。


:写本文时, 笔者所用 go 版本为: go1.14.2


发布于: 2020 年 10 月 19 日阅读数: 609
用户头像

Gopher指北

关注

还未添加个人签名 2020.09.15 加入

欢迎关注公众号:Gopher指北

评论

发布
暂无评论
Go发起HTTP2.0请求流程分析(中篇)——数据帧&流控制