From 61b5f3143e4ea53c9f64e5a1a5ad19f2edf3e776 Mon Sep 17 00:00:00 2001
From: Nat Goodspeed <nat@lindenlab.com>
Date: Thu, 5 Jan 2012 15:43:23 -0500
Subject: [PATCH] Introduce LLStreamQueue to buffer nonblocking I/O. Add unit
 tests to verify basic functionality.

---
 indra/llcommon/CMakeLists.txt               |   3 +
 indra/llcommon/llstreamqueue.cpp            |  24 ++
 indra/llcommon/llstreamqueue.h              | 229 ++++++++++++++++++++
 indra/llcommon/tests/llstreamqueue_test.cpp | 177 +++++++++++++++
 4 files changed, 433 insertions(+)
 create mode 100644 indra/llcommon/llstreamqueue.cpp
 create mode 100644 indra/llcommon/llstreamqueue.h
 create mode 100644 indra/llcommon/tests/llstreamqueue_test.cpp

diff --git a/indra/llcommon/CMakeLists.txt b/indra/llcommon/CMakeLists.txt
index c8e18275846..334f78cbffa 100644
--- a/indra/llcommon/CMakeLists.txt
+++ b/indra/llcommon/CMakeLists.txt
@@ -88,6 +88,7 @@ set(llcommon_SOURCE_FILES
     llsingleton.cpp
     llstat.cpp
     llstacktrace.cpp
+    llstreamqueue.cpp
     llstreamtools.cpp
     llstring.cpp
     llstringtable.cpp
@@ -221,6 +222,7 @@ set(llcommon_HEADER_FILES
     llstat.h
     llstatenums.h
     llstl.h
+    llstreamqueue.h
     llstreamtools.h
     llstrider.h
     llstring.h
@@ -327,6 +329,7 @@ if (LL_TESTS)
   LL_ADD_INTEGRATION_TEST(stringize "" "${test_libs}")
   LL_ADD_INTEGRATION_TEST(lleventdispatcher "" "${test_libs}")
   LL_ADD_INTEGRATION_TEST(llprocesslauncher "" "${test_libs}")
+  LL_ADD_INTEGRATION_TEST(llstreamqueue "" "${test_libs}")
 
   # *TODO - reenable these once tcmalloc libs no longer break the build.
   #ADD_BUILD_TEST(llallocator llcommon)
diff --git a/indra/llcommon/llstreamqueue.cpp b/indra/llcommon/llstreamqueue.cpp
new file mode 100644
index 00000000000..1116a2b6a2c
--- /dev/null
+++ b/indra/llcommon/llstreamqueue.cpp
@@ -0,0 +1,24 @@
+/**
+ * @file   llstreamqueue.cpp
+ * @author Nat Goodspeed
+ * @date   2012-01-05
+ * @brief  Implementation for llstreamqueue.
+ * 
+ * $LicenseInfo:firstyear=2012&license=viewerlgpl$
+ * Copyright (c) 2012, Linden Research, Inc.
+ * $/LicenseInfo$
+ */
+
+// Precompiled header
+#include "linden_common.h"
+// associated header
+#include "llstreamqueue.h"
+// STL headers
+// std headers
+// external library headers
+// other Linden headers
+
+// As of this writing, llstreamqueue.h is entirely template-based, therefore
+// we don't strictly need a corresponding .cpp file. However, our CMake test
+// macro assumes one. Here it is.
+bool llstreamqueue_cpp_ignored = true;
diff --git a/indra/llcommon/llstreamqueue.h b/indra/llcommon/llstreamqueue.h
new file mode 100644
index 00000000000..2fbc2067d2c
--- /dev/null
+++ b/indra/llcommon/llstreamqueue.h
@@ -0,0 +1,229 @@
+/**
+ * @file   llstreamqueue.h
+ * @author Nat Goodspeed
+ * @date   2012-01-04
+ * @brief  Definition of LLStreamQueue
+ * 
+ * $LicenseInfo:firstyear=2012&license=viewerlgpl$
+ * Copyright (c) 2012, Linden Research, Inc.
+ * $/LicenseInfo$
+ */
+
+#if ! defined(LL_LLSTREAMQUEUE_H)
+#define LL_LLSTREAMQUEUE_H
+
+#include <string>
+#include <list>
+#include <iosfwd>                   // std::streamsize
+#include <boost/iostreams/categories.hpp>
+
+/**
+ * This class is a growable buffer between a producer and consumer. It serves
+ * as a queue usable with Boost.Iostreams -- hence, a "stream queue."
+ *
+ * This is especially useful for buffering nonblocking I/O. For instance, we
+ * want application logic to be able to serialize LLSD to a std::ostream. We
+ * may write more data than the destination pipe can handle all at once, but
+ * it's imperative NOT to block the application-level serialization call. So
+ * we buffer it instead. Successive frames can try nonblocking writes to the
+ * destination pipe until all buffered data has been sent.
+ *
+ * Similarly, we want application logic be able to deserialize LLSD from a
+ * std::istream. Again, we must not block that deserialize call waiting for
+ * more data to arrive from the input pipe! Instead we build up a buffer over
+ * a number of frames, using successive nonblocking reads, until we have
+ * "enough" data to be able to present it through a std::istream.
+ *
+ * @note The use cases for this class overlap somewhat with those for the
+ * LLIOPipe/LLPumpIO hierarchies, and indeed we considered using those. This
+ * class has two virtues over the older machinery:
+ *
+ * # It's vastly simpler -- way fewer concepts. It's not clear to me whether
+ *   there were ever LLIOPipe/etc. use cases that demanded all the fanciness
+ *   rolled in, or whether they were simply overdesigned. In any case, no
+ *   remaining Lindens will admit to familiarity with those classes -- and
+ *   they're sufficiently obtuse that it would take considerable learning
+ *   curve to figure out how to use them properly. The bottom line is that
+ *   current management is not keen on any more engineers climbing that curve.
+ * # This class is designed around available components such as std::string,
+ *   std::list, Boost.Iostreams. There's less proprietary code.
+ */
+template <typename Ch>
+class LLGenericStreamQueue
+{
+public:
+    LLGenericStreamQueue():
+        mClosed(false)
+    {}
+
+    /**
+     * Boost.Iostreams Source Device facade for use with other Boost.Iostreams
+     * functionality. LLGenericStreamQueue doesn't quite fit any of the Boost
+     * 1.48 Iostreams concepts; instead it behaves as both a Sink and a
+     * Source. This is its Source facade.
+     */
+    struct Source
+    {
+        typedef Ch char_type;
+        typedef boost::iostreams::source_tag category;
+
+        /// Bind the underlying LLGenericStreamQueue
+        Source(LLGenericStreamQueue& sq):
+            mStreamQueue(sq)
+        {}
+
+        // Read up to n characters from the underlying data source into the
+        // buffer s, returning the number of characters read; return -1 to
+        // indicate EOF
+        std::streamsize read(Ch* s, std::streamsize n)
+        {
+            return mStreamQueue.read(s, n);
+        }
+
+        LLGenericStreamQueue& mStreamQueue;
+    };
+
+    /**
+     * Boost.Iostreams Sink Device facade for use with other Boost.Iostreams
+     * functionality. LLGenericStreamQueue doesn't quite fit any of the Boost
+     * 1.48 Iostreams concepts; instead it behaves as both a Sink and a
+     * Source. This is its Sink facade.
+     */
+    struct Sink
+    {
+        typedef Ch char_type;
+        typedef boost::iostreams::sink_tag category;
+
+        /// Bind the underlying LLGenericStreamQueue
+        Sink(LLGenericStreamQueue& sq):
+            mStreamQueue(sq)
+        {}
+
+        /// Write up to n characters from the buffer s to the output sequence,
+        /// returning the number of characters written
+        std::streamsize write(const Ch* s, std::streamsize n)
+        {
+            return mStreamQueue.write(s, n);
+        }
+
+        /// Send EOF to consumer
+        void close()
+        {
+            mStreamQueue.close();
+        }
+
+        LLGenericStreamQueue& mStreamQueue;
+    };
+
+    /// Present Boost.Iostreams Source facade
+    Source asSource() { return Source(*this); }
+    /// Present Boost.Iostreams Sink facade
+    Sink   asSink()   { return Sink(*this); }
+
+    /// append data to buffer
+    std::streamsize write(const Ch* s, std::streamsize n)
+    {
+        // Unclear how often we might be asked to write 0 bytes -- perhaps a
+        // naive caller responding to an unready nonblocking read. But if we
+        // do get such a call, don't add a completely empty BufferList entry.
+        if (n == 0)
+            return n;
+        // We could implement this using a single std::string object, a la
+        // ostringstream. But the trouble with appending to a string is that
+        // you might have to recopy all previous contents to grow its size. If
+        // we want this to scale to large data volumes, better to allocate
+        // individual pieces.
+        mBuffer.push_back(string(s, n));
+        return n;
+    }
+
+    /**
+     * Inform this LLGenericStreamQueue that no further data are forthcoming.
+     * For our purposes, close() is strictly a producer-side operation;
+     * there's little point in closing the consumer side.
+     */
+    void close()
+    {
+        mClosed = true;
+    }
+
+    /// consume data from buffer
+    std::streamsize read(Ch* s, std::streamsize n)
+    {
+        // read() is actually a convenience method for peek() followed by
+        // skip().
+        std::streamsize got(peek(s, n));
+        // We can only skip() as many characters as we can peek(); ignore
+        // skip() return here.
+        skip(n);
+        return got;
+    }
+
+    /// Retrieve data from buffer without consuming. Like read(), return -1 on
+    /// EOF.
+    std::streamsize peek(Ch* s, std::streamsize n) const;
+
+    /// Consume data from buffer without retrieving. Unlike read() and peek(),
+    /// at EOF we simply skip 0 characters.
+    std::streamsize skip(std::streamsize n);
+
+private:
+    typedef std::basic_string<Ch> string;
+    typedef std::list<string> BufferList;
+    BufferList mBuffer;
+    bool mClosed;
+};
+
+template <typename Ch>
+std::streamsize LLGenericStreamQueue<Ch>::peek(Ch* s, std::streamsize n) const
+{
+    // Here we may have to build up 'n' characters from an arbitrary
+    // number of individual BufferList entries.
+    typename BufferList::const_iterator bli(mBuffer.begin()), blend(mBuffer.end());
+    // Indicate EOF if producer has closed the pipe AND we've exhausted
+    // all previously-buffered data.
+    if (mClosed && bli == blend)
+    {
+        return -1;
+    }
+    // Here either producer hasn't yet closed, or we haven't yet exhausted
+    // remaining data.
+    std::streamsize needed(n), got(0);
+    // Loop until either we run out of BufferList entries or we've
+    // completely satisfied the request.
+    for ( ; bli != blend && needed; ++bli)
+    {
+        std::streamsize chunk(std::min(needed, std::streamsize(bli->length())));
+        std::copy(bli->begin(), bli->begin() + chunk, s);
+        needed -= chunk;
+        s      += chunk;
+        got    += chunk;
+    }
+    return got;
+}
+
+template <typename Ch>
+std::streamsize LLGenericStreamQueue<Ch>::skip(std::streamsize n)
+{
+    typename BufferList::iterator bli(mBuffer.begin()), blend(mBuffer.end());
+    std::streamsize toskip(n), skipped(0);
+    while (bli != blend && toskip >= bli->length())
+    {
+        std::streamsize chunk(bli->length());
+        typename BufferList::iterator zap(bli++);
+        mBuffer.erase(zap);
+        toskip  -= chunk;
+        skipped += chunk;
+    }
+    if (bli != blend && toskip)
+    {
+        bli->erase(bli->begin(), bli->begin() + toskip);
+        skipped += toskip;
+    }
+    return skipped;
+}
+
+typedef LLGenericStreamQueue<char>    LLStreamQueue;
+typedef LLGenericStreamQueue<wchar_t> LLWStreamQueue;
+
+#endif /* ! defined(LL_LLSTREAMQUEUE_H) */
diff --git a/indra/llcommon/tests/llstreamqueue_test.cpp b/indra/llcommon/tests/llstreamqueue_test.cpp
new file mode 100644
index 00000000000..e88c37d5bec
--- /dev/null
+++ b/indra/llcommon/tests/llstreamqueue_test.cpp
@@ -0,0 +1,177 @@
+/**
+ * @file   llstreamqueue_test.cpp
+ * @author Nat Goodspeed
+ * @date   2012-01-05
+ * @brief  Test for llstreamqueue.
+ * 
+ * $LicenseInfo:firstyear=2012&license=viewerlgpl$
+ * Copyright (c) 2012, Linden Research, Inc.
+ * $/LicenseInfo$
+ */
+
+// Precompiled header
+#include "linden_common.h"
+// associated header
+#include "llstreamqueue.h"
+// STL headers
+#include <vector>
+// std headers
+// external library headers
+#include <boost/foreach.hpp>
+// other Linden headers
+#include "../test/lltut.h"
+#include "stringize.h"
+
+/*****************************************************************************
+*   TUT
+*****************************************************************************/
+namespace tut
+{
+    struct llstreamqueue_data
+    {
+        llstreamqueue_data():
+            // we want a buffer with actual bytes in it, not an empty vector
+            buffer(10)
+        {}
+        // As LLStreamQueue is merely a typedef for
+        // LLGenericStreamQueue<char>, and no logic in LLGenericStreamQueue is
+        // specific to the <char> instantiation, we're comfortable for now
+        // testing only the narrow-char version.
+        LLStreamQueue strq;
+        // buffer for use in multiple tests
+        std::vector<char> buffer;
+    };
+    typedef test_group<llstreamqueue_data> llstreamqueue_group;
+    typedef llstreamqueue_group::object object;
+    llstreamqueue_group llstreamqueuegrp("llstreamqueue");
+
+    template<> template<>
+    void object::test<1>()
+    {
+        set_test_name("empty LLStreamQueue");
+        ensure_equals("brand-new LLStreamQueue isn't empty",
+                      strq.asSource().read(&buffer[0], buffer.size()), 0);
+        strq.asSink().close();
+        ensure_equals("closed empty LLStreamQueue not at EOF",
+                      strq.asSource().read(&buffer[0], buffer.size()), -1);
+    }
+
+    template<> template<>
+    void object::test<2>()
+    {
+        set_test_name("one internal block, one buffer");
+        LLStreamQueue::Sink sink(strq.asSink());
+        ensure_equals("write(\"\")", sink.write("", 0), 0);
+        ensure_equals("0 write should leave LLStreamQueue empty",
+                      strq.peek(&buffer[0], buffer.size()), 0);
+        // The meaning of "atomic" is that it must be smaller than our buffer.
+        std::string atomic("atomic");
+        ensure("test data exceeds buffer", atomic.length() < buffer.size());
+        ensure_equals(STRINGIZE("write(\"" << atomic << "\")"),
+                      sink.write(&atomic[0], atomic.length()), atomic.length());
+        size_t peeklen(strq.peek(&buffer[0], buffer.size()));
+        ensure_equals(STRINGIZE("peek(\"" << atomic << "\")"),
+                      peeklen, atomic.length());
+        ensure_equals(STRINGIZE("peek(\"" << atomic << "\") result"),
+                      std::string(buffer.begin(), buffer.begin() + peeklen), atomic);
+        // peek() should not consume. Use a different buffer to prove it isn't
+        // just leftover data from the first peek().
+        std::vector<char> again(buffer.size());
+        peeklen = size_t(strq.peek(&again[0], again.size()));
+        ensure_equals(STRINGIZE("peek(\"" << atomic << "\") again"),
+                      peeklen, atomic.length());
+        ensure_equals(STRINGIZE("peek(\"" << atomic << "\") again result"),
+                      std::string(again.begin(), again.begin() + peeklen), atomic);
+        // now consume.
+        std::vector<char> third(buffer.size());
+        size_t readlen(strq.read(&third[0], third.size()));
+        ensure_equals(STRINGIZE("read(\"" << atomic << "\")"),
+                      readlen, atomic.length());
+        ensure_equals(STRINGIZE("read(\"" << atomic << "\") result"),
+                      std::string(third.begin(), third.begin() + readlen), atomic);
+        ensure_equals("peek() after read()", strq.peek(&buffer[0], buffer.size()), 0);
+    }
+
+    template<> template<>
+    void object::test<3>()
+    {
+        set_test_name("basic skip()");
+        std::string lovecraft("lovecraft");
+        ensure("test data exceeds buffer", lovecraft.length() < buffer.size());
+        ensure_equals(STRINGIZE("write(\"" << lovecraft << "\")"),
+                      strq.write(&lovecraft[0], lovecraft.length()), lovecraft.length());
+        size_t peeklen(strq.peek(&buffer[0], buffer.size()));
+        ensure_equals(STRINGIZE("peek(\"" << lovecraft << "\")"),
+                      peeklen, lovecraft.length());
+        ensure_equals(STRINGIZE("peek(\"" << lovecraft << "\") result"),
+                      std::string(buffer.begin(), buffer.begin() + peeklen), lovecraft);
+        std::streamsize skip1(4);
+        ensure_equals(STRINGIZE("skip(" << skip1 << ")"), strq.skip(skip1), skip1);
+        size_t readlen(strq.read(&buffer[0], buffer.size()));
+        ensure_equals(STRINGIZE("read(\"" << lovecraft.substr(skip1) << "\")"),
+                      readlen, lovecraft.length() - skip1);
+        ensure_equals(STRINGIZE("read(\"" << lovecraft.substr(skip1) << "\") result"),
+                      std::string(buffer.begin(), buffer.begin() + readlen),
+                      lovecraft.substr(skip1));
+        ensure_equals("unconsumed", strq.read(&buffer[0], buffer.size()), 0);
+    }
+
+    template<> template<>
+    void object::test<4>()
+    {
+        set_test_name("skip() multiple blocks");
+        std::string blocks[] = { "books of ", "H.P. ", "Lovecraft" };
+        std::streamsize skip(blocks[0].length() + blocks[1].length() + 4);
+        BOOST_FOREACH(const std::string& block, blocks)
+        {
+            strq.write(&block[0], block.length());
+        }
+        std::streamsize skiplen(strq.skip(skip));
+        ensure_equals(STRINGIZE("skip(" << skip << ")"), skiplen, skip);
+        size_t readlen(strq.read(&buffer[0], buffer.size()));
+        ensure_equals("read(\"craft\")", readlen, 5);
+        ensure_equals("read(\"craft\") result",
+                      std::string(buffer.begin(), buffer.begin() + readlen), "craft");
+    }
+
+    template<> template<>
+    void object::test<5>()
+    {
+        set_test_name("concatenate blocks");
+        std::string blocks[] = { "abcd", "efghij", "klmnopqrs" };
+        BOOST_FOREACH(const std::string& block, blocks)
+        {
+            strq.write(&block[0], block.length());
+        }
+        std::vector<char> longbuffer(30);
+        std::streamsize readlen(strq.read(&longbuffer[0], longbuffer.size()));
+        ensure_equals("read() multiple blocks",
+                      readlen, blocks[0].length() + blocks[1].length() + blocks[2].length());
+        ensure_equals("read() multiple blocks result",
+                      std::string(longbuffer.begin(), longbuffer.begin() + readlen),
+                      blocks[0] + blocks[1] + blocks[2]);
+    }
+
+    template<> template<>
+    void object::test<6>()
+    {
+        set_test_name("split blocks");
+        std::string blocks[] = { "abcdefghijklm", "nopqrstuvwxyz" };
+        BOOST_FOREACH(const std::string& block, blocks)
+        {
+            strq.write(&block[0], block.length());
+        }
+        strq.close();
+        std::streamsize readlen(strq.read(&buffer[0], buffer.size()));
+        ensure_equals("read() 0", readlen, buffer.size());
+        ensure_equals("read() 0 result", std::string(buffer.begin(), buffer.end()), "abcdefghij");
+        readlen = strq.read(&buffer[0], buffer.size());
+        ensure_equals("read() 1", readlen, buffer.size());
+        ensure_equals("read() 1 result", std::string(buffer.begin(), buffer.end()), "klmnopqrst");
+        readlen = strq.read(&buffer[0], buffer.size());
+        ensure_equals("read() 2", readlen, 6);
+        ensure_equals("read() 2 result",
+                      std::string(buffer.begin(), buffer.begin() + readlen), "uvwxyz");
+        ensure_equals("read() 3", strq.read(&buffer[0], buffer.size()), -1);
+    }
+} // namespace tut
-- 
GitLab