libtdepim

weaver.cpp
1 /* -*- C++ -*-
2 
3  This file implements the Weaver, Job and Thread classes.
4 
5  $ Author: Mirko Boehm $
6  $ Copyright: (C) 2004, Mirko Boehm $
7  $ Contact: mirko@kde.org
8  http://www.kde.org
9  http://www.hackerbuero.org $
10  $ License: LGPL with the following explicit clarification:
11  This code may be linked against any version of the TQt toolkit
12  from Troll Tech, Norway. $
13 
14 */
15 
16 extern "C" {
17 #include <signal.h>
18 }
19 
20 #include <tqevent.h>
21 #include <tqapplication.h>
22 
23 #include "weaver.h"
24 
25 namespace KPIM {
26 namespace ThreadWeaver {
27 
28  bool Debug = true;
29  int DebugLevel = 2;
30 
31  Job::Job (TQObject* parent, const char* name)
32  : TQObject (parent, name),
33  m_finished (false),
34  m_mutex (new TQMutex (true) ),
35  m_thread (0)
36  {
37  }
38 
40  {
41  }
42 
43  void Job::lock()
44  {
45  m_mutex->lock();
46  }
47 
48  void Job::unlock()
49  {
50  m_mutex->unlock();
51  }
52 
53  void Job::execute(Thread *th)
54  {
55  m_mutex->lock();
56  m_thread = th;
57  m_mutex->unlock();
58 
59  run ();
60 
61  m_mutex->lock();
62  setFinished (true);
63  m_thread = 0;
64  m_mutex->unlock();
65  }
66 
68  {
69  TQMutexLocker l (m_mutex);
70  return m_thread;
71  }
72 
73  bool Job::isFinished() const
74  {
75  TQMutexLocker l (m_mutex);
76  return m_finished;
77  }
78 
79  void Job::setFinished(bool status)
80  {
81  TQMutexLocker l (m_mutex);
82  m_finished = status;
83  }
84 
86  {
87  switch ( e->action() )
88  {
89  case Event::JobStarted:
90  emit ( started() );
91  break;
92  case Event::JobFinished:
93  emit ( done() );
94  break;
95  case Event::JobSPR:
96  emit ( SPR () );
97  m_wc->wakeOne ();
98  break;
99  case Event::JobAPR:
100  emit ( APR () );
101  // no wake here !
102  break;
103  default:
104  break;
105  }
106  }
107 
109  {
110  m_mutex->lock ();
111  m_wc = new TQWaitCondition;
112  m_mutex->unlock ();
113 
114  thread()->post (KPIM::ThreadWeaver::Event::JobSPR, this);
115  m_wc->wait ();
116 
117  m_mutex->lock ();
118  delete m_wc;
119  m_wc = 0;
120  m_mutex->unlock ();
121  }
122 
124  {
125  m_mutex->lock ();
126  m_wc = new TQWaitCondition;
127  m_mutex->unlock ();
128 
130  m_wc->wait ();
131  }
132 
133  void Job::wakeAPR ()
134  {
135  TQMutexLocker l(m_mutex);
136  if ( m_wc!=0 )
137  {
138  m_wc->wakeOne ();
139  delete m_wc;
140  m_wc = 0;
141  }
142  }
143 
144  const int Event::Type = TQEvent::User + 1000;
145 
146  Event::Event ( Action action, Thread *thread, Job *job)
147  : TQCustomEvent ( type () ),
148  m_action (action),
149  m_thread (thread),
150  m_job (job)
151  {
152  }
153 
154  int Event::type ()
155  {
156  return Type;
157  }
158 
160  {
161  if ( m_thread != 0)
162  {
163  return m_thread;
164  } else {
165  return 0;
166  }
167  }
168 
169  Job* Event::job () const
170  {
171  return m_job;
172  }
173 
175  {
176  return m_action;
177  }
178 
179  unsigned int Thread::sm_Id;
180 
182  : TQThread (),
183  m_parent ( parent ),
184  m_id ( makeId() )
185  {
186  }
187 
189  {
190  }
191 
192  unsigned int Thread::makeId()
193  {
194  static TQMutex mutex;
195  TQMutexLocker l (&mutex);
196 
197  return ++sm_Id;
198  }
199 
200  unsigned int Thread::id() const
201  {
202  return m_id;
203  }
204 
205  void Thread::run()
206  {
207  Job *job = 0;
208 
210 
211  while (true)
212  {
213  debug ( 3, "Thread::run [%u]: trying to execute the next job.\n", id() );
214 
215  job = m_parent->applyForWork ( this, job );
216 
217  if (job == 0)
218  {
219  break;
220  } else {
221  post ( Event::JobStarted, job );
222  job->execute (this);
223  post ( Event::JobFinished, job );
224  }
225  }
226 
227  post ( Event::ThreadExiting );
228  }
229 
231  {
232  m_parent->post ( a, this, j);
233  }
234 
235  void Thread::msleep(unsigned long msec)
236  {
237  TQThread::msleep(msec);
238  }
239 
240  Weaver::Weaver(TQObject* parent, const char* name,
241  int inventoryMin, int inventoryMax)
242  : TQObject(parent, name),
243  m_active(0),
244  m_inventoryMin(inventoryMin),
245  m_inventoryMax(inventoryMax),
246  m_shuttingDown(false),
247  m_running (false),
248  m_suspend (false),
249  m_mutex ( new TQMutex(true) )
250  {
251  lock();
252 
253  for ( int count = 0; count < m_inventoryMin; ++count)
254  {
255  Thread *th = new Thread(this);
256  m_inventory.append(th);
257  // this will idle the thread, waiting for a job
258  th->start();
259 
260  emit (threadCreated (th) );
261  }
262 
263  unlock();
264  }
265 
266  Weaver::~Weaver()
267  {
268  lock();
269 
270  debug ( 1, "Weaver dtor: destroying inventory.\n" );
271 
272  m_shuttingDown = true;
273 
274  unlock();
275 
276  m_jobAvailable.wakeAll();
277 
278  // problem: Some threads might not be asleep yet, just finding
279  // out if a job is available. Those threads will suspend
280  // waiting for their next job (a rare case, but not impossible).
281  // Therefore, if we encounter a thread that has not exited, we
282  // have to wake it again (which we do in the following for
283  // loop).
284 
285  for ( Thread *th = m_inventory.first(); th; th = m_inventory.next() )
286  {
287  if ( !th->finished() )
288  {
289  m_jobAvailable.wakeAll();
290  th->wait();
291  }
292 
293  emit (threadDestroyed (th) );
294  delete th;
295 
296  }
297 
298  m_inventory.clear();
299 
300  delete m_mutex;
301 
302  debug ( 1, "Weaver dtor: done\n" );
303 
304  }
305 
307  {
308  debug ( 3 , "Weaver::lock: lock (mutex is %s).\n",
309  ( m_mutex->locked() ? "locked" : "not locked" ) );
310  m_mutex->lock();
311  }
312 
314  {
315  m_mutex->unlock();
316 
317  debug ( 3 , "Weaver::unlock: unlock (mutex is %s).\n",
318  ( m_mutex->locked() ? "locked" : "not locked" ) );
319  }
320 
321  int Weaver::threads () const
322  {
323  TQMutexLocker l (m_mutex);
324  return m_inventory.count ();
325  }
326 
327  void Weaver::enqueue(Job* job)
328  {
329  lock();
330 
331  m_assignments.append(job);
332  m_running = true;
333 
334  unlock();
335 
336  assignJobs();
337  }
338 
339  void Weaver::enqueue (TQPtrList <Job> jobs)
340  {
341  lock();
342 
343  for ( Job * job = jobs.first(); job; job = jobs.next() )
344  {
345  m_assignments.append (job);
346  }
347 
348  unlock();
349 
350  assignJobs();
351  }
352 
353  bool Weaver::dequeue ( Job* job )
354  {
355  TQMutexLocker l (m_mutex);
356  return m_assignments.remove (job);
357  }
358 
360  {
361  TQMutexLocker l (m_mutex);
362  m_assignments.clear();
363  }
364 
365  void Weaver::suspend (bool state)
366  {
367  lock();
368 
369  if (state)
370  {
371  // no need to wake any threads here
372  m_suspend = true;
373  if ( m_active == 0 && isEmpty() )
374  { // instead of waking up threads:
376  }
377  } else {
378  m_suspend = false;
379  // make sure we emit suspended () even if all threads are sleeping:
380  assignJobs ();
381  debug (2, "Weaver::suspend: queueing resumed.\n" );
382  }
383 
384  unlock();
385  }
386 
388  {
389  m_jobAvailable.wakeAll();
390  }
391 
392  bool Weaver::event (TQEvent *e )
393  {
394  if ( e->type() >= TQEvent::User )
395  {
396 
397  if ( e->type() == Event::type() )
398  {
399  Event *event = (Event*) e;
400 
401  switch (event->action() )
402  {
403  case Event::JobFinished:
404  if ( event->job() !=0 )
405  {
406  emit (jobDone (event->job() ) );
407  }
408  break;
409  case Event::Finished:
410  emit ( finished() );
411  break;
412  case Event::Suspended:
413  emit ( suspended() );
414  break;
415  case Event::ThreadSuspended:
416  if (!m_shuttingDown )
417  {
418  emit (threadSuspended ( event->thread() ) );
419  }
420  break;
421  case Event::ThreadBusy:
422  if (!m_shuttingDown )
423  {
424  emit (threadBusy (event->thread() ) );
425  }
426  break;
427  default:
428  break;
429  }
430 
431  if ( event->job() !=0 )
432  {
433  event->job()->processEvent (event);
434  }
435  } else {
436  debug ( 0, "Weaver::event: Strange: received unknown user event.\n" );
437  }
438  return true;
439  } else {
440  // others - please make sure we are a TQObject!
441  return TQObject::event ( e );
442  }
443  }
444 
446  {
447  Event *e = new Event ( a, t, j);
448  TQApplication::postEvent (this, e);
449  }
450 
451  bool Weaver::isEmpty() const
452  {
453  TQMutexLocker l (m_mutex);
454  return m_assignments.count()==0;
455  }
456 
458  {
459  Job *rc = 0;
460  bool lastjob = false;
461  bool suspended = false;
462 
463  while (true)
464  {
465  lock();
466 
467  if (previous != 0)
468  { // cleanup and send events:
469  --m_active;
470 
471  debug ( 3, "Weaver::applyForWork: job done, %i jobs left, "
472  "%i active jobs left.\n",
473  queueLength(), m_active );
474 
475  if ( m_active == 0 && isEmpty() )
476  {
477  lastjob = true;
478  m_running = false;
479  post (Event::Finished);
480  debug ( 3, "Weaver::applyForWork: last job.\n" );
481  }
482 
483  if (m_active == 0 && m_suspend == true)
484  {
485  suspended = true;
487  debug ( 2, "Weaver::applyForWork: queueing suspended.\n" );
488  }
489 
490  m_jobFinished.wakeOne();
491  }
492 
493  previous = 0;
494 
495  if (m_shuttingDown == true)
496  {
497  unlock();
498 
499  return 0;
500  } else {
501  if ( !isEmpty() && m_suspend == false )
502  {
503  rc = m_assignments.getFirst();
504  m_assignments.removeFirst ();
505  ++m_active;
506 
507  debug ( 3, "Weaver::applyForWork: job assigned, "
508  "%i jobs in queue (%i active).\n",
509  m_assignments.count(), m_active );
510  unlock();
511 
512  post (Event::ThreadBusy, th);
513 
514  return rc;
515  } else {
516  unlock();
517 
518  post (Event::ThreadSuspended, th);
519  m_jobAvailable.wait();
520  }
521  }
522  }
523  }
524 
526  {
527  TQMutexLocker l (m_mutex);
528  return m_assignments.count();
529  }
530 
531  bool Weaver::isIdle () const
532  {
533  TQMutexLocker l (m_mutex);
534  return isEmpty() && m_active == 0;
535  }
536 
538  {
539  while ( !isIdle() )
540  {
541  debug (2, "Weaver::finish: not done, waiting.\n" );
542  m_jobFinished.wait();
543  }
544  debug (1, "Weaver::finish: done.\n\n\n" );
545  }
546 
547 }
548 }
549 
550 #include "weaver.moc"
void started()
This signal is emitted when a thread starts to process a job.
virtual void execute(Thread *)
Perform the job.
Definition: weaver.cpp:53
void unlock()
Unlock.
Definition: weaver.cpp:313
A class to represent the events threads generate and send to the Weaver object.
Definition: weaver.h:100
virtual ~Job()
Destructor.
Definition: weaver.cpp:39
virtual void finish()
Get notified when a thread has finished a job.
Definition: weaver.cpp:537
A weaver is the manager of worker threads (Thread objects) to which it assigns jobs from it&#39;s queue...
Definition: weaver.h:297
Job(TQObject *parent=0, const char *name=0)
Construct a Job object.
Definition: weaver.cpp:31
void unlock()
Unlock this Job&#39;s mutex.
Definition: weaver.cpp:48
virtual void setFinished(bool status)
Call with status = true to mark this job as done.
Definition: weaver.cpp:79
void done()
This signal is emitted when a job has been finished.
virtual bool isFinished() const
Returns true if the jobs&#39;s execute method finished.
Definition: weaver.cpp:73
All jobs in the queue are done.
Definition: weaver.h:106
void lock()
Lock this Job&#39;s mutex.
Definition: weaver.cpp:43
virtual void dequeue()
Remove all queued jobs.
Definition: weaver.cpp:359
void triggerAPR()
Trigger an APR.
Definition: weaver.cpp:123
void SPR()
This signal is emitted when the job needs some operation done by the main thread (usually the creator...
bool isEmpty() const
Is the queue empty?
Definition: weaver.cpp:451
Action action() const
The action.
Definition: weaver.cpp:174
Thread(Weaver *parent)
Create a thread.
Definition: weaver.cpp:181
~Thread()
The destructor.
Definition: weaver.cpp:188
Thread queueing halted.
Definition: weaver.h:107
void post(Event::Action, Thread *=0, Job *=0)
Post an event that is handled by this object, but in the main (GUI) thread.
Definition: weaver.cpp:445
int threads() const
Returns the current number of threads in the inventory.
Definition: weaver.cpp:321
virtual void processEvent(Event *)
Process events related to this job (created by the processing thread or the weaver or whoever)...
Definition: weaver.cpp:85
Synchronous Process Request.
Definition: weaver.h:114
Thread * thread() const
The ID of the sender thread.
Definition: weaver.cpp:159
virtual void suspend(bool state)
Suspend job execution if state = true, otherwise resume job execution if it was suspended.
Definition: weaver.cpp:365
virtual void run()=0
The method that actually performs the job.
void wakeAPR()
Wake the thread after an APR has been processed.
Definition: weaver.cpp:133
int queueLength()
Returns the number of pending jobs.
Definition: weaver.cpp:525
virtual Job * applyForWork(Thread *thread, Job *previous)
Assign a job to the calling thread.
Definition: weaver.cpp:457
void lock()
Lock the mutex for this weaver.
Definition: weaver.cpp:306
void triggerSPR()
Trigger a SPR.
Definition: weaver.cpp:108
unsigned int id() const
Returns the thread id.
Definition: weaver.cpp:200
TDEPIM classes for drag and drop of mails.
void post(Event::Action, Job *=0)
Post an event, will be received and processed by the Weaver.
Definition: weaver.cpp:230
virtual void enqueue(Job *)
Add a job to be executed.
Definition: weaver.cpp:327
bool isIdle() const
Is the weaver idle? The weaver is idle if no jobs are queued and no jobs are processed by the threads...
Definition: weaver.cpp:531
bool event(TQEvent *)
Check incoming events for user defined ones.
Definition: weaver.cpp:392
Thread * thread()
Return the thread that executes this job.
Definition: weaver.cpp:67
void APR()
Perform an Asynchronous Process Request.
The class Thread is used to represent the worker threads in the weaver&#39;s inventory.
Definition: weaver.h:250
A Job is a simple abstraction of an action that is to be executed in a thread context.
Definition: weaver.h:164
void run()
Overloaded to execute the assigned job.
Definition: weaver.cpp:205
static int type()
Return the (custom defined) event type.
Definition: weaver.cpp:154
void assignJobs()
Schedule enqueued jobs to be executed by idle threads.
Definition: weaver.cpp:387
Job * job() const
The associated job.
Definition: weaver.cpp:169