写点什么

Go 发起 HTTP2.0 请求流程分析 (前篇)

用户头像
Gopher指北
关注
发布于: 2020 年 10 月 12 日
Go发起HTTP2.0请求流程分析(前篇)

来自公众号:新世界杂货铺

前言


Go中的HTTP请求之——HTTP1.1请求流程分析之后,中间断断续续,历时近一月,终于才敢开始码字写下本文。


阅读建议


HTTP2.0 在建立 TCP 连接和安全的 TLS 传输通道与 HTTP1.1 的流程基本一致。所以笔者建议没有看过Go中的HTTP请求之——HTTP1.1请求流程分析这篇文章的先去补一下课,本文会基于前一篇文章仅介绍和 HTTP2.0 相关的逻辑。

(*Transport).roundTrip


(*Transport).roundTrip方法会调用t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)初始化TLSClientConfig以及h2transport,而这两者都和 HTTP2.0 有着紧密的联系。


TLSClientConfig: 初始化 client 支持的 http 协议, 并在 tls 握手时告知 server。


h2transport: 如果本次请求是 http2,那么 h2transport 会接管连接,请求和响应的处理逻辑。


下面看看源码:


func (t *Transport) onceSetNextProtoDefaults() {	// ...此处省略代码...	t2, err := http2configureTransport(t)	if err != nil {		log.Printf("Error enabling Transport HTTP/2 support: %v", err)		return	}	t.h2transport = t2
// ...此处省略代码...}func http2configureTransport(t1 *Transport) (*http2Transport, error) { connPool := new(http2clientConnPool) t2 := &http2Transport{ ConnPool: http2noDialClientConnPool{connPool}, t1: t1, } connPool.t = t2 if err := http2registerHTTPSProtocol(t1, http2noDialH2RoundTripper{t2}); err != nil { return nil, err } if t1.TLSClientConfig == nil { t1.TLSClientConfig = new(tls.Config) } if !http2strSliceContains(t1.TLSClientConfig.NextProtos, "h2") { t1.TLSClientConfig.NextProtos = append([]string{"h2"}, t1.TLSClientConfig.NextProtos...) } if !http2strSliceContains(t1.TLSClientConfig.NextProtos, "http/1.1") { t1.TLSClientConfig.NextProtos = append(t1.TLSClientConfig.NextProtos, "http/1.1") } upgradeFn := func(authority string, c *tls.Conn) RoundTripper { addr := http2authorityAddr("https", authority) if used, err := connPool.addConnIfNeeded(addr, t2, c); err != nil { go c.Close() return http2erringRoundTripper{err} } else if !used { // Turns out we don't need this c. // For example, two goroutines made requests to the same host // at the same time, both kicking off TCP dials. (since protocol // was unknown) go c.Close() } return t2 } if m := t1.TLSNextProto; len(m) == 0 { t1.TLSNextProto = map[string]func(string, *tls.Conn) RoundTripper{ "h2": upgradeFn, } } else { m["h2"] = upgradeFn } return t2, nil}
复制代码

笔者将上述的源码简单拆解为以下几个步骤:


  1. 新建一个http2clientConnPool并复制给 t2,以后 http2 的请求会优先从该连接池中获取连接。

  2. 初始化TLSClientConfig,并将支持的h2http1.1协议添加到TLSClientConfig.NextProtos中。

  3. 定义一个h2upgradeFn存储到t1.TLSNextProto里。


鉴于前一篇文章对新建连接前的步骤有了较为详细的介绍,所以这里直接看和 server 建立连接的部分源码,即(*Transport).dialConn方法:


func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) {	// ...此处省略代码...	if cm.scheme() == "https" && t.hasCustomTLSDialer() {		// ...此处省略代码...	} else {		conn, err := t.dial(ctx, "tcp", cm.addr())		if err != nil {			return nil, wrapErr(err)		}		pconn.conn = conn		if cm.scheme() == "https" {			var firstTLSHost string			if firstTLSHost, _, err = net.SplitHostPort(cm.addr()); err != nil {				return nil, wrapErr(err)			}			if err = pconn.addTLS(firstTLSHost, trace); err != nil {				return nil, wrapErr(err)			}		}	}
// Proxy setup. // ...此处省略代码...
if s := pconn.tlsState; s != nil && s.NegotiatedProtocolIsMutual && s.NegotiatedProtocol != "" { if next, ok := t.TLSNextProto[s.NegotiatedProtocol]; ok { return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: next(cm.targetAddr, pconn.conn.(*tls.Conn))}, nil } }
// ...此处省略代码...}
复制代码

笔者对上述的源码描述如下:

  1. 调用t.dial(ctx, "tcp", cm.addr())创建 TCP 连接。

  2. 如果是 https 的请求, 则对请求建立安全的 tls 传输通道。

  3. 检查 tls 的握手状态,如果和 server 协商的NegotiatedProtocol协议不为空,且 client 的t.TLSNextProto有该协议,则返回 alt 不为空的持久连接(HTTP1.1 不会进入 if 条件里)。


笔者对上述的第三点进行展开。经笔者在本地 debug 验证,当 client 和 server 都支持 http2 时,s.NegotiatedProtocol的值为h2s.NegotiatedProtocolIsMutual的值为true


在上面分析http2configureTransport函数时,我们知道TLSNextProto注册了一个 key 为h2的函数,所以调用next实际就是调用前面的upgradeFn函数。


upgradeFn会调用connPool.addConnIfNeeded向 http2 的连接池添加一个 tls 传输通道,并最终返回前面已经创建好的t2http2Transport


func (p *http2clientConnPool) addConnIfNeeded(key string, t *http2Transport, c *tls.Conn) (used bool, err error) {	p.mu.Lock()	// ...此处省略代码...	// 主要用于判断是否有必要像连接池添加新的连接	// 判断连接池中是否已有同host连接,如果有且该链接能够处理新的请求则直接返回	call, dup := p.addConnCalls[key]	if !dup {		// ...此处省略代码...		call = &http2addConnCall{			p:    p,			done: make(chan struct{}),		}		p.addConnCalls[key] = call		go call.run(t, key, c)	}	p.mu.Unlock()
<-call.done if call.err != nil { return false, call.err } return !dup, nil}func (c *http2addConnCall) run(t *http2Transport, key string, tc *tls.Conn) { cc, err := t.NewClientConn(tc)
p := c.p p.mu.Lock() if err != nil { c.err = err } else { p.addConnLocked(key, cc) } delete(p.addConnCalls, key) p.mu.Unlock() close(c.done)}
复制代码

分析上述的源码我们能够得到两点结论:

  1. 执行完upgradeFn之后,(*Transport).dialConn 返回的持久化连接中 alt 字段已经不是 nil 了。

  2. t.NewClientConn(tc)新建出来的连接会保存在 http2 的连接池即http2clientConnPool中,下一小结将对 NewClientConn 展开分析。


最后我们回到(*Transport).roundTrip 方法并分析其中的关键源码:


func (t *Transport) roundTrip(req *Request) (*Response, error) {	t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)	// ...此处省略代码...	for {		select {		case <-ctx.Done():			req.closeBody()			return nil, ctx.Err()		default:		}
// ...此处省略代码... pconn, err := t.getConn(treq, cm) if err != nil { t.setReqCanceler(req, nil) req.closeBody() return nil, err }
var resp *Response if pconn.alt != nil { // HTTP/2 path. t.setReqCanceler(req, nil) // not cancelable with CancelRequest resp, err = pconn.alt.RoundTrip(req) } else { resp, err = pconn.roundTrip(treq) } if err == nil { return resp, nil }
// ...此处省略代码... }}
复制代码

结合前面的分析,pconn.alt在 server 和 client 都支持 http2 协议的情况下是不为 nil 的。所以,http2 的请求会走pconn.alt.RoundTrip(req)分支,也就是说 http2 的请求流程就被http2Transport接管啦。

(*http2Transport).NewClientConn


(*http2Transport).NewClientConn 内部会调用t.newClientConn(c, t.disableKeepAlives())


因为本节内容较多,所以笔者不再一次性贴出源码,而是按关键步骤分析并分块儿贴出源码。


1、初始化一个http2ClientConn


cc := &http2ClientConn{	t:                     t,	tconn:                 c,	readerDone:            make(chan struct{}),	nextStreamID:          1,	maxFrameSize:          16 << 10,           // spec default	initialWindowSize:     65535,              // spec default	maxConcurrentStreams:  1000,               // "infinite", per spec. 1000 seems good enough.	peerMaxHeaderListSize: 0xffffffffffffffff, // "infinite", per spec. Use 2^64-1 instead.	streams:               make(map[uint32]*http2clientStream),	singleUse:             singleUse,	wantSettingsAck:       true,	pings:                 make(map[[8]byte]chan struct{}),}
复制代码

上面的源码新建了一个默认的 http2ClientConn。


initialWindowSize:初始化窗口大小为 65535,这个值之后会初始化每一个数据流可发送的数据窗口大小。


maxConcurrentStreams:表示每个连接上允许最多有多少个数据流同时传输数据。


streams:当前连接上的数据流。


singleUse: 控制 http2 的连接是否允许多个数据流共享,其值由t.disableKeepAlives()控制。


2、创建一个条件锁并且新建 Writer&Reader。


cc.cond = sync.NewCond(&cc.mu)cc.flow.add(int32(http2initialWindowSize))cc.bw = bufio.NewWriter(http2stickyErrWriter{c, &cc.werr})cc.br = bufio.NewReader(c)
复制代码

新建 Writer&Reader 没什么好说的,需要注意的是cc.flow.add(int32(http2initialWindowSize))


cc.flow.add将当前连接的可写流控制窗口大小设置为http2initialWindowSize,即 65535。


3、新建一个读写数据帧的 Framer。


cc.fr = http2NewFramer(cc.bw, cc.br)cc.fr.ReadMetaHeaders = hpack.NewDecoder(http2initialHeaderTableSize, nil)cc.fr.MaxHeaderListSize = t.maxHeaderListSize()
复制代码

4、向 server 发送开场白,并发送一些初始化数据帧。


initialSettings := []http2Setting{	{ID: http2SettingEnablePush, Val: 0},	{ID: http2SettingInitialWindowSize, Val: http2transportDefaultStreamFlow},}if max := t.maxHeaderListSize(); max != 0 {	initialSettings = append(initialSettings, http2Setting{ID: http2SettingMaxHeaderListSize, Val: max})}
cc.bw.Write(http2clientPreface)cc.fr.WriteSettings(initialSettings...)cc.fr.WriteWindowUpdate(0, http2transportDefaultConnFlow)cc.inflow.add(http2transportDefaultConnFlow + http2initialWindowSize)cc.bw.Flush()
复制代码

client 向 server 发送的开场白内容如下:


const (    // client首先想server发送以PRI开头的一串字符串。    http2ClientPreface = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n")var (	http2clientPreface = []byte(http2ClientPreface))
复制代码

发送完开场白后,client 向 server 发送SETTINGS数据帧。


http2SettingEnablePush: 告知 server 客户端是否开启 push 功能。


http2SettingInitialWindowSize:告知 server 客户端可接受的最大数据窗口是http2transportDefaultStreamFlow(4M)。


发送完 SETTINGS 数据帧后,发送 WINDOW_UPDATE 数据帧, 因为第一个参数为 0 即 streamID 为 0,则是告知 server 此连接可接受的最大数据窗口为http2transportDefaultConnFlow(1G)。


发送完 WINDOW_UPDATE 数据帧后,将 client 的可读流控制窗口大小设置为http2transportDefaultConnFlow + http2initialWindowSize


5、开启读循环并返回


go cc.readLoop()
复制代码

(*http2Transport).RoundTrip


(*http2Transport).RoundTrip 只是一个入口函数,它会调用(\http2Transport). RoundTripOpt 方法。


(*http2Transport). RoundTripOpt 有两个步骤比较关键:


t.connPool().GetClientConn(req, addr): 在 http2 的连接池里面获取一个可用连接,其中连接池的类型为http2noDialClientConnPool,参考http2configureTransport函数。


cc.roundTrip(req): 通过获取到的可用连接发送请求并返回响应。

(http2noDialClientConnPool).GetClientConn


根据实际的 debug 结果(http2noDialClientConnPool).GetClientConn 最终会调用(*http2clientConnPool).getClientConn(req *Request, addr string, dialOnMiss bool)


通过(http2noDialClientConnPool).GetClientConn 获取连接时传递给(*http2clientConnPool).getClientConn 方法的第三个参数始终为false,该参数为 false 时代表着即使无法正常获取可用连接,也不在这个环节重新发起拨号流程。


在(*http2clientConnPool).getClientConn 中会遍历同地址的连接,并判断连接的状态从而获取一个可以处理请求的连接。


for _, cc := range p.conns[addr] {	if st := cc.idleState(); st.canTakeNewRequest {		if p.shouldTraceGetConn(st) {			http2traceGetConn(req, addr)		}		p.mu.Unlock()		return cc, nil	}}
复制代码

cc.idleState()判断当前连接池中的连接能否处理新的请求:


1、当前连接是否能被多个请求共享,如果仅单个请求使用且已经有一个数据流,则当前连接不能处理新的请求。


if cc.singleUse && cc.nextStreamID > 1 {	return}
复制代码


2、以下几点均为 true 时,才代表当前连接能够处理新的请求:

* 连接状态正常,即未关闭并且不处于正在关闭的状态。

* 当前连接正在处理的数据流小于maxConcurrentStreams

下一个要处理的数据流 + 当前连接处于等待状态的请求 2 < math.MaxInt32。

* 当前连接没有长时间处于空闲状态(主要通过cc.tooIdleLocked()判断)。


st.canTakeNewRequest = cc.goAway == nil && !cc.closed && !cc.closing && maxConcurrentOkay &&		int64(cc.nextStreamID)+2*int64(cc.pendingRequests) < math.MaxInt32 &&		!cc.tooIdleLocked()
复制代码


当从链接池成功获取到一个可以处理请求的连接,就可以和 server 进行数据交互,即(*http2ClientConn).roundTrip流程。

(*http2ClientConn).roundTrip


1、在真正开始处理请求前,还要进行 header 检查,http2 对 http1.1 的某些 header 是不支持的,笔者就不对这个逻辑进行分析了,直接上源码:


func http2checkConnHeaders(req *Request) error {	if v := req.Header.Get("Upgrade"); v != "" {		return fmt.Errorf("http2: invalid Upgrade request header: %q", req.Header["Upgrade"])	}	if vv := req.Header["Transfer-Encoding"]; len(vv) > 0 && (len(vv) > 1 || vv[0] != "" && vv[0] != "chunked") {		return fmt.Errorf("http2: invalid Transfer-Encoding request header: %q", vv)	}	if vv := req.Header["Connection"]; len(vv) > 0 && (len(vv) > 1 || vv[0] != "" && !strings.EqualFold(vv[0], "close") && !strings.EqualFold(vv[0], "keep-alive")) {		return fmt.Errorf("http2: invalid Connection request header: %q", vv)	}	return nil}func http2commaSeparatedTrailers(req *Request) (string, error) {	keys := make([]string, 0, len(req.Trailer))	for k := range req.Trailer {		k = CanonicalHeaderKey(k)		switch k {		case "Transfer-Encoding", "Trailer", "Content-Length":			return "", &http2badStringError{"invalid Trailer key", k}		}		keys = append(keys, k)	}	if len(keys) > 0 {		sort.Strings(keys)		return strings.Join(keys, ","), nil	}	return "", nil}
复制代码

2、调用(*http2ClientConn).awaitOpenSlotForRequest,一直等到当前连接处理的数据流小于maxConcurrentStreams, 如果此函数返回错误,则本次请求失败。


2.1、double check 当前连接可用。


if cc.closed || !cc.canTakeNewRequestLocked() {	if waitingForConn != nil {		close(waitingForConn)	}	return http2errClientConnUnusable}
复制代码

2.2、如果当前连接处理的数据流小于maxConcurrentStreams则直接返回 nil。笔者相信大部分逻辑走到这儿就返回了。


if int64(len(cc.streams))+1 <= int64(cc.maxConcurrentStreams) {	if waitingForConn != nil {		close(waitingForConn)	}	return nil}
复制代码

2.3、如果当前连接处理的数据流确实已经达到上限,则开始进入等待流程。


if waitingForConn == nil {	waitingForConn = make(chan struct{})	go func() {		if err := http2awaitRequestCancel(req, waitingForConn); err != nil {			cc.mu.Lock()			waitingForConnErr = err			cc.cond.Broadcast()			cc.mu.Unlock()		}	}()}cc.pendingRequests++cc.cond.Wait()cc.pendingRequests--
复制代码

通过上面的逻辑知道,当前连接处理的数据流达到上限后有两种情况,一是等待请求被取消,二是等待其他请求结束。如果有其他数据流结束并唤醒当前等待的请求,则重复 2.1、2.2 和 2.3 的步骤。


3、调用cc.newStream()在连接上创建一个数据流(创建数据流是线程安全的,因为源码中在调用awaitOpenSlotForRequest之前先加锁,直到写入请求的 header 之后才释放锁)。


func (cc *http2ClientConn) newStream() *http2clientStream {	cs := &http2clientStream{		cc:        cc,		ID:        cc.nextStreamID,		resc:      make(chan http2resAndError, 1),		peerReset: make(chan struct{}),		done:      make(chan struct{}),	}	cs.flow.add(int32(cc.initialWindowSize))	cs.flow.setConnFlow(&cc.flow)	cs.inflow.add(http2transportDefaultStreamFlow)	cs.inflow.setConnFlow(&cc.inflow)	cc.nextStreamID += 2	cc.streams[cs.ID] = cs	return cs}
复制代码

笔者对上述代码简单描述如下:

  • 新建一个http2clientStream,数据流 ID 为cc.nextStreamID,新建数据流后,cc.nextStreamID +=2

  • 数据流通过http2resAndError管道接收请求的响应。

  • 初始化当前数据流的可写流控制窗口大小为cc.initialWindowSize,并保存连接的可写流控制指针。

  • 初始化当前数据流的可读流控制窗口大小为http2transportDefaultStreamFlow,并保存连接的可读流控制指针。

  • 最后将新建的数据流注册到当前连接中。


4、调用cc.t.getBodyWriterState(cs, body)会返回一个http2bodyWriterState结构体。通过该结构体可以知道请求 body 是否发送成功。


func (t *http2Transport) getBodyWriterState(cs *http2clientStream, body io.Reader) (s http2bodyWriterState) {	s.cs = cs	if body == nil {		return	}	resc := make(chan error, 1)	s.resc = resc	s.fn = func() {		cs.cc.mu.Lock()		cs.startedWrite = true		cs.cc.mu.Unlock()		resc <- cs.writeRequestBody(body, cs.req.Body)	}	s.delay = t.expectContinueTimeout()	if s.delay == 0 ||		!httpguts.HeaderValuesContainsToken(			cs.req.Header["Expect"],			"100-continue") {		return	}	// 此处省略代码,因为绝大部分请求都不会设置100-continue的标头	return}
复制代码

s.fn: 标记当前数据流开始写入数据,并且将请求 body 的发送结果写入s.resc管道(本文暂不对writeRequestBody展开分析,下篇文章会对其进行分析)。


5、因为是多个请求共享一个连接,那么向连接写入数据帧时需要加锁,比如加锁写入请求头。


cc.wmu.Lock()endStream := !hasBody && !hasTrailerswerr := cc.writeHeaders(cs.ID, endStream, int(cc.maxFrameSize), hdrs)cc.wmu.Unlock()
复制代码


6、如果有请求 body,则开始写入请求 body,没有请求 body 则设置响应 header 的超时时间(有请求 body 时,响应 header 的超时时间需要在请求 body 写完之后设置)。


if hasBody {	bodyWriter.scheduleBodyWrite()} else {	http2traceWroteRequest(cs.trace, nil)	if d := cc.responseHeaderTimeout(); d != 0 {		timer := time.NewTimer(d)		defer timer.Stop()		respHeaderTimer = timer.C	}}
复制代码

scheduleBodyWrite的内容如下:


func (s http2bodyWriterState) scheduleBodyWrite() {	if s.timer == nil {		// We're not doing a delayed write (see		// getBodyWriterState), so just start the writing		// goroutine immediately.		go s.fn()		return	}	http2traceWait100Continue(s.cs.trace)	if s.timer.Stop() {		s.timer.Reset(s.delay)	}}
复制代码

因为笔者的请求 header 中没有携带100-continue标头,所以在前面的getBodyWriterState函数中初始化的 s.timer 为 nil 即调用scheduleBodyWrite会立即开始发送请求 body。


7、轮询管道获取响应结果。


在看轮询源码之前,先看一个简单的函数:


handleReadLoopResponse := func(re http2resAndError) (*Response, bool, error) {	res := re.res	if re.err != nil || res.StatusCode > 299 {		bodyWriter.cancel()		cs.abortRequestBodyWrite(http2errStopReqBodyWrite)	}	if re.err != nil {		cc.forgetStreamID(cs.ID)		return nil, cs.getStartedWrite(), re.err	}	res.Request = req	res.TLS = cc.tlsState	return res, false, nil}
复制代码

该函数主要就是判断读到的响应是否正常,并根据响应的结果构造(*http2ClientConn).roundTrip的返回值。


了解了handleReadLoopResponse之后,下面就看看轮询的逻辑:


for {	select {	case re := <-readLoopResCh:		return handleReadLoopResponse(re)	// 此处省略代码(包含请求取消,请求超时等管道的轮询)	case err := <-bodyWriter.resc:		// Prefer the read loop's response, if available. Issue 16102.		select {		case re := <-readLoopResCh:			return handleReadLoopResponse(re)		default:		}		if err != nil {			cc.forgetStreamID(cs.ID)			return nil, cs.getStartedWrite(), err		}		bodyWritten = true		if d := cc.responseHeaderTimeout(); d != 0 {			timer := time.NewTimer(d)			defer timer.Stop()			respHeaderTimer = timer.C		}	}}
复制代码

笔者仅对上面的第二种情况即请求 body 发送完成进行描述:

  • 能否读到响应,如果能够读取响应则直接返回。

  • 判断请求 body 是否发送成功,如果发送失败,直接返回。

  • 如果请求 body 发送成功,则设置响应 header 的超时时间。

总结


本文主要描述了两个方面的内容:

  1. 确认 client 和 server 都支持 http2 协议,并构建一个 http2 的连接,同时开启该连接的读循环。

  2. 通过 http2 连接池获取一个 http2 连接,并发送请求和读取响应。

预告


鉴于 HTTTP2.0 的内容较多,且文章篇幅过长时不易阅读,笔者将后续要分析的内容拆为两个部分:


  1. 描述数据帧和流控制以及读循环读到响应并发送给readLoopResCh管道。

  2. http2.0 标头压缩逻辑。


最后,衷心希望本文能够对各位读者有一定的帮助。


:

1. 写本文时, 笔者所用 go 版本为: go1.14.2。

2. 本文对 h2c 的情况不予以考虑。

3. 因为笔者分析的是请求流程,所以没有在本地搭建 server,而是使用了一个支持 http2 连接的图片一步步的 debug。eg: https://dss0.bdstatic.com/5aV1bjqh_Q23odCf/static/superman/img/topnav/baiduyun@2x-e0be79e69e.png


参考

https://developers.google.com/web/fundamentals/performance/http2?hl=zh-cn


发布于: 2020 年 10 月 12 日阅读数: 122
用户头像

Gopher指北

关注

还未添加个人签名 2020.09.15 加入

欢迎关注公众号:Gopher指北

评论

发布
暂无评论
Go发起HTTP2.0请求流程分析(前篇)