写点什么

手撕环形队列系列三:多生产者并行写入

发布于: 刚刚

本文是手撕环形队列系列的第三篇,之前的文章链接如下:

《手撕环形队列》

《手撕环形队列系列二:无锁实现高并发》


之前系列文章介绍的环形队列,已经能够支持多生产者和多消费者并发操作,数据操作模式为:

生产者准备好数据,然后 push()方法放入环形队列中;

消费者准备好接收的缓冲区,然后调用 pop()方法将数据写入接收缓冲区中。


这种模式下,生产者准备数据过程中,需要分配内存。把数据写入环形队列后,这个内存就不需要了,得释放掉。这样,会导致生产者频繁进行内存分配和释放的动作。


因此,对性能要求更苛刻的系统,希望生产者能直接使用环形队列中的内存,直接写入,这样就不需要生产者再分配内存了。这种思路下,环形队列和生产者之间的交互关系变为:


  1. 生产者调用 ring_queue.request_push()方法,获得一个可以写入的 token;

  2. 生产者通过 token 获取内存地址,向这个地址进行数据写入, 把数据都写好;

  3. 生产者调用 token.finish()方法, 通知环形队列已经完成写入。


多个生产者可以并行操作,用图示意如下:


上图中,环形队列中的元素,用三种不同颜色来区分:

可写入区域:可以写入的位置区域;

正在写入区域:已经分配出去,producer 正在进行写入的区域

写入完成区域:producer 写入完成,并且环形队列已经更新其状态的区域。


多个 producer(p1, p2, p3) 并发在环形队列尾部写入,它们的速度不一定相同,因此并发写入区域会形成正在写入区域、写入完成区域互相交错的情况

这种情况下,消费者读取时,把 tail 作为上限,并且要判断区域是已写入完成状态,才可以读取进行消费


支持多生产者并行提交的环形队列,C 语言实现代码如下。

本文仅展示了多生产者并行写入,消费者依然是串行消费。(依照本文的思路,小伙伴们可以继续扩展,让多个消费者也能并行消费。)

// ring_queue.h#ifndef RING_QUEUE_H#define RING_QUEUE_H
typedef struct ring_queue_t { char* pbuf; char* pstate; int item_size; int capacity;
volatile int write_flag; volatile int read_flag;
volatile int head; volatile int tail; volatile int same_cycle;} ring_queue_t;
typedef struct ring_queue_token_t { ring_queue_t* pqueue; void* pitem; int idx;} ring_queue_token_t;
int ring_queue_init(ring_queue_t* pqueue, int item_size, int capacity);void ring_queue_destroy(ring_queue_t* pqueue);
int ring_queue_request_push(ring_queue_t* pqueue, ring_queue_token_t* ptoken);void* ring_queue_token_item(ring_queue_token_t* ptoken);int ring_queue_token_finish(ring_queue_token_t* ptoken);
int ring_queue_pop(ring_queue_t* pqueue, void* pitem);
int ring_queue_is_empty(ring_queue_t* pqueue);int ring_queue_is_full(ring_queue_t* pqueue);
#endif
复制代码


// ring_queue.c#include "ring_queue.h"
#include <stdlib.h>#include <string.h>
#define CAS(ptr, old, new) __sync_bool_compare_and_swap(ptr, old, new)
enum ItemState { ITEM_BLANK, ITEM_WRITING, ITEM_WRITED,};
int ring_queue_init(ring_queue_t* pqueue, int item_size, int capacity) { memset(pqueue, 0, sizeof(*pqueue)); pqueue->pbuf = (char*)malloc(item_size * capacity); pqueue->pstate = (char*)malloc(capacity); if (!pqueue->pbuf || !pqueue->pstate) { return -1; }
pqueue->item_size = item_size; pqueue->capacity = capacity; pqueue->same_cycle = 1; memset(pqueue->pstate, ITEM_BLANK, capacity); return 0;}
void ring_queue_destroy(ring_queue_t* pqueue) { free(pqueue->pbuf); free(pqueue->pstate); memset(pqueue, 0, sizeof(*pqueue));}
int ring_queue_request_push(ring_queue_t* pqueue, ring_queue_token_t* ptoken) { // try to lock write flag while (1) { if (ring_queue_is_full(pqueue)) { return -1; }
if (CAS(&pqueue->write_flag, 0, 1)) { // set write flag successfully break; } }
// generate token int idx = pqueue->tail; pqueue->pstate[idx] = ITEM_WRITING; ptoken->pqueue = pqueue; ptoken->idx = idx; ptoken->pitem = pqueue->pbuf + (idx * pqueue->item_size); idx = (idx + 1) % pqueue->capacity; if (0 == idx) { // a new cycle pqueue->same_cycle = 0; // tail is not the same cycle with head } pqueue->tail = idx;
// unlock write flag CAS(&pqueue->write_flag, 1, 0);
return 0;}
void* ring_queue_token_item(ring_queue_token_t* ptoken) { return ptoken->pitem;}
int ring_queue_token_finish(ring_queue_token_t* ptoken) { // set item commit state ring_queue_t* pq = ptoken->pqueue; pq->pstate[ptoken->idx] = ITEM_WRITED;
return 0;}
int ring_queue_pop(ring_queue_t* pqueue, void* pitem) { // try to set read flag while (1) { if (ring_queue_is_empty(pqueue)) { return -1; }
if (CAS(&pqueue->read_flag, 0, 1)) { // set read flag successfully break; } }
int iret = -2; if (ITEM_WRITED == pqueue->pstate[pqueue->head]) { // read data memcpy(pitem, pqueue->pbuf + pqueue->head * pqueue->item_size, pqueue->item_size); pqueue->head = (pqueue->head + 1) % pqueue->capacity; if (0 == pqueue->head) { pqueue->same_cycle = 1; // head is now the same cycle with tail } iret = 0; // succ }
// reset read flag CAS(&pqueue->read_flag, 1, 0);
return iret;}

int ring_queue_is_empty(ring_queue_t* pqueue) { return (pqueue->head == pqueue->tail) && pqueue->same_cycle;}
int ring_queue_is_full(ring_queue_t* pqueue) { return (pqueue->head == pqueue->tail) && !pqueue->same_cycle;}
复制代码


写个简单的程序,测试一下(不是多线程的,仅为了基本功能演示用):

// test_ring_queue.c#include "ring_queue.h"#include <stdio.h>
static void test_push(ring_queue_t* pq, ring_queue_token_t* ptk, int val);static void test_pop(ring_queue_t* pq);
int main() { ring_queue_t queue, *pq = &queue; int iret = ring_queue_init(pq, sizeof(int), 3); iret = ring_queue_is_empty(pq); printf("ring_queue is%s empty!\n", iret ? "" : " not");
ring_queue_token_t tk1, tk2, tk3, tk4; iret = ring_queue_request_push(pq, &tk1); iret = ring_queue_request_push(pq, &tk2); iret = ring_queue_request_push(pq, &tk3); iret = ring_queue_request_push(pq, &tk4);
int val = 1; test_push(pq, &tk1, val++); test_push(pq, &tk2, val++); test_push(pq, &tk3, val++);
iret = ring_queue_is_full(pq); printf("ring_queue is%s full!\n", iret ? "" : " not");
test_pop(pq);
iret = ring_queue_request_push(pq, &tk4); test_push(pq, &tk4, val++);
test_pop(pq); test_pop(pq); test_pop(pq); test_pop(pq);
ring_queue_destroy(pq); return 0;}
static void test_push(ring_queue_t* pq, ring_queue_token_t* ptk, int val) { int* p = (int*)ring_queue_token_item(ptk); *p = val; int iret = ring_queue_token_finish(ptk); if (0 == iret) { printf("ring_queue_push succ, val = %d\n", val); } else { printf("ring_queue_push failed! iret = %d\n", iret); }}
static void test_pop(ring_queue_t* pq) { int val = -1; int iret = ring_queue_pop(pq, &val); if (0 == iret) { printf("ring_queue_pop succ, val = %d\n", val); } else { printf("ring_queue_pop failed! iret = %d\n", iret); }}
复制代码


编译后,运行测试程序,输出如下:

$ ./test_ring_queuering_queue is empty!ring_queue_push succ, val = 1ring_queue_push succ, val = 2ring_queue_push succ, val = 3ring_queue is full!ring_queue_pop succ, val = 1ring_queue_push succ, val = 4ring_queue_pop succ, val = 2ring_queue_pop succ, val = 3ring_queue_pop succ, val = 4ring_queue_pop failed! iret = -1
复制代码


我的微信号是 实力程序员,欢迎大家转发至朋友圈,分享给更多的朋友。

发布于: 刚刚阅读数: 2
用户头像

实力程序员,用实力说话! 2021.05.24 加入

超过20年一线产品研发和技术管理的实力程序员

评论

发布
暂无评论
手撕环形队列系列三:多生产者并行写入