《从内核视角看 Linux:环形缓冲区 + 线程池的生产消费模型实现》

【一】环形生产消费模型介绍
“环形”生产消费模型:队列采⽤数组模拟,⽤模运算来模拟环状特性,例如:

《从内核视角看 Linux:环形缓冲区 + 线程池的生产消费模型实现》

特点:

(1)缓存大小初始后是固定的

(2)也符合数据“先进先出”的特点

(3)通过首尾下标来访问数据。Head:只要有空位置就可以一直存放数据

Tail:只要有数据就可以一直获取数据

而位置是否有数据我们可以根据信号量(引用计数,本质为临界资源数量)判断!

(此模型较于普通的“生产消费模型”可以根据信号量做到并发执行,信号量代表资源数量)

为什么信号量不用担心两个线程同时获取一个信号量?

核心原因是sem_wait操作的原子性—— 它将 “检查信号量值” 和 “修改信号量值” 封装为一个不可分割的步骤,确保任何时刻只有一个线程能成功获取资源

例如:核心思想就是“连续”,不给其它线程可乘之机

原子操作的关键是 “没有中间状态”:

信号量值要么是 1(没被拿),要么是 0(被拿了),不会出现 “0.5” 这种中间值;
线程要么拿到资源(信号量值变 0),要么没拿到(阻塞),不会出现 “两个线程都认为自己拿到了” 的情况。
这就是为什么哪怕两个线程 “同时” 抢最后一个信号量,也绝对不会同时拿到 —— 原子性把 “争夺” 变成了 “排队”,先到先得

【二】信号量使用
(1)创建信号量对象
原型:

sem_t 对象变量名;

(2)初始化信号量
原型:

#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);

参数:

sem:指向信号量对象的指针(需提前声明,如sem_t empty_sem;)

pshared:0 表示信号量用于线程间同步(环形模型必选);非 0 表示用于进程间同步(不常用)

value:信号量初始值(表示可用资源的初始数量)

作用:初始化一个信号量

(3)信号量等待
原型:

int sem_wait(sem_t *sem);

参数:指向目标信号量对象的指针

作用:尝试获取信号量资源(将信号量值减 1)

若当前信号量值为 0,线程会阻塞等待,直到有其他线程释放资源(信号量值 > 0)

(4)信号量释放
原型:

int sem_post(sem_t *sem);

参数:指向目标信号量对象的指针

作用:释放信号量资源(将信号量值加 1),并唤醒一个阻塞在该信号量上的线程(若有)

(5)信号量销毁
原型:

int sem_destroy(sem_t *sem);

参数:指向目标信号量对象的指针

作用:销毁已初始化的无名信号量,释放其占用的资源

(6)获取当前信号量
原型:

int sem_getvalue(sem_t *sem, int *sval);

参数:获取信号量的当前值

第一个参数:要获取的信号量对象指针

第二个参数:用于存储信号量当前值

作用:

【三】模型实现
(1)理论框架
生产和消费是根据信号量来判断的,可以同步,所以需要两个信号量初始化

(生产:初始资源为MAX)

(消费:初始资源为0)

在单生产消费的基础上,同样只能单个线程生产与消费,但是可以并发执行,所以需要两个互斥锁

所以类的设计:

//最大容量
#define MAX 5

template<class T>
class cycle_p_c
{
public:
cycle_p_c()
{
//开缓存空间
_buffer.resize(MAX);
//锁
pthread_mutex_init(&p_mutex,NULL);
pthread_mutex_init(&c_mutex,NULL);
//信号量
sem_init(&p_message,0,MAX);
sem_init(&c_message,0,0);
//下标
head,tail=0;
}
//生产
void push_back(const T& date)
{

}
//消费
void pop()
{

}
private:
//数据缓存
std::vector<T> _buffer;
//信号变量*2
sem_t p_message;
sem_t c_message;
//两个下标
size_t head;
size_t tail;
//两个互斥锁(确保每次只有一个生产和一个消费并发执行)
pthread_mutex_t p_mutex;
pthread_mutex_t c_mutex;
};

(2)生产实现
先判断信号量,如果生产的信号量充足(原子性),就可以直接去进入生产(否则就自动等待):

生产需要保证互斥性,否则可能导致超出资源范围,根据模运算来获取资源下标

//生产
void push_back(const T& date)
{
//看信号量是否足够(生产信号量-1)
sem_wait(&p_message);
//此时说明已经申请到信号量
pthread_mutex_lock(&p_mutex);

//生产
head%=MAX;
_buffer[head++]=date;
//消费信号量+1
sem_post(&c_message);

std::cout<<"我生产了:"<<date<<std::endl;

pthread_mutex_unlock(&p_mutex);
}

(3)消费实现
先判断信号量,如果消费的信号量不为0(原子性),就可以直接去进入消费(否则就自动等待)

消费需要保证互斥性,否则可能导致超出资源范围,根据模运算来获取资源下标

//消费
void pop()
{
//看信号量是否足够(消费信号量-1)
sem_wait(&c_message);
//此时说明已经申请到信号量
pthread_mutex_lock(&c_mutex);

//消费
std::cout<<"我消费了:"<<_buffer[tail]<<"...."<<std::endl;
_buffer[tail++]=0;
//生产信号量+1
sem_post(&p_message);

pthread_mutex_unlock(&c_mutex);
}

【四】线程池
线程池我们采用上面的“生产消费模型”或者“环形生产消费模型”来实现都可以,这里以后者为例:

我的设计思路:

主要还是环形生产消费模型的实现,其次在pop()的时候只需要将任务拿出来交给执行函数就可以了,因为是一个线程一次pop(),所以执行任务是不需要加锁的,当然如果你需要打印的话就需要加锁,且所有打印应该是同一把锁,否则打印无法准确观察任务的执行

(1)理论框架
//最大容量
#define MAX 6
//随机数范围
constexpr int min_t=1;
constexpr int max_t=100;

struct Date
{
Date() = default;
Date(int date1,int date2)
:_date1(date1),_date2(date2)
{}

int _date1;
int _date2;
};

//获取随机数
int data_t()
{
// 1. 使用 std::thread_local 声明一个线程局部的 Mersenne Twister 生成器
// std::mt19937 是一个高质量的伪随机数生成器
thread_local static std::mt19937 generator;

// 2. 懒初始化:检查生成器是否已经被种子化
// (一个未被种子化的 std::mt19937 会产生固定的默认序列)
thread_local static bool is_initialized = false;
if (!is_initialized) {
// 3. 使用 std::random_device 来获取一个真随机种子
std::random_device rd;

// 为了确保万无一失,我们可以将多个种子源组合起来
// 这里我们结合了真随机数和线程ID的哈希值
std::seed_seq seq{
rd(),
static_cast<unsigned int>(std::hash<std::thread::id>()(std::this_thread::get_id()))
};

// 4. 用组合后的种子序列来初始化当前线程的生成器
generator.seed(seq);

is_initialized = true;
}

// 5. 定义一个分布,用于生成 [min_t, max_t] 范围内的整数
std::uniform_int_distribution<int> distribution(min_t, max_t);

// 6. 使用生成器和分布来产生最终的随机数
return distribution(generator);
}

//任务执行方法
int calculate(struct Date date)
{
return date._date1+date._date2;
}

//线程池
template<class T>
class cycle_p_c
{
public:
cycle_p_c()
{
//缓存空间
_buffer.resize(MAX);
//锁
pthread_mutex_init(&p_mutex,NULL);
pthread_mutex_init(&c_mutex,NULL);
pthread_mutex_init(&tasks,NULL);
//信号量
sem_init(&p_message,0,MAX);
sem_init(&c_message,0,0);
//下标
tail=0;
head=0;
}
~cycle_p_c()
{
pthread_mutex_destroy(&p_mutex);
pthread_mutex_destroy(&c_mutex);
pthread_mutex_destroy(&tasks);
sem_destroy(&p_message);
sem_destroy(&c_message);
}

//生产
void push_back(const T& date)
{

}
//消费
T pop()
{

}
//处理任务
void Handle_tasks(struct Date* ptr)
{

}
private:
//数据缓存
std::vector<T> _buffer;
//信号变量*3
sem_t p_message;
sem_t c_message;
//两个下标
size_t head;
size_t tail;
//两个互斥锁(确保每次只有一个生产和一个消费并发执行)
pthread_mutex_t p_mutex;
pthread_mutex_t c_mutex;
//任务拿取锁
pthread_mutex_t tasks;
};

(2)生产实现
先看生产信号量(原子性,不需要加锁)是否充足,然后开启互斥锁,开始生产,给消费信号

//生产
void push_back(const T& date)
{
//看信号量是否足够(生产信号量-1)
sem_wait(&p_message);
//此时说明已经申请到信号量
pthread_mutex_lock(&p_mutex);

//生产
head%=MAX;
_buffer[head++]=date;
//消费信号量+1
sem_post(&c_message);
std::cout<<"我创建任务了哦..."<<std::endl;

pthread_mutex_unlock(&p_mutex);
}

(3)消费实现
看消费的信号是否充(原子性),随后开启锁,开始消费,返回任务,用于后面的任务执行函数

//消费
T pop()
{
//看信号量是否足够(消费信号量-1)
sem_wait(&c_message);
//此时说明已经申请到信号量
pthread_mutex_lock(&c_mutex);

//消费
struct Date date=_buffer[tail++];
tail%=MAX;
//生产信号量+1
sem_post(&p_message);

pthread_mutex_unlock(&c_mutex);
return date;
}

(4)任务执行
如果需要打印任务来进行就需要加锁,确保每个任务的执行打印是独一无二的,否则会乱序

//处理任务
void Handle_tasks(struct Date* ptr)
{
pthread_mutex_lock(&tasks);

std::cout<<"date1="<<(ptr->_date1)<<" date2="<<(ptr->_date2)<<" "<<"date1+date2=="
<<calculate(*ptr)<<std::endl;

pthread_mutex_unlock(&tasks);
}

(5)main函数
#include"thread_pool.h"

typedef cycle_p_c<Date> cycle;

//派发任务
void* haddle_p(void* arg)
{
cycle *ptr=(cycle*)arg;

//创建任务
int date1=data_t();
int date2=data_t();
struct Date date(date1,date2);
sleep(1);

ptr->push_back(date);

return NULL;
}
//处理任务
void* haddle_c(void* arg)
{
cycle *ptr=(cycle*)arg;

//先获取任务
struct Date date=ptr->pop();
//执行任务
ptr->Handle_tasks(&date);

return NULL;
}

int main()
{

cycle *ptr=new cycle;
while(1)
{
//任务生成
pthread_t pthread_p;
pthread_create(&pthread_p,NULL,haddle_p,ptr);

//任务执行线程
pthread_t pthread_c;
pthread_create(&pthread_c,NULL,haddle_c,ptr);

sleep(1);
}

return 0;
}

(6)效果展示
如图出现“一次创建”“两次打印任务”是因为从内核来看该线程的时间片到了:线程完成对信号量的操作,此时有了消费信号,但无法执行打印,而生产和消费是并发执行的,此时就轮到消费了,生产需要重新进入运行队列重新执行,因此将打印提前,在线程的时间片结束前执行打印即可解决!

《从内核视角看 Linux:环形缓冲区 + 线程池的生产消费模型实现》

【五】单例模式
(1)什么是单例模式
某些类,只应该具有⼀个对象(实例),就称之为单例。这点我们将构造私,通过静态方式即可解决!

(为什么需要静态的?静态成员在进程形成时就已经形成,而普通类需要main函数栈帧形成)

而静态方式可以选择直接实例化对象,也可以先只形成对象指针(用的时候再实例化)

很明显,选择后者更佳,二者虽都是实例化一个对象,但是后者可以一定时刻减少内存负担!

(2)如何完成单例模式
总结:构造私有(不能delete,否则会调用系统默认的)+delete拷贝构造与赋值+锁+静态成 员内部形成唯一的对象指针并返回

(3)例如:
#include<iostream>
#include<unistd.h>
#include<pthread.h>

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

template<class T>
class Ceshi
{
public:

// 禁止拷贝构造和赋值操作
Ceshi(const Ceshi&) = delete;
Ceshi& operator=(const Ceshi&) = delete;

static Ceshi<T>* handle()
{
if (instance == nullptr)
{
pthread_mutex_lock(&mutex);
if (instance == nullptr)
{
instance = new Ceshi<T>;
}
pthread_mutex_unlock(&mutex);
}
return instance;
}

void text()
{
std::cout<<"Hello World"<<std::endl;
}

private:
Ceshi()
{}
static Ceshi<T>* instance;
};

// 模板类静态成员变量类外初始化
template<class T>
Ceshi<T>* Ceshi<T>::instance = nullptr;

————————————————
版权声明:本文为CSDN博主「雾非雾の迷惘」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/Dovis5884/article/details/154694661

阅读剩余
THE END