From 43ad4b5db4fdfd82e73e705e2d46bb339768bee3 Mon Sep 17 00:00:00 2001
From: Rye Mutt <rye@alchemyviewer.org>
Date: Wed, 14 Jul 2021 17:42:25 -0400
Subject: [PATCH] Port thread pooled image workers from FS which was from
 CoolVL

---
 indra/llcommon/llqueuedthread.cpp             |   8 +-
 indra/llcommon/llqueuedthread.h               |   3 +-
 indra/llimage/llimageworker.cpp               | 182 +++++++++++++++++-
 indra/llimage/llimageworker.h                 |  13 +-
 .../newview/app_settings/settings_alchemy.xml |  11 ++
 indra/newview/llappviewer.cpp                 |   3 +-
 .../newview/skins/default/xui/en/strings.xml  |   1 +
 7 files changed, 206 insertions(+), 15 deletions(-)

diff --git a/indra/llcommon/llqueuedthread.cpp b/indra/llcommon/llqueuedthread.cpp
index 2f967304d0d..c418ba143bc 100644
--- a/indra/llcommon/llqueuedthread.cpp
+++ b/indra/llcommon/llqueuedthread.cpp
@@ -437,7 +437,13 @@ S32 LLQueuedThread::processNextRequest()
 	// safe to access req.
 	if (req)
 	{
-		// process request		
+		if (req->getFlags() & FLAG_ASYNC)
+		{
+			req->processRequest();
+			return getPending();
+		}
+
+		// process request
 		bool complete = req->processRequest();
 
 		if (complete)
diff --git a/indra/llcommon/llqueuedthread.h b/indra/llcommon/llqueuedthread.h
index 05dcaf10eb6..7b76b90dc21 100644
--- a/indra/llcommon/llqueuedthread.h
+++ b/indra/llcommon/llqueuedthread.h
@@ -66,7 +66,8 @@ class LL_COMMON_API LLQueuedThread : public LLThread
 	enum flags_t {
 		FLAG_AUTO_COMPLETE = 1,
 		FLAG_AUTO_DELETE = 2, // child-class dependent
-		FLAG_ABORT = 4
+		FLAG_ABORT = 4,
+		FLAG_ASYNC = 8
 	};
 
 	typedef U32 handle_t;
diff --git a/indra/llimage/llimageworker.cpp b/indra/llimage/llimageworker.cpp
index c70dbdefba1..6f2d9fef4f2 100644
--- a/indra/llimage/llimageworker.cpp
+++ b/indra/llimage/llimageworker.cpp
@@ -29,14 +29,98 @@
 #include "llimageworker.h"
 #include "llimagedxt.h"
 
+std::atomic< U32 > sImageThreads;
+
+class PoolWorkerThread : public LLThread
+{
+public:
+	PoolWorkerThread(std::string name) : LLThread(name),
+		mCurrentRequest(NULL)
+	{
+	}
+	virtual void run()
+	{
+		while (!isQuitting())
+		{
+			auto *pReq = mCurrentRequest.exchange(nullptr);
+
+			if (pReq)
+				pReq->processRequestIntern();
+			checkPause();
+		}
+	}
+	bool isBusy()
+	{
+		auto *pReq = mCurrentRequest.load();
+		if (!pReq)
+			return false;
+
+		auto status = pReq->getStatus();
+
+		return status  == LLQueuedThread::STATUS_QUEUED || status == LLQueuedThread::STATUS_INPROGRESS;
+	}
+
+	bool runCondition()
+	{
+		return mCurrentRequest != NULL;
+	}
+
+	bool setRequest(LLImageDecodeThread::ImageRequest* req)
+	{
+		LLImageDecodeThread::ImageRequest* pOld{ nullptr };
+		bool bSuccess = mCurrentRequest.compare_exchange_strong(pOld, req);
+		wake();
+
+		return bSuccess;
+	}
+
+private:
+	std::atomic< LLImageDecodeThread::ImageRequest * > mCurrentRequest;
+};
+
 //----------------------------------------------------------------------------
 
 // MAIN THREAD
-LLImageDecodeThread::LLImageDecodeThread(bool threaded)
+LLImageDecodeThread::LLImageDecodeThread(bool threaded, U32 pool_size)
 	: LLQueuedThread("imagedecode", threaded)
 	, mCreationListSize(0)
 {
 	mCreationMutex = new LLMutex();
+
+    if (pool_size == 0)
+	{
+        pool_size = std::thread::hardware_concurrency();
+        if (!pool_size == 0)
+            pool_size = 4U;  // Use a sane default: 2 cores
+        if (pool_size >= 8U)
+		{
+			// Using number of (virtual) cores minus 3 for:
+			// - main image worker
+			// - viewer main loop thread
+			// - mesh repo thread
+			// further bound to a maximum of 16 threads (more than that is totally useless, even
+			// when flying over main land with 512m draw distance).
+            pool_size = llmin(pool_size - 3U, 16U);
+		}
+        else if (pool_size > 2U)
+		{
+			// Using number of (virtual) cores - 1 (for the main image worker
+			// thread).
+            --pool_size;
+		}
+	}
+    else if (pool_size == 1)  // Disable if only 1
+        pool_size = 0;
+
+	sImageThreads = pool_size;
+	
+	LL_INFOS() << "Initializing with " << sImageThreads << " image decode threads" << LL_ENDL;
+	
+	for (U32 i = 0; i < pool_size; ++i)
+	{
+		mThreadPool.push_back(std::make_unique<PoolWorkerThread>(fmt::format("Image Decode Thread {}", i)));
+		mThreadPool[i]->start();
+	}
 }
 
 //virtual 
@@ -56,14 +140,18 @@ S32 LLImageDecodeThread::update(F32 max_time_ms)
 			 iter != mCreationList.end(); ++iter)
 		{
 			creation_info& info = *iter;
+		// ImageRequest* req = new ImageRequest(info.handle, info.image,
+		//				     info.priority, info.discard, info.needs_aux,
+		//				     info.responder);
 			ImageRequest* req = new ImageRequest(info.handle, info.image,
 				info.priority, info.discard, info.needs_aux,
-				info.responder);
+				info.responder, this);
 
 			bool res = addRequest(req);
 			if (!res)
 			{
-				LL_ERRS() << "request added after LLLFSThread::cleanupClass()" << LL_ENDL;
+				LL_WARNS() << "request added after LLLFSThread::cleanupClass()" << LL_ENDL;
+				return 0;
 			}
 		}
 		mCreationList.clear();
@@ -76,10 +164,28 @@ S32 LLImageDecodeThread::update(F32 max_time_ms)
 LLImageDecodeThread::handle_t LLImageDecodeThread::decodeImage(LLImageFormatted* image, 
 	U32 priority, S32 discard, BOOL needs_aux, Responder* responder)
 {
-	LLMutexLock lock(mCreationMutex);
 	handle_t handle = generateHandle();
-	mCreationList.push_back(creation_info(handle, image, priority, discard, needs_aux, responder));
-	mCreationListSize = mCreationList.size();
+	// If we have a thread pool dispatch this directly.
+	// Note: addRequest could cause the handling to take place on the fetch thread, this is unlikely to be an issue. 
+	// if this is an actual problem we move the fallback to here and place the unfulfilled request into the legacy queue
+    if (sImageThreads > 0)
+	{
+		ImageRequest* req = new ImageRequest(handle, image,
+			priority, discard, needs_aux,
+			responder, this);
+		bool res = addRequest(req);
+		if (!res)
+		{
+			LL_WARNS() << "Decode request not added because we are exiting." << LL_ENDL;
+			return 0;
+		}
+	}
+	else
+	{
+		LLMutexLock lock(mCreationMutex);
+		mCreationList.push_back(creation_info(handle, image, priority, discard, needs_aux, responder));
+		mCreationListSize = mCreationList.size();
+	}
 	return handle;
 }
 
@@ -96,15 +202,19 @@ S32 LLImageDecodeThread::tut_size()
 
 LLImageDecodeThread::ImageRequest::ImageRequest(handle_t handle, LLImageFormatted* image, 
 												U32 priority, S32 discard, BOOL needs_aux,
-												LLImageDecodeThread::Responder* responder)
+												LLImageDecodeThread::Responder* responder,
+												LLImageDecodeThread *aQueue)
 	: LLQueuedThread::QueuedRequest(handle, priority, FLAG_AUTO_COMPLETE),
 	  mFormattedImage(image),
 	  mDiscardLevel(discard),
 	  mNeedsAux(needs_aux),
 	  mDecodedRaw(FALSE),
 	  mDecodedAux(FALSE),
-	  mResponder(responder)
+	  mResponder(responder),
+      mQueue( aQueue )
 {
+    if (sImageThreads > 0)
+		mFlags |= FLAG_ASYNC;
 }
 
 LLImageDecodeThread::ImageRequest::~ImageRequest()
@@ -120,7 +230,24 @@ LLImageDecodeThread::ImageRequest::~ImageRequest()
 // Returns true when done, whether or not decode was successful.
 bool LLImageDecodeThread::ImageRequest::processRequest()
 {
-	const F32 decode_time_slice = .1f;
+	// If not async, decode using this thread
+	if ((mFlags & FLAG_ASYNC) == 0)
+		return processRequestIntern();
+
+	// Try to dispatch to a new thread, if this isn't possible decode on this thread
+	if (!mQueue->enqueRequest(this))
+		return processRequestIntern();
+	return true;
+}
+
+bool LLImageDecodeThread::ImageRequest::processRequestIntern()
+{
+	F32 decode_time_slice = .1f;
+	if(mFlags & FLAG_ASYNC)
+	{
+		decode_time_slice = 10.0f;// long time out as this is not an issue with async
+	}
+
 	bool done = true;
 	if (!mDecodedRaw && mFormattedImage.notNull())
 	{
@@ -144,7 +271,16 @@ bool LLImageDecodeThread::ImageRequest::processRequest()
 											  mFormattedImage->getHeight(),
 											  mFormattedImage->getComponents());
 		}
-		done = mFormattedImage->decode(mDecodedImageRaw, decode_time_slice); // 1ms
+
+		if( mDecodedImageRaw->getData() )
+			done = mFormattedImage->decode(mDecodedImageRaw, decode_time_slice); // 1ms
+		else
+		{
+			LL_WARNS() << "No memory for LLImageRaw of size " << (U32)mFormattedImage->getWidth() << "x" << (U32)mFormattedImage->getHeight() << "x"
+					   << (U32)mFormattedImage->getComponents() << LL_ENDL;
+			done = false;
+		}
+		
 		// some decoders are removing data when task is complete and there were errors
 		mDecodedRaw = done && mDecodedImageRaw->getData();
 	}
@@ -161,6 +297,19 @@ bool LLImageDecodeThread::ImageRequest::processRequest()
 		mDecodedAux = done && mDecodedImageAux->getData();
 	}
 
+	if(!done)
+	{
+		LL_WARNS("ImageDecode") << "Image decoding failed to complete with time slice=" << decode_time_slice << LL_ENDL;
+	}
+
+	if (mFlags & FLAG_ASYNC)
+	{
+		setStatus(STATUS_COMPLETE);
+		finishRequest(true);
+		// always autocomplete
+		mQueue->completeRequest(mHashKey);
+	}
+
 	return done;
 }
 
@@ -180,3 +329,16 @@ bool LLImageDecodeThread::ImageRequest::tut_isOK()
 {
 	return mResponder.notNull();
 }
+
+bool LLImageDecodeThread::enqueRequest(ImageRequest * req)
+{
+	for (auto &pThread : mThreadPool)
+	{
+		if (!pThread->isBusy())
+		{
+			if( pThread->setRequest(req) )
+				return true;
+		}
+	}
+	return false;
+}
diff --git a/indra/llimage/llimageworker.h b/indra/llimage/llimageworker.h
index 019331bc0a7..3d70b7f2933 100644
--- a/indra/llimage/llimageworker.h
+++ b/indra/llimage/llimageworker.h
@@ -31,6 +31,9 @@
 #include "llpointer.h"
 #include "llworkerthread.h"
 
+ class PoolWorkerThread;
+class LLImageFormatted;
+
 class LLImageDecodeThread : public LLQueuedThread
 {
 public:
@@ -50,9 +53,10 @@ class LLImageDecodeThread : public LLQueuedThread
 	public:
 		ImageRequest(handle_t handle, LLImageFormatted* image,
 					 U32 priority, S32 discard, BOOL needs_aux,
-					 LLImageDecodeThread::Responder* responder);
+					 LLImageDecodeThread::Responder* responder, LLImageDecodeThread *aQueue);
 
 		/*virtual*/ bool processRequest();
+		bool processRequestIntern();
 		/*virtual*/ void finishRequest(bool completed);
 
 		// Used by unit tests to check the consitency of the request instance
@@ -66,13 +70,15 @@ class LLImageDecodeThread : public LLQueuedThread
 		// output
 		LLPointer<LLImageRaw> mDecodedImageRaw;
 		LLPointer<LLImageRaw> mDecodedImageAux;
+		LLImageDecodeThread* mQueue;
 		BOOL mDecodedRaw;
 		BOOL mDecodedAux;
 		LLPointer<LLImageDecodeThread::Responder> mResponder;
 	};
 	
 public:
-	LLImageDecodeThread(bool threaded = true);
+	LLImageDecodeThread(bool threaded = true, U32 pool_size = 0 );
+
 	virtual ~LLImageDecodeThread();
 
 	handle_t decodeImage(LLImageFormatted* image,
@@ -101,6 +107,9 @@ class LLImageDecodeThread : public LLQueuedThread
 	creation_list_t mCreationList;
 	std::atomic<S32> mCreationListSize;
 	LLMutex* mCreationMutex;
+
+	std::vector<std::unique_ptr<PoolWorkerThread>> mThreadPool;
+	bool enqueRequest(ImageRequest*);
 };
 
 #endif
diff --git a/indra/newview/app_settings/settings_alchemy.xml b/indra/newview/app_settings/settings_alchemy.xml
index 11ec05dc31f..ab08a6d4650 100644
--- a/indra/newview/app_settings/settings_alchemy.xml
+++ b/indra/newview/app_settings/settings_alchemy.xml
@@ -288,6 +288,17 @@
       <key>Value</key>
       <real>8.0</real>
     </map>
+    <key>AlchemyImageDecodeThreads</key>
+    <map>
+      <key>Comment</key>
+      <string>Amount of threads to use for image decoding. 0 = autodetect, 1 = 0ff, >1 number of threads. Needs restart</string>
+      <key>Persist</key>
+      <integer>1</integer>
+      <key>Type</key>
+      <string>U32</string>
+      <key>Value</key>
+      <integer>0</integer>
+    </map>
     <key>AlchemyMoonWalk</key>
       <map>
       <key>Comment</key>
diff --git a/indra/newview/llappviewer.cpp b/indra/newview/llappviewer.cpp
index 239915c2264..84416e17dd7 100644
--- a/indra/newview/llappviewer.cpp
+++ b/indra/newview/llappviewer.cpp
@@ -2182,7 +2182,7 @@ bool LLAppViewer::initThreads()
 	LLLFSThread::initClass(enable_threads && false);
 
 	// Image decoding
-	LLAppViewer::sImageDecodeThread = new LLImageDecodeThread(enable_threads && true);
+    LLAppViewer::sImageDecodeThread = new LLImageDecodeThread(enable_threads && true, gSavedSettings.getU32("AlchemyImageDecodeThreads"));
 	LLAppViewer::sTextureCache = new LLTextureCache(enable_threads && true);
 	LLAppViewer::sTextureFetch = new LLTextureFetch(LLAppViewer::getTextureCache(),
 													sImageDecodeThread,
@@ -3200,6 +3200,7 @@ LLSD LLAppViewer::getViewerInfo() const
 	info["CPU"] = gSysCPU.getCPUString();
 	info["MEMORY_MB"] = LLSD::Integer(gSysMemory.getPhysicalMemoryKB().valueInUnits<LLUnits::Megabytes>());
 	// Moved hack adjustment to Windows memory size into llsys.cpp
+    info["CONCURRENCY"] = LLSD::Integer((S32) std::thread::hardware_concurrency());
 	info["OS_VERSION"] = LLOSInfo::instance().getOSString();
 	info["GRAPHICS_CARD_VENDOR"] = ll_safe_string((const char*)(glGetString(GL_VENDOR)));
 	info["GRAPHICS_CARD"] = ll_safe_string((const char*)(glGetString(GL_RENDERER)));
diff --git a/indra/newview/skins/default/xui/en/strings.xml b/indra/newview/skins/default/xui/en/strings.xml
index 2ad93683524..7d6da274448 100644
--- a/indra/newview/skins/default/xui/en/strings.xml
+++ b/indra/newview/skins/default/xui/en/strings.xml
@@ -48,6 +48,7 @@ You are in [REGION]
 	<string name="AboutSystem">
 CPU: [CPU]
 Memory: [MEMORY_MB] MB
+Concurrency: [CONCURRENCY]
 OS Version: [OS_VERSION]
 Graphics Card Vendor: [GRAPHICS_CARD_VENDOR]
 Graphics Card: [GRAPHICS_CARD]
-- 
GitLab