Creating and Using Thread-safe Chains of Streaming Elements
A thread-safe chain of streaming elements is a composition of streaming elements in which one of the streaming elements is a synchronized filtered stream. Synchronized filtered streams ensure that each operation carried out on a stream is executed atomically in a multithreaded environment. Synchronized filtered streams can also be used in conjunction with guarded filtered streams to ensure that a group of operations are executed atomically in a multithreaded environment. For more information on the synchronized and guarded filtered streams, see
“Synchronized Streams” and
“Guarded Streams.” Creating a Synchronized Data Output Stream
The first example demonstrates how to create and use a thread-safe chain of streaming elements. It creates a synchronized data output stream (
RWSynchronizedDataOutputStreamImp) that connects to a
RWNativeDataToByteOutputStreamImp stream, which encodes C++ base types as a sequence of bytes using their native binary representation.
The
RWNativeDataToByteOutputStreamImp stream is in turn connected to a buffered binary output stream that sends the transformed bytes to a file. The class
RWBufferedByteOutputStreamImp is connected to class
RWByteToStreambufOutputStreamImp to provide the buffered byte output stream. This example requires the Streams package, as well as the Execution Tracing, Thread-compatible Exception, Synchronization, Smart Pointer, Threading, Functor, and Interthread Communication packages from the Threads Module.
Figure 15 is a representation of the chain of streaming elements used in this example.
The complete source for the examples in
“Creating and Managing Threads Using Active Objects” and
“Creating and Sharing a Synchronized Data Output Stream Among Several Active Objects” is located in
examples\stream\
dataFilteredWrite.cpp. Only part of the code is presented here.
Creating and Managing Threads Using Active Objects
The example uses active objects to carry out asynchronous operations on a unique thread-safe data stream. Active objects execute in their own thread. They start their execution upon construction and wait for their processing to terminate upon destruction, if necessary. This example uses two different kinds of active objects. The first one writes ten elements of one of the C++ base types to a data output stream, ensuring that the ten elements are written in one synchronized operation. The second type of active object writes twenty elements to a data output stream, ensuring that each individual operation on the data output stream is synchronized. The code for both active object classes is presented below.
template <class T>
class synchronizeABunch {
public:
synchronizeABunch(RWDataOutputStream& stream, T data) // 1
:data_(data)
,dataOutputStream_(stream)
{
thread_= rwMakeThreadFunctionM(synchronizeABunch<T>,
*this,void,&synchronizeABunch<T>::func); // 2
thread_.start();
}
~synchronizeABunch() {
thread_.join(); // 3
thread_.raise();
}
void func() {
RWDataOutputStream tmpStream =
RWGuardedDataOutputStreamImp::make(dataOutputStream_); // 4
for(int i=0; i<10; i++) {
tmpStream << data_; // 5
}
}
private:
synchronizeABunch(const synchronizeABunch<T>&);
synchronizeABunch<T>& operator=(const synchronizeABunch<T>&);
RWDataOutputStream dataOutputStream_;
T data_;
RWThreadFunction thread_;
};
The code for the second active object is similar to the code presented above with the exception of the func() function, which enforces synchronization at the operation level.
template <class T>
class synchronizeSingle {
public:
synchronizeSingle(RWDataOutputStream& stream, T data)
:data_(data)
,dataOutputStream_(stream)
{
thread_= rwMakeThreadFunctionM(synchronizeSingle<T>,
*this,void,&synchronizeSingle<T>::func);
thread_.start();
}
~synchronizeSingle() {
thread_.join();
thread_.raise();
}
void func() {
for(int i=0; i<20; i++) {
dataOutputStream_ << data_; // 1
rwYield();
}
}
private:
synchronizeSingle(const synchronizeSingle<T>&);
synchronizeSingle<T>& operator=(const synchronizeSingle<T>&);
RWDataOutputStream dataOutputStream_;
T data_;
RWThreadFunction thread_;
};
Creating and Sharing a Synchronized Data Output Stream Among Several Active Objects
The following code constructs the thread-safe chain of streaming elements, and then constructs several active objects that use the same chain of streaming elements as the sink of data.
filebuf fbuf; // 1
fbuf.open("dataFilteredWrite.dat", ios::out | ios::binary);
RWByteOutputStream binOutputStream =
RWByteToStreambufOutputStreamImp::make(fbuf); // 2
RWByteOutputStream bufferedBinOutputStream =
RWBufferedByteOutputStreamImp::make(binOutputStream,1024); // 3
RWDataOutputStream dataOutputStream =
RWNativeDataToByteOutputStreamImp::make(bufferedBinOutputStream);// 4
RWDataOutputStream syncDataOutputStream =
RWSynchronizedDataOutputStreamImp::make(dataOutputStream); // 5
try {
// first group of active objects is created
{ synchronizeSingle<double> s1(syncDataOutputStream,64.0); // 6
synchronizeABunch<double> sb1(syncDataOutputStream,3.14159),
sb2(syncDataOutputStream,1.05);
} // 7
// second group of active objects is created
{ synchronizeSingle<int> s1(syncDataOutputStream,67); // 6
synchronizeABunch<int> sb1(syncDataOutputStream,78),
sb2(syncDataOutputStream,99);
} // 7
}
catch(const RWIncompleteStreamOperation& e) { // 8
cout << e.why() << endl;
cout << e.elementsProcessed() << endl;
}
catch(const RWExternalStreamException& e) {
cout << e.why() << endl;
}
Active Objects and Data Input Streams
The example in this section implements the previous example’s corresponding input operation. It creates a synchronized data input stream
RWSynchronizedDataInputStreamImp that connects to an
RWNativeDataFromByteInputStreamImp stream, which restores C++ base types from a sequences of bytes. The
RWNativeDataFromByteInputStreamImp stream is then connected to a buffered binary input stream that reads bytes from the file created in the previous example. The class
RWBufferedByteInputStreamImp is connected to class
RWByteFromStreambufInputStreamImp to provide the buffered byte input stream.
This example requires the Streams package, as well as the Execution Tracing, Thread-compatible Exception, Synchronization, Smart Pointer, Threading, Functor, and Interthread Communication packages from the Threads Module.
Figure 16 is a representation of the chain of streaming elements used in this example.
As in the previous example, this example uses active objects to carry out asynchronous operations on a unique thread-safe data stream. Only one type of active object is used. Its purpose is to read ten elements of one of the C++ base types from a data input stream, ensuring that the ten elements are read in one synchronized operation.
The complete example is located in directory ...\examples\stream in the file dataFilteredRead.cpp. The code for the active object class is presented below:
template <class T>
class readABunch {
public:
readABunch(RWDataInputStream& stream) // 1
:dataInputStream_(stream)
{
thread_= rwMakeThreadFunctionM(readABunch<T>,*this,
void,&readABunch<T>::func); // 2
thread_.start();
}
~readABunch() {
thread_.join(); // 3
thread_.raise();
}
void func() {
RWDataInputStream tmpStream =
RWGuardedDataInputStreamImp::make(dataInputStream_); // 4
for(int i=0; i<10; i++) {
tmpStream >> data_; // 5
cout << data_ << ' ';
rwYield();
}
}
private:
readABunch(const readABunch<T>&);
readABunch<T>& operator=(const readABunch<T>&);
RWDataInputStream dataInputStream_;
T data_;
RWThreadFunction thread_;
};
Creating and Sharing a Synchronized Data Input Stream Among Several Active Objects
The following code constructs the thread-safe chain of streaming elements, and then constructs several active objects that use the same chain of streaming elements as the source of data.
filebuf fbuf; // 1
fbuf.open("dataFilteredWrite.dat", ios::in | ios::binary);
RWByteInputStream binInputStream =
RWByteFromStreambufInputStreamImp::make(fbuf); // 2
RWByteInputStream bufferedBinInputStream =
RWBufferedByteInputStreamImp::make(binInputStream,1024); // 3
RWDataInputStream dataInputStream =
RWNativeDataFromByteInputStreamImp::make(bufferedBinInputStream);// 4
RWDataInputStream syncDataInputStream =
RWSynchronizedDataInputStreamImp::make(dataInputStream); // 5
try {
{
readABunch<double> sb1(syncDataInputStream),
sb2(syncDataInputStream), // 6
sb3(syncDataInputStream),
sb4(syncDataInputStream);
} // 7
{
readABunch<int> sb5(syncDataInputStream),
sb6(syncDataInputStream), // 6
sb7(syncDataInputStream),
sb8(syncDataInputStream);
} // 7
}
catch(const RWIncompleteStreamOperation& e) { // 8
cout << e.why() << endl;
cout << e.elementsProcessed() << endl;
}
catch(const RWExternalStreamException& e) {
cout << e.why() << endl;
}