From b57b0d97bb2fb880084cbcca1b915f8e67b442a5 Mon Sep 17 00:00:00 2001
From: Rider Linden <rider@lindenlab.com>
Date: Tue, 28 Jul 2015 15:29:51 -0700
Subject: [PATCH] Named pools of coroutines.

---
 indra/newview/app_settings/settings.xml |  18 ++
 indra/newview/llcoproceduremanager.cpp  | 288 +++++++++++++++++++++---
 indra/newview/llcoproceduremanager.h    |  56 ++---
 indra/newview/llviewerassetupload.cpp   |   3 +-
 4 files changed, 291 insertions(+), 74 deletions(-)

diff --git a/indra/newview/app_settings/settings.xml b/indra/newview/app_settings/settings.xml
index 2180a7f1a15..b4a4e41884d 100755
--- a/indra/newview/app_settings/settings.xml
+++ b/indra/newview/app_settings/settings.xml
@@ -14340,6 +14340,24 @@
       <key>Value</key>
       <integer>1</integer>
     </map>
+    <key>PoolSizeAIS</key>
+        <map>
+        <key>Comment</key>
+            <string>Coroutine Pool size for AIS</string>
+        <key>Type</key>
+            <string>U32</string>
+        <key>Value</key>
+            <integer>25</integer>
+        </map>
+    <key>PoolSizeUpload</key>
+        <map>
+        <key>Comment</key>
+            <string>Coroutine Pool size for Upload</string>
+        <key>Type</key>
+            <string>U32</string>
+        <key>Value</key>
+            <real>1</real>
+        </map>
 
     <!-- Settings below are for back compatibility only.
     They are not used in current viewer anymore. But they can't be removed to avoid
diff --git a/indra/newview/llcoproceduremanager.cpp b/indra/newview/llcoproceduremanager.cpp
index d3168985f87..e22a8b8013e 100644
--- a/indra/newview/llcoproceduremanager.cpp
+++ b/indra/newview/llcoproceduremanager.cpp
@@ -1,5 +1,5 @@
 /**
-* @file llcoproceduremanager.cpp
+* @file LLCoprocedurePool.cpp
 * @author Rider Linden
 * @brief Singleton class for managing asset uploads to the sim.
 *
@@ -33,43 +33,270 @@
 #include "llcoproceduremanager.h"
 
 //=========================================================================
-#define COROCOUNT 1
+// Map of pool sizes for known pools
+static std::map<std::string, U32> DefaultPoolSizes;
+
+// *TODO$: When C++11 this can be initialized here as follows:
+// = {{"AIS", 25}, {"Upload", 1}}
+
+#define DEFAULT_POOL_SIZE 5
+
+//=========================================================================
+class LLCoprocedurePool: private boost::noncopyable
+{
+public:
+    typedef LLCoprocedureManager::CoProcedure_t CoProcedure_t;
+
+    LLCoprocedurePool(const std::string &name, size_t size);
+    virtual ~LLCoprocedurePool();
+
+    /// Places the coprocedure on the queue for processing. 
+    /// 
+    /// @param name Is used for debugging and should identify this coroutine.
+    /// @param proc Is a bound function to be executed 
+    /// 
+    /// @return This method returns a UUID that can be used later to cancel execution.
+    LLUUID enqueueCoprocedure(const std::string &name, CoProcedure_t proc);
+
+    /// Cancel a coprocedure. If the coprocedure is already being actively executed 
+    /// this method calls cancelYieldingOperation() on the associated HttpAdapter
+    /// If it has not yet been dequeued it is simply removed from the queue.
+    bool cancelCoprocedure(const LLUUID &id);
+
+    /// Requests a shutdown of the upload manager. Passing 'true' will perform 
+    /// an immediate kill on the upload coroutine.
+    void shutdown(bool hardShutdown = false);
+
+    /// Returns the number of coprocedures in the queue awaiting processing.
+    ///
+    inline size_t countPending() const
+    {
+        return mPendingCoprocs.size();
+    }
+
+    /// Returns the number of coprocedures actively being processed.
+    ///
+    inline size_t countActive() const
+    {
+        return mActiveCoprocs.size();
+    }
+
+    /// Returns the total number of coprocedures either queued or in active processing.
+    ///
+    inline size_t count() const
+    {
+        return countPending() + countActive();
+    }
+
+private:
+    struct QueuedCoproc
+    {
+        typedef boost::shared_ptr<QueuedCoproc> ptr_t;
+
+        QueuedCoproc(const std::string &name, const LLUUID &id, CoProcedure_t proc) :
+            mName(name),
+            mId(id),
+            mProc(proc)
+        {}
+
+        std::string mName;
+        LLUUID mId;
+        CoProcedure_t mProc;
+    };
+
+    // we use a deque here rather than std::queue since we want to be able to 
+    // iterate through the queue and potentially erase an entry from the middle.
+    typedef std::deque<QueuedCoproc::ptr_t>  CoprocQueue_t;
+    typedef std::map<LLUUID, LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t> ActiveCoproc_t;
+
+    std::string     mPoolName;
+    size_t          mPoolSize;
+    CoprocQueue_t   mPendingCoprocs;
+    ActiveCoproc_t  mActiveCoprocs;
+    bool            mShutdown;
+    LLEventStream   mWakeupTrigger;
+
+    typedef std::map<std::string, LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t> CoroAdapterMap_t;
+    LLCore::HttpRequest::policy_t mHTTPPolicy;
+
+    CoroAdapterMap_t mCoroMapping;
+
+    void coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter);
+
+};
 
 //=========================================================================
-LLCoprocedureManager::LLCoprocedureManager():
-    LLSingleton<LLCoprocedureManager>(),
+LLCoprocedureManager::LLCoprocedureManager()
+{
+    DefaultPoolSizes.insert(std::map<std::string, U32>::value_type("Upload", 1));
+    DefaultPoolSizes.insert(std::map<std::string, U32>::value_type("AIS", 25));
+}
+
+LLCoprocedureManager::~LLCoprocedureManager()
+{
+
+}
+
+LLCoprocedureManager::poolPtr_t LLCoprocedureManager::initializePool(const std::string &poolName)
+{
+    // *TODO: Retrieve the actual number of concurrent coroutines fro gSavedSettings and
+    // clamp to a "reasonable" number.
+    std::string keyName = "PoolSize" + poolName;
+    int size = 5;
+
+    size = gSavedSettings.getU32(keyName);
+    if (size == 0)
+    {
+        std::map<std::string, U32>::iterator it = DefaultPoolSizes.find(poolName);
+        if (it == DefaultPoolSizes.end())
+            size = DEFAULT_POOL_SIZE;
+        else
+            size = (*it).second;
+        gSavedSettings.declareU32(keyName, size, "Coroutine Pool size for " + poolName, LLControlVariable::PERSIST_ALWAYS);
+        LL_WARNS() << "LLCoprocedureManager: No setting for \"" << keyName << "\" setting pool size to default of " << size << LL_ENDL;
+    }
+
+    poolPtr_t pool = poolPtr_t(new LLCoprocedurePool(poolName, size));
+    mPoolMap.insert(poolMap_t::value_type(poolName, pool));
+
+    return pool;
+}
+
+//-------------------------------------------------------------------------
+LLUUID LLCoprocedureManager::enqueueCoprocedure(const std::string &pool, const std::string &name, CoProcedure_t proc)
+{
+    poolPtr_t targetPool;
+    poolMap_t::iterator it = mPoolMap.find(pool);
+
+    if (it == mPoolMap.end())
+    {
+        targetPool = initializePool(pool);
+    }
+    else
+    {
+        targetPool = (*it).second;
+    }
+
+    if (!targetPool)
+    {
+        LL_WARNS() << "LLCoprocedureManager unable to create coprocedure pool named \"" << pool << "\"" << LL_ENDL;
+        return LLUUID::null;
+    }
+
+    return targetPool->enqueueCoprocedure(name, proc);
+}
+
+void LLCoprocedureManager::cancelCoprocedure(const LLUUID &id)
+{
+    for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it)
+    {
+        if ((*it).second->cancelCoprocedure(id))
+            return;
+    }
+    LL_INFOS() << "Coprocedure not found." << LL_ENDL;
+}
+
+void LLCoprocedureManager::shutdown(bool hardShutdown)
+{
+    for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it)
+    {
+        (*it).second->shutdown(hardShutdown);
+    }
+    mPoolMap.clear();
+}
+
+//-------------------------------------------------------------------------
+size_t LLCoprocedureManager::countPending() const
+{
+    size_t count = 0;
+    for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it)
+    {
+        count += (*it).second->countPending();
+    }
+    return count;
+}
+
+size_t LLCoprocedureManager::countPending(const std::string &pool) const
+{
+    poolMap_t::const_iterator it = mPoolMap.find(pool);
+
+    if (it == mPoolMap.end())
+        return 0;
+    return (*it).second->countPending();
+}
+
+size_t LLCoprocedureManager::countActive() const
+{
+    size_t count = 0;
+    for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it)
+    {
+        count += (*it).second->countActive();
+    }
+    return count;
+}
+
+size_t LLCoprocedureManager::countActive(const std::string &pool) const
+{
+    poolMap_t::const_iterator it = mPoolMap.find(pool);
+
+    if (it == mPoolMap.end())
+        return 0;
+    return (*it).second->countActive();
+}
+
+size_t LLCoprocedureManager::count() const
+{
+    size_t count = 0;
+    for (poolMap_t::const_iterator it = mPoolMap.begin(); it != mPoolMap.end(); ++it)
+    {
+        count += (*it).second->count();
+    }
+    return count;
+}
+
+size_t LLCoprocedureManager::count(const std::string &pool) const
+{
+    poolMap_t::const_iterator it = mPoolMap.find(pool);
+
+    if (it == mPoolMap.end())
+        return 0;
+    return (*it).second->count();
+}
+
+//=========================================================================
+LLCoprocedurePool::LLCoprocedurePool(const std::string &poolName, size_t size):
+    mPoolName(poolName),
+    mPoolSize(size),
     mPendingCoprocs(),
     mShutdown(false),
-    mWakeupTrigger("CoprocedureManager", true),
+    mWakeupTrigger("CoprocedurePool" + poolName, true),
     mCoroMapping(),
     mHTTPPolicy(LLCore::HttpRequest::DEFAULT_POLICY_ID)
 {
-
-    // *TODO: Retrieve the actual number of concurrent coroutines fro gSavedSettings and
-    // clamp to a "reasonable" number.
-    for (int count = 0; count < COROCOUNT; ++count)
+    for (size_t count = 0; count < mPoolSize; ++count)
     {
         LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter =
             LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t(
-            new LLCoreHttpUtil::HttpCoroutineAdapter("uploadPostAdapter", mHTTPPolicy));
+            new LLCoreHttpUtil::HttpCoroutineAdapter( mPoolName + "Adapter", mHTTPPolicy));
 
-        std::string uploadCoro = LLCoros::instance().launch("LLCoprocedureManager::coprocedureInvokerCoro",
-            boost::bind(&LLCoprocedureManager::coprocedureInvokerCoro, this, httpAdapter));
+        std::string uploadCoro = LLCoros::instance().launch("LLCoprocedurePool("+mPoolName+")::coprocedureInvokerCoro",
+            boost::bind(&LLCoprocedurePool::coprocedureInvokerCoro, this, httpAdapter));
 
         mCoroMapping.insert(CoroAdapterMap_t::value_type(uploadCoro, httpAdapter));
     }
 
+    LL_INFOS() << "Created coprocedure pool named \"" << mPoolName << "\" with " << size << " items." << LL_ENDL;
+
     mWakeupTrigger.post(LLSD());
 }
 
-LLCoprocedureManager::~LLCoprocedureManager() 
+LLCoprocedurePool::~LLCoprocedurePool() 
 {
     shutdown();
 }
 
-//=========================================================================
-
-void LLCoprocedureManager::shutdown(bool hardShutdown)
+//-------------------------------------------------------------------------
+void LLCoprocedurePool::shutdown(bool hardShutdown)
 {
     CoroAdapterMap_t::iterator it;
 
@@ -93,46 +320,47 @@ void LLCoprocedureManager::shutdown(bool hardShutdown)
     mPendingCoprocs.clear();
 }
 
-//=========================================================================
-LLUUID LLCoprocedureManager::enqueueCoprocedure(const std::string &name, LLCoprocedureManager::CoProcedure_t proc)
+//-------------------------------------------------------------------------
+LLUUID LLCoprocedurePool::enqueueCoprocedure(const std::string &name, LLCoprocedurePool::CoProcedure_t proc)
 {
     LLUUID id(LLUUID::generateNewID());
 
     mPendingCoprocs.push_back(QueuedCoproc::ptr_t(new QueuedCoproc(name, id, proc)));
-    LL_INFOS() << "Coprocedure(" << name << ") enqueued with id=" << id.asString() << LL_ENDL;
+    LL_INFOS() << "Coprocedure(" << name << ") enqueued with id=" << id.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL;
 
     mWakeupTrigger.post(LLSD());
 
     return id;
 }
 
-void LLCoprocedureManager::cancelCoprocedure(const LLUUID &id)
+bool LLCoprocedurePool::cancelCoprocedure(const LLUUID &id)
 {
     // first check the active coroutines.  If there, remove it and return.
     ActiveCoproc_t::iterator itActive = mActiveCoprocs.find(id);
     if (itActive != mActiveCoprocs.end())
     {
-        LL_INFOS() << "Found and canceling active coprocedure with id=" << id.asString() << LL_ENDL;
+        LL_INFOS() << "Found and canceling active coprocedure with id=" << id.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL;
         (*itActive).second->cancelYieldingOperation();
         mActiveCoprocs.erase(itActive);
-        return;
+        return true;
     }
 
     for (CoprocQueue_t::iterator it = mPendingCoprocs.begin(); it != mPendingCoprocs.end(); ++it)
     {
         if ((*it)->mId == id)
         {
-            LL_INFOS() << "Found and removing queued coroutine(" << (*it)->mName << ") with Id=" << id.asString() << LL_ENDL;
+            LL_INFOS() << "Found and removing queued coroutine(" << (*it)->mName << ") with Id=" << id.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL;
             mPendingCoprocs.erase(it);
-            return;
+            return true;
         }
     }
 
-    LL_INFOS() << "Coprocedure with Id=" << id.asString() << " was not found." << LL_ENDL;
+    LL_INFOS() << "Coprocedure with Id=" << id.asString() << " was not found." << " in pool \"" << mPoolName << "\"" << LL_ENDL;
+    return false;
 }
 
-//=========================================================================
-void LLCoprocedureManager::coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter)
+//-------------------------------------------------------------------------
+void LLCoprocedurePool::coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter)
 {
     LLCore::HttpRequest::ptr_t httpRequest(new LLCore::HttpRequest);
 
@@ -148,7 +376,7 @@ void LLCoprocedureManager::coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineA
             mPendingCoprocs.pop_front();
             mActiveCoprocs.insert(ActiveCoproc_t::value_type(coproc->mId, httpAdapter));
 
-            LL_INFOS() << "Dequeued and invoking coprocedure(" << coproc->mName << ") with id=" << coproc->mId.asString() << LL_ENDL;
+            LL_INFOS() << "Dequeued and invoking coprocedure(" << coproc->mName << ") with id=" << coproc->mId.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL;
 
             try
             {
@@ -161,10 +389,10 @@ void LLCoprocedureManager::coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineA
             }
             catch (...)
             {
-                LL_WARNS() << "A non std::exception was thrown from " << coproc->mName << " with id=" << coproc->mId << "." << LL_ENDL;
+                LL_WARNS() << "A non std::exception was thrown from " << coproc->mName << " with id=" << coproc->mId << "." << " in pool \"" << mPoolName << "\"" << LL_ENDL;
             }
 
-            LL_INFOS() << "Finished coprocedure(" << coproc->mName << ")" << LL_ENDL;
+            LL_INFOS() << "Finished coprocedure(" << coproc->mName << ")" << " in pool \"" << mPoolName << "\"" << LL_ENDL;
 
             ActiveCoproc_t::iterator itActive = mActiveCoprocs.find(coproc->mId);
             if (itActive != mActiveCoprocs.end())
diff --git a/indra/newview/llcoproceduremanager.h b/indra/newview/llcoproceduremanager.h
index 6ba3891e879..d7f74af76b2 100644
--- a/indra/newview/llcoproceduremanager.h
+++ b/indra/newview/llcoproceduremanager.h
@@ -33,6 +33,8 @@
 #include "llcorehttputil.h"
 #include "lluuid.h"
 
+class LLCoprocedurePool;
+
 class LLCoprocedureManager : public LLSingleton < LLCoprocedureManager >
 {
 public:
@@ -47,7 +49,7 @@ class LLCoprocedureManager : public LLSingleton < LLCoprocedureManager >
     /// @param proc Is a bound function to be executed 
     /// 
     /// @return This method returns a UUID that can be used later to cancel execution.
-    LLUUID enqueueCoprocedure(const std::string &name, CoProcedure_t proc);
+    LLUUID enqueueCoprocedure(const std::string &pool, const std::string &name, CoProcedure_t proc);
 
     /// Cancel a coprocedure. If the coprocedure is already being actively executed 
     /// this method calls cancelYieldingOperation() on the associated HttpAdapter
@@ -60,58 +62,26 @@ class LLCoprocedureManager : public LLSingleton < LLCoprocedureManager >
 
     /// Returns the number of coprocedures in the queue awaiting processing.
     ///
-    inline size_t countPending() const
-    {
-        return mPendingCoprocs.size();
-    }
+    size_t countPending() const;
+    size_t countPending(const std::string &pool) const;
 
     /// Returns the number of coprocedures actively being processed.
     ///
-    inline size_t countActive() const
-    {
-        return mActiveCoprocs.size();
-    }
+    size_t countActive() const;
+    size_t countActive(const std::string &pool) const;
 
     /// Returns the total number of coprocedures either queued or in active processing.
     ///
-    inline size_t count() const
-    {
-        return countPending() + countActive();
-    }
+    size_t count() const;
+    size_t count(const std::string &pool) const;
 
 private:
-    struct QueuedCoproc
-    {
-        typedef boost::shared_ptr<QueuedCoproc> ptr_t;
-
-        QueuedCoproc(const std::string &name, const LLUUID &id, CoProcedure_t proc):
-            mName(name),
-            mId(id),
-            mProc(proc)
-        {}
-
-        std::string mName;
-        LLUUID mId;
-        CoProcedure_t mProc;
-    };
-    
-    // we use a deque here rather than std::queue since we want to be able to 
-    // iterate through the queue and potentially erase an entry from the middle.
-    typedef std::deque<QueuedCoproc::ptr_t>  CoprocQueue_t;  
-    typedef std::map<LLUUID, LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t> ActiveCoproc_t;
-
-    CoprocQueue_t   mPendingCoprocs;
-    ActiveCoproc_t  mActiveCoprocs;
-    bool            mShutdown;
-    LLEventStream   mWakeupTrigger;
-
-
-    typedef std::map<std::string, LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t> CoroAdapterMap_t;
-    LLCore::HttpRequest::policy_t mHTTPPolicy;
+    typedef boost::shared_ptr<LLCoprocedurePool> poolPtr_t;
+    typedef std::map<std::string, poolPtr_t> poolMap_t;
 
-    CoroAdapterMap_t mCoroMapping;
+    poolMap_t mPoolMap;
 
-    void coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter);
+    poolPtr_t initializePool(const std::string &poolName);
 };
 
 #endif
diff --git a/indra/newview/llviewerassetupload.cpp b/indra/newview/llviewerassetupload.cpp
index 4ef398d3144..6c6d3a4f332 100644
--- a/indra/newview/llviewerassetupload.cpp
+++ b/indra/newview/llviewerassetupload.cpp
@@ -666,7 +666,8 @@ LLUUID LLViewerAssetUpload::EnqueueInventoryUpload(const std::string &url, const
 {
     std::string procName("LLViewerAssetUpload::AssetInventoryUploadCoproc(");
     
-    LLUUID queueId = LLCoprocedureManager::getInstance()->enqueueCoprocedure(procName + LLAssetType::lookup(uploadInfo->getAssetType()) + ")",
+    LLUUID queueId = LLCoprocedureManager::getInstance()->enqueueCoprocedure("Upload", 
+        procName + LLAssetType::lookup(uploadInfo->getAssetType()) + ")",
         boost::bind(&LLViewerAssetUpload::AssetInventoryUploadCoproc, _1, _2, url, uploadInfo));
 
     return queueId;
-- 
GitLab