Threads Module User's Guide : PART II Concurrency Packages : Chapter 5 The Interthread Communication Package : The IOU Classes : Trapping IOUs with RWTIOUTrap
Trapping IOUs with RWTIOUTrap
The RWTIOUTrap template class can retrieve the next redeemable IOU in a group of IOUs. This class uses an internal producer-consumer queue to capture RWTIOUResult instances as they become redeemable.
Waiting for Trapped IOUs
A thread waits for trapped IOUs to become redeemable by calling the trap’s getNext() function. This function returns the next IOU in its internal queue, and waits for an IOU to arrive if the queue is found to be empty.
Using Timed Waits
An RWTIOUTrap can also be used to implement a timed wait on an IOU result. This capability is significant because the RWTIOUResult class does not allow a time-limit to be placed on a redemption operation. The following code demonstrates this capability:
Example 47 – Implementing a timed wait on an IOU result
#include <rw/thread/RWTThreadIOUFunction.h>
#include <rw/itc/RWTIOUResult.h>
#include <rw/itc/RWTIOUTrap.h>
#include <iostream>
 
using namespace std;
 
int func()
{
int result = 0;
// Wait for a while...
rwSleep(3000);
return result;
}
 
void main()
{
RWTIOUTrap<int> trap;
RWTThreadIOUFunction<int> thread;
thread = RWTThreadIOUFunction<int>::make(func);
thread.start();
trap.setTrap(thread.result());
RWTIOUResult<int> iou;
// Wait 1 second, complain and repeat...
while(RW_THR_COMPLETED != trap.getNext(iou,1000)) {
cout << "Timed out! Retrying..." << endl;
}
cout << "IOU closed — Operation Complete!" << endl;
}
To see another example of RWTIOUTrap usage, refer to “Waiting for IOUs.”
Improving the Asynchronous Example
To wait for the next redeemable IOU, use the RWTIOUTrap class. It uses an internal producer-consumer queue to capture RWTIOUResult instances as they become redeemable. To trap an IOU result, the code must pass the IOU handle to the RWTIOUTrap instance in a call to the trap’s setTrap() function. This approach is an improvement on the way asynchronous operations were handled in Example 46.
Once an IOU has been registered with a trap, a handle to the IOU is automatically enqueued inside the trap when the IOU is closed. An IOU can be registered with any number of traps—a separate handle is enqueued in each trap instance at IOU closure.
If the IOU has already been closed and is redeemable when the IOU is added to a trap, a redeemable IOU is enqueued immediately during the setTrap() call.
A thread waits for trapped IOUs to become redeemable by calling the trap’s getNext() function. This function returns the next IOU in its internal queue and waits for an IOU to arrive if the queue is empty.
See “Trapping IOUs with RWTIOUTrap” for additional information regarding this class.
Another Asynchronous Example
Example 48 provides another alternative for processing the results of asynchronous operations. In this example, the RWRunnableServer is replaced by an RWServerPool, an identification value is added for each operation invocation, and the RWTIOUTrap processes the results of the operations.
Example 48 – Using an IOU trap to handle results from asynchronous operations
#include <rw/thread/RWRunnableSelf.h>
#include <rw/thread/RWServerPool.h>
#include <rw/thread/RWTRunnableIOUFunction.h>
#include <rw/itc/RWTIOUResult.h>
#include <rw/itc/RWTIOUTrap.h>
#include <iostream>
 
using namespace std;
 
class ServiceProvider; // Forward reference for inlines!
 
class ServiceProvider {
private:
RWServerPool server_;
protected:
int syncService(int opId, int delay) // 1
{
// Do something that takes varying amounts of time
rwSleep(delay);
return opId;
}
public:
ServiceProvider(size_t concurrencyLevel) { // 2
server_ = RWServerPool::make(concurrencyLevel);
server_.start();
}
~ServiceProvider() { // 3
server_.stop();
server_.join();
}
RWTIOUResult<int> asyncService(int opId, int delay) { // 4
RWTRunnableIOUFunction<int> runnable;
runnable = RWTRunnableIOUFunction<int>::make(
rwBind(&ServiceProvider::syncService, this, opId, delay));
server_.enqueue(runnable);
return runnable.result();
}
};
 
void
main()
{
const size_t concurrency = 5;
const size_t operations = 10;
size_t i;
 
ServiceProvider provider(concurrency); // 5
RWTIOUTrap<int> trap; // 6
for (i=0; i<operations; i++)
trap.setTrap(provider.asyncService(i,rand()%1000)); // 7
 
for (i=0; i<operations; i++) {
cout << "Operation ";
cout << trap.getNext(); // 8
cout << " complete!" << endl;
}
}
 
//1 Defines the actual service to be performed.
//2 Define sa constructor that creates and starts the internal server pool. The constructor argument specifies the number of runnable server threads to create within the pool.
//3 Defines a destructor that stops the internal server and waits for it to exit.
//4 Defines a member function to initiate an asynchronous operation by constructing a synchronous runnable and passing it to the internal server. The IOU representing the result of the operation is returned to the caller.
//5 Constructs an instance of the service provider.
//6 Creates an IOU trap instance.
//7 Starts one of several asynchronous service requests using the loop index to identify each request. Adds the IOU result of each request to the trap.
//8 Redeems the next available IOU result provided by the trap, blocking (if necessary) until the trap captures another result.