写点什么

ARTS-week1

用户头像
书生
关注
发布于: 2020 年 06 月 05 日
ARTS-week1

1. Algorithm



Huffman压缩编码

学习自数据结构与算法之美——王争



解决的问题

更节省空间的存储方式



铺垫

假设一个文件包含1000个字符,每个字符1byte(8bits),那么这个文件大小为1000*8=8000bits



假设这个文件中只包含六种字符:a, b, c, d, e, f.



那么我们为了节省空间,可以用3bit来表示这六个字符,即:



a(000)、b(001)、c(010)、d(011)、e(100)、f(101)



此时由于每个字符只占3个bit,所以文件大小为3000bits。节省了一大半空间!



Huffman编码

huffman不仅会考察文本中有多少种字符,还会考察每种字符出现的频率。根据频率的不同,选则不同长度的编码。



根据贪心的思想,我们会将出现频率高的字符用长度较短的编码,出现频率低的字符用长度较长的编码。



由于编码长度不同,为了避免歧义,huffman要求各个字符的编码之间,不会出现一个编码是另一个编码前缀的情况。



假设6个字符出现的频率由高到低依次为a、b、c、d、e、f,我们可以编码为:





经过huffman编码后的文件大小为:1910bits!



如何根据频率来对字符进行编码?

我们把每个字符看作一个节点,并且辅带着把频率放到优先级队列中。我们从队列中取出频率最小的两个节点 A、B,然后新建一个节点 C,把频率设置为两个节点的频率之和,并把这个新节点 C 作为节点 A、B 的父节点。最后再把 C 节点放入到优先级队列中。重复这个过程,直到队列中没有数据。





现在,我们给每一条边加上画一个权值,指向左子节点的边我们统统标记为 0,指向右子节点的边,我们统统标记为 1,那从根节点到叶节点的路径就是叶节点对应字符的霍夫曼编码。





2. Review



由于工作需要,所以查看了一些百万长连接相关的技术文章,这是最有价值的一篇,所以大概翻译了下。



原文:A Million WebSockets and Go



场景

三百万的websocket长连接



正常做法

分析

channel struct



// Packet represents application level data.
type Packet struct {
...
}
// Channel wraps user connection.
type Channel struct {
conn net.Conn // WebSocket connection.
send chan Packet // Outgoing packets queue.
}
func NewChannel(conn net.Conn) *Channel {
c := &Channel{
conn: conn,
send: make(chan Packet, N),
}
go c.reader()
go c.writer()
return c
}

上面实现了一个websocket channel



每个连接都需要两个goroutine。每个goroutine大概会占用2k到8k的内存,不同操作系统表现不同



我们以每个goroutine占用4KB内存计算,3百万个连接会使用6百万个goroutine占用24GB内存



。这里只是goroutine占用的内存,channel的结构以及其他如ch.send中的结构都没有计算在内。



I/O goroutines



看一下reader的实现:



func (c *Channel) reader() {
// We make a buffered read to reduce read syscalls.
buf := bufio.NewReader(c.conn)
for {
pkt, _ := readPacket(buf)
c.handle(pkt)
}
}

这里我们使用bufio.Reader来减少read()的系统调用次数,并在buf的大小限制内尽可能多的读取。



我们先不考虑对数据的解析与处理,这和我们的优化无关。



注意这里的buf,它默认的大小是4KB,这就意味着12GB的内存占用。writer同样如此:



func (c *Channel) writer() {
// We make buffered write to reduce write syscalls.
buf := bufio.NewWriter(c.conn)
for pkt := range c.send {
_ := writePacket(buf, pkt)
buf.Flush()
}
}

readerwriter一共占用了24GB内存



HTTP



我们已经实现了一个简单的channel,现在我们需要一个websocket连接来让他工作。



import (
"net/http"
"some/websocket"
)
http.HandleFunc("/v1/ws", func(w http.ResponseWriter, r *http.Request) {
conn, _ := websocket.Upgrade(r, w)
ch := NewChannel(conn)
//...
})

这是一个正常的使用websocket的方式。



这里的http.ResponseWritehttp.Request会通过bufio.Readerbufio.Writer进行内存分配,每个是4KB。



不管是哪个websocket工具库,响应成功后都会调用responseWriter.Hijack(),然后都会接收到一个带着TCP连接的I/O缓存。



有时候我们会通过调用net/http.putBufio{Reader,Writer}来将缓存放回到net/httpsync.Pool



这里我们又占用了24GB内存



所以,光是连接我们就占用了72GB



优化

我们回顾一下一个用户的连接都做了什么。再将协议转换到websocket后,客户端发送了一个数据包,然后它就很有可能不在发送任何东西,而它的生命周期可能是几秒或者几天。



所以在大多数时间里,channel.reader()channel.writer()只是在等待数据,然而他们的等待就消耗了我们4kb内存。



netpoll

channel.reader()的实现中,在bufio.Reader.Read()中的conn.Reader()一直在等待接收新数据。如果这个连接里有数据,GO的运行时会唤醒goroutine并让它读取下个数据包,在这之后,这个goroutine又进入沉睡。让我们来看看GO的运行时在什么情况下会唤醒goroutine



让我们看下conn.Read()的实现。在net.netFD.Read()中:



// net/fd_unix.go
func (fd *netFD) Read(p []byte) (n int, err error) {
//...
for {
n, err = syscall.Read(fd.sysfd, p)
if err != nil {
n = 0
if err == syscall.EAGAIN {
if err = fd.pd.waitRead(); err == nil {
continue
}
}
}
//...
break
}
//...
}



GO使用了非阻塞模式的sockets,EAGAIN表示在socket里没有数据了但不会阻塞在空的socket上,OS会把控制权返回给用户进程。



它首先对文件描述符进行read()的系统调用,如果读取会返回EAGAIN error,运行时会调用 pollDesc.waitRead()



// net/fd_poll_runtime.go
func (pd *pollDesc) waitRead() error {
return pd.wait('r')
}
func (pd *pollDesc) wait(mode int) error {
res := runtime_pollWait(pd.runtimeCtx, mode)
//...
}

如果我们深入了解,就能看到netpolllinux下是通过epoll实现的,而在BSD下是通过kqueue实现的。我们的连接为什么不使用相同的方式呢:只在socket有需要读取的数据时,才分配缓存空间并启动读取数据的goroutine



摆脱goroutines

假设我们用netpoll模式实现了GO,我们就可以在订阅连接中的可读数据时,避免使用channel.reader()中的goroutine与其中的缓存。



ch := NewChannel(conn)
// Make conn to be observed by netpoll instance.
poller.Start(conn, netpoll.EventRead, func() {
// We spawn goroutine here to prevent poller wait loop
// to become locked during receiving packet from ch.
go Receive(ch)
})
// Receive reads a packet from conn and handles it somehow.
func (ch *Channel) Receive() {
buf := bufio.NewReader(ch.conn)
pkt := readPacket(buf)
c.handle(pkt)
}

实现channel.writer()也是非常简单的,我们只需要在要发送数据包时运行goroutine并分配缓存。



func (ch *Channel) Send(p Packet) {
if c.noWriterYet() {
go ch.writer()
}
ch.send <- p
}



注意当系统调用write()时,操作系统会返回EAGAIN,这里我们没有处理这种情况。在这种情况下,我们依靠Go运行时,因为这种服务器实际上很少见。如果需要的话,我们能够以相同的方式处理它。



ch.send读取数据包后,writer就会结束它的操作,并且释放goroutine栈以及发送占用的缓存。



我们通过这种方式,在readerwriter中能够减少48GB的内存占用!



控制资源

大量的连接数量带来的不仅仅是大量的内存消耗。在开发服务端时,我们会不停的遇到竞态条件和死锁,随之而来的是所谓的自我分布式阻断攻击(self-DDOS)。这种情况下,客户端会尝试重新连接服务器而把情况弄得更糟。



举个例子,由于某些原因,我们突然不能处理ping/pong信息,客户端就会认为连接中断并无法接收到任何数据,于是客户端会每个N秒进行一次重连,而不是等待服务端通知。



一种好的办法是被锁定的或者负载过重的服务端停止接收新的连接,并且通过负载均衡器(如nginx)把请求分发到其他服务上。



更进一步,不管服务器的负载,如果所有的客户端突然想要发送一个数据包,那么之前节省的48GB内存就会被再次使用,那我们就再次回到了最初的状态。



Goroutine pool



我们应该通过使用goroutine pool来限制同时处理的数据包的数量。



这里是一个简单的实现:



package gopool
func New(size int) *Pool {
return &Pool{
work: make(chan func()),
sem: make(chan struct{}, size),
}
}
func (p *Pool) Schedule(task func()) error {
select {
case p.work <- task:
case p.sem <- struct{}{}:
go p.worker(task)
}
}
func (p *Pool) worker(task func()) {
defer func() { <-p.sem }
for {
task()
task = <-p.work
}
}



现在我们的netpool看起来是这样:



pool := gopool.New(128)
poller.Start(conn, netpoll.EventRead, func() {
// We will block poller wait loop when
// all pool workers are busy.
pool.Schedule(func() {
Receive(ch)
})
})



所以现在我们读取数据包不仅仅取决于socket上的可读数据,还取决于能够在goroutine pool中能够使用的goroutine数量。



现在我们来修改Send()



pool := gopool.New(128)
func (ch *Channel) Send(p Packet) {
if c.noWriterYet() {
pool.Schedule(ch.writer)
}
ch.send <- p
}



不同于go ch.writer(),我们想要重复使用goroutine进行发送数据。因此,对于一个有N个goroutinepool,我们能够保证最多有N个请求同时处理,当第N+1个请求时,不会分配N+1个缓存。goroutine pool也允许我们限制一个新连接的Accept()Upgrade()来避免DDoS的大多数情况



零拷贝升级

客户端通过一个HTTP升级请求来转换WebSocket协议,如:



GET /ws HTTP/1.1
Host: mail.ru
Connection: Upgrade
Sec-Websocket-Key: A3xNe7sEB9HixkmBhVrYaA==
Sec-Websocket-Version: 13
Upgrade: websocket
HTTP/1.1 101 Switching Protocols
Connection: Upgrade
Sec-Websocket-Accept: ksu0wXWG+YmkVx+KQR2agP0cQn4=
Upgrade: websocket



我们需要HTTP请求,并且仅使用它的头部来转换websocket协议。这些信息都存储在http.Request中,所以我们可以通过放弃使用标准的net/http服务并在处理HTTP请求的时候避免无用的内存分配和拷贝。



举个例子,http.Request包含了很多类型的头部,标准库会将这些信息无条件的复制到Header字段中,可以想象有多少多余的数据,例如一个很大的cookie的头部



不幸的是,在我们找到的所有库中只有支持对net/http的升级。此外,没有一个库可以让我们来对上面的读和写进行优化。为了这些优化工作,我们必须有一套底层的API来操作websocket。为了重用缓存,我们需要类似下面的协议函数:



func ReadFrame(io.Reader) (Frame, error)
func WriteFrame(io.Writer, Frame) error



如果我们有一个这样的API的库,我们就能像下边这样从连接里读取数据包(写数据包类似)



// getReadBuf, putReadBuf are intended to
// reuse *bufio.Reader (with sync.Pool for example).
func getReadBuf(io.Reader) *bufio.Reader
func putReadBuf(*bufio.Reader)
// readPacket must be called when data could be read from conn.
func readPacket(conn io.Reader) error {
buf := getReadBuf()
defer putReadBuf(buf)
buf.Reset(conn)
frame, _ := ReadFrame(buf)
parsePacket(frame.Payload)
//...
}



长话短说,是时候来构造我们自己的库了



github.com/gobwas/ws



ws库没有将协议的操作逻辑暴露给用户。所有的读和写方法都接收通用的io.Readerio.Writer接口,所以我们可以选择用缓存或者其他I/O的包装。



除了从net/http库升级请求外,ws支持零拷贝升级,升级请求的处理和转换到websocket协议不会使用多余的内存分配或者复制。ws.Upgrade()接受io.ReadWriternet.Conn是心啊了这个接口)。在另一方面,我们能够使用标准的net.Listen(),然后立即将从ln.Accept接收的连接传递给ws.Upgrade()去处理。这个库能够支持在未来的应用中复制任何请求的数据(例如,使用cookie来确认session



下面是升级请求过程的性能测试:标准的net/http与使用了零拷贝升级的net.Listen



BenchmarkUpgradeHTTP 5156 ns/op 8576 B/op 9 allocs/op
BenchmarkUpgradeTCP 973 ns/op 0 B/op 0 allocs/op



转换到ws的零拷贝升级,我们能够再节省24GB内存——在net/http处理请求时为I/O缓存分配的。



回顾

总结一下我们优化的地方



  • goroutine与其上的昂贵的缓存。方案netpollepollkqueue);重用缓存。

  • goroutine与其上的昂贵的缓存。方案:只有在必要时才开启缓存;重用缓存。

  • 大量的连接访问,导致netpool不能工作。方案:在限制的数量内重用goroutine

  • net/http不是最快的方式去处理websocket的升级。方案:基于TCP的连接使用零拷贝升级。



下边是我们的代码



import (
"net"
"github.com/gobwas/ws"
)
ln, _ := net.Listen("tcp", ":8080")
for {
// Try to accept incoming connection inside free pool worker.
// If there no free workers for 1ms, do not accept anything and try later.
// This will help us to prevent many self-ddos or out of resource limit cases.
err := pool.ScheduleTimeout(time.Millisecond, func() {
conn := ln.Accept()
_ = ws.Upgrade(conn)
// Wrap WebSocket connection with our Channel struct.
// This will help us to handle/send our app's packets.
ch := NewChannel(conn)
// Wait for incoming bytes from connection.
poller.Start(conn, netpoll.EventRead, func() {
// Do not cross the resource limits.
pool.Schedule(func() {
// Read and handle incoming packet(s).
ch.Recevie()
})
})
})
if err != nil {
time.Sleep(time.Millisecond)
}
}



结论

过早的优化是万恶之源——Donald Knuth



当然,上边的优化是有意义的,但不适用于所有场景。例如:如果空闲资源(内存、CPU)与在线的连接数比例很大时,这些优化时没有意义的。然而,你仍能收益于了解怎样去提高性能。



3. Tip



最近学了很多东西,这里介绍一些小的方面。



我们在数据库分库分表或者API网关负载均衡或者缓存存入集群时,常常使用两种算法。



1. 哈希取模

这种算法能保证节点的数据分布较均匀。但是在扩容/缩容时,对节点的影响很大。



2. 一致性哈希算法

一致性哈希算法的精髓在于,计算结果不是落在一个点上,而是在一条线上,于是节点对应的是一个范围值,落在这个范围的数据会存放在这个节点上。



所以当我们扩容/缩容时,只会影响一个节点。



当然这种情况会导致节点上的数据分布不均,为解决这个问题,引入了虚拟节点



这里只是简单介绍,感兴趣的小伙伴可以自行查阅。



4. Share



分析自己平时的一些生活感悟。



由于住所和公司距离较近,所以平时都是走路上下班。最近天气越来越热,所以我每次晚上回家都能出一身汗。



有一天刚下班,我突然想到,晚上的天气明明不是很热,为什么我每次回家都会出一身汗?



于是我开始注意我的呼吸,注意我的身体反应。当我身体感觉有点燥热的时候,我就放慢脚步,慢慢走,其实不热的。当我看到旁边的人快速走过时,潜意识也想加快脚步,于是我感觉到了身体开始有些发燥;当我去想一些工作的事情时,我感觉到身体有些发燥、脚步也有些加快。然而每当我感到身体有变化时,我就告诉我自己,慢一点。



最终,我到了家,发现仍出了些汗,但是比平时少很多。



当我们身处这个时代的洪流中时,我们似乎总是“身不由己”,我们急切的想要得到些东西,急切的想要证明些东西。我们跟随着别人的脚步,看那些有名的书,学那些更潮的技术。然而,我们似乎更应该停下来,想想我们真正想要的是什么。



慢一些,再慢一些。我们本来就是独一无二的,所以更应该倾听自己内心的感受。



儒家讲:正心、诚意、修身、齐家、治国、平天下。



不要急着去成功,人生还很长。



做人先修心。



希望我们都能找到自己的节奏,在自己的节奏中走向远方。共勉~



用户头像

书生

关注

还未添加个人签名 2018.08.07 加入

还未添加个人简介

评论

发布
暂无评论
ARTS-week1