Code owners
Assign users and groups as approvers for specific file changes. Learn more.
llpluginmessagepipe.cpp 9.00 KiB
/**
* @file llpluginmessagepipe.cpp
* @brief Classes that implement connections from the plugin system to pipes/pumps.
*
* @cond
* $LicenseInfo:firstyear=2008&license=viewerlgpl$
* Second Life Viewer Source Code
* Copyright (C) 2010, Linden Research, Inc.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation;
* version 2.1 of the License only.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*
* Linden Research, Inc., 945 Battery Street, San Francisco, CA 94111 USA
* $/LicenseInfo$
* @endcond
*/
#include "linden_common.h"
#include "llpluginmessagepipe.h"
#include "llbufferstream.h"
#include "llapr.h"
static const char MESSAGE_DELIMITER = '\0';
LLPluginMessagePipeOwner::LLPluginMessagePipeOwner() :
mMessagePipe(NULL),
mSocketError(APR_SUCCESS)
{
}
// virtual
LLPluginMessagePipeOwner::~LLPluginMessagePipeOwner()
{
killMessagePipe();
}
// virtual
apr_status_t LLPluginMessagePipeOwner::socketError(apr_status_t error)
{
mSocketError = error;
return error;
};
//virtual
void LLPluginMessagePipeOwner::setMessagePipe(LLPluginMessagePipe *read_pipe)
{
// Save a reference to this pipe
mMessagePipe = read_pipe;
}
bool LLPluginMessagePipeOwner::canSendMessage(void)
{
return (mMessagePipe != NULL);
}
bool LLPluginMessagePipeOwner::writeMessageRaw(const std::string &message)
{
bool result = true;
if(mMessagePipe != NULL)
{
result = mMessagePipe->addMessage(message);
}
else
{
LL_WARNS("Plugin") << "dropping message: " << message << LL_ENDL;
result = false;
}
return result;
}
void LLPluginMessagePipeOwner::killMessagePipe(void)
{
if(mMessagePipe != NULL)
{
delete mMessagePipe;
mMessagePipe = NULL;
}
}
LLPluginMessagePipe::LLPluginMessagePipe(LLPluginMessagePipeOwner *owner, LLSocket::ptr_t socket):
mInputMutex(),
mOutputMutex(),
mOutputStartIndex(0),
mOwner(owner),
mSocket(socket)
{
mOwner->setMessagePipe(this);
}
LLPluginMessagePipe::~LLPluginMessagePipe()
{
if(mOwner != NULL)
{
mOwner->setMessagePipe(NULL);
}
}
bool LLPluginMessagePipe::addMessage(const std::string &message)
{
// queue the message for later output
LLMutexLock lock(&mOutputMutex);
// If we're starting to use up too much memory, clear
if (mOutputStartIndex > 1024 * 1024)
{
mOutput = mOutput.substr(mOutputStartIndex);
mOutputStartIndex = 0;
}
mOutput += message;
mOutput += MESSAGE_DELIMITER; // message separator
return true;
}
void LLPluginMessagePipe::clearOwner(void)
{
// The owner is done with this pipe. The next call to process_impl should send any remaining data and exit.
mOwner = NULL;
}
void LLPluginMessagePipe::setSocketTimeout(apr_interval_time_t timeout_usec)
{
// We never want to sleep forever, so force negative timeouts to become non-blocking.
// according to this page: http://dev.ariel-networks.com/apr/apr-tutorial/html/apr-tutorial-13.html
// blocking/non-blocking with apr sockets is somewhat non-portable.
if(timeout_usec <= 0)
{
// Make the socket non-blocking
apr_socket_opt_set(mSocket->getSocket(), APR_SO_NONBLOCK, 1);
apr_socket_timeout_set(mSocket->getSocket(), 0);
}
else
{
// Make the socket blocking-with-timeout
apr_socket_opt_set(mSocket->getSocket(), APR_SO_NONBLOCK, 1);
apr_socket_timeout_set(mSocket->getSocket(), timeout_usec);
}
}
bool LLPluginMessagePipe::pump(F64 timeout)
{
bool result = pumpOutput();
if(result)
{
result = pumpInput(timeout);
}
return result;
}
bool LLPluginMessagePipe::pumpOutput()
{
bool result = true;
if(mSocket)
{
apr_status_t status;
apr_size_t in_size, out_size;
LLMutexLock lock(&mOutputMutex);
const char * output_data = &(mOutput.data()[mOutputStartIndex]);
if(*output_data != '\0')
{
// write any outgoing messages
in_size = (apr_size_t) (mOutput.size() - mOutputStartIndex);
out_size = in_size;
setSocketTimeout(0);
// LL_INFOS("Plugin") << "before apr_socket_send, size = " << size << LL_ENDL;
status = apr_socket_send(mSocket->getSocket(),
output_data,
&out_size);
// LL_INFOS("Plugin") << "after apr_socket_send, size = " << size << LL_ENDL;
if((status == APR_SUCCESS) || APR_STATUS_IS_EAGAIN(status))
{
// Success or Socket buffer is full...
// If we've pumped the entire string, clear it
if (out_size == in_size)
{
mOutputStartIndex = 0;
mOutput.clear();
}
else
{
llassert(in_size > out_size);
// Remove the written part from the buffer and try again later.
mOutputStartIndex += out_size;
}
}
else if(APR_STATUS_IS_EOF(status))
{
// This is what we normally expect when a plugin exits.
//LL_INFOS() << "Got EOF from plugin socket. " << LL_ENDL;
if(mOwner)
{
mOwner->socketError(status);
}
result = false;
}
else
{
// some other error
// Treat this as fatal.
ll_apr_warn_status(status);
if(mOwner)
{
mOwner->socketError(status);
}
result = false;
}
}
}
return result;
}
bool LLPluginMessagePipe::pumpInput(F64 timeout)
{
bool result = true;
if(mSocket)
{
apr_status_t status;
apr_size_t size;
// FIXME: For some reason, the apr timeout stuff isn't working properly on windows.
// Until such time as we figure out why, don't try to use the socket timeout -- just sleep here instead.
#if LL_WINDOWS
if(result)
{
if(timeout != 0.0f)
{
ms_sleep((int)(timeout * 1000.0f));
timeout = 0.0f;
}
}
#endif
// Check for incoming messages
if(result)
{
char input_buf[1024];
apr_size_t request_size;
if(timeout == 0.0f)
{
// If we have no timeout, start out with a full read.
request_size = sizeof(input_buf);
}
else
{
// Start out by reading one byte, so that any data received will wake us up.
request_size = 1;
}
// and use the timeout so we'll sleep if no data is available.
setSocketTimeout((apr_interval_time_t)(timeout * 1000000));
while(1)
{
size = request_size;
// LL_INFOS("Plugin") << "before apr_socket_recv, size = " << size << LL_ENDL;
status = apr_socket_recv(
mSocket->getSocket(),
input_buf,
&size);
// LL_INFOS("Plugin") << "after apr_socket_recv, size = " << size << LL_ENDL;
if(size > 0)
{
LLMutexLock lock(&mInputMutex);
mInput.append(input_buf, size);
}
if(status == APR_SUCCESS)
{
#if SHOW_DEBUG
LL_DEBUGS("PluginSocket") << "success, read " << size << LL_ENDL;
#endif
if(size != request_size)
{
// This was a short read, so we're done.
break;
}
}
else if(APR_STATUS_IS_TIMEUP(status))
{
#if SHOW_DEBUG
LL_DEBUGS("PluginSocket") << "TIMEUP, read " << size << LL_ENDL;
#endif
// Timeout was hit. Since the initial read is 1 byte, this should never be a partial read.
break;
}
else if(APR_STATUS_IS_EAGAIN(status))
{
#if SHOW_DEBUG
LL_DEBUGS("PluginSocket") << "EAGAIN, read " << size << LL_ENDL;
#endif
// Non-blocking read returned immediately.
break;
}
else if(APR_STATUS_IS_EOF(status))
{
// This is what we normally expect when a plugin exits.
//LL_INFOS("PluginSocket") << "Got EOF from plugin socket. " << LL_ENDL;
if(mOwner)
{
mOwner->socketError(status);
}
result = false;
break;
}
else
{
// some other error
// Treat this as fatal.
ll_apr_warn_status(status);
if(mOwner)
{
mOwner->socketError(status);
}
result = false;
break;
}
if(timeout != 0.0f)
{
// Second and subsequent reads should not use the timeout
setSocketTimeout(0);
// and should try to fill the input buffer
request_size = sizeof(input_buf);
}
}
processInput();
}
}
return result;
}
void LLPluginMessagePipe::processInput(void)
{
// Look for input delimiter(s) in the input buffer.
int delim;
mInputMutex.lock();
while((delim = mInput.find(MESSAGE_DELIMITER)) != std::string::npos)
{
// Let the owner process this message
if (mOwner)
{
// Pull the message out of the input buffer before calling receiveMessageRaw.
// It's now possible for this function to get called recursively (in the case where the plugin makes a blocking request)
// and this guarantees that the messages will get dequeued correctly.
std::string message(mInput, 0, delim);
mInput.erase(0, delim + 1);
mInputMutex.unlock();
mOwner->receiveMessageRaw(message);
mInputMutex.lock();
}
else
{
LL_WARNS("Plugin") << "!mOwner" << LL_ENDL;
}
}
mInputMutex.unlock();
}