Using Threads To Build Active Objects
You can encapsulate asynchronous operations within an active object. An active object creates one or more threads to asynchronously execute service requests made by its clients.
This is useful because IOUs (described in
“The IOU Classes” ) are not always closed and redeemed when threads exit. Although the
RWTThreadIOUFunction class closes the IOU just before exiting, it is possible to create threads that can close an IOU at any time during their processing.
Using Runnables
Using an active object solves the join problem by creating any necessary threads within the active object’s constructor and joining with them in its destructor. This pattern is similar to the “resource acquisition is initialization” idiom employed by the Synchronization package guard objects (see
“Using Guards” ).
Any of the threaded runnable classes, which are part of the Threading package, can construct an active object. The final choice, though, is dependent on the type of service that the object provides.
Runnables are covered in detail in
“The Runnable Object Classes.”Example
Example 22 shows how an asynchronous function call can be converted to a simple active object.
Example 22 – Encapsulating asynchronous operations in an active object
#include <rw/thread/RWTThreadIOUFunction.h>
#include <rw/itc/RWTIOUResult.h>
#include <rw/functor/rwBind.h>
using namespace std;
class AsyncService; // Forward reference for inlines!
class AsyncService {
private:
RWTThreadIOUFunction<int> thread_;
protected:
int syncService() // 1
{
int status = 0;
// Do something useful that produces a status
// value indicating success or failure
rwSleep(1000); // Waste some time...
return status;
}
public:
AsyncService() { // 2
thread_ = RWTThreadIOUFunction<int>::make(rwBind(
&AsyncService::syncService, this));
thread_.start();
}
~AsyncService() { // 3
thread_.join();
}
RWTIOUResult<int> operator()() const // 4
{
return thread_.result();
}
};
void
main()
{
cout << "Start the request" << endl;
AsyncService request; // 5
RWTIOUResult<int> iou = request(); // 6
cout << "Redeemed value of IOU: " << iou << endl; // 7
} // 8
Other Solutions to the Join Problem
The approach in the previous example solves the join problem, but the implementation is confusing because:
• It relies on a constructor to initiate the operation, instead of using a regular function call.
• It requires a separate class for each asynchronous operation.
A better way to solve the problem is to move the initiation of the asynchronous operation to a public member function of the class. In this case, the operation is started by calling the member function, which creates a threaded runnable to perform the actual operation. A handle to that runnable could be stored within the class so that a join can be performed when the object is destroyed.
Other problems are created, however, if the service is invoked a second time:
• If the function does not wait on the previous operation, but instead creates another thread, then it needs to store a handle to the second thread object.
• If the function stores the handle in the internal handle member, the reference to the first runnable is lost.
To avoid these new problems:
• Keep any thread creation in the active object constructor.
• Add member functions that delegate any service requests to those thread or threads that are created.
Using Runnable Servers
The
RWTThreadIOUFunction used in the example is generally not suitable for an active object implementation that must accept any number of asynchronous requests from a client interface.
It is better to use one of the runnable server classes,
RWRunnableServer or
RWServerPool, to provide the internal thread or threads required by the active object. These classes are suitable because they are designed to continuously accept and execute other runnable objects. This allows you to package individual operations as synchronous runnables that can be passed to the internal server for execution. Runnable servers are covered in detail in
“The Server Classes.”In
Example 23 , individual operations are executed asynchronously relative to the thread that requests them, but simultaneous requests cannot be processed concurrently. This limitation is a consequence of choosing the single-threaded
RWRunnableServer class for the internal server thread.
If an active object design can benefit from increased concurrency, then the multithreaded
RWServerPool class should be used instead.
Example 23 – Using a single-threaded runnable server class
#include <rw/thread/RWRunnableServer.h>
#include <rw/thread/RWTRunnableIOUFunction.h>
#include <rw/itc/RWTIOUResult.h>
#include <rw/functor/rwBind.h>
#include <iostream>
using namespace std;
class ServiceProvider; // Forward reference for inlines!
class ServiceProvider {
private:
RWRunnableServer server_;
protected:
int syncService() // 1
{
int status = 0;
// Do something useful that produces a status
// value indicating success or failure
rwSleep(1000); // Waste some time...
return status;
}
public:
ServiceProvider() { // 2
server_ = RWRunnableServer::make();
server_.start();
}
~ServiceProvider() { // 3
server_.stop();
server_.join();
}
RWTIOUResult<int> asyncService() { // 4
RWTRunnableIOUFunction<int> runnable;
runnable = RWTRunnableIOUFunction<int>::make(rwBind(
&ServiceProvider::syncService,
this));
server_.enqueue(runnable);
return runnable.result();
}
};
void
main()
{
ServiceProvider provider; // 5
cout << "Calling asynchronous service..." << endl;
RWTIOUResult<int> iou = provider.asyncService(); // 6
cout << "Redeemed value of IOU: " << iou << endl; // 7
} // 8