写点什么

nsq 源码阅读之 diskqueue

用户头像
werben
关注
发布于: 2021 年 03 月 23 日

有兴趣可以看看这篇文章

https://www.cnblogs.com/zhangboyu/p/7457070.html

一、队列存储

队列的特征是先入先出,也就是写入是从后面写入,读取是从前面读取

我们平时写的队列一般是放到内存里面,比如一个大的动态数组

这里如果队列中的数据很大,diskqueue 则是将这个动态数组拆成了好多个文件来存储队列中的数据


如果队列是放在内存数组中,那么队列只需要记录两个属性,一个头的位置,一个是尾的位置,

队列大小 depth = 头位置 - 尾位置


但是由于 diskqueue 是将数组保存在多个文件中

所以 diskqueue 就会有五个属性: 头所在的文件,头在文件中的位置,尾所在的文件,尾在文件中的位置,还有就是 depth 标识头和尾中间的数据数量

这五个数据作为 diskqueue 的元数据单独保存在一个文件里面。

所以 New 一个 diskqueue 的时候先要这几个元数据读取出来


func New(name string, dataPath string, maxBytesPerFile int64,	minMsgSize int32, maxMsgSize int32,	syncEvery int64, syncTimeout time.Duration, logf AppLogFunc) Interface {	d := diskQueue{		//名称		name:              name,		//文件保存路径		dataPath:          dataPath,		//每个文件大小最大值,超过要重新开启一个文件		maxBytesPerFile:   maxBytesPerFile,		//写入消息最小大小		minMsgSize:        minMsgSize,		//写入消息最大大小		maxMsgSize:        maxMsgSize,		readChan:          make(chan []byte),		writeChan:         make(chan []byte),		writeResponseChan: make(chan error),		emptyChan:         make(chan int),		emptyResponseChan: make(chan error),		exitChan:          make(chan int),		exitSyncChan:      make(chan int),		syncEvery:         syncEvery,		syncTimeout:       syncTimeout,		logf:              logf,	}
// 读取队列数据 err := d.retrieveMetaData() if err != nil && !os.IsNotExist(err) { d.logf(ERROR, "DISKQUEUE(%s) failed to retrieveMetaData - %s", d.name, err) }
go d.ioLoop() return &d}
// 读取队列数据func (d *diskQueue) retrieveMetaData() error { var f *os.File var err error
fileName := d.metaDataFileName() f, err = os.OpenFile(fileName, os.O_RDONLY, 0600) if err != nil { return err } defer f.Close()
//队列写入和读取位置中间有多少条数据,也就是队列的大小 var depth int64 //读取队列核心数据 //当前读取文件是哪个,读取位置是哪里 //当前写入的文件是哪个,写入文件位置 _, err = fmt.Fscanf(f, "%d\n%d,%d\n%d,%d\n", &depth, &d.readFileNum, &d.readPos, &d.writeFileNum, &d.writePos) if err != nil { return err } atomic.StoreInt64(&d.depth, depth)
//下一个读取文件 d.nextReadFileNum = d.readFileNum //下一个读取位置 d.nextReadPos = d.readPos
return nil}
复制代码


二、写入队列


func (d *diskQueue) writeOne(data []byte) error {	var err error
// 当前写入文件是否打开,没有则打开当前写入文件 if d.writeFile == nil { curFileName := d.fileName(d.writeFileNum) d.writeFile, err = os.OpenFile(curFileName, os.O_RDWR|os.O_CREATE, 0600) if err != nil { return err }
d.logf(INFO, "DISKQUEUE(%s): writeOne() opened %s", d.name, curFileName)
//如果当前写入位置大于0,则将文件位置移动到写入位置点 if d.writePos > 0 { _, err = d.writeFile.Seek(d.writePos, 0) if err != nil { d.writeFile.Close() d.writeFile = nil return err } } }
dataLen := int32(len(data))
//判断消息大小是否合法 if dataLen < d.minMsgSize || dataLen > d.maxMsgSize { return fmt.Errorf("invalid message write size (%d) maxMsgSize=%d", dataLen, d.maxMsgSize) }
//将缓冲区清空 d.writeBuf.Reset() //将消息大小写入缓冲区 err = binary.Write(&d.writeBuf, binary.BigEndian, dataLen) if err != nil { return err }
//将消息写入缓冲区 _, err = d.writeBuf.Write(data) if err != nil { return err }
// only write to the file once //将缓冲区关联到文件 _, err = d.writeFile.Write(d.writeBuf.Bytes()) if err != nil { d.writeFile.Close() d.writeFile = nil return err }
//计算总大小 totalBytes := int64(4 + dataLen) d.writePos += totalBytes //队列消息数量+1 atomic.AddInt64(&d.depth, 1)
//如果写入位置大于了文件最大大小 if d.writePos >= d.maxBytesPerFile { //将当前写入文件+1 d.writeFileNum++ //当前写入位置重置为0 d.writePos = 0
// sync every time we start writing to a new file //将缓存数据写入到磁盘 err = d.sync() if err != nil { d.logf(ERROR, "DISKQUEUE(%s) failed to sync - %s", d.name, err) }
if d.writeFile != nil { d.writeFile.Close() d.writeFile = nil } }
return err}
复制代码


三、读取队列

func (d *diskQueue) readOne() ([]byte, error) {	var err error	var msgSize int32
// 当前读取文件是否打开,没有则打开当前文件 if d.readFile == nil { // 打开读取的文件(也就是当前队列头所在的文件),如果文件大小到达上线maxBytesPerFile,readFileNum加一 curFileName := d.fileName(d.readFileNum) d.readFile, err = os.OpenFile(curFileName, os.O_RDONLY, 0600) if err != nil { return nil, err }
d.logf(INFO, "DISKQUEUE(%s): readOne() opened %s", d.name, curFileName)
//当前队列头在当前读取文件的位置 if d.readPos > 0 { _, err = d.readFile.Seek(d.readPos, 0) if err != nil { d.readFile.Close() d.readFile = nil return nil, err } }
d.reader = bufio.NewReader(d.readFile) }
// 先读取出消息的大小 err = binary.Read(d.reader, binary.BigEndian, &msgSize) if err != nil { d.readFile.Close() d.readFile = nil return nil, err }
//判断消息大小是否合法 if msgSize < d.minMsgSize || msgSize > d.maxMsgSize { // this file is corrupt and we have no reasonable guarantee on // where a new message should begin d.readFile.Close() d.readFile = nil return nil, fmt.Errorf("invalid message read size (%d)", msgSize) }
//读取消息 readBuf := make([]byte, msgSize) _, err = io.ReadFull(d.reader, readBuf) if err != nil { d.readFile.Close() d.readFile = nil return nil, err }
totalBytes := int64(4 + msgSize)
//将下一个要读取的位置往后移 d.nextReadPos = d.readPos + totalBytes d.nextReadFileNum = d.readFileNum
//判断下一个读取的位置是不是超过了文件大小 if d.nextReadPos > d.maxBytesPerFile { if d.readFile != nil { d.readFile.Close() d.readFile = nil } //如果超过了,则当前队列头文件要往后移,且读取位置设置为0 d.nextReadFileNum++ d.nextReadPos = 0 }
return readBuf, nil}
复制代码


// 刷新缓存到磁盘func (d *diskQueue) sync() error {	if d.writeFile != nil {		// 将缓冲区的数据从内存中拷贝刷新到硬盘中保存		err := d.writeFile.Sync()		if err != nil {			d.writeFile.Close()			d.writeFile = nil			return err		}	}
//保存元数据 err := d.persistMetaData() if err != nil { return err }
d.needSync = false return nil}
复制代码


四、ioLoop 循环


这个函数是一个“守护”协程,

暴露的 d.writeChan 和 d.readChan

如果外部有网 writeChan 里写数据在这里处理

同时,这里的消息也会通过 d.readChan 将消息不断的从队列中往外推


func (d *diskQueue) ioLoop() {	var dataRead []byte	var err error	var count int64	var r chan []byte
//开启一个timer,syncTimeout在系统配置里,默认是2秒一次 syncTicker := time.NewTicker(d.syncTimeout)
for { // SyncEvery是系统配置,默认值是2500 // 也就是如果现在这段代码中的count的值如果到了2500,则将缓存中的数据保存到磁盘 if count == d.syncEvery { d.needSync = true }
// needSync这个字段如果为true,则将缓存中的数据保存到磁盘 if d.needSync { err = d.sync() if err != nil { d.logf(ERROR, "DISKQUEUE(%s) failed to sync - %s", d.name, err) } //同步缓存到磁盘成功,将count重置为0 count = 0 }
//如果队列头和尾中间还有数据,则从头部读取数据 if (d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos) { if d.nextReadPos == d.readPos { dataRead, err = d.readOne() if err != nil { d.logf(ERROR, "DISKQUEUE(%s) reading at %d of %s - %s", d.name, d.readPos, d.fileName(d.readFileNum), err) d.handleReadError() continue } } //这里读取的数据放到readChan这个通道里 r = d.readChan } else { r = nil }
select { // the Go channel spec dictates that nil channel operations (read or write) // in a select are skipped, we set r to d.readChan only when there is data to read //看上面的英文注释,如果r为空,则这个分支会被跳过,这是golang的一个特性 //将读取的消息,丢到d.readChan里面,r.readChan向外部暴露 case r <- dataRead: count++ // moveForward sets needSync flag if a file is removed //moveForward()会删除已经没用的file,这个文件中的数据已经全部被读取了,不在队列头和尾之间了 d.moveForward() case <-d.emptyChan: d.emptyResponseChan <- d.deleteAllFiles() count = 0 case dataWrite := <-d.writeChan: //如果d.writeChan有写入数据,则将消息数据写入到队列 count++ d.writeResponseChan <- d.writeOne(dataWrite) case <-syncTicker.C: //这里相当于两秒钟同步一次缓存到磁盘 if count == 0 { // avoid sync when there's no activity continue } d.needSync = true case <-d.exitChan: goto exit } }
exit: d.logf(INFO, "DISKQUEUE(%s): closing ... ioLoop", d.name) syncTicker.Stop() d.exitSyncChan <- 1}
复制代码


发布于: 2021 年 03 月 23 日阅读数: 12
用户头像

werben

关注

还未添加个人签名 2018.01.08 加入

还未添加个人简介

评论

发布
暂无评论
nsq源码阅读之diskqueue