5.5 The IOU Classes
The IOU template classes implement a thread synchronization mechanism called a future, as explained in
Section 5.2.1.
The concrete IOU classes are templates that are parameterized on the type of the result that is stored. This type has the following characteristics:
It cannot be a reference.
It must have a public copy constructor.
It cannot define a protected
operator new().
An IOU object possesses two distinct interfaces:
One for storing the results of an operation into an IOU’s escrow object.
One for retrieving the results from the IOU escrow.
These two interfaces are realized by the IOU handle classes
RWTIOUEscrow and
RWTIOUResult.
You can convert from one IOU handle type to the other by using copy-construction:
RWTIOUResult<int> result = ...;
RWTIOUEscrow<int> escrow = result;
5.5.1 Testing for Empty IOU Handles
An IOU handle instance can be empty. Any attempt to use an empty handle as if it were a valid IOU object produces an
RWTHRInvalidPointer exception. You can determine whether an IOU handle is empty by using the
isValid() member of the handle instance.
5.5.2 Constructing an IOU
The IOU mechanism is implemented using a handle-body architecture. The construction of an IOU handle does not result in the construction of a complete IOU object. To build a viable IOU object you must first construct an escrow object and bind that object to one or more IOU handles.
Use the static make() functions construct an IOU escrow object.
The
RWTThreadEscrowImp template class in the Threading package has a static member function,
make(), that dynamically allocates an escrow object and returns an
RWTEscrowHandle instance bound to that escrow object. The
RWTEscrowHandle instance can be assigned to, or used to initialize, an
RWTIOUResult or
RWTIOUEscrow handle instance, as in this example:
RWTIOUEscrow<int> iou = RWTThreadEscrowImp<int>::make();
If you use the
RWTRunnableIOUFunction and
RWTThreadIOUFunction classes, found in the Threading package, you can avoid escrow construction entirely by letting these classes construct the escrow object for you. You can retrieve a handle to the internal escrow instance by calling the
result() member function in these runnable classes. This function returns an
RWTIOUResult handle instance.
5.5.3 Closing an IOU
In the Interthread Communication package, the act of storing or writing the result of an operation into an IOU is called
closing the IOU. To close an IOU, you must possess an
RWTIOUEscrow handle that references the IOU escrow object.
The
RWTIOUEscrow class has two functions related to closing an IOU:
close() setException() 5.5.3.1 The close() Function
The close() function stores the final result of an operation into the IOU object. It releases any threads that have been waiting for the result.
RWTIOUEscrow also has a function operator called
operator()() and an assignment operator called
operator=() that have the same capability as the
close() function. Given that “iou” is an instance of the
RWTIOUEscrow class, the following statements are functionally identical:
iou.close(value);
iou(value);
iou = value;
5.5.3.2 The setException() Function
RWTIOUEscrow has two versions of the
setException() function. Both indicate whether or not the operation failed to produce a result due to an error condition. One version of the function accepts a reference to an instance of the base exception class,
RWTHRxmsg. The other version accepts an
RWCString message that is used internally to initialize an
RWTHRxmsg instance. In either case, these functions:
Release threads that are waiting for a result.
Cause the exception to be rethrown in those threads.
5.5.3.3 Example
Typical uses of the
close() and
setException() functions are shown in the code fragment in
Example 44.
Example 44 – Closing an IOU
void async_operation(RWTIOUEscrow<int>& iou)
{
try {
int result=0;
// Perform some operation to calculate result...
// Close the escrow
iou.close(result);
}
catch(...) {
iou.setException("Unexpected Exception");
}
}
The IOU object is intended as a “one-shot” communication mechanism. An IOU instance can only be closed once during its life-span—it is not possible to reset or reuse an IOU. Any attempt to do so produces an RWTHREscrowAlreadyClosed exception.
The
RWTIOUEscrow class also has several functions that applications can call to query the status of an IOU object:
aborted() — Returns
true if a client has requested that the operation be aborted.
closeable() — Returns
true if the IOU has not been aborted and has not yet been closed.
closed() — Returns
true if the IOU has been closed.
inError() — Returns
true if the IOU was closed using
setException().
redeemed() — Returns
true if any thread has successfully redeemed the IOU to retrieve its value.
5.5.4 Redeeming an IOU
The act of reading a result from an IOU is called
redeeming the IOU. To redeem an IOU, you must possess an
RWTIOUResult handle that references the IOU escrow object.
The
RWTIOUResult class has three functions for redeeming the IOU:
A function operator
operator()().
A conversion operator
operator Redeemable().
A
redeem() function.
These three members perform an identical function. Given that
iou is an instance of the
RWTIOUResult class, the following statements are functionally equivalent:
value = iou();
value = iou;
value = iou.redeem();
5.5.4.1 Rules about Threads and IOUs
Threads use IOUs in the following ways:
Threads attempting to redeem an IOU that has not been closed, block until another thread writes a result or exception to the IOU object.
If an IOU is closed with an exception, any attempt to redeem the IOU produces that exception:
try {
result = iou.redeem();
cout << "Operation succeeded! " << result << endl;
}
catch(RWException& msg) {
cout << "Operation failed! " << msg.why() << endl;
}
A thread can redeem an IOU as many times as it wants.
An IOU can be shared between and redeemed by any number of threads.
5.5.4.2 Aborting a Request
The RWTIOUResult::abort() member can be used to signal that the IOU client no longer needs the result represented by the IOU. This function simply sets a flag within the IOU, so that it can be polled by another thread.
5.5.4.3 Querying the Status of an IOU Object
The
RWTIOUResult class also has several functions that can be used to query the status of an IOU object:
aborted() — Returns
true if a client has requested that the operation be aborted.
inError() — Returns
true if the IOU was closed using
setException().
redeemable() — Returns
true if the IOU has been closed, either with a value or with an exception.
redeemed() — Returns
true if any thread has successfully redeemed the IOU to retrieve its value.
5.5.5 Using IOUs
Obtaining results from asynchronous operations often requires complex synchronization that is difficult to code and difficult to understand. The IOU object has an easy-to-use mechanism that simplifies this retrieval process.
5.5.5.1 Example
The following example demonstrates the use of an IOU object for retrieving the results of an operation executed in another thread.
Example 45 uses a form of threaded runnable that invokes a function and stores any return value into an IOU.
Example 45 – Using an IOU object to retrieve results from another thread
#include <rw/thread/RWRunnableSelf.h>
#include <rw/thread/RWTThreadIOUFunction.h>
#include <rw/itc/RWTIOUResult.h>
#include <iostream>
using namespace std;
int sync_service()
{
int status = 0;
// Do something useful that produces a status
// value indicating success or failure
rwSleep(1000); // Waste some time...
return status;
}
RWTIOUResult<int> async_service()
{
RWTThreadIOUFunction<int> thread;
thread = RWTThreadIOUFunction<int>::make(sync_service); // 1
thread.start(); // 2
return thread.result(); // 3
}
void main()
{
cout << "Starting asynchronous service..." << endl;
RWTIOUResult<int> iou = async_service(); // 4
cout << "Redeemed value of IOU: " << iou << endl; // 5
}
5.5.5.2 Closing and Redeeming an IOU
In this example, the RWThreadIOUFunction class closes the IOU when the sync_service() function returns a result back to the runnable.
Threads attempting to redeem an IOU that has not been closed block until another thread writes a result or exception to that IOU object. If an IOU is closed with an exception, any attempt to redeem the IOU produces that exception.
5.5.5.3 Using Active Objects
The example in
Section 5.5.5.1 shows how an asynchronous operation can be created. Although the implementation seems straightforward, it has a potential problem—it does not have a way to join with the thread that has been created, so it has no way to insure that the thread has exited before any process eventually exits.
Using an active object solves the join problem. For more information, see
Section 3.8, “Using Threads To Build Active Objects.”5.5.6 Waiting for IOUs
In the examples in
Section 3.8.1, “Using Runnables,” Section 3.8.2, “Using Runnable Servers,” and
Section 5.5.5, “Using IOUs,” the IOU was immediately redeemed to get a result, effectively producing synchronous behavior. A more interesting solution includes launching several asynchronous operations at the same time.
5.5.6.1 Asynchronous Example
In
Example 46, four operations are started before attempting to redeem the results. Assume that:
The requests are performed in parallel.
Each of the requests can take a varying amount of time to complete.
In this hypothetical situation, the IOU results are likely to become redeemable in an order different from the order in which the corresponding operations were launched.
Example 46 – Responding to results from asynchronous operations in order of launch
ServiceProvider provider;
RWTIOUResult<int> iou1 = provider.asyncService();
RWTIOUResult<int> iou2 = provider.asyncService();
RWTIOUResult<int> iou3 = provider.asyncService();
RWTIOUResult<int> iou4 = provider.asyncService();
cout << "Redeemed value of IOU1: " << iou1 << endl;
cout << "Redeemed value of IOU2: " << iou2 << endl;
cout << "Redeemed value of IOU3: " << iou3 << endl;
cout << "Redeemed value of IOU4: " << iou4 << endl;
Redeeming the IOUs in the way shown above forces the results to be acted upon in the same order in which the operations were launched. To take maximum advantage of the concurrency of threads, the application needs to be designed to process the results of these operations in the order in which they become available.
Consider a hypothetical Web browser written using the Threading and Interthread Communication packages. A typical Web page can consist of text and images. Each of the images is identified by a Web link and can be distributed across various sites on a network. The browser, after loading a Web page containing these image links, can choose to launch separate threads to handle the retrieval of each image. In this situation, IOUs can represent the individual images being retrieved by each of the threads. If the display code in the browser were forced to redeem the IOUs in the same sequence as the retrieval operations were launched, then a lengthy transfer could unnecessarily delay rendering of the remaining portions of the page. If, however, our hypothetical browser could draw each image as it arrives, the user would see each image on the page appear as soon as it became available.
5.5.7 Trapping IOUs with RWTIOUTrap
The
RWTIOUTrap template class can retrieve the next redeemable IOU in a group of IOUs. This class uses an internal producer-consumer queue to capture
RWTIOUResult instances as they become redeemable.
5.5.7.1 Waiting for Trapped IOUs
A thread waits for trapped IOUs to become redeemable by calling the trap’s getNext() function. This function returns the next IOU in its internal queue, and waits for an IOU to arrive if the queue is found to be empty.
5.5.7.2 Using Timed Waits
An
RWTIOUTrap can also be used to implement a timed wait on an IOU result. This capability is significant because the
RWTIOUResult class does not allow a time-limit to be placed on a redemption operation. The following code demonstrates this capability:
Example 47 – Implementing a timed wait on an IOU result
#include <rw/thread/RWTThreadIOUFunction.h>
#include <rw/itc/RWTIOUResult.h>
#include <rw/itc/RWTIOUTrap.h>
#include <iostream>
using namespace std;
int func()
{
int result = 0;
// Wait for a while...
rwSleep(3000);
return result;
}
void main()
{
RWTIOUTrap<int> trap;
RWTThreadIOUFunction<int> thread;
thread = RWTThreadIOUFunction<int>::make(func);
thread.start();
trap.setTrap(thread.result());
RWTIOUResult<int> iou;
// Wait 1 second, complain and repeat...
while(RW_THR_COMPLETED != trap.getNext(iou,1000)) {
cout << "Timed out! Retrying..." << endl;
}
cout << "IOU closed — Operation Complete!" << endl;
}
5.5.7.3 Improving the Asynchronous Example
To wait for the next redeemable IOU, use the
RWTIOUTrap class. It uses an internal producer-consumer queue to capture
RWTIOUResult instances as they become redeemable. To trap an IOU result, the code must pass the IOU handle to the
RWTIOUTrap instance in a call to the trap’s
setTrap() function. This approach is an improvement on the way asynchronous operations were handled in
Example 46.
Once an IOU has been registered with a trap, a handle to the IOU is automatically enqueued inside the trap when the IOU is closed. An IOU can be registered with any number of traps—a separate handle is enqueued in each trap instance at IOU closure.
If the IOU has already been closed and is redeemable when the IOU is added to a trap, a redeemable IOU is enqueued immediately during the setTrap() call.
A thread waits for trapped IOUs to become redeemable by calling the trap’s getNext() function. This function returns the next IOU in its internal queue and waits for an IOU to arrive if the queue is empty.
See
Section 5.5.7, “Trapping IOUs with RWTIOUTrap,” for additional information regarding this class.
5.5.7.4 Another Asynchronous Example
Example 48 provides another alternative for processing the results of asynchronous operations. In this example, the
RWRunnableServer is replaced by an
RWServerPool, an identification value is added for each operation invocation, and the
RWTIOUTrap processes the results of the operations.
Example 48 – Using an IOU trap to handle results from asynchronous operations
#include <rw/thread/RWRunnableSelf.h>
#include <rw/thread/RWServerPool.h>
#include <rw/thread/RWTRunnableIOUFunction.h>
#include <rw/itc/RWTIOUResult.h>
#include <rw/itc/RWTIOUTrap.h>
#include <iostream>
using namespace std;
class ServiceProvider; // Forward reference for inlines!
class ServiceProvider {
private:
RWServerPool server_;
protected:
int syncService(int opId, int delay) // 1
{
// Do something that takes varying amounts of time
rwSleep(delay);
return opId;
}
public:
ServiceProvider(size_t concurrencyLevel) { // 2
server_ = RWServerPool::make(concurrencyLevel);
server_.start();
}
~ServiceProvider() { // 3
server_.stop();
server_.join();
}
RWTIOUResult<int> asyncService(int opId, int delay) { // 4
RWTRunnableIOUFunction<int> runnable;
runnable = RWTRunnableIOUFunction<int>::make(
rwBind(&ServiceProvider::syncService, this, opId, delay));
server_.enqueue(runnable);
return runnable.result();
}
};
void
main()
{
const size_t concurrency = 5;
const size_t operations = 10;
size_t i;
ServiceProvider provider(concurrency); // 5
RWTIOUTrap<int> trap; // 6
for (i=0; i<operations; i++)
trap.setTrap(provider.asyncService(i,rand()%1000)); // 7
for (i=0; i<operations; i++) {
cout << "Operation ";
cout << trap.getNext(); // 8
cout << " complete!" << endl;
}
}