#include "loghandle.h"
#include "wwwi/errno.h"
#include "wwwi/exception.h"
#include "wwwi/mutexholder.h"
#include "wwwi/rwlocker.h"
#include "wwwi/semaphoreholder.h"
using WWWI::Exception;
using WWWI::GetErrorString;
using WWWI::MutexHolder;
using WWWI::RWLocker;
using WWWI::SemaphoreHolder;
namespace {
template <class Worker, class Initializer>
void *DTPTrampoline(void *i_vpTP) {
typedef DynamicThreadPool<Worker,Initializer> DTPWI;
DTPWI *dtp;
int iRC;
try {
dtp = (DTPWI*)(i_vpTP);
iRC = dtp->Thread();
} catch (MindEx &exr) {
LogHandle lh(LF_MISC,LY_FAILURE);
lh << "DTPTrampoline: fatal " << exr.name() << ": "
<< exr.what() << endl;
} catch (exception &exr) {
LogHandle lh(LF_MISC,LY_FAILURE);
lh << "DTPTrampoline: fatal exception: " << exr.what() << endl;
} catch (WWWI::Exception &exr) {
LogHandle lh(LF_MISC,LY_FAILURE);
lh << "DTPTrampoline: library exception: " << exr.m_strMessage << endl;
}
return (void*)iRC;
}
}
template <class Worker, class Initializer>
DynamicThreadPool<Worker,Initializer>::DynamicThreadPool(unsigned i_uMinIdle, unsigned i_uMaxIdle, Initializer i_in) : m_seWaiting(0) {
unsigned u;
LogHandle lh(LF_THREADS,LY_DEBUG);
m_uMinIdle = i_uMinIdle;
m_uMaxIdle = i_uMaxIdle;
m_in = i_in;
m_bShuttingDown = false;
m_uThreads = 0;
for(u=0; u<i_uMinIdle; u++) this->NewThread();
}
template <class Worker, class Initializer>
void DynamicThreadPool<Worker,Initializer>::NewThread() {
MutexHolder mh(&m_mxNew);
LogHandle lh(LF_THREADS,LY_DEBUG);
pthread_attr_t pa;
pthread_t pt;
int iErr;
if (m_bShuttingDown==true) return;
pthread_attr_init(&pa);
pthread_attr_setstacksize(&pa,65536);
m_uThreads++;
lh << "threads increase to " << m_uThreads << endl; lh();
iErr = pthread_create(&pt,&pa,DTPTrampoline<Worker,Initializer>,this);
if (iErr!=0) {
throw Exception("DynamicThreadPool::NewThread: pthread_create() failed: %s",GetErrorString(iErr));
}
}
template <class Worker, class Initializer>
void DynamicThreadPool<Worker,Initializer>::PleaseShutdown() {
m_bShuttingDown = true;
}
template <class Worker, class Initializer>
int DynamicThreadPool<Worker,Initializer>::Thread() {
SemaphoreHolder sh(&m_seWaiting,false);
LogHandle lh(LF_THREADS,LY_DEBUG);
RWLocker rk(&m_rwThreads,false);
Worker wk(m_in);
unsigned uIdle;
lh << "threads Go!"; lh();
sh.Post();
uIdle = m_seWaiting.GetValue();
while(m_bShuttingDown==false) {
if (uIdle>m_uMaxIdle) {
lh << "threads idle " << uIdle << " exceed maximum " << m_uMaxIdle << endl;
goto dtptStop;
}
if (wk.Wait()==true) {
sh.Wait();
wk.Work();
sh.Post();
}
uIdle = m_seWaiting.GetValue();
if ((uIdle<m_uMinIdle)&&(m_bShuttingDown==false)) {
lh << "threads idle " << uIdle << " below minimum " << m_uMinIdle << " starting" << endl;
this->NewThread();
}
}
dtptStop:
MutexHolder mh(&m_mxNew);
m_uThreads--;
lh << "threads decrease to " << m_uThreads; lh();
return 0;
}
template <class Worker, class Initializer>
DynamicThreadPool<Worker,Initializer>::~DynamicThreadPool() {
LogHandle lh(LF_THREADS,LY_DEBUG);
lh << "Worker threads shutting down." << endl; lh();
this->PleaseShutdown();
RWLocker rk(&m_rwThreads,true);
}