Advanced Tools Module User’s Guide : PART II Advanced Tools Module Packages : Chapter 4 Using Streams : Examples : Creating and Using Thread-safe Chains of Streaming Elements
Creating and Using Thread-safe Chains of Streaming Elements
A thread-safe chain of streaming elements is a composition of streaming elements in which one of the streaming elements is a synchronized filtered stream. Synchronized filtered streams ensure that each operation carried out on a stream is executed atomically in a multithreaded environment. Synchronized filtered streams can also be used in conjunction with guarded filtered streams to ensure that a group of operations are executed atomically in a multithreaded environment. For more information on the synchronized and guarded filtered streams, see “Synchronized Streams” and “Guarded Streams.”
Creating a Synchronized Data Output Stream
The first example demonstrates how to create and use a thread-safe chain of streaming elements. It creates a synchronized data output stream (RWSynchronizedDataOutputStreamImp) that connects to a RWNativeDataToByteOutputStreamImp stream, which encodes C++ base types as a sequence of bytes using their native binary representation.
The RWNativeDataToByteOutputStreamImp stream is in turn connected to a buffered binary output stream that sends the transformed bytes to a file. The class RWBufferedByteOutputStreamImp is connected to class RWByteToStreambufOutputStreamImp to provide the buffered byte output stream. This example requires the Streams package, as well as the Execution Tracing, Thread-compatible Exception, Synchronization, Smart Pointer, Threading, Functor, and Interthread Communication packages from the Threads Module.
Figure 15 is a representation of the chain of streaming elements used in this example.
Figure 15 – Thread-safe chain of streaming elements
The complete source for the examples in “Creating and Managing Threads Using Active Objects” and “Creating and Sharing a Synchronized Data Output Stream Among Several Active Objects” is located in examples\stream\dataFilteredWrite.cpp. Only part of the code is presented here.
Creating and Managing Threads Using Active Objects
The example uses active objects to carry out asynchronous operations on a unique thread-safe data stream. Active objects execute in their own thread. They start their execution upon construction and wait for their processing to terminate upon destruction, if necessary. This example uses two different kinds of active objects. The first one writes ten elements of one of the C++ base types to a data output stream, ensuring that the ten elements are written in one synchronized operation. The second type of active object writes twenty elements to a data output stream, ensuring that each individual operation on the data output stream is synchronized. The code for both active object classes is presented below.
 
template <class T>
class synchronizeABunch {
 
public:
 
synchronizeABunch(RWDataOutputStream& stream, T data) // 1
:data_(data)
,dataOutputStream_(stream)
{
thread_= rwMakeThreadFunctionM(synchronizeABunch<T>,
*this,void,&synchronizeABunch<T>::func); // 2
thread_.start();
}
 
~synchronizeABunch() {
thread_.join(); // 3
thread_.raise();
}
 
void func() {
RWDataOutputStream tmpStream =
RWGuardedDataOutputStreamImp::make(dataOutputStream_); // 4
for(int i=0; i<10; i++) {
tmpStream << data_; // 5
}
}
 
private:
 
synchronizeABunch(const synchronizeABunch<T>&);
synchronizeABunch<T>& operator=(const synchronizeABunch<T>&);
 
RWDataOutputStream dataOutputStream_;
T data_;
RWThreadFunction thread_;
};
//1 The active object’s constructor takes a handle to the data output stream that is used internally as the sink of data. The handle points to the thread-safe chain of streaming elements. If the first element of the chain of streaming elements pointed to by the data output stream handle is not of type RWSynchronizedDataOutputStreamImp, then the active object does not enforce proper thread synchronization. The second parameter is the C++ base type’s value that is inserted ten times into the data output stream.
//2 Creates and starts a thread that executes the member function func().
//3 Upon destruction the active object waits for its thread to terminate execution and re-throws any exception raised while executing.
//4 Creates a temporary guarded output stream. All the operations carried out on the guarded output stream until its destruction are synchronized. The guarded output stream uses the internal lock provided by the synchronized data output stream class to which it is attached to enforce synchronization. The guarded output stream acquires the synchronized data output stream lock in its constructor and releases it in its destructor. Once the guarded output stream is constructed, any other threads accessing the guarded output stream’s synchronized data output stream are blocked.
//5 Inserts the C++ base type’s value provided at construction time into the temporary guarded data output stream.
The code for the second active object is similar to the code presented above with the exception of the func() function, which enforces synchronization at the operation level.
 
template <class T>
class synchronizeSingle {
 
public:
 
synchronizeSingle(RWDataOutputStream& stream, T data)
:data_(data)
,dataOutputStream_(stream)
{
thread_= rwMakeThreadFunctionM(synchronizeSingle<T>,
*this,void,&synchronizeSingle<T>::func);
thread_.start();
}
 
~synchronizeSingle() {
thread_.join();
thread_.raise();
}
 
void func() {
for(int i=0; i<20; i++) {
dataOutputStream_ << data_; // 1
rwYield();
}
}
 
private:
 
synchronizeSingle(const synchronizeSingle<T>&);
synchronizeSingle<T>& operator=(const synchronizeSingle<T>&);
 
RWDataOutputStream dataOutputStream_;
T data_;
RWThreadFunction thread_;
};
//1 Inserts the C++ base type’s value provided at construction time into the data output stream. After each insertion, the active object’s thread yields execution to any other thread candidate for execution. Therefore, only individual insertions are guaranteed to be synchronized because a synchronized data output stream is used as the sink of data.
Creating and Sharing a Synchronized Data Output Stream Among Several Active Objects
The following code constructs the thread-safe chain of streaming elements, and then constructs several active objects that use the same chain of streaming elements as the sink of data.
 
filebuf fbuf; // 1
fbuf.open("dataFilteredWrite.dat", ios::out | ios::binary);
 
RWByteOutputStream binOutputStream =
RWByteToStreambufOutputStreamImp::make(fbuf); // 2
 
 
RWByteOutputStream bufferedBinOutputStream =
RWBufferedByteOutputStreamImp::make(binOutputStream,1024); // 3
 
RWDataOutputStream dataOutputStream =
RWNativeDataToByteOutputStreamImp::make(bufferedBinOutputStream);// 4
 
RWDataOutputStream syncDataOutputStream =
RWSynchronizedDataOutputStreamImp::make(dataOutputStream); // 5
 
try {
// first group of active objects is created
{ synchronizeSingle<double> s1(syncDataOutputStream,64.0); // 6
synchronizeABunch<double> sb1(syncDataOutputStream,3.14159),
sb2(syncDataOutputStream,1.05);
} // 7
// second group of active objects is created
{ synchronizeSingle<int> s1(syncDataOutputStream,67); // 6
synchronizeABunch<int> sb1(syncDataOutputStream,78),
sb2(syncDataOutputStream,99);
} // 7
}
catch(const RWIncompleteStreamOperation& e) { // 8
cout << e.why() << endl;
cout << e.elementsProcessed() << endl;
}
catch(const RWExternalStreamException& e) {
cout << e.why() << endl;
}
//1 Creates an iostreams file buffer, and opens it in output mode. If the code is compiled on a PC platform, the file buffer needs to be opened in binary mode by specifying the flag ios::binary. If you built the Advanced Tools Module with the Standard iostreams library, you need to qualify filebuf and other iostreams elements with std::, or you need to include using declarations. The complete example makes use of macros defined by the Essential Tools Module in order to support both the classic and Standard iostreams.
//2 Creates an instance of class RWByteToStreambufOutputStreamImp. The class is created by calling its static member function make(), which creates an instance of self and returns it as a binary output stream handle. In this example, the make() function takes a reference to an iostreams streambuf object that is used as the sink of bytes.
//3 Creates an instance of class RWBufferedByteOutputStreamImp. The class is created by calling one of its static member functions make(), which creates an instance of self and returns it as a binary output stream handle. Several static make() functions are available to construct an instance of class RWBufferedByteOutputStreamImp. All take a handle to the next streaming element, which must be of type RWByteOutputStream. The static make() function used in this example takes a second parameter that specifies the size of the buffer to be allocated.
//4 Creates an instance of class RWNativeDataToByteOutputStreamImp. The class is created by calling its static member function make(), which creates an instance of self and returns it as a data output stream handle. The make() function takes a handle to the next streaming element, which must be of type RWByteOutputStream. It is used as the sink for the bytes generated when converting C++ base types to their binary representation.
//5 Creates an instance of class RWSynchronizedDataOutputStreamImp. The class is created by calling its static member function make(), which creates an instance of self and returns it as a data output stream handle. The make() function takes a handle to the next streaming element, which must be of type RWDataOutputStream. Each operation executed on this stream is guaranteed to be synchronized in a multithreaded environment.
//6 Creates the active objects. They are passed the same synchronized data output stream at construction time. First, an active object writes the same element twenty times. Then, two active objects write the same element ten times in a single synchronized operation. The twenty elements written by the first active object are not necessarily written consecutively into the stream, but each element is written atomically. The elements of the other two active objects are written consecutively into the stream. For example, one element of the first active object might be written into the stream, then the ten elements of the second active object, then the second element of the first active object, then the ten elements of the third active object, and finally the eighteen remaining elements of the first active object.
//7 Each active object waits for its thread to finish execution before being destroyed.
//8 Catches exceptions potentially thrown by the stream. The Streams package defines two exception classes: RWExternalStreamException and RWIncompleteStreamOperation. Class RWExternalStreamException returns an error message and an error code. Class RWIncompleteStreamOperation inherits from class RWExternalStreamException and is thrown when an operation partially succeeds. For more information on exceptions, see “Error Handling.”
Active Objects and Data Input Streams
The example in this section implements the previous example’s corresponding input operation. It creates a synchronized data input stream RWSynchronizedDataInputStreamImp that connects to an RWNativeDataFromByteInputStreamImp stream, which restores C++ base types from a sequences of bytes. The RWNativeDataFromByteInputStreamImp stream is then connected to a buffered binary input stream that reads bytes from the file created in the previous example. The class RWBufferedByteInputStreamImp is connected to class RWByteFromStreambufInputStreamImp to provide the buffered byte input stream.
This example requires the Streams package, as well as the Execution Tracing, Thread-compatible Exception, Synchronization, Smart Pointer, Threading, Functor, and Interthread Communication packages from the Threads Module.
Figure 16 is a representation of the chain of streaming elements used in this example.
Figure 16 – Streaming with active objects
As in the previous example, this example uses active objects to carry out asynchronous operations on a unique thread-safe data stream. Only one type of active object is used. Its purpose is to read ten elements of one of the C++ base types from a data input stream, ensuring that the ten elements are read in one synchronized operation.
The complete example is located in directory ...\examples\stream in the file dataFilteredRead.cpp. The code for the active object class is presented below:
 
template <class T>
class readABunch {
 
public:
 
readABunch(RWDataInputStream& stream) // 1
:dataInputStream_(stream)
{
thread_= rwMakeThreadFunctionM(readABunch<T>,*this,
void,&readABunch<T>::func); // 2
thread_.start();
}
 
~readABunch() {
thread_.join(); // 3
thread_.raise();
}
 
void func() {
RWDataInputStream tmpStream =
RWGuardedDataInputStreamImp::make(dataInputStream_); // 4
for(int i=0; i<10; i++) {
tmpStream >> data_; // 5
cout << data_ << ' ';
rwYield();
}
}
 
private:
 
readABunch(const readABunch<T>&);
readABunch<T>& operator=(const readABunch<T>&);
 
RWDataInputStream dataInputStream_;
T data_;
RWThreadFunction thread_;
};
//1 The active object’s constructor takes a handle to the data input stream that is used internally as the source of data. The handle points to the thread-safe chain of streaming elements. If the first element of the chain of streaming elements pointed to by the data input stream handle is not of type RWSynchronizedDataInputStreamImp, then the active object does not enforce proper thread synchronization.
//2 Creates and starts a thread that executes the member function func().
//3 Upon destruction the active object waits for its thread to terminate execution and re-throws any exception raised while executing.
//4 Creates a temporary guarded input stream. All operations carried out on the guarded input stream are synchronized until its destruction. The guarded input stream uses the internal lock provided by the synchronized data input stream class to which it is attached to enforce synchronization. The guarded input stream acquires the synchronized data input stream lock in its constructor and releases it in its destructor. Once the guarded input stream is constructed, any other threads accessing the guarded input stream’s synchronized data input stream are blocked.
//5 Extracts the C++ base type from the temporary guarded data output stream and sends it to the standard output.
Creating and Sharing a Synchronized Data Input Stream Among Several Active Objects
The following code constructs the thread-safe chain of streaming elements, and then constructs several active objects that use the same chain of streaming elements as the source of data.
 
filebuf fbuf; // 1
fbuf.open("dataFilteredWrite.dat", ios::in | ios::binary);
RWByteInputStream binInputStream =
RWByteFromStreambufInputStreamImp::make(fbuf); // 2
 
RWByteInputStream bufferedBinInputStream =
RWBufferedByteInputStreamImp::make(binInputStream,1024); // 3
 
RWDataInputStream dataInputStream =
RWNativeDataFromByteInputStreamImp::make(bufferedBinInputStream);// 4
 
RWDataInputStream syncDataInputStream =
RWSynchronizedDataInputStreamImp::make(dataInputStream); // 5
 
try {
{
readABunch<double> sb1(syncDataInputStream),
sb2(syncDataInputStream), // 6
sb3(syncDataInputStream),
sb4(syncDataInputStream);
} // 7
 
{
readABunch<int> sb5(syncDataInputStream),
sb6(syncDataInputStream), // 6
sb7(syncDataInputStream),
sb8(syncDataInputStream);
} // 7
}
catch(const RWIncompleteStreamOperation& e) { // 8
cout << e.why() << endl;
cout << e.elementsProcessed() << endl;
}
catch(const RWExternalStreamException& e) {
cout << e.why() << endl;
}
//1 Creates an iostreams file buffer, and opens it in input mode. If the code is compiled on a PC platform, the file buffer needs to be opened in binary mode by specifying the flag ios::binary. If you built the Advanced Tools Module with the Standard iostreams library, you need to qualify filebuf and other iostreams elements with std::, or you need to include using declarations. The complete example code makes use of macros defined by the Essential Tools Module in order to support both the classic and Standard iostreams.
//2 Creates an instance of class RWByteFromStreambufInputStreamImp. The class is created by calling its static member function make(), which creates an instance of self and returns it as a binary input stream handle. In this example, the make() function takes a reference to an iostreams streambuf object that is used as the source of bytes.
//3 Creates an instance of class RWBufferedByteInputStreamImp. The class is created by calling one of its static member functions make(), which creates an instance of self and returns it as a binary input stream handle. Several static make() functions are available to construct an instance of class RWBufferedByteInputStreamImp. All of them take a handle to the next streaming element, which must be of type RWByteInputStream. The static make() function used in this example takes a second parameter that specifies the size of the buffer to be allocated.
//4 Creates an instance of class RWNativeDataFromByteInputStreamImp. The class is created by calling its static member function make(), which creates an instance of self and returns it as a data input stream handle. The make() function takes a handle to the next streaming element, which must be of type RWByteInputStream. It is used as the source of bytes.
//5 Creates an instance of class RWSynchronizedDataInputStreamImp. The class is created by calling its static member function make(), which creates an instance of self and returns it as a data input stream handle. The make() function takes a handle to the next streaming element, which must be of type RWDataInputStream. Each operation executed on this stream is guaranteed to be synchronized in a multithreaded environment.
//6 Creates four active objects. They are passed the same synchronized data input stream at construction time. Each active object reads ten consecutive elements from the stream, and sends them to the standard output.
//7 Each active object waits for its thread to finish execution before being destroyed.
//8 Catches exceptions potentially thrown by the stream. Class RWExternalStreamException is the base class for all the exceptions thrown by the Streams package. It returns an error message and an error code. For more information on exceptions, see “Error Handling.”