Skip to content
Snippets Groups Projects
Commit 3f7b060f authored by Rye Mutt's avatar Rye Mutt :bread:
Browse files

Pull in Kitty's changes to de-slow the queued thread system

parent 2b22097d
No related branches found
No related tags found
No related merge requests found
......@@ -36,9 +36,10 @@
LLQueuedThread::LLQueuedThread(const std::string& name, bool threaded, bool should_pause) :
LLThread(name),
mThreaded(threaded),
mIdleThread(TRUE),
mNextHandle(0),
mStarted(FALSE)
mStarted(false),
mIdleThread(true),
mRequestQueueSize(0),
mNextHandle(0)
{
if (mThreaded)
{
......@@ -117,7 +118,7 @@ S32 LLQueuedThread::update(F32 max_time_ms)
if (!mThreaded)
{
startThread();
mStarted = TRUE;
mStarted = true;
}
}
return updateQueue(max_time_ms);
......@@ -162,17 +163,6 @@ void LLQueuedThread::incQueue()
}
}
//virtual
// May be called from any thread
S32 LLQueuedThread::getPending()
{
S32 res;
lockData();
res = mRequestQueue.size();
unlockData();
return res;
}
// MAIN thread
void LLQueuedThread::waitOnPending()
{
......@@ -236,6 +226,7 @@ bool LLQueuedThread::addRequest(QueuedRequest* req)
#if _DEBUG
// LL_INFOS() << llformat("LLQueuedThread::Added req [%08d]",handle) << LL_ENDL;
#endif
mRequestQueueSize = mRequestQueue.size();
unlockData();
incQueue();
......@@ -289,7 +280,7 @@ LLQueuedThread::QueuedRequest* LLQueuedThread::getRequest(handle_t handle)
{
if (handle == nullHandle())
{
return 0;
return nullptr;
}
lockData();
QueuedRequest* res = (QueuedRequest*)mRequestHash.find(handle);
......@@ -429,6 +420,10 @@ S32 LLQueuedThread::processNextRequest()
llassert_always(req->getStatus() == STATUS_QUEUED);
break;
}
// Update queue size after processing
mRequestQueueSize = mRequestQueue.size();
U32 start_priority = 0 ;
if (req)
{
......@@ -463,6 +458,7 @@ S32 LLQueuedThread::processNextRequest()
lockData();
req->setStatus(STATUS_QUEUED);
mRequestQueue.insert(req);
mRequestQueueSize = mRequestQueue.size();
unlockData();
if (mThreaded && start_priority < PRIORITY_NORMAL)
{
......@@ -481,10 +477,7 @@ S32 LLQueuedThread::processNextRequest()
bool LLQueuedThread::runCondition()
{
// mRunCondition must be locked here
if (mRequestQueue.empty() && mIdleThread)
return false;
else
return true;
return !((mRequestQueueSize <= 0) && mIdleThread);
}
// virtual
......@@ -493,7 +486,7 @@ void LLQueuedThread::run()
// call checPause() immediately so we don't try to do anything before the class is fully constructed
checkPause();
startThread();
mStarted = TRUE;
mStarted = true;
while (1)
{
......@@ -507,7 +500,7 @@ void LLQueuedThread::run()
break;
}
mIdleThread = FALSE;
mIdleThread = false;
threadedUpdate();
......@@ -515,7 +508,7 @@ void LLQueuedThread::run()
if (pending_work == 0)
{
mIdleThread = TRUE;
mIdleThread = true;
ms_sleep(1);
}
//LLThread::yield(); // thread should yield after each request
......
......@@ -153,11 +153,10 @@ class LL_COMMON_API LLQueuedThread : public LLThread
virtual ~LLQueuedThread();
virtual void shutdown();
private:
// No copy constructor or copy assignment
LLQueuedThread(const LLQueuedThread&);
LLQueuedThread& operator=(const LLQueuedThread&);
LLQueuedThread(const LLQueuedThread&) = delete;
LLQueuedThread& operator=(const LLQueuedThread&) = delete;
private:
virtual bool runCondition(void);
virtual void run(void);
virtual void startThread(void);
......@@ -179,8 +178,8 @@ class LL_COMMON_API LLQueuedThread : public LLThread
void waitOnPending();
void printQueueStats();
virtual S32 getPending();
bool getThreaded() { return mThreaded ? true : false; }
virtual S32 getPending() const { return mRequestQueueSize; } // May be called from any thread
bool getThreaded() const { return mThreaded; }
// Request accessors
status_t getRequestStatus(handle_t handle);
......@@ -196,12 +195,13 @@ class LL_COMMON_API LLQueuedThread : public LLThread
bool check();
protected:
BOOL mThreaded; // if false, run on main thread and do updates during update()
BOOL mStarted; // required when mThreaded is false to call startThread() from update()
bool mThreaded; // if false, run on main thread and do updates during update()
bool mStarted; // required when mThreaded is false to call startThread() from update()
LLAtomicBool mIdleThread; // request queue is empty (or we are quitting) and the thread is idle
typedef std::set<QueuedRequest*, queued_request_less> request_queue_t;
request_queue_t mRequestQueue;
std::atomic<S32> mRequestQueueSize;
enum { REQUEST_HASH_SIZE = 512 }; // must be power of 2
typedef LLSimpleHash<handle_t, REQUEST_HASH_SIZE> request_hash_t;
......
......@@ -35,7 +35,8 @@
// Run on MAIN thread
LLWorkerThread::LLWorkerThread(const std::string& name, bool threaded, bool should_pause) :
LLQueuedThread(name, threaded, should_pause)
LLQueuedThread(name, threaded, should_pause),
mDeleteListSize(0)
{
mDeleteMutex = new LLMutex();
......@@ -76,6 +77,7 @@ void LLWorkerThread::clearDeleteList()
delete *iter ;
}
mDeleteList.clear() ;
mDeleteListSize = mDeleteList.size();
mDeleteMutex->unlock() ;
}
}
......@@ -87,26 +89,30 @@ S32 LLWorkerThread::update(F32 max_time_ms)
// Delete scheduled workers
std::vector<LLWorkerClass*> delete_list;
std::vector<LLWorkerClass*> abort_list;
mDeleteMutex->lock();
for (delete_list_t::iterator iter = mDeleteList.begin();
iter != mDeleteList.end(); )
if (mDeleteListSize)
{
delete_list_t::iterator curiter = iter++;
LLWorkerClass* worker = *curiter;
if (worker->deleteOK())
mDeleteMutex->lock();
for (delete_list_t::iterator iter = mDeleteList.begin();
iter != mDeleteList.end(); )
{
if (worker->getFlags(LLWorkerClass::WCF_WORK_FINISHED))
{
delete_list.push_back(worker);
mDeleteList.erase(curiter);
}
else if (!worker->getFlags(LLWorkerClass::WCF_ABORT_REQUESTED))
delete_list_t::iterator curiter = iter++;
LLWorkerClass* worker = *curiter;
if (worker->deleteOK())
{
abort_list.push_back(worker);
if (worker->getFlags(LLWorkerClass::WCF_WORK_FINISHED))
{
delete_list.push_back(worker);
mDeleteList.erase(curiter);
}
else if (!worker->getFlags(LLWorkerClass::WCF_ABORT_REQUESTED))
{
abort_list.push_back(worker);
}
}
}
mDeleteListSize = mDeleteList.size();
mDeleteMutex->unlock();
}
mDeleteMutex->unlock();
// abort and delete after releasing mutex
for (std::vector<LLWorkerClass*>::iterator iter = abort_list.begin();
iter != abort_list.end(); ++iter)
......@@ -154,6 +160,7 @@ void LLWorkerThread::deleteWorker(LLWorkerClass* workerclass)
{
mDeleteMutex->lock();
mDeleteList.push_back(workerclass);
mDeleteListSize = mDeleteList.size();
mDeleteMutex->unlock();
}
......
......@@ -81,6 +81,7 @@ class LL_COMMON_API LLWorkerThread : public LLQueuedThread
private:
typedef std::list<LLWorkerClass*> delete_list_t;
delete_list_t mDeleteList;
std::atomic<S32> mDeleteListSize;
LLMutex* mDeleteMutex;
public:
......@@ -91,7 +92,7 @@ class LL_COMMON_API LLWorkerThread : public LLQueuedThread
handle_t addWorkRequest(LLWorkerClass* workerclass, S32 param, U32 priority = PRIORITY_NORMAL);
S32 getNumDeletes() { return (S32)mDeleteList.size(); } // debug
S32 getNumDeletes() const { return mDeleteListSize; } // debug
private:
void deleteWorker(LLWorkerClass* workerclass); // schedule for deletion
......
......@@ -34,6 +34,7 @@
// MAIN THREAD
LLImageDecodeThread::LLImageDecodeThread(bool threaded)
: LLQueuedThread("imagedecode", threaded)
, mCreationListSize(0)
{
mCreationMutex = new LLMutex();
}
......@@ -48,22 +49,26 @@ LLImageDecodeThread::~LLImageDecodeThread()
// virtual
S32 LLImageDecodeThread::update(F32 max_time_ms)
{
LLMutexLock lock(mCreationMutex);
for (creation_list_t::iterator iter = mCreationList.begin();
iter != mCreationList.end(); ++iter)
if (mCreationListSize > 0)
{
creation_info& info = *iter;
ImageRequest* req = new ImageRequest(info.handle, info.image,
info.priority, info.discard, info.needs_aux,
info.responder);
bool res = addRequest(req);
if (!res)
LLMutexLock lock(mCreationMutex);
for (creation_list_t::iterator iter = mCreationList.begin();
iter != mCreationList.end(); ++iter)
{
LL_ERRS() << "request added after LLLFSThread::cleanupClass()" << LL_ENDL;
creation_info& info = *iter;
ImageRequest* req = new ImageRequest(info.handle, info.image,
info.priority, info.discard, info.needs_aux,
info.responder);
bool res = addRequest(req);
if (!res)
{
LL_ERRS() << "request added after LLLFSThread::cleanupClass()" << LL_ENDL;
}
}
mCreationList.clear();
mCreationListSize = 0;
}
mCreationList.clear();
S32 res = LLQueuedThread::update(max_time_ms);
return res;
}
......@@ -74,6 +79,7 @@ LLImageDecodeThread::handle_t LLImageDecodeThread::decodeImage(LLImageFormatted*
LLMutexLock lock(mCreationMutex);
handle_t handle = generateHandle();
mCreationList.push_back(creation_info(handle, image, priority, discard, needs_aux, responder));
mCreationListSize = mCreationList.size();
return handle;
}
......
......@@ -86,18 +86,20 @@ class LLImageDecodeThread : public LLQueuedThread
private:
struct creation_info
{
handle_t handle;
LLPointer<LLImageFormatted> image;
LLPointer<Responder> responder;
handle_t handle;
U32 priority;
S32 discard;
BOOL needs_aux;
LLPointer<Responder> responder;
creation_info(handle_t h, LLImageFormatted* i, U32 p, S32 d, BOOL aux, Responder* r)
: handle(h), image(i), priority(p), discard(d), needs_aux(aux), responder(r)
: image(i), responder(r), handle(h), priority(p), discard(d), needs_aux(aux)
{}
};
typedef std::list<creation_info> creation_list_t;
creation_list_t mCreationList;
std::atomic<S32> mCreationListSize;
LLMutex* mCreationMutex;
};
......
......@@ -831,6 +831,8 @@ LLTextureCache::LLTextureCache(bool threaded)
mListMutex(),
mFastCacheMutex(),
mHeaderAPRFile(NULL),
mPrioritizeWriteListEmpty(true),
mCompletedListEmpty(true),
mReadOnly(TRUE), //do not allow to change the texture cache until setReadOnly() is called.
mTexturesSizeTotal(0),
mDoPurge(FALSE),
......@@ -862,35 +864,41 @@ S32 LLTextureCache::update(F32 max_time_ms)
S32 res;
res = LLWorkerThread::update(max_time_ms);
mListMutex.lock();
handle_list_t priorty_list = mPrioritizeWriteList; // copy list
mPrioritizeWriteList.clear();
responder_list_t completed_list = mCompletedList; // copy list
mCompletedList.clear();
mListMutex.unlock();
lockWorkers();
handle_list_t priorty_list;
responder_list_t completed_list;
if ((!mPrioritizeWriteListEmpty) || (!mCompletedListEmpty))
{
LLMutexLock lock(&mListMutex);
priorty_list.swap(mPrioritizeWriteList); // copy list
mPrioritizeWriteList.clear();
mPrioritizeWriteListEmpty = mPrioritizeWriteList.empty();
completed_list.swap(mCompletedList); // copy list
mCompletedList.clear();
mCompletedListEmpty = mCompletedList.empty();
}
for (handle_list_t::iterator iter1 = priorty_list.begin();
iter1 != priorty_list.end(); ++iter1)
if (!priorty_list.empty())
{
handle_t handle = *iter1;
handle_map_t::iterator iter2 = mWriters.find(handle);
if(iter2 != mWriters.end())
lockWorkers();
for (handle_t handle : priorty_list)
{
LLTextureCacheWorker* worker = iter2->second;
worker->setPriority(LLWorkerThread::PRIORITY_HIGH | worker->mPriority);
handle_map_t::iterator iter2 = mWriters.find(handle);
if (iter2 != mWriters.end())
{
LLTextureCacheWorker* worker = iter2->second;
worker->setPriority(LLWorkerThread::PRIORITY_HIGH | worker->mPriority);
}
}
}
unlockWorkers();
unlockWorkers();
}
// call 'completed' with workers list unlocked (may call readComplete() or writeComplete()
for (responder_list_t::iterator iter1 = completed_list.begin();
iter1 != completed_list.end(); ++iter1)
{
Responder *responder = iter1->first;
bool success = iter1->second;
for (auto& iter1 : completed_list)
{
Responder *responder = iter1.first;
bool success = iter1.second;
responder->completed(success);
}
......@@ -917,7 +925,7 @@ std::string LLTextureCache::getLocalFileName(const LLUUID& id)
std::string LLTextureCache::getTextureFileName(const LLUUID& id)
{
std::string idstr = id.asString();
std::string delem = gDirUtilp->getDirDelimiter();
const std::string& delem = gDirUtilp->getDirDelimiter();
std::string filename = mTexturesDirName + delem + idstr[0] + delem + idstr + ".texture";
return filename;
}
......@@ -993,8 +1001,6 @@ const char* fast_cache_filename = "FastCache.cache";
void LLTextureCache::setDirNames(ELLPath location)
{
std::string delem = gDirUtilp->getDirDelimiter();
mHeaderEntriesFileName = gDirUtilp->getExpandedFilename(location, textures_dirname, entries_filename);
mHeaderDataFileName = gDirUtilp->getExpandedFilename(location, textures_dirname, cache_filename);
mTexturesDirName = gDirUtilp->getExpandedFilename(location, textures_dirname);
......@@ -2300,12 +2306,14 @@ void LLTextureCache::prioritizeWrite(handle_t handle)
// which could create a deadlock
LLMutexLock lock(&mListMutex);
mPrioritizeWriteList.push_back(handle);
mPrioritizeWriteListEmpty = mPrioritizeWriteList.empty();
}
void LLTextureCache::addCompleted(Responder* responder, bool success)
{
LLMutexLock lock(&mListMutex);
mCompletedList.push_back(std::make_pair(responder,success));
mCompletedListEmpty = mCompletedList.empty();
}
//////////////////////////////////////////////////////////////////////////////
......
......@@ -210,9 +210,11 @@ class LLTextureCache : public LLWorkerThread
typedef std::vector<handle_t> handle_list_t;
handle_list_t mPrioritizeWriteList;
std::atomic<bool> mPrioritizeWriteListEmpty;
typedef std::vector<std::pair<LLPointer<Responder>, bool> > responder_list_t;
responder_list_t mCompletedList;
std::atomic<bool> mCompletedListEmpty;
BOOL mReadOnly;
......
......@@ -2569,8 +2569,9 @@ LLTextureFetch::LLTextureFetch(LLTextureCache* cache, LLImageDecodeThread* image
mTextureCache(cache),
mImageDecodeThread(imagedecodethread),
mTextureBandwidth(0),
mHTTPTextureBits(0),
mHTTPTextureBits((U32Bits)0),
mTotalHTTPRequests(0),
mCommandsSize(0),
mQAMode(qa_mode),
mHttpRequest(NULL),
mHttpOptions(),
......@@ -2632,6 +2633,7 @@ LLTextureFetch::~LLTextureFetch()
mCommands.erase(mCommands.begin());
delete req;
}
mCommandsSize = 0;
mHttpWaitResource.clear();
......@@ -2814,7 +2816,7 @@ void LLTextureFetch::removeFromHTTPQueue(const LLUUID& id, S32Bytes received_siz
{
LLMutexLock lock(&mNetworkQueueMutex); // +Mfnq
mHTTPTextureQueue.erase(id);
mHTTPTextureBits += received_size; // Approximate - does not include header bits
mHTTPTextureBits = U32Bits(mHTTPTextureBits) + received_size;
} // -Mfnq
// NB: If you change deleteRequest() you should probably make
......@@ -3013,54 +3015,12 @@ bool LLTextureFetch::updateRequestPriority(const LLUUID& id, F32 priority)
return res;
}
// Replicates and expands upon the base class's
// getPending() implementation. getPending() and
// runCondition() replicate one another's logic to
// an extent and are sometimes used for the same
// function (deciding whether or not to sleep/pause
// a thread). So the implementations need to stay
// in step, at least until this can be refactored and
// the redundancy eliminated.
//
// Threads: T*
//virtual
S32 LLTextureFetch::getPending()
{
S32 res;
lockData(); // +Ct
{
LLMutexLock lock(&mQueueMutex); // +Mfq
res = mRequestQueue.size();
res += mCommands.size();
} // -Mfq
unlockData(); // -Ct
return res;
}
// Locks: Ct
// virtual
bool LLTextureFetch::runCondition()
{
// Caller is holding the lock on LLThread's condition variable.
// LLQueuedThread, unlike its base class LLThread, makes this a
// private method which is unfortunate. I want to use it directly
// but I'm going to have to re-implement the logic here (or change
// declarations, which I don't want to do right now).
//
// Changes here may need to be reflected in getPending().
bool have_no_commands(false);
{
LLMutexLock lock(&mQueueMutex); // +Mfq
have_no_commands = mCommands.empty();
} // -Mfq
return ! (have_no_commands
&& (mRequestQueue.empty() && mIdleThread)); // From base class
return ! ( !mCommandsSize && (!mRequestQueueSize && mIdleThread) ); // From base class
}
//////////////////////////////////////////////////////////////////////////////
......@@ -3104,16 +3064,12 @@ void LLTextureFetch::commonUpdate()
//virtual
S32 LLTextureFetch::update(F32 max_time_ms)
{
static LLCachedControl<F32> band_width(gSavedSettings,"ThrottleBandwidthKBPS", 3000.0);
static LLCachedControl<F32> band_width(gSavedSettings,"ThrottleBandwidthKBPS", 3000.0f);
{
mNetworkQueueMutex.lock(); // +Mfnq
mMaxBandwidth = band_width();
add(LLStatViewer::TEXTURE_NETWORK_DATA_RECEIVED, mHTTPTextureBits);
mHTTPTextureBits = (U32Bits)0;
mNetworkQueueMutex.unlock(); // -Mfnq
add(LLStatViewer::TEXTURE_NETWORK_DATA_RECEIVED, mHTTPTextureBits.exchange((U32Bits)0));
}
S32 res = LLWorkerThread::update(max_time_ms);
......@@ -3288,10 +3244,9 @@ void LLTextureFetch::sendRequestListToSimulators()
}
} // -Mfnq
for (work_request_map_t::iterator iter1 = requests.begin();
iter1 != requests.end(); ++iter1)
{
LLHost host = iter1->first;
for (auto& request : requests)
{
LLHost host = request.first;
// invalid host = use agent host
if (host.isInvalid())
{
......@@ -3300,10 +3255,8 @@ void LLTextureFetch::sendRequestListToSimulators()
S32 sim_request_count = 0;
for (request_list_t::iterator iter2 = iter1->second.begin();
iter2 != iter1->second.end(); ++iter2)
{
LLTextureFetchWorker* req = *iter2;
for (LLTextureFetchWorker* req : request.second)
{
if (gMessageSystem)
{
if (req->mSentRequest != LLTextureFetchWorker::SENT_SIM)
......@@ -3932,6 +3885,7 @@ void LLTextureFetch::cmdEnqueue(TFRequest * req)
{
lockQueue(); // +Mfq
mCommands.push_back(req);
mCommandsSize = mCommands.size();
unlockQueue(); // -Mfq
unpause();
......@@ -3948,6 +3902,7 @@ LLTextureFetch::TFRequest * LLTextureFetch::cmdDequeue()
ret = mCommands.front();
mCommands.pop_front();
}
mCommandsSize = mCommands.size();
unlockQueue(); // -Mfq
return ret;
......
......@@ -135,7 +135,7 @@ class LLTextureFetch : public LLWorkerThread
U32 getTotalNumHTTPRequests();
// Threads: T*
S32 getPending();
S32 getPending() const override { return mCommandsSize + mRequestQueueSize; }
// Threads: T*
void lockQueue() { mQueueMutex.lock(); }
......@@ -332,12 +332,12 @@ class LLTextureFetch : public LLWorkerThread
typedef std::map<LLHost,std::set<LLUUID> > cancel_queue_t;
cancel_queue_t mCancelQueue; // Mfnq
F32 mTextureBandwidth; // <none>
F32 mMaxBandwidth; // Mfnq
std::atomic<F32> mMaxBandwidth;
LLTextureInfo mTextureInfo;
LLTextureInfo mTextureInfoMainThread;
// XXX possible delete
U32Bits mHTTPTextureBits; // Mfnq
std::atomic<U32Bits> mHTTPTextureBits;
// XXX possible delete
//debug use
......@@ -349,6 +349,7 @@ class LLTextureFetch : public LLWorkerThread
// same locks.
typedef std::deque<TFRequest *> command_queue_t;
command_queue_t mCommands; // Mfq
std::atomic<S32> mCommandsSize;
// If true, modifies some behaviors that help with QA tasks.
const bool mQAMode;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment