Code owners
Assign users and groups as approvers for specific file changes. Learn more.
llstreamqueue.h 8.11 KiB
/**
* @file llstreamqueue.h
* @author Nat Goodspeed
* @date 2012-01-04
* @brief Definition of LLStreamQueue
*
* $LicenseInfo:firstyear=2012&license=viewerlgpl$
* Copyright (c) 2012, Linden Research, Inc.
* $/LicenseInfo$
*/
#if ! defined(LL_LLSTREAMQUEUE_H)
#define LL_LLSTREAMQUEUE_H
#include <string>
#include <list>
#include <iosfwd> // std::streamsize
#include <boost/iostreams/categories.hpp>
/**
* This class is a growable buffer between a producer and consumer. It serves
* as a queue usable with Boost.Iostreams -- hence, a "stream queue."
*
* This is especially useful for buffering nonblocking I/O. For instance, we
* want application logic to be able to serialize LLSD to a std::ostream. We
* may write more data than the destination pipe can handle all at once, but
* it's imperative NOT to block the application-level serialization call. So
* we buffer it instead. Successive frames can try nonblocking writes to the
* destination pipe until all buffered data has been sent.
*
* Similarly, we want application logic be able to deserialize LLSD from a
* std::istream. Again, we must not block that deserialize call waiting for
* more data to arrive from the input pipe! Instead we build up a buffer over
* a number of frames, using successive nonblocking reads, until we have
* "enough" data to be able to present it through a std::istream.
*
* @note The use cases for this class overlap somewhat with those for the
* LLIOPipe/LLPumpIO hierarchies, and indeed we considered using those. This
* class has two virtues over the older machinery:
*
* # It's vastly simpler -- way fewer concepts. It's not clear to me whether
* there were ever LLIOPipe/etc. use cases that demanded all the fanciness
* rolled in, or whether they were simply overdesigned. In any case, no
* remaining Lindens will admit to familiarity with those classes -- and
* they're sufficiently obtuse that it would take considerable learning
* curve to figure out how to use them properly. The bottom line is that
* current management is not keen on any more engineers climbing that curve.
* # This class is designed around available components such as std::string,
* std::list, Boost.Iostreams. There's less proprietary code.
*/
template <typename Ch>
class LLGenericStreamQueue
{
public:
LLGenericStreamQueue():
mSize(0),
mClosed(false)
{}
/**
* Boost.Iostreams Source Device facade for use with other Boost.Iostreams
* functionality. LLGenericStreamQueue doesn't quite fit any of the Boost
* 1.48 Iostreams concepts; instead it behaves as both a Sink and a
* Source. This is its Source facade.
*/
struct Source
{
typedef Ch char_type;
typedef boost::iostreams::source_tag category;
/// Bind the underlying LLGenericStreamQueue
Source(LLGenericStreamQueue& sq):
mStreamQueue(sq)
{}
// Read up to n characters from the underlying data source into the
// buffer s, returning the number of characters read; return -1 to
// indicate EOF
std::streamsize read(Ch* s, std::streamsize n)
{
return mStreamQueue.read(s, n);
}
LLGenericStreamQueue& mStreamQueue;
};
/**
* Boost.Iostreams Sink Device facade for use with other Boost.Iostreams
* functionality. LLGenericStreamQueue doesn't quite fit any of the Boost
* 1.48 Iostreams concepts; instead it behaves as both a Sink and a
* Source. This is its Sink facade.
*/
struct Sink
{
typedef Ch char_type;
typedef boost::iostreams::sink_tag category;
/// Bind the underlying LLGenericStreamQueue
Sink(LLGenericStreamQueue& sq):
mStreamQueue(sq)
{}
/// Write up to n characters from the buffer s to the output sequence,
/// returning the number of characters written
std::streamsize write(const Ch* s, std::streamsize n)
{
return mStreamQueue.write(s, n);
}
/// Send EOF to consumer
void close()
{
mStreamQueue.close();
}
LLGenericStreamQueue& mStreamQueue;
};
/// Present Boost.Iostreams Source facade
Source asSource() { return Source(*this); }
/// Present Boost.Iostreams Sink facade
Sink asSink() { return Sink(*this); }
/// append data to buffer
std::streamsize write(const Ch* s, std::streamsize n)
{
// Unclear how often we might be asked to write 0 bytes -- perhaps a
// naive caller responding to an unready nonblocking read. But if we
// do get such a call, don't add a completely empty BufferList entry.
if (n == 0)
return n;
// We could implement this using a single std::string object, a la
// ostringstream. But the trouble with appending to a string is that
// you might have to recopy all previous contents to grow its size. If
// we want this to scale to large data volumes, better to allocate
// individual pieces.
mBuffer.push_back(string(s, n));
mSize += n;
return n;
}
/**
* Inform this LLGenericStreamQueue that no further data are forthcoming.
* For our purposes, close() is strictly a producer-side operation;
* there's little point in closing the consumer side.
*/
void close()
{
mClosed = true;
}
/// consume data from buffer
std::streamsize read(Ch* s, std::streamsize n)
{
// read() is actually a convenience method for peek() followed by
// skip().
std::streamsize got(peek(s, n));
// We can only skip() as many characters as we can peek(); ignore
// skip() return here.
skip(n);
return got;
}
/// Retrieve data from buffer without consuming. Like read(), return -1 on
/// EOF.
std::streamsize peek(Ch* s, std::streamsize n) const;
/// Consume data from buffer without retrieving. Unlike read() and peek(),
/// at EOF we simply skip 0 characters.
std::streamsize skip(std::streamsize n);
/// How many characters do we currently have buffered?
std::streamsize size() const
{
return mSize;
}
private:
typedef std::basic_string<Ch> string;
typedef std::list<string> BufferList;
BufferList mBuffer;
std::streamsize mSize;
bool mClosed;
};
template <typename Ch>
std::streamsize LLGenericStreamQueue<Ch>::peek(Ch* s, std::streamsize n) const
{
// Here we may have to build up 'n' characters from an arbitrary
// number of individual BufferList entries.
typename BufferList::const_iterator bli(mBuffer.begin()), blend(mBuffer.end());
// Indicate EOF if producer has closed the pipe AND we've exhausted
// all previously-buffered data.
if (mClosed && bli == blend)
{
return -1;
}
// Here either producer hasn't yet closed, or we haven't yet exhausted
// remaining data.
std::streamsize needed(n), got(0);
// Loop until either we run out of BufferList entries or we've
// completely satisfied the request.
for ( ; bli != blend && needed; ++bli)
{
std::streamsize chunk(std::min(needed, std::streamsize(bli->length())));
std::copy(bli->begin(), bli->begin() + chunk, s);
needed -= chunk;
s += chunk;
got += chunk;
}
return got;
}
template <typename Ch>
std::streamsize LLGenericStreamQueue<Ch>::skip(std::streamsize n)
{
typename BufferList::iterator bli(mBuffer.begin()), blend(mBuffer.end());
std::streamsize toskip(n), skipped(0);
while (bli != blend && toskip >= bli->length())
{
std::streamsize chunk(bli->length());
typename BufferList::iterator zap(bli++);
mBuffer.erase(zap);
mSize -= chunk;
toskip -= chunk;
skipped += chunk;
}
if (bli != blend && toskip)
{
bli->erase(bli->begin(), bli->begin() + toskip);
mSize -= toskip;
skipped += toskip;
}
return skipped;
}
typedef LLGenericStreamQueue<char> LLStreamQueue;
typedef LLGenericStreamQueue<wchar_t> LLWStreamQueue;
#endif /* ! defined(LL_LLSTREAMQUEUE_H) */