Skip to content
Snippets Groups Projects
Commit 1c91c8a1 authored by Rider Linden's avatar Rider Linden
Browse files

Adding weak pointer support.

Event polling as a coroutine. (incomplete)
Groundwork for canceling HttpCoroutineAdapter yields.
parent fe34b3ef
No related branches found
No related tags found
No related merge requests found
...@@ -190,6 +190,7 @@ ...@@ -190,6 +190,7 @@
#include "linden_common.h" // Modifies curl/curl.h interfaces #include "linden_common.h" // Modifies curl/curl.h interfaces
#include "boost/intrusive_ptr.hpp" #include "boost/intrusive_ptr.hpp"
#include "boost/shared_ptr.hpp" #include "boost/shared_ptr.hpp"
#include "boost/weak_ptr.hpp"
#include "boost/function.hpp" #include "boost/function.hpp"
#include <string> #include <string>
......
...@@ -98,6 +98,7 @@ class HttpRequest ...@@ -98,6 +98,7 @@ class HttpRequest
typedef unsigned int priority_t; typedef unsigned int priority_t;
typedef boost::shared_ptr<HttpRequest> ptr_t; typedef boost::shared_ptr<HttpRequest> ptr_t;
typedef boost::weak_ptr<HttpRequest> wptr_t;
public: public:
/// @name PolicyMethods /// @name PolicyMethods
/// @{ /// @{
......
...@@ -317,15 +317,18 @@ HttpCoroutineAdapter::HttpCoroutineAdapter(const std::string &name, ...@@ -317,15 +317,18 @@ HttpCoroutineAdapter::HttpCoroutineAdapter(const std::string &name,
LLCore::HttpRequest::policy_t policyId, LLCore::HttpRequest::priority_t priority) : LLCore::HttpRequest::policy_t policyId, LLCore::HttpRequest::priority_t priority) :
mAdapterName(name), mAdapterName(name),
mPolicyId(policyId), mPolicyId(policyId),
mPriority(priority) mPriority(priority),
mYieldingHandle(LLCORE_HTTP_HANDLE_INVALID),
mWeakRequest()
{ {
} }
HttpCoroutineAdapter::~HttpCoroutineAdapter() HttpCoroutineAdapter::~HttpCoroutineAdapter()
{ {
} }
LLSD HttpCoroutineAdapter::postAndYield(LLCoros::self & self, LLCore::HttpRequest::ptr_t &request, LLSD HttpCoroutineAdapter::postAndYield(LLCoros::self & self, LLCore::HttpRequest::ptr_t request,
const std::string & url, const LLSD & body, const std::string & url, const LLSD & body,
LLCore::HttpOptions::ptr_t options, LLCore::HttpHeaders::ptr_t headers) LLCore::HttpOptions::ptr_t options, LLCore::HttpHeaders::ptr_t headers)
{ {
...@@ -353,28 +356,17 @@ LLSD HttpCoroutineAdapter::postAndYield(LLCoros::self & self, LLCore::HttpReques ...@@ -353,28 +356,17 @@ LLSD HttpCoroutineAdapter::postAndYield(LLCoros::self & self, LLCore::HttpReques
if (hhandle == LLCORE_HTTP_HANDLE_INVALID) if (hhandle == LLCORE_HTTP_HANDLE_INVALID)
{ {
LLCore::HttpStatus status = request->getStatus(); return HttpCoroutineAdapter::buildImmediateErrorResult(request, url, httpHandler);
LL_WARNS() << "Error posting to " << url << " Status=" << status.getStatus() <<
" message = " << status.getMessage() << LL_ENDL;
// Mimic the status results returned from an http error that we had
// to wait on
LLSD httpresults = LLSD::emptyMap();
httpHandler->writeStatusCodes(status, url, httpresults);
LLSD errorres = LLSD::emptyMap();
errorres["http_result"] = httpresults;
return errorres;
} }
mYieldingHandle = hhandle;
LLSD results = waitForEventOn(self, replyPump); LLSD results = waitForEventOn(self, replyPump);
mYieldingHandle = LLCORE_HTTP_HANDLE_INVALID;
//LL_INFOS() << "Results for transaction " << transactionId << LL_ENDL; //LL_INFOS() << "Results for transaction " << transactionId << LL_ENDL;
return results; return results;
} }
LLSD HttpCoroutineAdapter::putAndYield(LLCoros::self & self, LLCore::HttpRequest::ptr_t & request, LLSD HttpCoroutineAdapter::putAndYield(LLCoros::self & self, LLCore::HttpRequest::ptr_t request,
const std::string & url, const LLSD & body, const std::string & url, const LLSD & body,
LLCore::HttpOptions::ptr_t options, LLCore::HttpHeaders::ptr_t headers) LLCore::HttpOptions::ptr_t options, LLCore::HttpHeaders::ptr_t headers)
{ {
...@@ -402,28 +394,17 @@ LLSD HttpCoroutineAdapter::putAndYield(LLCoros::self & self, LLCore::HttpRequest ...@@ -402,28 +394,17 @@ LLSD HttpCoroutineAdapter::putAndYield(LLCoros::self & self, LLCore::HttpRequest
if (hhandle == LLCORE_HTTP_HANDLE_INVALID) if (hhandle == LLCORE_HTTP_HANDLE_INVALID)
{ {
LLCore::HttpStatus status = request->getStatus(); return HttpCoroutineAdapter::buildImmediateErrorResult(request, url, httpHandler);
LL_WARNS() << "Error posting to " << url << " Status=" << status.getStatus() <<
" message = " << status.getMessage() << LL_ENDL;
// Mimic the status results returned from an http error that we had
// to wait on
LLSD httpresults = LLSD::emptyMap();
httpHandler->writeStatusCodes(status, url, httpresults);
LLSD errorres = LLSD::emptyMap();
errorres["http_result"] = httpresults;
return errorres;
} }
mYieldingHandle = hhandle;
LLSD results = waitForEventOn(self, replyPump); LLSD results = waitForEventOn(self, replyPump);
mYieldingHandle = LLCORE_HTTP_HANDLE_INVALID;
//LL_INFOS() << "Results for transaction " << transactionId << LL_ENDL; //LL_INFOS() << "Results for transaction " << transactionId << LL_ENDL;
return results; return results;
} }
LLSD HttpCoroutineAdapter::getAndYield(LLCoros::self & self, LLCore::HttpRequest::ptr_t & request, LLSD HttpCoroutineAdapter::getAndYield(LLCoros::self & self, LLCore::HttpRequest::ptr_t request,
const std::string & url, const std::string & url,
LLCore::HttpOptions::ptr_t options, LLCore::HttpHeaders::ptr_t headers) LLCore::HttpOptions::ptr_t options, LLCore::HttpHeaders::ptr_t headers)
{ {
...@@ -449,6 +430,19 @@ LLSD HttpCoroutineAdapter::getAndYield(LLCoros::self & self, LLCore::HttpRequest ...@@ -449,6 +430,19 @@ LLSD HttpCoroutineAdapter::getAndYield(LLCoros::self & self, LLCore::HttpRequest
url, options.get(), headers.get(), httpHandler.get()); url, options.get(), headers.get(), httpHandler.get());
if (hhandle == LLCORE_HTTP_HANDLE_INVALID) if (hhandle == LLCORE_HTTP_HANDLE_INVALID)
{
return HttpCoroutineAdapter::buildImmediateErrorResult(request, url, httpHandler);
}
mYieldingHandle = hhandle;
LLSD results = waitForEventOn(self, replyPump);
mYieldingHandle = LLCORE_HTTP_HANDLE_INVALID;
//LL_INFOS() << "Results for transaction " << transactionId << LL_ENDL;
return results;
}
LLSD HttpCoroutineAdapter::buildImmediateErrorResult(const LLCore::HttpRequest::ptr_t &request,
const std::string &url, LLCoreHttpUtil::HttpCoroHandler::ptr_t &httpHandler)
{ {
LLCore::HttpStatus status = request->getStatus(); LLCore::HttpStatus status = request->getStatus();
LL_WARNS() << "Error posting to " << url << " Status=" << status.getStatus() << LL_WARNS() << "Error posting to " << url << " Status=" << status.getStatus() <<
...@@ -466,10 +460,5 @@ LLSD HttpCoroutineAdapter::getAndYield(LLCoros::self & self, LLCore::HttpRequest ...@@ -466,10 +460,5 @@ LLSD HttpCoroutineAdapter::getAndYield(LLCoros::self & self, LLCore::HttpRequest
return errorres; return errorres;
} }
LLSD results = waitForEventOn(self, replyPump);
//LL_INFOS() << "Results for transaction " << transactionId << LL_ENDL;
return results;
}
} // end namespace LLCoreHttpUtil } // end namespace LLCoreHttpUtil
...@@ -210,36 +210,54 @@ class HttpRequestPumper ...@@ -210,36 +210,54 @@ class HttpRequestPumper
class HttpCoroutineAdapter class HttpCoroutineAdapter
{ {
public: public:
typedef boost::shared_ptr<HttpCoroutineAdapter> ptr_t;
typedef boost::weak_ptr<HttpCoroutineAdapter> wptr_t;
HttpCoroutineAdapter(const std::string &name, LLCore::HttpRequest::policy_t policyId, HttpCoroutineAdapter(const std::string &name, LLCore::HttpRequest::policy_t policyId,
LLCore::HttpRequest::priority_t priority = 0L); LLCore::HttpRequest::priority_t priority = 0L);
~HttpCoroutineAdapter(); ~HttpCoroutineAdapter();
/// Execute a Post transaction on the supplied URL and yield execution of /// Execute a Post transaction on the supplied URL and yield execution of
/// the coroutine until a result is available. /// the coroutine until a result is available.
LLSD postAndYield(LLCoros::self & self, LLCore::HttpRequest::ptr_t & request, /// Note: the request's smart pointer is passed by value so that it will
/// not be deallocated during the yield.
LLSD postAndYield(LLCoros::self & self, LLCore::HttpRequest::ptr_t request,
const std::string & url, const LLSD & body, const std::string & url, const LLSD & body,
LLCore::HttpOptions::ptr_t options = LLCore::HttpOptions::ptr_t(), LLCore::HttpOptions::ptr_t options = LLCore::HttpOptions::ptr_t(new LLCore::HttpOptions(), false),
LLCore::HttpHeaders::ptr_t headers = LLCore::HttpHeaders::ptr_t()); LLCore::HttpHeaders::ptr_t headers = LLCore::HttpHeaders::ptr_t(new LLCore::HttpHeaders(), false));
/// Execute a Put transaction on the supplied URL and yield execution of /// Execute a Put transaction on the supplied URL and yield execution of
/// the coroutine until a result is available. /// the coroutine until a result is available.
LLSD putAndYield(LLCoros::self & self, LLCore::HttpRequest::ptr_t & request, /// Note: the request's smart pointer is passed by value so that it will
/// not be deallocated during the yield.
LLSD putAndYield(LLCoros::self & self, LLCore::HttpRequest::ptr_t request,
const std::string & url, const LLSD & body, const std::string & url, const LLSD & body,
LLCore::HttpOptions::ptr_t options = LLCore::HttpOptions::ptr_t(), LLCore::HttpOptions::ptr_t options = LLCore::HttpOptions::ptr_t(),
LLCore::HttpHeaders::ptr_t headers = LLCore::HttpHeaders::ptr_t()); LLCore::HttpHeaders::ptr_t headers = LLCore::HttpHeaders::ptr_t());
/// Execute a Get transaction on the supplied URL and yield execution of /// Execute a Get transaction on the supplied URL and yield execution of
/// the coroutine until a result is available. /// the coroutine until a result is available.
LLSD getAndYield(LLCoros::self & self, LLCore::HttpRequest::ptr_t & request, /// Note: the request's smart pointer is passed by value so that it will
/// not be deallocated during the yield.
LLSD getAndYield(LLCoros::self & self, LLCore::HttpRequest::ptr_t request,
const std::string & url, const std::string & url,
LLCore::HttpOptions::ptr_t options = LLCore::HttpOptions::ptr_t(), LLCore::HttpOptions::ptr_t options = LLCore::HttpOptions::ptr_t(),
LLCore::HttpHeaders::ptr_t headers = LLCore::HttpHeaders::ptr_t()); LLCore::HttpHeaders::ptr_t headers = LLCore::HttpHeaders::ptr_t());
///
void cancelYieldingOperation();
private: private:
static LLSD buildImmediateErrorResult(const LLCore::HttpRequest::ptr_t &request,
const std::string &url, LLCoreHttpUtil::HttpCoroHandler::ptr_t &httpHandler);
std::string mAdapterName; std::string mAdapterName;
LLCore::HttpRequest::priority_t mPriority; LLCore::HttpRequest::priority_t mPriority;
LLCore::HttpRequest::policy_t mPolicyId; LLCore::HttpRequest::policy_t mPolicyId;
LLCore::HttpHandle mYieldingHandle;
LLCore::HttpRequest::wptr_t mWeakRequest;
}; };
} // end namespace LLCoreHttpUtil } // end namespace LLCoreHttpUtil
......
...@@ -37,6 +37,9 @@ ...@@ -37,6 +37,9 @@
#include "llviewerregion.h" #include "llviewerregion.h"
#include "message.h" #include "message.h"
#include "lltrans.h" #include "lltrans.h"
#include "llcoros.h"
#include "lleventcoro.h"
#include "llcorehttputil.h"
namespace namespace
{ {
...@@ -47,6 +50,194 @@ namespace ...@@ -47,6 +50,194 @@ namespace
const F32 EVENT_POLL_ERROR_RETRY_SECONDS_INC = 5.f; // ~ half of a normal timeout. const F32 EVENT_POLL_ERROR_RETRY_SECONDS_INC = 5.f; // ~ half of a normal timeout.
const S32 MAX_EVENT_POLL_HTTP_ERRORS = 10; // ~5 minutes, by the above rules. const S32 MAX_EVENT_POLL_HTTP_ERRORS = 10; // ~5 minutes, by the above rules.
#if 1
class LLEventPollImpl
{
public:
LLEventPollImpl(const LLHost &sender);
void start(const std::string &url);
void stop();
private:
void eventPollCoro(LLCoros::self& self, std::string url);
void handleMessage(const LLSD &content);
bool mDone;
LLCore::HttpRequest::ptr_t mHttpRequest;
LLCore::HttpRequest::policy_t mHttpPolicy;
std::string mSenderIp;
int mCounter;
static int sNextCounter;
};
int LLEventPollImpl::sNextCounter = 1;
LLEventPollImpl::LLEventPollImpl(const LLHost &sender) :
mDone(false),
mHttpRequest(),
mHttpPolicy(LLCore::HttpRequest::DEFAULT_POLICY_ID),
mSenderIp(),
mCounter(sNextCounter++)
{
mHttpRequest = LLCore::HttpRequest::ptr_t(new LLCore::HttpRequest);
mSenderIp = sender.getIPandPort();
}
void LLEventPollImpl::handleMessage(const LLSD& content)
{
std::string msg_name = content["message"];
LLSD message;
message["sender"] = mSenderIp;
message["body"] = content["body"];
LLMessageSystem::dispatch(msg_name, message);
}
void LLEventPollImpl::start(const std::string &url)
{
if (!url.empty())
{
std::string coroname =
LLCoros::instance().launch("LLAccountingCostManager::accountingCostCoro",
boost::bind(&LLEventPollImpl::eventPollCoro, this, _1, url));
LL_DEBUGS() << coroname << " with url '" << url << LL_ENDL;
}
}
void LLEventPollImpl::stop()
{
mDone = true;
}
void LLEventPollImpl::eventPollCoro(LLCoros::self& self, std::string url)
{
LLCoreHttpUtil::HttpCoroutineAdapter httpAdapter("EventPoller", mHttpPolicy);
LLSD acknowledge;
int errorCount = 0;
int counter = mCounter; // saved on the stack for debugging.
LL_INFOS("LLEventPollImpl::eventPollCoro") << " <" << counter << "> entering coroutine." << LL_ENDL;
while (!mDone)
{
LLSD request;
request["ack"] = acknowledge;
request["done"] = mDone;
// LL_DEBUGS("LLEventPollImpl::eventPollCoro") << "<" << counter << "> request = "
// << LLSDXMLStreamer(request) << LL_ENDL;
LL_INFOS("LLEventPollImpl::eventPollCoro") << " <" << counter << "> posting and yielding." << LL_ENDL;
LLSD result = httpAdapter.postAndYield(self, mHttpRequest, url, request);
// LL_DEBUGS("LLEventPollImpl::eventPollCoro") << "<" << counter << "> result = "
// << LLSDXMLStreamer(result) << LL_ENDL;
if (mDone)
break;
LLSD httpResults;
httpResults = result["http_result"];
if (!httpResults["success"].asBoolean())
{
LL_WARNS("LLEventPollImpl::eventPollCoro") << "<" << counter << "> Error result from LLCoreHttpUtil::HttpCoroHandler. Code "
<< httpResults["status"] << ": '" << httpResults["message"] << "'" << LL_ENDL;
if (httpResults["status"].asInteger() == HTTP_BAD_GATEWAY)
{
// A HTTP_BAD_GATEWAY (502) error is our standard timeout response
// we get this when there are no events.
errorCount = 0;
continue;
}
LL_WARNS("LLEventPollImpl::eventPollCoro") << "<" << counter << "> " << LLSDXMLStreamer(result) << (mDone ? " -- done" : "") << LL_ENDL;
if (errorCount < MAX_EVENT_POLL_HTTP_ERRORS)
{
++errorCount;
// The 'tick' will return TRUE causing the timer to delete this.
int waitToRetry = EVENT_POLL_ERROR_RETRY_SECONDS
+ errorCount * EVENT_POLL_ERROR_RETRY_SECONDS_INC;
LL_WARNS() << "Retrying in " << waitToRetry <<
" seconds, error count is now " << errorCount << LL_ENDL;
// *TODO: Wait for a timeout here
// new LLEventPollEventTimer(
// , this);
continue;
}
else
{
// At this point we have given up and the viewer will not receive HTTP messages from the simulator.
// IMs, teleports, about land, selecting land, region crossing and more will all fail.
// They are essentially disconnected from the region even though some things may still work.
// Since things won't get better until they relog we force a disconnect now.
mDone = true;
// *NOTE:Mani - The following condition check to see if this failing event poll
// is attached to the Agent's main region. If so we disconnect the viewer.
// Else... its a child region and we just leave the dead event poll stopped and
// continue running.
if (gAgent.getRegion() && gAgent.getRegion()->getHost().getIPandPort() == mSenderIp)
{
LL_WARNS("LLEventPollImpl::eventPollCoro") << "< " << counter << "> Forcing disconnect due to stalled main region event poll." << LL_ENDL;
LLAppViewer::instance()->forceDisconnect(LLTrans::getString("AgentLostConnection"));
}
break;
}
}
LL_DEBUGS("LLEventPollImpl::eventPollCoro") << " <" << counter << ">"
<< (mDone ? " -- done" : "") << LL_ENDL;
errorCount = 0;
if (!result.isMap() ||
!result.get("events") ||
!result.get("id"))
{
LL_WARNS("LLEventPollImpl::eventPollCoro") << " <" << counter << "> received event poll with no events or id key: " << LLSDXMLStreamer(result) << LL_ENDL;
continue;
}
acknowledge = result["id"];
LLSD events = result["events"];
if (acknowledge.isUndefined())
{
LL_WARNS("LLEventPollImpl::eventPollCoro") << " id undefined" << LL_ENDL;
}
// was LL_INFOS() but now that CoarseRegionUpdate is TCP @ 1/second, it'd be too verbose for viewer logs. -MG
LL_DEBUGS() << "LLEventPollResponder::httpSuccess <" << counter << "> " << events.size() << "events (id "
<< LLSDXMLStreamer(acknowledge) << ")" << LL_ENDL;
LLSD::array_const_iterator i = events.beginArray();
LLSD::array_const_iterator end = events.endArray();
for (; i != end; ++i)
{
if (i->has("message"))
{
handleMessage(*i);
}
}
}
LL_INFOS("LLEventPollImpl::eventPollCoro") << " <" << counter << "> Leaving coroutine." << LL_ENDL;
}
#else
class LLEventPollResponder : public LLHTTPClient::Responder class LLEventPollResponder : public LLHTTPClient::Responder
{ {
LOG_CLASS(LLEventPollResponder); LOG_CLASS(LLEventPollResponder);
...@@ -288,3 +479,18 @@ LLEventPoll::~LLEventPoll() ...@@ -288,3 +479,18 @@ LLEventPoll::~LLEventPoll()
LLEventPollResponder* event_poll_responder = dynamic_cast<LLEventPollResponder*>(responderp); LLEventPollResponder* event_poll_responder = dynamic_cast<LLEventPollResponder*>(responderp);
if (event_poll_responder) event_poll_responder->stop(); if (event_poll_responder) event_poll_responder->stop();
} }
#endif
}
LLEventPoll::LLEventPoll(const std::string& poll_url, const LLHost& sender):
mImpl()
{
mImpl = boost::unique_ptr<LLEventPollImpl>(new LLEventPollImpl(sender));
mImpl->start(poll_url);
}
LLEventPoll::~LLEventPoll()
{
mImpl->stop();
}
...@@ -28,9 +28,20 @@ ...@@ -28,9 +28,20 @@
#define LL_LLEVENTPOLL_H #define LL_LLEVENTPOLL_H
#include "llhttpclient.h" #include "llhttpclient.h"
#include "boost/move/unique_ptr.hpp"
namespace boost
{
using ::boost::movelib::unique_ptr; // move unique_ptr into the boost namespace.
}
class LLHost; class LLHost;
namespace
{
class LLEventPollImpl;
}
class LLEventPoll class LLEventPoll
///< implements the viewer side of server-to-viewer pushed events. ///< implements the viewer side of server-to-viewer pushed events.
...@@ -40,11 +51,15 @@ class LLEventPoll ...@@ -40,11 +51,15 @@ class LLEventPoll
///< Start polling the URL. ///< Start polling the URL.
virtual ~LLEventPoll(); virtual ~LLEventPoll();
///< will stop polling, cancelling any poll in progress. ///< will stop polling, canceling any poll in progress.
private: private:
#if 1
boost::unique_ptr<LLEventPollImpl> mImpl;
#else
LLHTTPClient::ResponderPtr mImpl; LLHTTPClient::ResponderPtr mImpl;
#endif
}; };
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment