Newer
Older
* $LicenseInfo:firstyear=2004&license=viewerlgpl$
* Copyright (C) 2010, Linden Research, Inc.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation;
* version 2.1 of the License only.
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
* Linden Research, Inc., 945 Battery Street, San Francisco, CA 94111 USA
*/
#include "linden_common.h"
#include "llqueuedthread.h"
#include "lltimer.h" // ms_sleep()
#include "lltracethreadrecorder.h"
//============================================================================
// MAIN THREAD
Xiaohong Bao
committed
LLQueuedThread::LLQueuedThread(const std::string& name, bool threaded, bool should_pause) :
mStarted(false),
mIdleThread(true),
mRequestQueueSize(0),
mNextHandle(0)
Xiaohong Bao
committed
if(should_pause)
{
pause() ; //call this before start the thread.
}
start();
}
}
// MAIN THREAD
LLQueuedThread::~LLQueuedThread()
if (!mThreaded)
{
endThread();
}
shutdown();
// ~LLThread() will be called here
}
void LLQueuedThread::shutdown()
{
setQuitting();
unpause(); // MAIN THREAD
if (mThreaded)
{
S32 timeout = 100;
for ( ; timeout>0; timeout--)
{
if (isStopped())
{
break;
}
ms_sleep(100);
LLThread::yield();
}
if (timeout == 0)
{
LL_WARNS() << "~LLQueuedThread (" << mName << ") timed out!" << LL_ENDL;
}
}
else
{
mStatus = STOPPED;
}
QueuedRequest* req;
while ( (req = (QueuedRequest*)mRequestHash.pop_element()) )
{
if (req->getStatus() == STATUS_QUEUED || req->getStatus() == STATUS_INPROGRESS)
{
++active_count;
req->setStatus(STATUS_ABORTED); // avoid assert in deleteRequest
if (active_count)
{
LL_WARNS() << "~LLQueuedThread() called with active requests: " << active_count << LL_ENDL;
}
//----------------------------------------------------------------------------
// MAIN THREAD
S32 LLQueuedThread::update(F32 max_time_ms)
if (!mStarted)
{
if (!mThreaded)
{
startThread();
return updateQueue(max_time_ms);
S32 LLQueuedThread::updateQueue(F32 max_time_ms)
F64 max_time = (F64)max_time_ms * .001;
LLTimer timer;
S32 pending = 1;
// Frame Update
if (mThreaded)
pending = getPending();
if(pending > 0)
{
else
{
while (pending > 0)
pending = processNextRequest();
if (max_time && timer.getElapsedTimeF64() > max_time)
break;
return pending;
}
void LLQueuedThread::incQueue()
{
// Something has been added to the queue
if (!isPaused())
wake(); // Wake the thread up if necessary.
}
}
}
// MAIN thread
void LLQueuedThread::waitOnPending()
{
while(1)
{
if (mIdleThread)
{
break;
}
if (mThreaded)
{
yield();
}
}
return;
}
// MAIN thread
void LLQueuedThread::printQueueStats()
{
lockData();
if (!mRequestQueue.empty())
{
QueuedRequest *req = *mRequestQueue.begin();
LL_INFOS() << llformat("Pending Requests:%d Current status:%d", mRequestQueue.size(), req->getStatus()) << LL_ENDL;
LL_INFOS() << "Queued Thread Idle" << LL_ENDL;
}
unlockData();
}
// MAIN thread
LLQueuedThread::handle_t LLQueuedThread::generateHandle()
{
lockData();
while ((mNextHandle == nullHandle()) || (mRequestHash.find(mNextHandle)))
{
mNextHandle++;
}
Dave Simmons
committed
const LLQueuedThread::handle_t res = mNextHandle++;
Dave Simmons
committed
return res;
}
// MAIN thread
bool LLQueuedThread::addRequest(QueuedRequest* req)
{
if (mStatus == QUITTING)
{
return false;
}
lockData();
req->setStatus(STATUS_QUEUED);
mRequestQueue.insert(req);
mRequestHash.insert(req);
#if _DEBUG
// LL_INFOS() << llformat("LLQueuedThread::Added req [%08d]",handle) << LL_ENDL;
mRequestQueueSize = mRequestQueue.size();
return true;
}
// MAIN thread
bool LLQueuedThread::waitForResult(LLQueuedThread::handle_t handle, bool auto_complete)
{
llassert (handle != nullHandle());
bool res = false;
bool waspaused = isPaused();
bool done = false;
while(!done)
{
update(0); // unpauses
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
lockData();
QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle);
if (!req)
{
done = true; // request does not exist
}
else if (req->getStatus() == STATUS_COMPLETE)
{
res = true;
if (auto_complete)
{
mRequestHash.erase(handle);
req->deleteRequest();
// check();
}
done = true;
}
unlockData();
if (!done && mThreaded)
{
yield();
}
}
if (waspaused)
{
pause();
}
return res;
}
// MAIN thread
LLQueuedThread::QueuedRequest* LLQueuedThread::getRequest(handle_t handle)
{
if (handle == nullHandle())
{
}
lockData();
QueuedRequest* res = (QueuedRequest*)mRequestHash.find(handle);
unlockData();
return res;
}
LLQueuedThread::status_t LLQueuedThread::getRequestStatus(handle_t handle)
{
status_t res = STATUS_EXPIRED;
lockData();
QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle);
if (req)
{
res = req->getStatus();
}
unlockData();
return res;
}
void LLQueuedThread::abortRequest(handle_t handle, bool autocomplete)
{
lockData();
QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle);
if (req)
{
req->setFlags(FLAG_ABORT | (autocomplete ? FLAG_AUTO_COMPLETE : 0));
void LLQueuedThread::setFlags(handle_t handle, U32 flags)
{
lockData();
QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle);
if (req)
{
req->setFlags(flags);
}
unlockData();
}
void LLQueuedThread::setPriority(handle_t handle, U32 priority)
{
lockData();
QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle);
if(req->getStatus() == STATUS_INPROGRESS)
{
// not in list
req->setPriority(priority);
}
else if(req->getStatus() == STATUS_QUEUED)
{
// remove from list then re-insert
llverify(mRequestQueue.erase(req) == 1);
req->setPriority(priority);
mRequestQueue.insert(req);
}
}
unlockData();
}
bool LLQueuedThread::completeRequest(handle_t handle)
{
bool res = false;
lockData();
QueuedRequest* req = (QueuedRequest*)mRequestHash.find(handle);
if (req)
{
llassert_always(req->getStatus() != STATUS_QUEUED);
llassert_always(req->getStatus() != STATUS_INPROGRESS);
// LL_INFOS() << llformat("LLQueuedThread::Completed req [%08d]",handle) << LL_ENDL;
#endif
mRequestHash.erase(handle);
req->deleteRequest();
// check();
res = true;
}
unlockData();
return res;
}
bool LLQueuedThread::check()
{
#if 0 // not a reliable check once mNextHandle wraps, just for quick and dirty debugging
for (int i=0; i<REQUEST_HASH_SIZE; i++)
{
LLSimpleHashEntry<handle_t>* entry = mRequestHash.get_element_at_index(i);
while (entry)
{
if (entry->getHashKey() > mNextHandle)
{
return false;
}
entry = entry->getNextEntry();
}
}
#endif
return true;
}
//============================================================================
// Runs on its OWN thread
S32 LLQueuedThread::processNextRequest()
req = NULL;
if (mRequestQueue.empty())
req = *mRequestQueue.begin();
mRequestQueue.erase(mRequestQueue.begin());
if ((req->getFlags() & FLAG_ABORT) || (mStatus == QUITTING))
req->finishRequest(false);
if (req->getFlags() & FLAG_AUTO_COMPLETE)
{
mRequestHash.erase(req);
req->deleteRequest();
// check();
}
continue;
llassert_always(req->getStatus() == STATUS_QUEUED);
break;
// Update queue size after processing
mRequestQueueSize = mRequestQueue.size();
if (req)
{
req->setStatus(STATUS_INPROGRESS);
start_priority = req->getPriority();
// This is the only place we will call req->setStatus() after
// it has initially been seet to STATUS_QUEUED, so it is
// safe to access req.
if (req)
{
bool complete = req->processRequest();
if (complete)
{
lockData();
req->setStatus(STATUS_COMPLETE);
req->finishRequest(true);
if (req->getFlags() & FLAG_AUTO_COMPLETE)
mRequestHash.erase(req);
req->deleteRequest();
// check();
}
unlockData();
}
else
{
lockData();
req->setStatus(STATUS_QUEUED);
mRequestQueue.insert(req);
mRequestQueueSize = mRequestQueue.size();
if (mThreaded && start_priority < PRIORITY_NORMAL)
{
ms_sleep(1); // sleep the thread a little
}
LLTrace::get_thread_recorder()->pushToParent();
S32 pending = getPending();
return pending;
bool LLQueuedThread::runCondition()
{
// mRunCondition must be locked here
return !((mRequestQueueSize <= 0) && mIdleThread);
// call checPause() immediately so we don't try to do anything before the class is fully constructed
checkPause();
startThread();
while (1)
{
// this will block on the condition until runCondition() returns true, the thread is unpaused, or the thread leaves the RUNNING state.
checkPause();
if (isQuitting())
{
LLTrace::get_thread_recorder()->pushToParent();
mIdleThread = false;
threadedUpdate();
int pending_work = processNextRequest();
if (pending_work == 0)
mIdleThread = true;
//LLThread::yield(); // thread should yield after each request
LL_INFOS() << "LLQueuedThread " << mName << " EXITING." << LL_ENDL;
// virtual
void LLQueuedThread::startThread()
{
}
// virtual
void LLQueuedThread::endThread()
{
}
// virtual
void LLQueuedThread::threadedUpdate()
{
}
//============================================================================
LLQueuedThread::QueuedRequest::QueuedRequest(LLQueuedThread::handle_t handle, U32 priority, U32 flags) :
LLSimpleHashEntry<LLQueuedThread::handle_t>(handle),
mStatus(STATUS_UNKNOWN),
mPriority(priority),
mFlags(flags)
{
}
LLQueuedThread::QueuedRequest::~QueuedRequest()
{
llassert_always(mStatus == STATUS_DELETE);
void LLQueuedThread::QueuedRequest::finishRequest(bool completed)
{
}
//virtual
void LLQueuedThread::QueuedRequest::deleteRequest()
{
llassert_always(mStatus != STATUS_INPROGRESS);