Threads Module User's Guide : PART II Concurrency Packages : Chapter 3 The Threading Package : Using Threads To Build Active Objects
Using Threads To Build Active Objects
You can encapsulate asynchronous operations within an active object. An active object creates one or more threads to asynchronously execute service requests made by its clients.
This is useful because IOUs (described in “The IOU Classes”) are not always closed and redeemed when threads exit. Although the RWTThreadIOUFunction class closes the IOU just before exiting, it is possible to create threads that can close an IOU at any time during their processing.
Using Runnables
Using an active object solves the join problem by creating any necessary threads within the active object’s constructor and joining with them in its destructor. This pattern is similar to the “resource acquisition is initialization” idiom employed by the Synchronization package guard objects (see “Using Guards”).
Any of the threaded runnable classes, which are part of the Threading package, can construct an active object. The final choice, though, is dependent on the type of service that the object provides.
Runnables are covered in detail in “The Runnable Object Classes.”
Example
Example 22 shows how an asynchronous function call can be converted to a simple active object.
Example 22 – Encapsulating asynchronous operations in an active object
#include <rw/thread/RWTThreadIOUFunction.h>
#include <rw/itc/RWTIOUResult.h>
#include <rw/functor/rwBind.h>
 
using namespace std;
 
class AsyncService; // Forward reference for inlines!
 
class AsyncService {
private:
RWTThreadIOUFunction<int> thread_;
protected:
int syncService() // 1
{
int status = 0;
// Do something useful that produces a status
// value indicating success or failure
rwSleep(1000); // Waste some time...
return status;
}
 
public:
AsyncService() { // 2
thread_ = RWTThreadIOUFunction<int>::make(rwBind(
&AsyncService::syncService, this));
thread_.start();
}
~AsyncService() { // 3
thread_.join();
}
RWTIOUResult<int> operator()() const // 4
{
return thread_.result();
}
};
 
void
main()
{
cout << "Start the request" << endl;
AsyncService request; // 5
RWTIOUResult<int> iou = request(); // 6
cout << "Redeemed value of IOU: " << iou << endl; // 7
} // 8
//1 Defines the actual service that is to be executed in a separate thread.
//2 Defines a constructor that creates a thread to perform the operation.
//3 Defines a destructor that joins the thread.
//4 Includes an accessor for retrieving the IOU result of the operation.
//5 Constructs a named object to start the operation.
//6 Retrieves the future result of the operation as an IOU.
//7 Redeems the result, blocking (if necessary) until the operation has completed.
//8 Destroys the service object at end of scope, and automatically joins with the thread that was created.
Other Solutions to the Join Problem
The approach in the previous example solves the join problem, but the implementation is confusing because:
It relies on a constructor to initiate the operation, instead of using a regular function call.
It requires a separate class for each asynchronous operation.
A better way to solve the problem is to move the initiation of the asynchronous operation to a public member function of the class. In this case, the operation is started by calling the member function, which creates a threaded runnable to perform the actual operation. A handle to that runnable could be stored within the class so that a join can be performed when the object is destroyed.
Other problems are created, however, if the service is invoked a second time:
If the function does not wait on the previous operation, but instead creates another thread, then it needs to store a handle to the second thread object.
If the function stores the handle in the internal handle member, the reference to the first runnable is lost.
To avoid these new problems:
Keep any thread creation in the active object constructor.
Add member functions that delegate any service requests to those thread or threads that are created.
Using Runnable Servers
The RWTThreadIOUFunction used in the example is generally not suitable for an active object implementation that must accept any number of asynchronous requests from a client interface.
It is better to use one of the runnable server classes, RWRunnableServer or RWServerPool, to provide the internal thread or threads required by the active object. These classes are suitable because they are designed to continuously accept and execute other runnable objects. This allows you to package individual operations as synchronous runnables that can be passed to the internal server for execution. Runnable servers are covered in detail in “The Server Classes.”
In Example 23, individual operations are executed asynchronously relative to the thread that requests them, but simultaneous requests cannot be processed concurrently. This limitation is a consequence of choosing the single-threaded RWRunnableServer class for the internal server thread.
If an active object design can benefit from increased concurrency, then the multithreaded RWServerPool class should be used instead.
Example 23 – Using a single-threaded runnable server class
#include <rw/thread/RWRunnableServer.h>
#include <rw/thread/RWTRunnableIOUFunction.h>
#include <rw/itc/RWTIOUResult.h>
#include <rw/functor/rwBind.h>
#include <iostream>
 
using namespace std;
 
class ServiceProvider; // Forward reference for inlines!
 
class ServiceProvider {
private:
RWRunnableServer server_;
protected:
int syncService() // 1
{
int status = 0;
// Do something useful that produces a status
// value indicating success or failure
rwSleep(1000); // Waste some time...
return status;
}
public:
ServiceProvider() { // 2
server_ = RWRunnableServer::make();
server_.start();
}
~ServiceProvider() { // 3
server_.stop();
server_.join();
}
RWTIOUResult<int> asyncService() { // 4
RWTRunnableIOUFunction<int> runnable;
runnable = RWTRunnableIOUFunction<int>::make(rwBind(
&ServiceProvider::syncService,
this));
server_.enqueue(runnable);
return runnable.result();
}
};
 
void
main()
{
ServiceProvider provider; // 5
cout << "Calling asynchronous service..." << endl;
RWTIOUResult<int> iou = provider.asyncService(); // 6
cout << "Redeemed value of IOU: " << iou << endl; // 7
} // 8
//1 Defines the actual service to be performed.
//2 Defines a constructor that creates and starts the internal server.
//3 Defines a destructor that stops the internal server and waits for it to exit.
//4 Defines a member function to initiate an asynchronous operation by constructing a synchronous runnable and passing it to the internal server. The IOU representing the result of the operation is returned to the caller.
//5 Constructs an instance of the service provider.
//6 Calls the asynchronous service and save the IOU result.
//7 Redeems the IOU result, blocking if necessary, until the result is made available.
//8 The service provider object is destroyed at block scope, but the destructor does not return until its internal server thread has shutdown and exited.