From e92c3113545dd60fb76e115da201163e340c730c Mon Sep 17 00:00:00 2001
From: Nat Goodspeed <nat@lindenlab.com>
Date: Thu, 16 Feb 2012 16:05:04 -0500
Subject: [PATCH] Add LLProcess::ReadPipe::find() methods, with corresponding
 npos. If it's useful to have contains() to tell you whether incoming data
 contains a particular substring, and if it's useful for contains() and peek()
 to accept an offset within that data, then it's useful to allow you to get
 the offset of a desired substring within that data. But of course a find()
 returning offset needs something like std::string::npos for "not found";
 borrow that convention. Support both find(const std::string&) and find(char);
 the latter permits a more efficient implementation. In fact, make
 find(string) recognize a string of length 1 and leverage the find(char)
 implementation. Given that, reimplement contains(mumble) as shorthand for
 find(mumble) != npos. Implement find() overloads using std::search() and
 std::find() on boost::asio::streambuf character iterators, rather than
 copying to std::string and then using string search like previous contains()
 implementation. Reimplement WritePipeImpl::tick() and ReadPipeImpl::tick() to
 write/read directly from/to boost::asio::streambuf data, instead of copying
 to/from a temporary flat buffer. As long as ReadPipeImpl::tick() keeps
 successfully filling buffers, keep reading. Previous implementation would
 only handle a long child write over successive tick() calls. Stop on read
 error or when we come up short.

---
 indra/llcommon/llprocess.cpp            | 259 ++++++++++++++++--------
 indra/llcommon/llprocess.h              |  37 +++-
 indra/llcommon/tests/llprocess_test.cpp |   2 +
 3 files changed, 210 insertions(+), 88 deletions(-)

diff --git a/indra/llcommon/llprocess.cpp b/indra/llcommon/llprocess.cpp
index 1481bf571fa..aa22b3f8054 100644
--- a/indra/llcommon/llprocess.cpp
+++ b/indra/llcommon/llprocess.cpp
@@ -120,9 +120,12 @@ class LLProcessListener
 static LLProcessListener sProcessListener;
 
 LLProcess::BasePipe::~BasePipe() {}
+const LLProcess::BasePipe::size_type
+	  LLProcess::BasePipe::npos((std::numeric_limits<LLProcess::BasePipe::size_type>::max)());
 
 class WritePipeImpl: public LLProcess::WritePipe
 {
+    LOG_CLASS(WritePipeImpl);
 public:
 	WritePipeImpl(const std::string& desc, apr_file_t* pipe):
 		mDesc(desc),
@@ -139,30 +142,53 @@ class WritePipeImpl: public LLProcess::WritePipe
 
 	bool tick(const LLSD&)
 	{
+		typedef boost::asio::streambuf::const_buffers_type const_buffer_sequence;
 		// If there's anything to send, try to send it.
-		if (mStreambuf.size())
+		std::size_t total(mStreambuf.size()), consumed(0);
+		if (total)
 		{
-			// Copy data out from mStreambuf to a flat, contiguous buffer to
-			// write -- but only up to a certain size.
-			std::size_t total(mStreambuf.size());
-			std::size_t bufsize((std::min)(std::size_t(4096), total));
-			boost::asio::streambuf::const_buffers_type bufs = mStreambuf.data();
-			std::vector<char> buffer(
-				boost::asio::buffers_begin(bufs),
-				boost::asio::buffers_begin(bufs) + bufsize);
-			apr_size_t written(bufsize);
-			ll_apr_warn_status(apr_file_write(mPipe, &buffer[0], &written));
-			// 'written' is modified to reflect the number of bytes actually
-			// written. Since they've been sent, remove them from the
+			const_buffer_sequence bufs = mStreambuf.data();
+			// In general, our streambuf might contain a number of different
+			// physical buffers; iterate over those.
+			for (const_buffer_sequence::const_iterator bufi(bufs.begin()), bufend(bufs.end());
+				 bufi != bufend; ++bufi)
+			{
+				// http://www.boost.org/doc/libs/1_49_0_beta1/doc/html/boost_asio/reference/buffer.html#boost_asio.reference.buffer.accessing_buffer_contents
+				std::size_t towrite(boost::asio::buffer_size(*bufi));
+				apr_size_t written(towrite);
+				apr_status_t err = apr_file_write(mPipe,
+												  boost::asio::buffer_cast<const void*>(*bufi),
+												  &written);
+				// EAGAIN is exactly what we want from a nonblocking pipe.
+				// Rather than waiting for data, it should return immediately.
+				if (! (err == APR_SUCCESS || APR_STATUS_IS_EAGAIN(err)))
+				{
+					LL_WARNS("LLProcess") << "apr_file_write(" << towrite << ") on " << mDesc
+										  << " got " << err << ":" << LL_ENDL;
+					ll_apr_warn_status(err);
+				}
+
+				// 'written' is modified to reflect the number of bytes actually
+				// written. Make sure we consume those later. (Don't consume them
+				// now, that would invalidate the buffer iterator sequence!)
+				consumed += written;
+				LL_DEBUGS("LLProcess") << "wrote " << written << " of " << towrite
+									   << " bytes to " << mDesc
+									   << " (original " << total << ")" << LL_ENDL;
+
+				// The parent end of this pipe is nonblocking. If we weren't able
+				// to write everything we wanted, don't keep banging on it -- that
+				// won't change until the child reads some. Wait for next tick().
+				if (written < towrite)
+					break;
+			}
+			// In all, we managed to write 'consumed' bytes. Remove them from the
 			// streambuf so we don't keep trying to send them. This could be
-			// anywhere from 0 up to mStreambuf.size(); anything we haven't
-			// yet sent, we'll try again next tick() call.
-			mStreambuf.consume(written);
-			LL_DEBUGS("LLProcess") << "wrote " << written << " of " << bufsize
-								   << " bytes to " << mDesc
-								   << " (original " << total << "), "
-								   << mStreambuf.size() << " remaining" << LL_ENDL;
+			// anywhere from 0 up to mStreambuf.size(); anything we haven't yet
+			// sent, we'll try again later.
+			mStreambuf.consume(consumed);
 		}
+
 		return false;
 	}
 
@@ -176,6 +202,7 @@ class WritePipeImpl: public LLProcess::WritePipe
 
 class ReadPipeImpl: public LLProcess::ReadPipe
 {
+    LOG_CLASS(ReadPipeImpl);
 public:
 	ReadPipeImpl(const std::string& desc, apr_file_t* pipe):
 		mDesc(desc),
@@ -184,7 +211,7 @@ class ReadPipeImpl: public LLProcess::ReadPipe
 		mStream(&mStreambuf),
 		mPump("ReadPipe"),
 		// use funky syntax to call max() to avoid blighted max() macros
-		mLimit((std::numeric_limits<size_t>::max)())
+		mLimit(npos)
 	{
 		mConnection = LLEventPumps::instance().obtain("mainloop")
 			.listen(LLEventPump::inventName("ReadPipe"),
@@ -195,79 +222,149 @@ class ReadPipeImpl: public LLProcess::ReadPipe
 	// methods with implementation data concealed from the base class.
 	virtual std::istream& get_istream() { return mStream; }
 	virtual LLEventPump& getPump() { return mPump; }
-	virtual void setLimit(size_t limit) { mLimit = limit; }
-	virtual size_t getLimit() const { return mLimit; }
-    virtual std::size_t size() { return mStreambuf.size(); }
+	virtual void setLimit(size_type limit) { mLimit = limit; }
+	virtual size_type getLimit() const { return mLimit; }
+	virtual size_type size() const { return mStreambuf.size(); }
 
-	virtual std::string peek(std::size_t offset=0,
-							 std::size_t len=(std::numeric_limits<std::size_t>::max)())
+	virtual std::string peek(size_type offset=0, size_type len=npos) const
 	{
 		// Constrain caller's offset and len to overlap actual buffer content.
-		std::size_t real_offset = (std::min)(mStreambuf.size(), offset);
-		std::size_t real_end	= (std::min)(mStreambuf.size(), real_offset + len);
+		std::size_t real_offset = (std::min)(mStreambuf.size(), std::size_t(offset));
+		std::size_t real_end	= (std::min)(mStreambuf.size(), std::size_t(real_offset + len));
 		boost::asio::streambuf::const_buffers_type cbufs = mStreambuf.data();
 		return std::string(boost::asio::buffers_begin(cbufs) + real_offset,
 						   boost::asio::buffers_begin(cbufs) + real_end);
 	}
 
-	virtual bool contains(const std::string& seek, std::size_t offset=0)
+	virtual size_type find(const std::string& seek, size_type offset=0) const
 	{
-		// There may be a more efficient way to search mStreambuf contents,
-		// but this is far the easiest...
-		return peek(offset).find(seek) != std::string::npos;
+		// If we're passing a string of length 1, use find(char), which can
+		// use an O(n) std::find() rather than the O(n^2) std::search().
+		if (seek.length() == 1)
+		{
+			return find(seek[0], offset);
+		}
+
+		// If offset is beyond the whole buffer, can't even construct a valid
+		// iterator range; can't possibly find the string we seek.
+		if (offset > mStreambuf.size())
+		{
+			return npos;
+		}
+
+		boost::asio::streambuf::const_buffers_type cbufs = mStreambuf.data();
+		boost::asio::buffers_iterator<boost::asio::streambuf::const_buffers_type>
+			begin(boost::asio::buffers_begin(cbufs)),
+			end	 (boost::asio::buffers_end(cbufs)),
+			found(std::search(begin + offset, end, seek.begin(), seek.end()));
+		return (found == end)? npos : (found - begin);
 	}
 
-private:
-	bool tick(const LLSD&)
+	virtual size_type find(char seek, size_type offset=0) const
 	{
-		// Allocate a buffer and try, every time, to read into it.
-		std::vector<char> buffer(4096);
-		apr_size_t gotten(buffer.size());
-		apr_status_t err = apr_file_read(mPipe, &buffer[0], &gotten);
-		if (err == APR_EOF)
+		// If offset is beyond the whole buffer, can't even construct a valid
+		// iterator range; can't possibly find the char we seek.
+		if (offset > mStreambuf.size())
 		{
-			// Handle EOF specially: it's part of normal-case processing.
-			LL_DEBUGS("LLProcess") << "EOF on " << mDesc << LL_ENDL;
-			// We won't need any more tick() calls.
-			mConnection.disconnect();
+			return npos;
 		}
-		else if (! ll_apr_warn_status(err)) // validate anything but EOF
+
+		boost::asio::streambuf::const_buffers_type cbufs = mStreambuf.data();
+		boost::asio::buffers_iterator<boost::asio::streambuf::const_buffers_type>
+			begin(boost::asio::buffers_begin(cbufs)),
+			end	 (boost::asio::buffers_end(cbufs)),
+			found(std::find(begin + offset, end, seek));
+		return (found == end)? npos : (found - begin);
+	}
+
+private:
+	bool tick(const LLSD&)
+	{
+		typedef boost::asio::streambuf::mutable_buffers_type mutable_buffer_sequence;
+		// Try, every time, to read into our streambuf. In fact, we have no
+		// idea how much data the child might be trying to send: keep trying
+		// until we're convinced we've temporarily exhausted the pipe.
+		bool exhausted = false;
+		std::size_t committed(0);
+		do
 		{
-			// 'gotten' was modified to reflect the number of bytes actually
-			// received. If nonzero, add them to the streambuf and notify
-			// interested parties.
-			if (gotten)
+			// attempt to read an arbitrary size
+			mutable_buffer_sequence bufs = mStreambuf.prepare(4096);
+			// In general, the mutable_buffer_sequence returned by prepare() might
+			// contain a number of different physical buffers; iterate over those.
+			std::size_t tocommit(0);
+			for (mutable_buffer_sequence::const_iterator bufi(bufs.begin()), bufend(bufs.end());
+				 bufi != bufend; ++bufi)
 			{
-				boost::asio::streambuf::mutable_buffers_type mbufs = mStreambuf.prepare(gotten);
-				std::copy(buffer.begin(), buffer.begin() + gotten,
-						  boost::asio::buffers_begin(mbufs));
-				// Don't forget to "commit" the data! The sequence (prepare(),
-				// commit()) is obviously intended to allow us to allocate
-				// buffer space, then read directly into some portion of it,
-				// then commit only as much as we managed to obtain. But the
-				// only official (documented) way I can find to populate a
-				// mutable_buffers_type is to use buffers_begin(). It Would Be
-				// Nice if we were permitted to directly read into
-				// mutable_buffers_type (not to mention writing directly from
-				// const_buffers_type in WritePipeImpl; APR even supports an
-				// apr_file_writev() function for writing from discontiguous
-				// buffers) -- but as of 2012-02-14, this copying appears to
-				// be the safest tactic.
-				mStreambuf.commit(gotten);
-				LL_DEBUGS("LLProcess") << "read " << gotten << " of " << buffer.size()
-									   << " bytes from " << mDesc << ", new total "
-									   << mStreambuf.size() << LL_ENDL;
-
-				// Now that we've received new data, publish it on our
-				// LLEventPump as advertised. Constrain it by mLimit. But show
-				// listener the actual accumulated buffer size, regardless of
-				// mLimit.
-				std::size_t datasize((std::min)(mLimit, mStreambuf.size()));
-				mPump.post(LLSDMap
-						   ("data", peek(0, datasize))
-						   ("len", LLSD::Integer(mStreambuf.size())));
+				// http://www.boost.org/doc/libs/1_49_0_beta1/doc/html/boost_asio/reference/buffer.html#boost_asio.reference.buffer.accessing_buffer_contents
+				std::size_t toread(boost::asio::buffer_size(*bufi));
+				apr_size_t gotten(toread);
+				apr_status_t err = apr_file_read(mPipe,
+												 boost::asio::buffer_cast<void*>(*bufi),
+												 &gotten);
+				// EAGAIN is exactly what we want from a nonblocking pipe.
+				// Rather than waiting for data, it should return immediately.
+				if (! (err == APR_SUCCESS || APR_STATUS_IS_EAGAIN(err)))
+				{
+					// Handle EOF specially: it's part of normal-case processing.
+					if (err == APR_EOF)
+					{
+						LL_DEBUGS("LLProcess") << "EOF on " << mDesc << LL_ENDL;
+					}
+					else
+					{
+						LL_WARNS("LLProcess") << "apr_file_read(" << toread << ") on " << mDesc
+											  << " got " << err << ":" << LL_ENDL;
+						ll_apr_warn_status(err);
+					}
+					// Either way, though, we won't need any more tick() calls.
+					mConnection.disconnect();
+					exhausted = true; // also break outer retry loop
+					break;
+				}
+
+				// 'gotten' was modified to reflect the number of bytes actually
+				// received. Make sure we commit those later. (Don't commit them
+				// now, that would invalidate the buffer iterator sequence!)
+				tocommit += gotten;
+				LL_DEBUGS("LLProcess") << "read " << gotten << " of " << toread
+									   << " bytes from " << mDesc << LL_ENDL;
+
+				// The parent end of this pipe is nonblocking. If we weren't even
+				// able to fill this buffer, don't loop to try to fill the next --
+				// that won't change until the child writes more. Wait for next
+				// tick().
+				if (gotten < toread)
+				{
+					// break outer retry loop too
+					exhausted = true;
+					break;
+				}
 			}
+
+			// Don't forget to "commit" the data!
+			mStreambuf.commit(tocommit);
+			committed += tocommit;
+
+			// 'exhausted' is set when we can't fill any one buffer of the
+			// mutable_buffer_sequence established by the current prepare()
+			// call -- whether due to error or not enough bytes. That is,
+			// 'exhausted' is still false when we've filled every physical
+			// buffer in the mutable_buffer_sequence. In that case, for all we
+			// know, the child might have still more data pending -- go for it!
+		} while (! exhausted);
+
+		if (committed)
+		{
+			// If we actually received new data, publish it on our LLEventPump
+			// as advertised. Constrain it by mLimit. But show listener the
+			// actual accumulated buffer size, regardless of mLimit.
+			size_type datasize((std::min)(mLimit, size_type(mStreambuf.size())));
+			mPump.post(LLSDMap
+					   ("data", peek(0, datasize))
+					   ("len", LLSD::Integer(mStreambuf.size())));
 		}
+
 		return false;
 	}
 
@@ -277,7 +374,7 @@ class ReadPipeImpl: public LLProcess::ReadPipe
 	boost::asio::streambuf mStreambuf;
 	std::istream mStream;
 	LLEventStream mPump;
-	size_t mLimit;
+	size_type mLimit;
 };
 
 /// Need an exception to avoid constructing an invalid LLProcess object, but
@@ -472,16 +569,18 @@ LLProcess::LLProcess(const LLSDOrParams& params):
 	{
 		if (select[i] != APR_CHILD_BLOCK)
 			continue;
+		std::string desc(STRINGIZE(mDesc << ' ' << whichfile[i]));
+		apr_file_t* pipe(mProcess.*(members[i]));
 		if (i == STDIN)
 		{
-			mPipes.replace(i, new WritePipeImpl(whichfile[i], mProcess.*(members[i])));
+			mPipes.replace(i, new WritePipeImpl(desc, pipe));
 		}
 		else
 		{
-			mPipes.replace(i, new ReadPipeImpl(whichfile[i], mProcess.*(members[i])));
+			mPipes.replace(i, new ReadPipeImpl(desc, pipe));
 		}
 		LL_DEBUGS("LLProcess") << "Instantiating " << typeid(mPipes[i]).name()
-							   << "('" << whichfile[i] << "')" << LL_ENDL;
+							   << "('" << desc << "')" << LL_ENDL;
 	}
 }
 
diff --git a/indra/llcommon/llprocess.h b/indra/llcommon/llprocess.h
index bf0517600d7..2c6951b5627 100644
--- a/indra/llcommon/llprocess.h
+++ b/indra/llcommon/llprocess.h
@@ -295,6 +295,9 @@ class LL_COMMON_API LLProcess: public boost::noncopyable
 	{
 	public:
 		virtual ~BasePipe() = 0;
+
+		typedef std::size_t size_type;
+		static const size_type npos;
 	};
 
 	/// As returned by getWritePipe() or getOptWritePipe()
@@ -338,7 +341,7 @@ class LL_COMMON_API LLProcess: public boost::noncopyable
 		 * the child, but the child happens to flush "12" before emitting
 		 * "3\n", get_istream() >> myint could return 12 rather than 123!
 		 */
-		virtual std::size_t size() = 0;
+		virtual size_type size() const = 0;
 
 		/**
 		 * Peek at accumulated buffer data without consuming it. Optional
@@ -346,14 +349,32 @@ class LL_COMMON_API LLProcess: public boost::noncopyable
 		 *
 		 * @note You can discard buffer data using get_istream().ignore(n).
 		 */
-		virtual std::string peek(std::size_t offset=0,
-								 std::size_t len=(std::numeric_limits<std::size_t>::max)()) = 0;
+		virtual std::string peek(size_type offset=0, size_type len=npos) const = 0;
+
+		/**
+		 * Detect presence of a substring (or char) in accumulated buffer data
+		 * without retrieving it. Optional offset allows you to search from
+		 * specified position.
+		 */
+		template <typename SEEK>
+		bool contains(SEEK seek, size_type offset=0) const
+		{ return find(seek, offset) != npos; }
+
+		/**
+		 * Search for a substring in accumulated buffer data without
+		 * retrieving it. Returns size_type position at which found, or npos
+		 * meaning not found. Optional offset allows you to search from
+		 * specified position.
+		 */
+		virtual size_type find(const std::string& seek, size_type offset=0) const = 0;
 
 		/**
-		 * Search accumulated buffer data without retrieving it. Optional
-		 * offset allows you to start at specified position.
+		 * Search for a char in accumulated buffer data without retrieving it.
+		 * Returns size_type position at which found, or npos meaning not
+		 * found. Optional offset allows you to search from specified
+		 * position.
 		 */
-		virtual bool contains(const std::string& seek, std::size_t offset=0) = 0;
+		virtual size_type find(char seek, size_type offset=0) const = 0;
 
 		/**
 		 * Get LLEventPump& on which to listen for incoming data. The posted
@@ -377,12 +398,12 @@ class LL_COMMON_API LLProcess: public boost::noncopyable
 		 * the data posted with the LLSD event. If you don't call this method,
 		 * all pending data will be posted.
 		 */
-		virtual void setLimit(size_t limit) = 0;
+		virtual void setLimit(size_type limit) = 0;
 
 		/**
 		 * Query the current setLimit() limit.
 		 */
-		virtual size_t getLimit() const = 0;
+		virtual size_type getLimit() const = 0;
 	};
 
 	/// Exception thrown by getWritePipe(), getReadPipe() if you didn't ask to
diff --git a/indra/llcommon/tests/llprocess_test.cpp b/indra/llcommon/tests/llprocess_test.cpp
index 31bc833a1d1..d7bda349238 100644
--- a/indra/llcommon/tests/llprocess_test.cpp
+++ b/indra/llcommon/tests/llprocess_test.cpp
@@ -1131,5 +1131,7 @@ namespace tut
     // test setLimit(), getLimit()
     // test EOF -- check logging
     // test peek() with substr
+    // test contains(char)
+    // test find(string, offset), find(char, offset), offset <, =, > size()
 
 } // namespace tut
-- 
GitLab