Skip to content
Snippets Groups Projects
Commit cc6f1d61 authored by Nat Goodspeed's avatar Nat Goodspeed
Browse files

DRTVWR-476: Use shared_ptr to manage lifespan of coprocedure queue.

Since the consuming coroutine LLCoprocedurePool::coprocedureInvokerCoro() has
been observed to outlive the LLCoprocedurePool instance that owns the
CoprocQueue_t, closing that queue isn't enough to keep the coroutine from
crashing at shutdown: accessing a deleted CoprocQueue_t is fatal whether or
not it's been closed.

Make LLCoprocedurePool store a shared_ptr to a heap CoprocQueue_t instance,
and pass that shared_ptr by value to consuming coroutines. That way the
CoprocQueue_t instance is guaranteed to live as long as the last interested
party.
parent 9008124d
No related branches found
No related tags found
No related merge requests found
...@@ -94,13 +94,17 @@ class LLCoprocedurePool: private boost::noncopyable ...@@ -94,13 +94,17 @@ class LLCoprocedurePool: private boost::noncopyable
// we use a buffered_channel here rather than unbuffered_channel since we want to be able to // we use a buffered_channel here rather than unbuffered_channel since we want to be able to
// push values without blocking,even if there's currently no one calling a pop operation (due to // push values without blocking,even if there's currently no one calling a pop operation (due to
// fibber running right now) // fiber running right now)
typedef boost::fibers::buffered_channel<QueuedCoproc::ptr_t> CoprocQueue_t; typedef boost::fibers::buffered_channel<QueuedCoproc::ptr_t> CoprocQueue_t;
// Use shared_ptr to control the lifespan of our CoprocQueue_t instance
// because the consuming coroutine might outlive this LLCoprocedurePool
// instance.
typedef boost::shared_ptr<CoprocQueue_t> CoprocQueuePtr;
typedef std::map<LLUUID, LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t> ActiveCoproc_t; typedef std::map<LLUUID, LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t> ActiveCoproc_t;
std::string mPoolName; std::string mPoolName;
size_t mPoolSize; size_t mPoolSize;
CoprocQueue_t mPendingCoprocs; CoprocQueuePtr mPendingCoprocs;
ActiveCoproc_t mActiveCoprocs; ActiveCoproc_t mActiveCoprocs;
LLTempBoundListener mStatusListener; LLTempBoundListener mStatusListener;
...@@ -109,7 +113,8 @@ class LLCoprocedurePool: private boost::noncopyable ...@@ -109,7 +113,8 @@ class LLCoprocedurePool: private boost::noncopyable
CoroAdapterMap_t mCoroMapping; CoroAdapterMap_t mCoroMapping;
void coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter); void coprocedureInvokerCoro(CoprocQueuePtr pendingCoprocs,
LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter);
}; };
//========================================================================= //=========================================================================
...@@ -214,7 +219,7 @@ void LLCoprocedureManager::close(const std::string &pool) ...@@ -214,7 +219,7 @@ void LLCoprocedureManager::close(const std::string &pool)
LLCoprocedurePool::LLCoprocedurePool(const std::string &poolName, size_t size): LLCoprocedurePool::LLCoprocedurePool(const std::string &poolName, size_t size):
mPoolName(poolName), mPoolName(poolName),
mPoolSize(size), mPoolSize(size),
mPendingCoprocs(DEFAULT_QUEUE_SIZE), mPendingCoprocs(boost::make_shared<CoprocQueue_t>(DEFAULT_QUEUE_SIZE)),
mHTTPPolicy(LLCore::HttpRequest::DEFAULT_POLICY_ID), mHTTPPolicy(LLCore::HttpRequest::DEFAULT_POLICY_ID),
mCoroMapping() mCoroMapping()
{ {
...@@ -222,7 +227,7 @@ LLCoprocedurePool::LLCoprocedurePool(const std::string &poolName, size_t size): ...@@ -222,7 +227,7 @@ LLCoprocedurePool::LLCoprocedurePool(const std::string &poolName, size_t size):
// destroyed, we implicitly disconnect from this LLEventPump // destroyed, we implicitly disconnect from this LLEventPump
mStatusListener = LLEventPumps::instance().obtain("LLApp").listen( mStatusListener = LLEventPumps::instance().obtain("LLApp").listen(
poolName, poolName,
[this, poolName](const LLSD& status) [pendingCoprocs=mPendingCoprocs, poolName](const LLSD& status)
{ {
auto& statsd = status["status"]; auto& statsd = status["status"];
if (statsd.asString() != "running") if (statsd.asString() != "running")
...@@ -232,7 +237,7 @@ LLCoprocedurePool::LLCoprocedurePool(const std::string &poolName, size_t size): ...@@ -232,7 +237,7 @@ LLCoprocedurePool::LLCoprocedurePool(const std::string &poolName, size_t size):
<< LL_ENDL; << LL_ENDL;
// This should ensure that all waiting coprocedures in this // This should ensure that all waiting coprocedures in this
// pool will wake up and terminate. // pool will wake up and terminate.
mPendingCoprocs.close(); pendingCoprocs->close();
} }
return false; return false;
}); });
...@@ -241,8 +246,10 @@ LLCoprocedurePool::LLCoprocedurePool(const std::string &poolName, size_t size): ...@@ -241,8 +246,10 @@ LLCoprocedurePool::LLCoprocedurePool(const std::string &poolName, size_t size):
{ {
LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter(new LLCoreHttpUtil::HttpCoroutineAdapter( mPoolName + "Adapter", mHTTPPolicy)); LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter(new LLCoreHttpUtil::HttpCoroutineAdapter( mPoolName + "Adapter", mHTTPPolicy));
std::string pooledCoro = LLCoros::instance().launch("LLCoprocedurePool("+mPoolName+")::coprocedureInvokerCoro", std::string pooledCoro = LLCoros::instance().launch(
boost::bind(&LLCoprocedurePool::coprocedureInvokerCoro, this, httpAdapter)); "LLCoprocedurePool("+mPoolName+")::coprocedureInvokerCoro",
boost::bind(&LLCoprocedurePool::coprocedureInvokerCoro, this,
mPendingCoprocs, httpAdapter));
mCoroMapping.insert(CoroAdapterMap_t::value_type(pooledCoro, httpAdapter)); mCoroMapping.insert(CoroAdapterMap_t::value_type(pooledCoro, httpAdapter));
} }
...@@ -259,14 +266,16 @@ LLUUID LLCoprocedurePool::enqueueCoprocedure(const std::string &name, LLCoproced ...@@ -259,14 +266,16 @@ LLUUID LLCoprocedurePool::enqueueCoprocedure(const std::string &name, LLCoproced
{ {
LLUUID id(LLUUID::generateNewID()); LLUUID id(LLUUID::generateNewID());
mPendingCoprocs.push(QueuedCoproc::ptr_t(new QueuedCoproc(name, id, proc))); mPendingCoprocs->push(QueuedCoproc::ptr_t(new QueuedCoproc(name, id, proc)));
LL_INFOS("CoProcMgr") << "Coprocedure(" << name << ") enqueued with id=" << id.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL; LL_INFOS("CoProcMgr") << "Coprocedure(" << name << ") enqueued with id=" << id.asString() << " in pool \"" << mPoolName << "\"" << LL_ENDL;
return id; return id;
} }
//------------------------------------------------------------------------- //-------------------------------------------------------------------------
void LLCoprocedurePool::coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter) void LLCoprocedurePool::coprocedureInvokerCoro(
CoprocQueuePtr pendingCoprocs,
LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter)
{ {
QueuedCoproc::ptr_t coproc; QueuedCoproc::ptr_t coproc;
boost::fibers::channel_op_status status; boost::fibers::channel_op_status status;
...@@ -274,7 +283,7 @@ void LLCoprocedurePool::coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineAdap ...@@ -274,7 +283,7 @@ void LLCoprocedurePool::coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineAdap
{ {
{ {
LLCoros::TempStatus st("waiting for work for 10s"); LLCoros::TempStatus st("waiting for work for 10s");
status = mPendingCoprocs.pop_wait_for(coproc, std::chrono::seconds(10)); status = pendingCoprocs->pop_wait_for(coproc, std::chrono::seconds(10));
} }
if (status == boost::fibers::channel_op_status::closed) if (status == boost::fibers::channel_op_status::closed)
{ {
...@@ -313,5 +322,5 @@ void LLCoprocedurePool::coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineAdap ...@@ -313,5 +322,5 @@ void LLCoprocedurePool::coprocedureInvokerCoro(LLCoreHttpUtil::HttpCoroutineAdap
void LLCoprocedurePool::close() void LLCoprocedurePool::close()
{ {
mPendingCoprocs.close(); mPendingCoprocs->close();
} }
...@@ -32,6 +32,7 @@ ...@@ -32,6 +32,7 @@
#include "llcoros.h" #include "llcoros.h"
#include "llcorehttputil.h" #include "llcorehttputil.h"
#include "lluuid.h" #include "lluuid.h"
#include <boost/smart_ptr/shared_ptr.hpp>
class LLCoprocedurePool; class LLCoprocedurePool;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment