1: 
   2: 
   3: /**********************
   4: *                     *
   5: *  COMPILER INCLUDES  *
   6: *                     *
   7: **********************/
   8: 
   9: 
  10: /*********************
  11: *                    *
  12: *  PROJECT INCLUDES  *
  13: *                    *
  14: *********************/
  15: 
  16: 
  17: #include "loghandle.h" 
  18: #include "wwwi/errno.h" 
  19: #include "wwwi/exception.h" 
  20: #include "wwwi/mutexholder.h" 
  21: #include "wwwi/rwlocker.h" 
  22: #include "wwwi/semaphoreholder.h" 
  23: 
  24: 
  25: using WWWI::Exception;
  26: using WWWI::GetErrorString;
  27: using WWWI::MutexHolder;
  28: using WWWI::RWLocker;
  29: using WWWI::SemaphoreHolder;
  30: 
  31: 
  32: /***************************
  33: *                          *
  34: *  FUNCTION DTPTRAMPOLINE  *
  35: *                          *
  36: ***************************/
  37: 
  38: namespace {
  39: 
  40: template <class Worker, class Initializer> 
  41: void *DTPTrampoline(void *i_vpTP) {
  42:   typedef DynamicThreadPool<Worker,Initializer> DTPWI;
  43:   DTPWI *dtp;
  44:   int iRC;
  45: 
  46:   try {
  47:     dtp = (DTPWI*)(i_vpTP);
  48:     iRC = dtp->Thread();
  49:   } catch (MindEx &exr) {
  50:     LogHandle lh(LF_MISC,LY_FAILURE);
  51:     lh << "DTPTrampoline: fatal " << exr.name() << ": "
  52:        << exr.what() << endl;
  53:   } catch (exception &exr) {
  54:     LogHandle lh(LF_MISC,LY_FAILURE);
  55:     lh << "DTPTrampoline: fatal exception: " << exr.what() << endl;
  56:   } catch (WWWI::Exception &exr) {  
  57:     LogHandle lh(LF_MISC,LY_FAILURE);
  58:     lh << "DTPTrampoline: library exception: " << exr.m_strMessage << endl;
  59:   }
  60:   return (void*)iRC;
  61: }
  62: 
  63: }
  64: 
  65: /**********************************
  66: *                                 *
  67: *  DYNAMICTHREADPOOL CONSTRUCTOR  *
  68: *                                 *
  69: **********************************/
  70: 
  71: 
  72: template <class Worker, class Initializer> 
  73: DynamicThreadPool<Worker,Initializer>::DynamicThreadPool(unsigned i_uMinIdle, unsigned i_uMaxIdle, Initializer i_in) : m_seWaiting(0) {
  74:   unsigned u;
  75:   LogHandle lh(LF_THREADS,LY_DEBUG);
  76:  
  77:   m_uMinIdle = i_uMinIdle;
  78:   m_uMaxIdle = i_uMaxIdle;
  79:   m_in = i_in;
  80:   m_bShuttingDown = false;
  81:   m_uThreads = 0;
  82: 
  83:   for(u=0; u<i_uMinIdle; u++) this->NewThread();
  84: }
  85: 
  86: 
  87: /*********************
  88: *                    *
  89: *  METHOD NEWTHREAD  *
  90: *                    *
  91: *********************/
  92: 
  93: 
  94: template <class Worker, class Initializer>
  95: void DynamicThreadPool<Worker,Initializer>::NewThread() {
  96:   MutexHolder mh(&m_mxNew);
  97:   LogHandle lh(LF_THREADS,LY_DEBUG);
  98:   pthread_attr_t pa;
  99:   pthread_t pt;
 100:   int iErr;
 101: 
 102:   if (m_bShuttingDown==true) return;
 103:   pthread_attr_init(&pa);
 104:   pthread_attr_setstacksize(&pa,65536);
 105: 
 106:   m_uThreads++;
 107:   lh << "threads increase to " << m_uThreads << endl; lh();
 108:   iErr = pthread_create(&pt,&pa,DTPTrampoline<Worker,Initializer>,this); 
 109:   if (iErr!=0) {
 110:     throw Exception("DynamicThreadPool::NewThread: pthread_create() failed: %s",GetErrorString(iErr));
 111:   }
 112: 
 113: }
 114: 
 115: 
 116: /**************************
 117: *                         *
 118: *  METHOD PLEASESHUTDOWN  *
 119: *                         *
 120: **************************/
 121: 
 122: 
 123: template <class Worker, class Initializer>
 124: void DynamicThreadPool<Worker,Initializer>::PleaseShutdown() {
 125:   m_bShuttingDown = true;
 126: }
 127: 
 128: 
 129: /******************
 130: *                 *
 131: *  METHOD THREAD  *
 132: *                 *
 133: ******************/
 134: 
 135: 
 136: template <class Worker, class Initializer>
 137: int DynamicThreadPool<Worker,Initializer>::Thread() {
 138:   SemaphoreHolder sh(&m_seWaiting,false);
 139:   LogHandle lh(LF_THREADS,LY_DEBUG);
 140:   RWLocker rk(&m_rwThreads,false);
 141:   Worker wk(m_in);
 142:   unsigned uIdle;
 143: 
 144:   lh << "threads Go!"; lh();
 145: 
 146:   sh.Post();
 147:   uIdle = m_seWaiting.GetValue();
 148:   while(m_bShuttingDown==false) {
 149:     if (uIdle>m_uMaxIdle) {
 150:       lh << "threads idle " << uIdle << " exceed maximum " << m_uMaxIdle << endl;
 151:       goto dtptStop;
 152:     }
 153:     if (wk.Wait()==true) {
 154:       sh.Wait();
 155:       wk.Work();
 156:       sh.Post();
 157:     }
 158:     uIdle = m_seWaiting.GetValue();
 159:     if ((uIdle<m_uMinIdle)&&(m_bShuttingDown==false)) {
 160:       lh << "threads idle " << uIdle << " below minimum " << m_uMinIdle << " starting" << endl;
 161:       this->NewThread();
 162:     }
 163:   }
 164: 
 165: dtptStop:
 166:   MutexHolder mh(&m_mxNew);
 167:   m_uThreads--;
 168:   lh << "threads decrease to " << m_uThreads; lh();
 169:   return 0;
 170: }
 171: 
 172: 
 173: /*********************************
 174: *                                *
 175: *  DYNAMICTHREADPOOL DESTRUCTOR  *
 176: *                                *
 177: *********************************/
 178: 
 179: 
 180: template <class Worker, class Initializer>
 181: DynamicThreadPool<Worker,Initializer>::~DynamicThreadPool() {
 182:   LogHandle lh(LF_THREADS,LY_DEBUG);
 183:   lh << "Worker threads shutting down." << endl; lh();
 184:   this->PleaseShutdown();
 185:   RWLocker rk(&m_rwThreads,true);
 186: }
 187: 
 188: 
 189: