[linux仓库]线程池

线程池
⼀种线程使⽤模式。线程过多会带来调度开销,进⽽影响缓存局部性和整体性能。⽽线程池维护着多个线程,等待着监督管理者分配可并发执⾏的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利⽤,还能防⽌过分调度。可⽤线程数量应该取决于可⽤的并发处理器、处理器内核、内存、⽹络sockets等的数量。

应用场景
需要⼤量的线程来完成任务,且完成任务的时间⽐较短。 ⽐如WEB服务器完成⽹⻚请求这样的任务,使⽤线程池技术是⾮常合适的。因为单个任务⼩,⽽任务数量巨⼤,你可以想象⼀个热⻔⽹站的点击次数。 但对于⻓时间的任务,⽐如⼀个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间⽐线程的创建时间⼤多了。
对性能要求苛刻的应⽤,⽐如要求服务器迅速响应客户请求。
接受突发性的⼤量请求,但不⾄于使服务器因此产⽣⼤量线程的应⽤。突发性⼤量客⼾请求,在没有线程池情况下,将产⽣⼤量线程,虽然理论上⼤部分操作系统线程数⽬最⼤值不是问题,短时间内产⽣⼤量线程可能使内存到达极限,出现错误.
种类
创建固定数量线程池,循环从任务队列中获取任务对象,获取到任务对象后,执⾏任务对象中的任务接⼝
浮动线程池,其他同上

线程描述与组织
既然是一个线程池,需要创建大量的线程,让线程从任务队列中取数据来执行任务.要对线程做创建,控制,回收,因此需要对线程做管理,如何管理呢?需要能够描述线程和做组织!

好在我们之前做过了对线程的封装,其封装如下:

Thread.hpp

#define get_lwp_id() syscall(SYS_gettid)

using func_t = std::function<void(const std::string&name)>;
const std::string threadnamedefault = "None-Name";

class Thread
{
public:
Thread(func_t func, const std::string &name = threadnamedefault)
: _name(name),
_func(func),
_isrunning(false)
{
LOG(LogLevel::INFO) << _name << " create thread obj success";
}
static void *start_routine(void *args)
{
Thread *self = static_cast<Thread *>(args);
self->_isrunning = true;
self->_lwpid = get_lwp_id();
self->_func(self->_name);
pthread_exit((void *)0);
}
void Start()
{
int n = pthread_create(&_tid, nullptr, start_routine, this);
if (n == 0)
{
LOG(LogLevel::INFO) << _name << " running success";
}
}
void Stop()
{
int n = pthread_cancel(_tid); // 太简单粗暴了
(void)n;
}
// void Die()
// {
// pthread_cancel(_tid);
// }
// 检测线程结束并且回收的功能
void Join()
{
if (!_isrunning)
return;

int n = pthread_join(_tid, nullptr);
if (n == 0)
{
LOG(LogLevel::INFO) << _name << " pthread_join success";
}
}
~Thread()
{
// LOG(LogLevel::INFO) << _name << " destory thread obj success";
}

private:
bool _isrunning;
pthread_t _tid;
pid_t _lwpid;
std::string _name;
func_t _func;
};

#endif

线程管理
template<typename T>

class ThreadPool
{
private:
std::queue<T> _q; //整体使用的临界资源
};

我们使用_q队列来表示任务队列,既然存在很多线程从任务队列里取数据,那这不就是多线程并发访问共享资源吗?因此,需要对共享资源做保护啊!如何保护呢?互斥锁.可是,该加一把锁呢还是,这里以一把锁就可以解决多线程并发访问共享资源的问题.

#pragma once

#include <pthread.h>
#include <iostream>
#include <unistd.h>
#include <string>

class Mutex
{
public:
Mutex()
{
// 初始化锁
pthread_mutex_init(&_lock, nullptr);
}

void Lock()
{
// 加锁
pthread_mutex_lock(&_lock);
}

void Unlock()
{
// 解锁
pthread_mutex_unlock(&_lock);
}

pthread_mutex_t *Get()
{
return &_lock;
}

~Mutex()
{
// 毁坏锁
pthread_mutex_destroy(&_lock);
}

private:
pthread_mutex_t _lock;
};

class LockGuard
{
public:
LockGuard(Mutex *mutex) : _mutexp(mutex)
{
_mutexp->Lock();
}

~LockGuard()
{
_mutexp->Unlock();
}

private:
Mutex *_mutexp;
};

可是,单单只有锁是不够的,会存在这种问题.如果某个线程的竞争能力特别强呢?我用户都没往任务队列里放数据,你这个线程就不断加锁,检查任务队列是否有数据,在解锁,这反而会导致效率降低.那么能不能当用户往任务队列里放数据后,再通知线程们去执行这个任务呢?当然可以,如果没有放数据,此时线程们都需要进入休眠,需要使用到条件变量.

#pragma once

#include<pthread.h>
#include"Mutex.hpp"

class Cond
{
public:
Cond()
{
pthread_cond_init(&_cond,nullptr);
}

void Wait(Mutex &mutex)
{
int n = pthread_cond_wait(&_cond,mutex.Get());
}

void NotifyOne()
{
int n = pthread_cond_signal(&_cond);
(void)n;
}

void NotifyAll()
{
int n = pthread_cond_broadcast(&_cond);
(void)n;
}

~Cond()
{
pthread_cond_destroy(&_cond);
}
private:
pthread_cond_t _cond;
};

线程池
前置工作准备完毕,那么就可以开始对线程池的封装了!

template <class T>
class ThreadPool
{
private:
// 任务队列
std::queue<T> _q; // 整体使用的临界资源

// 多个线程
std::vector<Thread> _threads; // 1. 创建线程对象 2. 让线程对象启动
int _threadnum;
int _wait_thread_num;

// 保护机制
Mutex _lock;
Cond _cond;

// 其他属性
bool _is_running;
};

这里的std::vetctor<Thread> _threads是否创建了线程对象呢?

可以肯定的是肯定是没有的,相当于你只是拥有了线程这个图纸,里面含有线程的相关属性.

构造函数
// 线程池
const static int defaultthreadnum = 3; // for debug

template <class T>
class ThreadPool
{
private:
void Routine(const std::string &name)
{
while (true)
{
//...
}
}

public:
ThreadPool(int threadnum = defaultthreadnum)
: _threadnum(threadnum), _is_running(false), _wait_thread_num(0)
{
for (int i = 0; i < _threadnum; i++)
{
//...
}
LOG(LogLevel::INFO) << "thread pool obj create success";
}
};

问题1:构造函数这里的for循环时对象存在了吗?

存在了的,在初始化列表这里对对象进行初始化,进行了开辟空间,已然可以访问类内属性.

问题2:构造函数并不需要启动线程池,只负责获取资源并设置其要执行的任务。只有当ThreadPool对象调用Start成员函数时,才需要让线程池启动起来.

我们的想法是这样的:在这里将this指针与任务执行方法进行绑定,当线程池启动起来时,能够调用Thread线程对象的start函数,里面的pthread_create函数会调用start_routine函数,函数体内的

self->_func(self->_name);

会因为包装器的原因,回调到ThreadPool的任务执行方法.这种思想可以做到模块与模块之间的解耦合,可是该怎么做呢?

方法一:使用bind函数

std::string name = "thread-" + std::to_string(i + 1);

auto f = std::bind(Routine, this);

Thread t([this](const std::string &name)
{ this->Routine(name); }
, name);
_threads.push_back(std::move(t));

bind函数,将Routine方法与this指针进行绑定,再通过Lambda表达式

[this](const std::string &name){ this->Routine(name); }

与name将这两参数传递给Thread的构造函数进行构造.

方法二:使用emplace_back函数

std::string name = "thread-" + std::to_string(i + 1);
_threads.emplace_back([this](const std::string &name)
{ this->Routine(name); }, name);

emplace_back函数的作用:

直接在std::vector的内存空间中传递Lambda表达式和name的两个参数构造Thread对象,避免了创建临时对象和可能的拷贝/移动操作。

线程启动、取消与回收
线程启动

void Start()
{
if (_is_running)
return;
_is_running = true;
for (auto &t : _threads)
{
t.Start();
}
LOG(LogLevel::INFO) << "thread pool running success";
}

线程取消
void Stop()
{
if (!_is_running)
return;
_is_running = false;
for (auto &t : _threads)
{
t.Stop();
}
LOG(LogLevel::INFO) << "thread pool stop success";
}

但实际上上面这种做法并不推荐,太过于简单粗暴了,考虑的因素太少了.

一个线程池要退出时,应该让线程走征程的唤醒逻辑以及退出啊!

如果被唤醒 && 任务队列没有任务 = 让线程退出

如果被唤醒 && 任务队列有任务 = 线程不能立即退出,而应该让线程把任务处理完,在退出

线程本身没有被休眠,我们应该让他把他能处理的任务全部处理完成, 在退出

可以发现第3种情况可以归到第2种情况中,线程取消如下所示,为什么是这样呢?我们来看看任务执行是怎样做的.

void Stop()
{
if (!_is_running)
return;
_is_running = false;
if (_wait_thread_num)
_cond.NotifyAll();
}

任务执行
任务队列为空并且线程池还在运行才要去休眠啊;
任务队列为空并且线程池关闭时让所有线程执行完自己的任务后退出;
任务队列不为空时,让线程们把任务拿完时,此时又轮到了情况1或情况2.
bool QueueIsEmpty()
{
return _q.empty();
}

void Routine(const std::string &name)
{
while (true)
{
// 把任务从线程获取到线程私有!临界区 -> 私有的栈
T t;
{
LockGuard lockguard(&_lock);
while (QueueIsEmpty() && _is_running)
{
_wait_thread_num++;
_cond.Wait(_lock);
_wait_thread_num--;
}
if (!_is_running && QueueIsEmpty())
{
LOG(LogLevel::INFO) << " 线程池退出 && 任务队列为空, " << name << " 退出";
break;
}
// 队列中一定有任务了!, 但是
// 1. 线程池退出 -- 消耗历史
// 2. 线程池没有退出 -- 正常工作
t = _q.front();
_q.pop();
}
t(); // 规定,未来的任务,必须这样处理!,处理任务需要再临界区内部进行吗?1 or 0
// for debug
LOG(LogLevel::DEBUG) << name << " handler task: " << t.Result2String();
}
}

回收线程
void Wait()
{
for (auto &t : _threads)
{
t.Join();
}
LOG(LogLevel::INFO) << "thread pool wait success";
}

往线程池入数据
void Enqueue(const T &t)
{
if (!_is_running)
return;
{
LockGuard lockguard(&_lock);

_q.push(t);
if (_wait_thread_num > 0)
_cond.NotifyOne();
}
}

————————————————
版权声明:本文为CSDN博主「egoist2023」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/egoist2023/article/details/151734988

阅读剩余
THE END
阿里云ECS特惠活动
阿里云ECS服务器 - 限时特惠活动

云服务器爆款直降90%

新客首单¥68起 | 人人可享99元套餐,续费同价 | u2a指定配置低至2.5折1年,立即选购享更多福利!

新客首单¥68起
人人可享99元套餐
弹性计费
7x24小时售后
立即查看活动详情
阿里云ECS服务器特惠活动