Threads Module User's Guide : PART II Concurrency Packages : Chapter 5 The Interthread Communication Package : Using Interthread Communication
Using Interthread Communication
This section uses a series of simple examples to introduce and explain the concepts and classes involved in interthread communication.
You can find more examples in buildspace\examples\itc. The following list gives the names of those examples and the classes that they exercise:
activobj: RWTIOUResult, RWTIOUTrap, RWTPCValQueue
balance: RWTPCValQueue
iouescro: RWTIOUResult, RWTIOUEscrow
ioureslt: RWTIOUResult
ioutrap: RWTIOUTrap, RWTIOUResult
prodcons: RWTPCValQueue
servpool: RWTIOUTrap, RWTIOUResult
Using Producer-Consumer Queues
The concept of producer-consumer synchronization was introduced in “Using Condition Variables.” This form of synchronization can be used to coordinate communication between two or more threads. By combining producer-consumer synchronization semantics with a simple templatized queue, you can link threads together into a network of cooperating processing elements.
For more information on producer-consumer synchronization, see “The Producer‑Consumer Classes.”
The RWTPCValQueue Family of Classes
The RWTPCValQueue class is one of a family of producer-consumer classes that includes thread-safe, synchronized, and buffered exchange of data. Each of the classes uses:
Read operations that block the calling thread if the underlying buffer is empty.
Write operations that block if the underlying buffer is full.
The maximum capacity of the buffer can be specified at the time of construction. The default is a buffer whose capacity is limited only by memory.
Example
Example 43 simulates a manufacturing operation where an RWTPCValQueue is used to represent a warehouse of limited capacity that accepts widgets from a production function and then sends these widgets to a shipping function. The production of widgets occurs at a fixed rate, while the shipping rate of widgets varies from widget to widget. This difference in read and write rates eventually results in one function being forced to wait on the other.
Example 43 – Using a producer-consumer queue in a manufacturing operation
#include <stdlib.h> // For rand() and srand()
#include <rw/thread/RWThreadFunction.h>
#include <rw/itc/RWTPCValQueue.h>
#include <rw/functor/rwBind.h>
#include <rw/sync/RWMutexLock.h>
#include <rw/cstring.h>
#include <iostream>
 
using namespace std;
 
typedef int Widget;
 
void message(const RWCString& message) { // 1
static RWMutexLock mutex; // Make output operation atomic!
RWMutexLock::LockGuard lock(mutex);
cout << message << endl;
}
 
void produce(RWTPCValQueue<Widget>& warehouse, size_t runSize) {
for (size_t i=0; i<runSize; i++) {
if (!warehouse.canWrite()) // 2
message("Warehouse full!");
// Print bar graph of inventory size
message(RWCString('*',warehouse.entries()));
warehouse.write(Widget(i)); // 3
rwSleep(100); // Fixed time to produce widgets
}
warehouse.write(Widget(-1)); // Tell shipping the run is done!
message("Production done!");
}
 
void ship(RWTPCValQueue<Widget>& warehouse) {
// Seed random number sequence using thread id
::srand(rwThreadHash(rwThreadId()));
Widget widget;
do {
rwSleep(::rand()%200); // Random time for each shipment
if (!warehouse.canRead()) // 4
message("Warehouse empty!");
widget = warehouse.read(); // 5
} while (widget != -1);
message("Shipping done!");
}
 
void main() {
// The warehouse can hold only five widgets!
RWTPCValQueue<Widget> warehouse(5); // 6
RWThreadFunction production;
production = RWThreadFunction::make(rwBind(produce, rwRef(warehouse), 100));
RWThreadFunction shipping;
shipping = RWThreadFunction::make(rwBind(ship, rwRef(warehouse)));
production.start();
rwSleep(5*100); // Let production fill the warehouse!
shipping.start();
production.join();
shipping.join();
}
//1 A simple routine for synchronizing message output.
//2 Tests to see if the warehouse queue is currently full, so that an alert can be sent before being blocked inside the write function.
//3 Puts a widget into the warehouse, and waits if it is full.
//4 Tests to see if the warehouse queue is currently empty, so that an alert can be sent before being blocked inside the read function.
//5 Gets a widget from the warehouse, and waits if it is empty.
//6 Constructs a queue to represent the warehouse and limits the queue length to five entries.