首页 技术 正文
技术 2022年11月18日
0 收藏 658 点赞 3,913 浏览 6450 个字

  最近打算做一个服务器端程序,每来一个客户端请求新开一个线程进行处理。在网上查了一些资料后,准备使用线程池来做这个东西。使用C++11新的库处理想线程问题比以前简单了许多,在网上找到一份线程池的实现,http://blog.csdn.net/goldenhawking/article/details/7919547    这个线程池实现中,每一个线程都维护一个任务队列,我觉得这么做不利于任务调度,有可能某个线程的待执行任务列表很长,而其他线程则在休眠。

下面是我自己实现的一个线程池,任务队列由线程池类来分配,每个线程只有拿到任务的时候才执行,其他时间是阻塞的。另外,如果没有任务需要执行,那么分配任务的线程也会阻塞,直到来了新任务。 代码在VS2013下测试通过。

本人菜鸟一枚,欢迎指出错误。

threadInstance.h

 #pragma once #include <iostream>
#include <thread>
#include <mutex>
#include <functional>
#include <list>
#include <atomic>
#include <vector>
#include <algorithm>
#include <memory>
#include <condition_variable> class ThreadPool; //thread class
class ThreadInstance
{
public:
ThreadInstance(ThreadPool *theadPool)
:m_stop(false), m_pthread(nullptr), m_threadPool(theadPool)
{ }
virtual ~ThreadInstance()
{
if (m_pthread != nullptr)
{
m_pthread->join();
delete m_pthread;
} } void begin()
{
m_pthread = new std::thread(std::bind(&ThreadInstance::run, this));
}
void run();
void join()
{
m_stop = true;
std::lock_guard<std::mutex> lg(m_mutex_task);
m_task = nullptr;
m_cond_task_ready.notify_one();
} void set_task(std::function<void(void)> &task)
{
std::lock_guard<std::mutex> lg(m_mutex_task);
m_task = task;
} public:
//condition_variable to wait for task
std::condition_variable m_cond_task_ready;
protected:
//flag used to terminate the thread
std::atomic< bool> m_stop;
//mutex used by member m_cond_task_ready
std::mutex m_mutex_cond;
//int m_id; //task to be executed
std::function<void(void)> m_task;
// mutex to protect m_task
std::mutex m_mutex_task;
//pointer to thread
std::thread *m_pthread;
// pointer to thread pool
ThreadPool *m_threadPool;
};

threadInstance.cpp

 #include "threadInstance.h"
#include "threadPool.h" void ThreadInstance::run()
{ while (true)
{
//auto x = std::this_thread::get_id();
std::unique_lock<std::mutex> lck(m_mutex_cond);
m_cond_task_ready.wait(lck); if (m_stop)
{
break;
}
m_task(); //shared_ptr<ThreadInstance> ptr(this);
m_threadPool->append_free_thread(this);
} }

threadPool.h

 #pragma once #include <thread>
#include <mutex>
#include <functional>
#include <list>
#include <atomic>
#include <vector>
#include <algorithm>
#include <memory>
#include <condition_variable> //semaphore class used to represent free threads and task
class Semaphore {
public:
Semaphore(int value = ) : count(value)
{} void wait(){
std::unique_lock<std::mutex> lock(m_mutex);
if (--count < ) // count is not enough ?
condition.wait(lock); // suspend and wait...
}
void signal(){
std::lock_guard<std::mutex> lock(m_mutex);
//if (++count <= 0) // have some thread suspended ?
count++;
condition.notify_one(); // notify one !
} private:
int count;
std::mutex m_mutex;
std::condition_variable condition;
}; class ThreadInstance; //the thread pool class
class ThreadPool
{
public:
ThreadPool(int nThreads);
~ThreadPool();
public:
//total threads;
size_t count(){ return m_vec_threads.size(); } //wait until all threads is terminated;
void join_all(); //append task to the thread pool
void append(std::function< void(void) > func);
//start service
void start();
//append free thread to free thread list
void append_free_thread(ThreadInstance* pfThread); protected:
//function to be execute in a separate thread
void start_thread(); public:
//NO. threads
int m_n_threads;
//flag used to stop the thread pool service
std::atomic<bool> m_stop; Semaphore m_sem_free_threads;
Semaphore m_sem_task; //list contains all the free threads
std::list<ThreadInstance*> m_list_free_threads;
//vector contains all the threads
std::vector<ThreadInstance* > m_vec_threads; //std::mutex m_mutex_list_task;
std::list<std::function<void(void)>> m_list_tasks;
};

threadPool.cpp

#include "threadPool.h"
#include "threadInstance.h"std::mutex cond_mutex;
std::condition_variable cond_incoming_task;ThreadPool::ThreadPool(int nThreads)
:m_n_threads(nThreads), m_sem_free_threads(nThreads), m_sem_task(), m_stop(false)
{
for (int i = ; i < nThreads; i++)
{
ThreadInstance* ptr=new ThreadInstance(this);
m_vec_threads.push_back(ptr);
m_list_free_threads.push_back(ptr);
}}ThreadPool::~ThreadPool()
{
for (int i = ; i != m_n_threads; ++i)
{
//m_vec_threads[i]->join();
delete m_vec_threads[i];
}
}void ThreadPool::start()
{
//to avoid blocking the main thread
std::thread t(std::bind(&ThreadPool::start_thread, this));
t.detach();
}
void ThreadPool::start_thread()
{
for (auto free_thread:m_list_free_threads)
{
free_thread->begin();
} while (true)
{
//whether there's free thread and existing task
m_sem_free_threads.wait();
m_sem_task.wait(); if (m_stop)
{
break;
} // take a free thread
ThreadInstance* ptr = m_list_free_threads.front();
m_list_free_threads.pop_front(); ptr->set_task(m_list_tasks.front());
m_list_tasks.pop_front(); // awaken a suspended thread
ptr->m_cond_task_ready.notify_one();
}
}
void ThreadPool::append(std::function< void(void) > func)
{
//std::lock_guard<std::mutex> lg(m_mutex_list_task);
m_list_tasks.push_back(func);
m_sem_task.signal();
}void ThreadPool::append_free_thread(ThreadInstance* pfThread)
{
//this function only push back thread in free thread list
// it does not need to lock the list //m_mutex_free_thread.lock();
m_list_free_threads.push_back(pfThread);
//m_mutex_free_thread.unlock();
m_sem_free_threads.signal();}void ThreadPool::join_all()
{
std::for_each(m_vec_threads.begin(), m_vec_threads.end(), [this](ThreadInstance* & item)
{
item->join();
}); m_stop = true;
m_sem_free_threads.signal();
m_sem_task.signal();}

用于测试的main函数

 #include <iostream>
#include <thread>
#include <mutex>
#include <functional>
#include <list>
#include <atomic>
#include <vector>
#include <algorithm>
#include <memory>
#include <condition_variable> #include "threadPool.h"
//#include <vld.h> using namespace std;
class A
{
public:
A()
{}
~A(){}
public:
void foo(int k)
{
//sleep for a while
std::this_thread::sleep_for(std::chrono::milliseconds(rand() % + ));
std::cout << "k = " << k << std::endl; }
}; //a function which will be executed in sub thread.
void hello()
{
//sleep for a while
std::this_thread::sleep_for(std::chrono::milliseconds());
cout << "hello \n";
} //let's test the thread.
int main()
{
srand(); ThreadPool g_threadPool();
A a; g_threadPool.append(&hello); //append object method with copy-constructor(value-assignment)
g_threadPool.append(std::bind(&A::foo, a, ));
g_threadPool.append(std::bind(&A::foo, a, ));
g_threadPool.append(std::bind(&A::foo, a, ));
g_threadPool.append(std::bind(&A::foo, a, )); //auto beg = std::chrono::high_resolution_clock().now(); g_threadPool.start(); //std::this_thread::sleep_for(std::chrono::milliseconds(5000)); g_threadPool.append(&hello);
//append object method with address assignment, will cause the objects' member increase.
g_threadPool.append(std::bind(&A::foo, &a, ));
g_threadPool.append(std::bind(&A::foo, &a, ));
g_threadPool.append(std::bind(&A::foo, &a, ));
g_threadPool.append(std::bind(&A::foo, &a, )); //std::this_thread::sleep_for(std::chrono::seconds(5));
char temp;
cin >> temp;
if (temp == 'e')
{
g_threadPool.join_all();
} //auto end = std::chrono::high_resolution_clock().now();
//auto dd = std::chrono::duration_cast<chrono::seconds>(end - beg);
//cout << dd.count() << endl; return ;
}
相关推荐
python开发_常用的python模块及安装方法
adodb:我们领导推荐的数据库连接组件bsddb3:BerkeleyDB的连接组件Cheetah-1.0:我比较喜欢这个版本的cheeta…
日期:2022-11-24 点赞:878 阅读:9,071
Educational Codeforces Round 11 C. Hard Process 二分
C. Hard Process题目连接:http://www.codeforces.com/contest/660/problem/CDes…
日期:2022-11-24 点赞:807 阅读:5,549
下载Ubuntn 17.04 内核源代码
zengkefu@server1:/usr/src$ uname -aLinux server1 4.10.0-19-generic #21…
日期:2022-11-24 点赞:569 阅读:6,397
可用Active Desktop Calendar V7.86 注册码序列号
可用Active Desktop Calendar V7.86 注册码序列号Name: www.greendown.cn Code: &nb…
日期:2022-11-24 点赞:733 阅读:6,174
Android调用系统相机、自定义相机、处理大图片
Android调用系统相机和自定义相机实例本博文主要是介绍了android上使用相机进行拍照并显示的两种方式,并且由于涉及到要把拍到的照片显…
日期:2022-11-24 点赞:512 阅读:7,809
Struts的使用
一、Struts2的获取  Struts的官方网站为:http://struts.apache.org/  下载完Struts2的jar包,…
日期:2022-11-24 点赞:671 阅读:4,889