Newer
Older
/**
* @file llcircuit.cpp
* @brief Class to track UDP endpoints for the message system.
*
* $LicenseInfo:firstyear=2002&license=viewerlgpl$
* Copyright (C) 2010, Linden Research, Inc.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation;
* version 2.1 of the License only.
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
* Linden Research, Inc., 945 Battery Street, San Francisco, CA 94111 USA
*/
#include "linden_common.h"
#if LL_WINDOWS
#include <process.h>
#else
#if LL_LINUX
#include <dlfcn.h> // RTLD_LAZY
#endif
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#endif
#if !defined(USE_CIRCUIT_LIST)
#include <algorithm>
#endif
#include <sstream>
#include <iterator>
#include <stack>
#include "llcircuit.h"
#include "message.h"
#include "llrand.h"
#include "llstl.h"
#include "lltransfermanager.h"
const S32 PING_START_BLOCK = 3; // How many pings behind we have to be to consider ourself blocked.
const S32 PING_RELEASE_BLOCK = 2; // How many pings behind we have to be to consider ourself unblocked.
const F32Seconds TARGET_PERIOD_LENGTH(5.f);
const F32Seconds LL_DUPLICATE_SUPPRESSION_TIMEOUT(60.f); //this can be long, as time-based cleanup is
LLCircuitData::LLCircuitData(const LLHost &host, TPACKETID in_id,
const F32Seconds circuit_heartbeat_interval, const F32Seconds circuit_timeout)
: mHost (host),
mWrapID(0),
mPacketsOutID(0),
mPacketsInID(in_id),
mHighestPacketID(in_id),
mTrusted(FALSE),
mbAllowTimeout(TRUE),
mbAlive(TRUE),
mBlocked(FALSE),
mPingTime(0.0),
mLastPingSendTime(0.0),
mLastPingReceivedTime(0.0),
mNextPingSendTime(0.0),
mPingsInTransit(0),
mLastPingID(0),
mPingDelay(INITIAL_PING_VALUE_MSEC),
mPingDelayAveraged(INITIAL_PING_VALUE_MSEC),
Dave SIMmONs
committed
mLastPacketInTime(0.0),
mLocalEndPointID(),
mPacketsOut(0),
mPacketsIn(0),
mPacketsLost(0),
mBytesIn(0),
mBytesOut(0),
mLastPeriodLength(-1.f),
mBytesInLastPeriod(0),
mBytesOutLastPeriod(0),
mBytesInThisPeriod(0),
mBytesOutThisPeriod(0),
mAckCreationTime(0.f),
mHeartbeatInterval(circuit_heartbeat_interval),
mHeartbeatTimeout(circuit_timeout)
{
// Need to guarantee that this time is up to date, we may be creating a circuit even though we haven't been
// running a message system loop.
F64Seconds mt_sec = LLMessageSystem::getMessageTimeSeconds(TRUE);
Aaron Brashears
committed
F32 distribution_offset = ll_frand();
mLastPingSendTime = mt_sec + mHeartbeatInterval * distribution_offset;
mNextPingSendTime = mLastPingSendTime + 0.95*mHeartbeatInterval + F32Seconds(ll_frand(0.1f*mHeartbeatInterval.value()));
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
mPeriodTime = mt_sec;
mLocalEndPointID.generate();
}
LLCircuitData::~LLCircuitData()
{
LLReliablePacket *packetp = NULL;
// Clean up all pending transfers.
gTransferManager.cleanupConnection(mHost);
// remove all pending reliable messages on this circuit
std::vector<TPACKETID> doomed;
reliable_iter iter;
reliable_iter end = mUnackedPackets.end();
for(iter = mUnackedPackets.begin(); iter != end; ++iter)
{
packetp = iter->second;
gMessageSystem->mFailedResendPackets++;
if(gMessageSystem->mVerboseLog)
{
doomed.push_back(packetp->mPacketID);
}
if (packetp->mCallback)
{
packetp->mCallback(packetp->mCallbackData,LL_ERR_CIRCUIT_GONE);
}
// Update stats
mUnackedPacketCount--;
mUnackedPacketBytes -= packetp->mBufferLength;
delete packetp;
}
// remove all pending final retry reliable messages on this circuit
end = mFinalRetryPackets.end();
for(iter = mFinalRetryPackets.begin(); iter != end; ++iter)
{
packetp = iter->second;
gMessageSystem->mFailedResendPackets++;
if(gMessageSystem->mVerboseLog)
{
doomed.push_back(packetp->mPacketID);
}
if (packetp->mCallback)
{
packetp->mCallback(packetp->mCallbackData,LL_ERR_CIRCUIT_GONE);
}
// Update stats
mUnackedPacketCount--;
mUnackedPacketBytes -= packetp->mBufferLength;
delete packetp;
}
// log aborted reliable packets for this circuit.
if(gMessageSystem->mVerboseLog && !doomed.empty())
{
std::ostringstream str;
std::ostream_iterator<TPACKETID> append(str, " ");
str << "MSG: -> " << mHost << "\tABORTING RELIABLE:\t";
std::copy(doomed.begin(), doomed.end(), append);
}
}
void LLCircuitData::ackReliablePacket(TPACKETID packet_num)
{
reliable_iter iter;
LLReliablePacket *packetp;
iter = mUnackedPackets.find(packet_num);
if (iter != mUnackedPackets.end())
{
packetp = iter->second;
if(gMessageSystem->mVerboseLog)
{
std::ostringstream str;
str << "MSG: <- " << packetp->mHost << "\tRELIABLE ACKED:\t"
<< packetp->mPacketID;
if (packetp->mTimeout < F32Seconds(0.f)) // negative timeout will always return timeout even for successful ack, for debugging
{
packetp->mCallback(packetp->mCallbackData,LL_ERR_TCP_TIMEOUT);
}
else
{
packetp->mCallback(packetp->mCallbackData,LL_ERR_NOERR);
}
}
// Update stats
mUnackedPacketCount--;
mUnackedPacketBytes -= packetp->mBufferLength;
// Cleanup
delete packetp;
mUnackedPackets.erase(iter);
return;
}
iter = mFinalRetryPackets.find(packet_num);
if (iter != mFinalRetryPackets.end())
{
packetp = iter->second;
// LL_INFOS() << "Packet " << packet_num << " removed from the pending list" << LL_ENDL;
if(gMessageSystem->mVerboseLog)
{
std::ostringstream str;
str << "MSG: <- " << packetp->mHost << "\tRELIABLE ACKED:\t"
<< packetp->mPacketID;
if (packetp->mTimeout < F32Seconds(0.f)) // negative timeout will always return timeout even for successful ack, for debugging
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
{
packetp->mCallback(packetp->mCallbackData,LL_ERR_TCP_TIMEOUT);
}
else
{
packetp->mCallback(packetp->mCallbackData,LL_ERR_NOERR);
}
}
// Update stats
mUnackedPacketCount--;
mUnackedPacketBytes -= packetp->mBufferLength;
// Cleanup
delete packetp;
mFinalRetryPackets.erase(iter);
}
else
{
// Couldn't find this packet on either of the unacked lists.
// maybe it's a duplicate ack?
}
}
S32 LLCircuitData::resendUnackedPackets(const F64Seconds now)
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
{
S32 resent_packets = 0;
LLReliablePacket *packetp;
//
// Theoretically we should search through the list for the packet with the oldest
// packet ID, as otherwise when we WRAP we will resend reliable packets out of order.
// Since resends are ALREADY out of order, and wrapping is highly rare (16+million packets),
// I'm not going to worry about this for now - djs
//
reliable_iter iter;
BOOL have_resend_overflow = FALSE;
for (iter = mUnackedPackets.begin(); iter != mUnackedPackets.end();)
{
packetp = iter->second;
// Only check overflow if we haven't had one yet.
if (!have_resend_overflow)
{
have_resend_overflow = mThrottles.checkOverflow(TC_RESEND, 0);
}
if (have_resend_overflow)
{
// We've exceeded our bandwidth for resends.
// Time to stop trying to send them.
// If we have too many unacked packets, we need to start dropping expired ones.
if (mUnackedPacketBytes > 512000)
{
if (now > packetp->mExpirationTime)
{
// This circuit has overflowed. Do not retry. Do not pass go.
packetp->mRetries = 0;
// Remove it from this list and add it to the final list.
mUnackedPackets.erase(iter++);
mFinalRetryPackets[packetp->mPacketID] = packetp;
}
else
{
++iter;
}
// Move on to the next unacked packet.
continue;
}
if (mUnackedPacketBytes > 256000 && !(getPacketsOut() % 1024))
{
// Warn if we've got a lot of resends waiting.
LL_WARNS() << mHost << " has " << mUnackedPacketBytes
<< " bytes of reliable messages waiting" << LL_ENDL;
}
// Stop resending. There are less than 512000 unacked packets.
break;
}
if (now > packetp->mExpirationTime)
{
packetp->mRetries--;
// retry
mCurrentResendCount++;
gMessageSystem->mResentPackets++;
if(gMessageSystem->mVerboseLog)
{
std::ostringstream str;
str << "MSG: -> " << packetp->mHost
<< "\tRESENDING RELIABLE:\t" << packetp->mPacketID;
}
packetp->mBuffer[0] |= LL_RESENT_FLAG; // tag packet id as being a resend
gMessageSystem->mPacketRing.sendPacket(packetp->mSocket,
(char *)packetp->mBuffer, packetp->mBufferLength,
packetp->mHost);
mThrottles.throttleOverflow(TC_RESEND, packetp->mBufferLength * 8.f);
// The new method, retry time based on ping
if (packetp->mPingBasedRetry)
{
packetp->mExpirationTime = now + llmax(LL_MINIMUM_RELIABLE_TIMEOUT_SECONDS, F32Seconds(LL_RELIABLE_TIMEOUT_FACTOR * getPingDelayAveraged()));
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
}
else
{
// custom, constant retry time
packetp->mExpirationTime = now + packetp->mTimeout;
}
if (!packetp->mRetries)
{
// Last resend, remove it from this list and add it to the final list.
mUnackedPackets.erase(iter++);
mFinalRetryPackets[packetp->mPacketID] = packetp;
}
else
{
// Don't remove it yet, it still gets to try to resend at least once.
++iter;
}
resent_packets++;
}
else
{
// Don't need to do anything with this packet, keep iterating.
++iter;
}
}
for (iter = mFinalRetryPackets.begin(); iter != mFinalRetryPackets.end();)
{
packetp = iter->second;
if (now > packetp->mExpirationTime)
{
// fail (too many retries)
//LL_INFOS() << "Packet " << packetp->mPacketID << " removed from the pending list: exceeded retry limit" << LL_ENDL;
// LL_INFOS() << "Packet name " << packetp->mMessageName << LL_ENDL;
//}
gMessageSystem->mFailedResendPackets++;
if(gMessageSystem->mVerboseLog)
{
std::ostringstream str;
str << "MSG: -> " << packetp->mHost << "\tABORTING RELIABLE:\t"
<< packetp->mPacketID;
}
if (packetp->mCallback)
{
packetp->mCallback(packetp->mCallbackData,LL_ERR_TCP_TIMEOUT);
}
// Update stats
mUnackedPacketCount--;
mUnackedPacketBytes -= packetp->mBufferLength;
mFinalRetryPackets.erase(iter++);
delete packetp;
}
else
{
++iter;
}
}
return mUnackedPacketCount;
}
LLCircuit::LLCircuit(const F32Seconds circuit_heartbeat_interval, const F32Seconds circuit_timeout)
: mLastCircuit(NULL),
mHeartbeatInterval(circuit_heartbeat_interval),
mHeartbeatTimeout(circuit_timeout)
{}
LLCircuit::~LLCircuit()
{
// delete pointers in the map.
}
LLCircuitData *LLCircuit::addCircuitData(const LLHost &host, TPACKETID in_id)
{
// This should really validate if one already exists
LL_INFOS() << "LLCircuit::addCircuitData for " << host << LL_ENDL;
LLCircuitData *tempp = new LLCircuitData(host, in_id, mHeartbeatInterval, mHeartbeatTimeout);
mCircuitData.insert(circuit_data_map::value_type(host, tempp));
mPingSet.insert(tempp);
mLastCircuit = tempp;
return tempp;
}
void LLCircuit::removeCircuitData(const LLHost &host)
{
LL_INFOS() << "LLCircuit::removeCircuitData for " << host << LL_ENDL;
mLastCircuit = NULL;
circuit_data_map::iterator it = mCircuitData.find(host);
if(it != mCircuitData.end())
{
LLCircuitData *cdp = it->second;
mCircuitData.erase(it);
LLCircuit::ping_set_t::iterator psit = mPingSet.find(cdp);
if (psit != mPingSet.end())
{
mPingSet.erase(psit);
}
else
{
LL_WARNS() << "Couldn't find entry for next ping in ping set!" << LL_ENDL;
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
}
// Clean up from optimization maps
mUnackedCircuitMap.erase(host);
mSendAckMap.erase(host);
delete cdp;
}
// This also has to happen AFTER we nuke the circuit, because various
// callbacks for the circuit may result in messages being sent to
// this circuit, and the setting of mLastCircuit. We don't check
// if the host matches, but we don't really care because mLastCircuit
// is an optimization, and this happens VERY rarely.
mLastCircuit = NULL;
}
void LLCircuitData::setAlive(BOOL b_alive)
{
if (mbAlive != b_alive)
{
mPacketsOutID = 0;
mPacketsInID = 0;
mbAlive = b_alive;
}
if (b_alive)
{
mLastPingReceivedTime = LLMessageSystem::getMessageTimeSeconds();
mPingsInTransit = 0;
mBlocked = FALSE;
}
}
void LLCircuitData::setAllowTimeout(BOOL allow)
{
mbAllowTimeout = allow;
if (allow)
{
// resuming circuit
// make sure it's alive
setAlive(TRUE);
}
}
// Reset per-period counters if necessary.
void LLCircuitData::checkPeriodTime()
{
F64Seconds mt_sec = LLMessageSystem::getMessageTimeSeconds();
F64Seconds period_length = mt_sec - mPeriodTime;
F32 bps_in = F32Bits(mBytesInThisPeriod).value() / period_length.value();
if (bps_in > mPeakBPSIn)
{
mPeakBPSIn = bps_in;
}
F32 bps_out = F32Bits(mBytesOutThisPeriod).value() / period_length.value();
if (bps_out > mPeakBPSOut)
{
mPeakBPSOut = bps_out;
}
mBytesInLastPeriod = mBytesInThisPeriod;
mBytesOutLastPeriod = mBytesOutThisPeriod;
mBytesInThisPeriod = S32Bytes(0);
mBytesOutThisPeriod = S32Bytes(0);
Graham Linden
committed
mLastPeriodLength = F32Seconds::convert(period_length);
void LLCircuitData::addBytesIn(S32Bytes bytes)
{
mBytesIn += bytes;
mBytesInThisPeriod += bytes;
}
void LLCircuitData::addBytesOut(S32Bytes bytes)
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
{
mBytesOut += bytes;
mBytesOutThisPeriod += bytes;
}
void LLCircuitData::addReliablePacket(S32 mSocket, U8 *buf_ptr, S32 buf_len, LLReliablePacketParams *params)
{
LLReliablePacket *packet_info;
packet_info = new LLReliablePacket(mSocket, buf_ptr, buf_len, params);
mUnackedPacketCount++;
mUnackedPacketBytes += packet_info->mBufferLength;
if (params && params->mRetries)
{
mUnackedPackets[packet_info->mPacketID] = packet_info;
}
else
{
mFinalRetryPackets[packet_info->mPacketID] = packet_info;
}
}
void LLCircuit::resendUnackedPackets(S32& unacked_list_length, S32& unacked_list_size)
{
F64Seconds now = LLMessageSystem::getMessageTimeSeconds();
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
unacked_list_length = 0;
unacked_list_size = 0;
LLCircuitData* circ;
circuit_data_map::iterator end = mUnackedCircuitMap.end();
for(circuit_data_map::iterator it = mUnackedCircuitMap.begin(); it != end; ++it)
{
circ = (*it).second;
unacked_list_length += circ->resendUnackedPackets(now);
unacked_list_size += circ->getUnackedPacketBytes();
}
}
BOOL LLCircuitData::isDuplicateResend(TPACKETID packetnum)
{
return (mRecentlyReceivedReliablePackets.find(packetnum) != mRecentlyReceivedReliablePackets.end());
}
void LLCircuit::dumpResends()
{
circuit_data_map::iterator end = mCircuitData.end();
for(circuit_data_map::iterator it = mCircuitData.begin(); it != end; ++it)
{
(*it).second->dumpResendCountAndReset();
}
}
LLCircuitData* LLCircuit::findCircuit(const LLHost& host) const
{
// An optimization on finding the previously found circuit.
if (mLastCircuit && (mLastCircuit->mHost == host))
{
return mLastCircuit;
}
circuit_data_map::const_iterator it = mCircuitData.find(host);
if(it == mCircuitData.end())
{
return NULL;
}
mLastCircuit = it->second;
return mLastCircuit;
}
BOOL LLCircuit::isCircuitAlive(const LLHost& host) const
{
LLCircuitData *cdp = findCircuit(host);
if(cdp)
{
return cdp->mbAlive;
}
return FALSE;
}
void LLCircuitData::setTimeoutCallback(void (*callback_func)(const LLHost &host, void *user_data), void *user_data)
{
mTimeoutCallback = callback_func;
mTimeoutUserData = user_data;
}
void LLCircuitData::checkPacketInID(TPACKETID id, BOOL receive_resent)
{
// Done as floats so we don't have to worry about running out of room
// with U32 getting poked into an S32.
F32 delta = (F32)mHighestPacketID - (F32)id;
if (delta > (0.5f*LL_MAX_OUT_PACKET_ID))
{
// We've almost definitely wrapped, reset the mLastPacketID to be low again.
mHighestPacketID = id;
}
else if (delta < (-0.5f*LL_MAX_OUT_PACKET_ID))
{
// This is almost definitely an old packet coming in after a wrap, ignore it.
}
else
{
mHighestPacketID = llmax(mHighestPacketID, id);
}
Dave SIMmONs
committed
// Save packet arrival time
mLastPacketInTime = LLMessageSystem::getMessageTimeSeconds();
// Have we received anything on this circuit yet?
if (0 == mPacketsIn)
{
// Must be first packet from unclosed circuit.
mPacketsIn++;
setPacketInID((id + 1) % LL_MAX_OUT_PACKET_ID);
return;
}
mPacketsIn++;
// now, check to see if we've got a gap
callum_linden
committed
if (mPacketsInID == id)
{
// nope! bump and wrap the counter, then return
mPacketsInID++;
mPacketsInID = (mPacketsInID) % LL_MAX_OUT_PACKET_ID;
}
else if (id < mWrapID)
{
// id < mWrapID will happen if the first few packets are out of order. . .
// at that point we haven't marked anything "potentially lost" and
// the out-of-order packet will cause a full wrap marking all the IDs "potentially lost"
// do nothing
}
else
{
// we have a gap! if that id is in the map, remove it from the map, leave mCurrentCircuit->mPacketsInID
// alone
// otherwise, walk from mCurrentCircuit->mPacketsInID to id with wrapping, adding the values to the map
// and setting mPacketsInID to id + 1 % LL_MAX_OUT_PACKET_ID
// babbage: all operands in expression are unsigned, so modular
// arithmetic will always find correct gap, regardless of wrap arounds.
const U8 width = 24;
gap = LLModularMath::subtract<width>(mPacketsInID, id);
if (mPotentialLostPackets.find(id) != mPotentialLostPackets.end())
{
if(gMessageSystem->mVerboseLog)
{
std::ostringstream str;
str << "MSG: <- " << mHost << "\tRECOVERING LOST:\t" << id;
// LL_INFOS() << "removing potential lost: " << id << LL_ENDL;
mPotentialLostPackets.erase(id);
}
else if (!receive_resent) // don't freak out over out-of-order reliable resends
{
U64Microseconds time = LLMessageSystem::getMessageTimeUsecs();
TPACKETID index = mPacketsInID;
S32 gap_count = 0;
if ((index < id) && ((id - index) < 16))
{
while (index != id)
{
if(gMessageSystem->mVerboseLog)
{
std::ostringstream str;
str << "MSG: <- " << mHost << "\tPACKET GAP:\t"
<< index;
// LL_INFOS() << "adding potential lost: " << index << LL_ENDL;
mPotentialLostPackets[index] = time;
index++;
index = index % LL_MAX_OUT_PACKET_ID;
gap_count++;
}
}
else
{
LL_INFOS() << "packet_out_of_order - got packet " << id << " expecting " << index << " from " << mHost << LL_ENDL;
if(gMessageSystem->mVerboseLog)
{
std::ostringstream str;
str << "MSG: <- " << mHost << "\tPACKET GAP:\t"
<< id << " expected " << index;
}
}
mPacketsInID = id + 1;
mPacketsInID = (mPacketsInID) % LL_MAX_OUT_PACKET_ID;
if (gap_count > 128)
{
LL_WARNS() << "Packet loss gap filler running amok!" << LL_ENDL;
LL_WARNS() << "Sustaining large amounts of packet loss!" << LL_ENDL;
}
void LLCircuit::updateWatchDogTimers(LLMessageSystem *msgsys)
{
F64Seconds cur_time = LLMessageSystem::getMessageTimeSeconds();
S32 count = mPingSet.size();
S32 cur = 0;
// Only process each circuit once at most, stop processing if no circuits
while((cur < count) && !mPingSet.empty())
{
cur++;
LLCircuit::ping_set_t::iterator psit = mPingSet.begin();
LLCircuitData *cdp = *psit;
if (!cdp->mbAlive)
{
// We suspect that this case should never happen, given how
// the alive status is set.
// Skip over dead circuits, just add the ping interval and push it to the back
// Always remember to remove it from the set before changing the sorting
// key (mNextPingSendTime)
mPingSet.erase(psit);
cdp->mNextPingSendTime = cur_time + mHeartbeatInterval;
mPingSet.insert(cdp);
continue;
}
else
{
// Check to see if this needs a ping
if (cur_time < cdp->mNextPingSendTime)
{
// This circuit doesn't need a ping, break out because
// we have a sorted list, thus no more circuits need pings
break;
}
// Update watchdog timers
if (cdp->updateWatchDogTimers(msgsys))
{
// Randomize our pings a bit by doing some up to 5% early or late
F64Seconds dt = 0.95f*mHeartbeatInterval + F32Seconds(ll_frand(0.1f*mHeartbeatInterval.value()));
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
// Remove it, and reinsert it with the new next ping time.
// Always remove before changing the sorting key.
mPingSet.erase(psit);
cdp->mNextPingSendTime = cur_time + dt;
mPingSet.insert(cdp);
// Update our throttles
cdp->mThrottles.dynamicAdjust();
// Update some stats, this is not terribly important
cdp->checkPeriodTime();
}
else
{
// This mPingSet.erase isn't necessary, because removing the circuit will
// remove the ping set.
//mPingSet.erase(psit);
removeCircuitData(cdp->mHost);
}
}
}
}
BOOL LLCircuitData::updateWatchDogTimers(LLMessageSystem *msgsys)
{
F64Seconds cur_time = LLMessageSystem::getMessageTimeSeconds();
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
mLastPingSendTime = cur_time;
if (!checkCircuitTimeout())
{
// Pass this back to the calling LLCircuit, this circuit needs to be cleaned up.
return FALSE;
}
// WARNING!
// Duplicate suppression can FAIL if packets are delivered out of
// order, although it's EXTREMELY unlikely. It would require
// that the ping get delivered out of order enough that the ACK
// for the packet that it was out of order with was received BEFORE
// the ping was sent.
// Find the current oldest reliable packetID
// This is to handle the case if we actually manage to wrap our
// packet IDs - the oldest will actually have a higher packet ID
// than the current.
BOOL wrapped = FALSE;
reliable_iter iter;
iter = mUnackedPackets.upper_bound(getPacketOutID());
if (iter == mUnackedPackets.end())
{
// Nothing AFTER this one, so we want the lowest packet ID
// then.
iter = mUnackedPackets.begin();
wrapped = TRUE;
}
TPACKETID packet_id = 0;
// Check against the "final" packets
BOOL wrapped_final = FALSE;
reliable_iter iter_final;
iter_final = mFinalRetryPackets.upper_bound(getPacketOutID());
if (iter_final == mFinalRetryPackets.end())
{
iter_final = mFinalRetryPackets.begin();
wrapped_final = TRUE;
}
//LL_INFOS() << mHost << " - unacked count " << mUnackedPackets.size() << LL_ENDL;
//LL_INFOS() << mHost << " - final count " << mFinalRetryPackets.size() << LL_ENDL;
if (wrapped != wrapped_final)
{
// One of the "unacked" or "final" lists hasn't wrapped. Whichever one
// hasn't has the oldest packet.
if (!wrapped)
{
// Hasn't wrapped, so the one on the
// unacked packet list is older
packet_id = iter->first;
//LL_INFOS() << mHost << ": nowrapped unacked" << LL_ENDL;
//LL_INFOS() << mHost << ": nowrapped final" << LL_ENDL;
}
}
else
{
// They both wrapped, we can just use the minimum of the two.
if ((iter == mUnackedPackets.end()) && (iter_final == mFinalRetryPackets.end()))
{
// Wow! No unacked packets at all!
// Send the ID of the last packet we sent out.
// This will flush all of the destination's
// unacked packets, theoretically.
//LL_INFOS() << mHost << ": No unacked!" << LL_ENDL;
packet_id = getPacketOutID();
}
else
{
BOOL had_unacked = FALSE;
if (iter != mUnackedPackets.end())
{
// Unacked list has the lowest so far
packet_id = iter->first;
had_unacked = TRUE;
//LL_INFOS() << mHost << ": Unacked" << LL_ENDL;
}
if (iter_final != mFinalRetryPackets.end())
{
// Use the lowest of the unacked list and the final list
if (had_unacked)
{
// Both had a packet, use the lowest.
packet_id = llmin(packet_id, iter_final->first);
//LL_INFOS() << mHost << ": Min of unacked/final" << LL_ENDL;
}
else
{
// Only the final had a packet, use it.
packet_id = iter_final->first;
//LL_INFOS() << mHost << ": Final!" << LL_ENDL;
}
}
}
}
// Send off the another ping.
pingTimerStart();
msgsys->newMessageFast(_PREHASH_StartPingCheck);
msgsys->nextBlock(_PREHASH_PingID);
msgsys->addU8Fast(_PREHASH_PingID, nextPingID());
msgsys->addU32Fast(_PREHASH_OldestUnacked, packet_id);
msgsys->sendMessage(mHost);
// Also do lost packet accounting.
// Check to see if anything on our lost list is old enough to
// be considered lost
LLCircuitData::packet_time_map::iterator it;
U64Microseconds timeout = llmin(LL_MAX_LOST_TIMEOUT, F32Seconds(getPingDelayAveraged()) * LL_LOST_TIMEOUT_FACTOR);
U64Microseconds mt_usec = LLMessageSystem::getMessageTimeUsecs();
for (it = mPotentialLostPackets.begin(); it != mPotentialLostPackets.end(); )
{
U64Microseconds delta_t_usec = mt_usec - (*it).second;
if (delta_t_usec > timeout)
{
// let's call this one a loss!
mPacketsLost++;
gMessageSystem->mDroppedPackets++;
if(gMessageSystem->mVerboseLog)
{
std::ostringstream str;
str << "MSG: <- " << mHost << "\tLOST PACKET:\t"
<< (*it).first;
Josh Bell
committed
mPotentialLostPackets.erase(it++);
}
else
{
++it;
}
}
return TRUE;
}
void LLCircuitData::clearDuplicateList(TPACKETID oldest_id)
{
// purge old data from the duplicate suppression queue
// we want to KEEP all x where oldest_id <= x <= last incoming packet, and delete everything else.
//LL_INFOS() << mHost << ": clearing before oldest " << oldest_id << LL_ENDL;