0

Add a simple thread pool to SimpleThread.

git-svn-id: svn://svn.chromium.org/chrome/trunk/src@1634 0039d316-1c4b-4281-b951-d872f2087c98
This commit is contained in:
deanm@google.com
2008-09-02 11:15:48 +00:00
parent 5f6eee53a4
commit a89d97fdbe
3 changed files with 183 additions and 1 deletions

@ -49,4 +49,70 @@ void DelegateSimpleThread::Run() {
delegate_ = NULL;
}
DelegateSimpleThreadPool::~DelegateSimpleThreadPool() {
DCHECK(threads_.empty());
DCHECK(delegates_.empty());
DCHECK(!dry_.IsSignaled());
}
void DelegateSimpleThreadPool::Start() {
DCHECK(threads_.empty()) << "Start() called with outstanding threads.";
for (int i = 0; i < num_threads_; ++i) {
DelegateSimpleThread* thread = new DelegateSimpleThread(this, name_prefix_);
thread->Start();
threads_.push_back(thread);
}
}
void DelegateSimpleThreadPool::JoinAll() {
DCHECK(!threads_.empty()) << "JoinAll() called with no outstanding threads.";
// Tell all our threads to quit their worker loop.
AddWork(NULL, num_threads_);
// Join and destroy all the worker threads.
for (int i = 0; i < num_threads_; ++i) {
threads_[i]->Join();
delete threads_[i];
}
threads_.clear();
DCHECK(delegates_.empty());
}
void DelegateSimpleThreadPool::AddWork(Delegate* delegate, int repeat_count) {
AutoLock locked(lock_);
for (int i = 0; i < repeat_count; ++i)
delegates_.push(delegate);
// If we were empty, signal that we have work now.
if (!dry_.IsSignaled())
dry_.Signal();
}
void DelegateSimpleThreadPool::Run() {
Delegate* work;
while (true) {
dry_.Wait();
{
AutoLock locked(lock_);
if (!dry_.IsSignaled())
continue;
DCHECK(!delegates_.empty());
work = delegates_.front();
delegates_.pop();
// Signal to any other threads that we're currently out of work.
if (delegates_.empty())
dry_.Reset();
}
// A NULL delegate pointer signals us to quit.
if (!work)
break;
work->Run();
}
}
} // namespace base

@ -41,8 +41,11 @@
#define BASE_SIMPLE_THREAD_H_
#include <string>
#include <queue>
#include <vector>
#include "base/basictypes.h"
#include "base/lock.h"
#include "base/waitable_event.h"
#include "base/platform_thread.h"
@ -70,7 +73,7 @@ class SimpleThread : public PlatformThread::Delegate {
// configuration involving the thread creation and management.
// Every thread has a name, in the form of |name_prefix|/TID, for example
// "my_thread/321". The thread will not be created until Start() is called.
SimpleThread(const std::string& name_prefix)
explicit SimpleThread(const std::string& name_prefix)
: name_prefix_(name_prefix), name_(name_prefix),
thread_(), event_(true, false), tid_(0), joined_(false) { }
SimpleThread(const std::string& name_prefix, const Options& options)
@ -136,6 +139,51 @@ class DelegateSimpleThread : public SimpleThread {
Delegate* delegate_;
};
// DelegateSimpleThreadPool allows you to start up a fixed number of threads,
// and then add jobs which will be dispatched to the threads. This is
// convenient when you have a lot of small work that you want done
// multi-threaded, but don't want to spawn a thread for each small bit of work.
//
// You just call AddWork() to add a delegate to the list of work to be done.
// JoinAll() will make sure that all outstanding work is processed, and wait
// for everything to finish. You can reuse a pool, so you can call Start()
// again after you've called JoinAll().
class DelegateSimpleThreadPool : public DelegateSimpleThread::Delegate {
public:
typedef DelegateSimpleThread::Delegate Delegate;
DelegateSimpleThreadPool(const std::string name_prefix, int num_threads)
: name_prefix_(name_prefix), num_threads_(num_threads),
dry_(true, false) { }
~DelegateSimpleThreadPool();
// Start up all of the underlying threads, and start processing work if we
// have any.
void Start();
// Make sure all outstanding work is finished, and wait for and destroy all
// of the underlying threads in the pool.
void JoinAll();
// It is safe to AddWork() any time, before or after Start().
// Delegate* should always be a valid pointer, NULL is reserved internally.
void AddWork(Delegate* work, int repeat_count);
void AddWork(Delegate* work) {
AddWork(work, 1);
}
// We implement the Delegate interface, for running our internal threads.
virtual void Run();
private:
const std::string name_prefix_;
int num_threads_;
std::vector<DelegateSimpleThread*> threads_;
std::queue<Delegate*> delegates_;
Lock lock_; // Locks delegates_
WaitableEvent dry_; // Not signaled when there is no work to do.
};
} // namespace base
#endif // BASE_SIMPLE_THREAD_H_

@ -2,6 +2,8 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "base/atomic_sequence_num.h"
#include "base/lock.h"
#include "base/simple_thread.h"
#include "base/string_util.h"
#include "base/waitable_event.h"
@ -37,6 +39,40 @@ class WaitEventRunner : public base::DelegateSimpleThread::Delegate {
base::WaitableEvent* event_;
};
class SeqRunner : public base::DelegateSimpleThread::Delegate {
public:
SeqRunner(base::AtomicSequenceNumber* seq) : seq_(seq) { }
virtual void Run() {
seq_->GetNext();
}
private:
base::AtomicSequenceNumber* seq_;
};
// We count up on a sequence number, firing on the event when we've hit our
// expected amount, otherwise we wait on the event. This will ensure that we
// have all threads outstanding until we hit our expected thread pool size.
class VerifyPoolRunner : public base::DelegateSimpleThread::Delegate {
public:
VerifyPoolRunner(base::AtomicSequenceNumber* seq,
int total, base::WaitableEvent* event)
: seq_(seq), total_(total), event_(event) { }
virtual void Run() {
if (seq_->GetNext() == total_) {
event_->Signal();
} else {
event_->Wait();
}
}
private:
base::AtomicSequenceNumber* seq_;
int total_;
base::WaitableEvent* event_;
};
} // namespace
TEST(SimpleThreadTest, CreateAndJoin) {
@ -97,3 +133,35 @@ TEST(SimpleThreadTest, NamedWithOptions) {
EXPECT_EQ(thread.name(), std::string("event_waiter/") +
IntToString(thread.tid()));
}
TEST(SimpleThreadTest, ThreadPool) {
base::AtomicSequenceNumber seq;
SeqRunner runner(&seq);
base::DelegateSimpleThreadPool pool("seq_runner", 10);
// Add work before we're running.
pool.AddWork(&runner, 300);
EXPECT_EQ(seq.GetNext(), 0);
pool.Start();
// Add work while we're running.
pool.AddWork(&runner, 300);
pool.JoinAll();
EXPECT_EQ(seq.GetNext(), 601);
// We can reuse our pool. Verify that all 10 threads can actually run in
// parallel, so this test will only pass if there are actually 10 threads.
base::AtomicSequenceNumber seq2;
base::WaitableEvent event(true, false);
// Changing 9 to 10, for example, would cause us JoinAll() to never return.
VerifyPoolRunner verifier(&seq2, 9, &event);
pool.Start();
pool.AddWork(&verifier, 10);
pool.JoinAll();
EXPECT_EQ(seq2.GetNext(), 10);
}