Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040 #include "jaus/core/transport/largedataset.h"
00041 #include <string.h>
00042
00043 using namespace JAUS;
00044
00045
00055 LargeDataSet::Key::Key(const Address& src,
00056 const UShort code,
00057 const UInt pv)
00058 {
00059 mSourceID = src;
00060 mMessageCode = code;
00061 mPresenceVector = pv;
00062 mIdentifier = 0;
00063 }
00064
00065
00071 LargeDataSet::Key::Key(const LargeDataSet::Key& key)
00072 {
00073 *this = key;
00074 }
00075
00076
00082 LargeDataSet::Key::~Key()
00083 {
00084 }
00085
00086
00092 void LargeDataSet::Key::Update() const
00093 {
00094 Packet * p = ( (Packet *)(&mKey) );
00095 p->Clear();
00096 p->Reserve(UINT_SIZE + USHORT_SIZE + ULONG_SIZE + BYTE_SIZE);
00097 p->Write(mSourceID.ToUInt());
00098 p->Write(mMessageCode);
00099 p->Write(mPresenceVector);
00100 p->Write(mIdentifier);
00101 }
00102
00103
00109 bool LargeDataSet::Key::operator<(const LargeDataSet::Key& key) const
00110 {
00111 Update();
00112 key.Update();
00113
00114 if(memcmp(mKey.Ptr(), key.mKey.Ptr(), mKey.Length()) < 0)
00115 {
00116 return true;
00117 }
00118 return false;
00119 }
00120
00121
00127 bool LargeDataSet::Key::operator<=(const LargeDataSet::Key& key) const
00128 {
00129 Update();
00130 key.Update();
00131
00132 if(memcmp(mKey.Ptr(), key.mKey.Ptr(), mKey.Length()) <= 0)
00133 {
00134 return true;
00135 }
00136 return false;
00137 }
00138
00139
00145 LargeDataSet::Key& LargeDataSet::Key::operator=(const LargeDataSet::Key& key)
00146 {
00147 if(this != &key)
00148 {
00149 mSourceID = key.mSourceID;
00150 mMessageCode = key.mMessageCode;
00151 mPresenceVector = key.mPresenceVector;
00152 mIdentifier = key.mIdentifier;
00153 mKey = key.mKey;
00154 }
00155 return *this;
00156 }
00157
00158
00164 LargeDataSet::LargeDataSet()
00165 {
00166 mCompleteFlag = false;
00167 mUpdateTimeMs = 0;
00168 mMessageCode = 0;
00169 mHaveLastFlag = false;
00170 }
00171
00172
00178 LargeDataSet::~LargeDataSet()
00179 {
00180 }
00181
00182
00188 void LargeDataSet::Clear()
00189 {
00190 mHeader = Header();
00191 mMessageCode = 0;
00192 mHaveLastFlag = false;
00193 mCompleteFlag = false;
00194 mStream.clear();
00195 mMissing.clear();
00196 mUpdateTimeMs = 0;
00197 }
00198
00199
00209 bool LargeDataSet::AddPacket(const Packet& message)
00210 {
00211 Header header;
00212 UShort messageCode;
00213 message.SetReadPos(0);
00214 if(header.Read(message) &&
00215 message.Read(messageCode))
00216 {
00217 if(header.mControlFlag != Header::DataControl::Single)
00218 {
00219 return AddPacket(header, messageCode, message);
00220 }
00221 }
00222 return false;
00223 }
00224
00225
00237 bool LargeDataSet::AddPacket(const Header& header,
00238 const UShort messageCode,
00239 const Packet& packet)
00240 {
00241 if(header.mControlFlag == Header::DataControl::Single || mCompleteFlag)
00242 {
00243 return false;
00244 }
00245 if(mStream.size() == 0)
00246 {
00247 mMessageCode = messageCode;
00248 mHeader.mSourceID = header.mSourceID;
00249 mHeader.mDestinationID = header.mDestinationID;
00250 mHeader.mSequenceNumber = header.mSequenceNumber;
00251
00252
00253
00254
00255
00256
00257 mStream[header.mSequenceNumber] = packet;
00258 mUpdateTimeMs = Time::GetUtcTimeMs();
00259 return true;
00260 }
00261
00262 if(messageCode != mMessageCode ||
00263 header.mSourceID != mHeader.mSourceID)
00264 {
00265 return false;
00266 }
00267
00268
00269
00270
00271
00272
00273
00274
00275
00276
00277
00278
00279
00280
00281
00282
00283
00284
00285
00286
00287
00288
00289
00290
00291
00292
00293 if(header.mControlFlag == Header::DataControl::Last)
00294 {
00295 mHaveLastFlag = true;
00296 }
00297
00298 mStream[header.mSequenceNumber] = packet;
00299 if(mHaveLastFlag)
00300 {
00301
00302 Stream::iterator p;
00303 UShort prevSequenceNumber = 0;
00304 bool first = true;
00305 mCompleteFlag = true;
00306 for(p = mStream.begin(); p != mStream.end(); p++)
00307 {
00308 if(first)
00309 {
00310 first = false;
00311 }
00312 else if(prevSequenceNumber + 1 != p->first)
00313 {
00314 mCompleteFlag = false;
00315 break;
00316 }
00317 prevSequenceNumber = p->first;
00318 }
00319 }
00320 mUpdateTimeMs = Time::GetUtcTimeMs();
00321 return true;
00322 }
00323
00324
00346 void LargeDataSet::CreateLargeDataSet(const Header& header,
00347 const UShort messageCode,
00348 const Packet& payload,
00349 Packet::List& stream,
00350 Header::List& streamHeaders,
00351 const Packet* transportHeader,
00352 const UShort maxPayloadSize,
00353 const UShort startingSequenceNumber)
00354 {
00355 stream.clear();
00356 streamHeaders.clear();
00357
00358 Header sHeader(header);
00359 Packet sPacket;
00360
00361 unsigned int total = 0;
00362 unsigned int toWrite = 0;
00363 const unsigned char* ptr = payload.Ptr();
00364 unsigned int transportHeaderSize = transportHeader ? transportHeader->Length() : 0;
00365
00366
00367 sPacket.Reserve(maxPayloadSize + Header::MinSize + transportHeaderSize + USHORT_SIZE);
00368 sHeader.mSequenceNumber = startingSequenceNumber;
00369
00370 while(total < payload.Length())
00371 {
00372
00373 if(sHeader.mSequenceNumber == startingSequenceNumber)
00374 {
00375 sHeader.mControlFlag = Header::DataControl::First;
00376 }
00377 else
00378 {
00379 sHeader.mControlFlag = Header::DataControl::Normal;
00380 }
00381
00382 toWrite = maxPayloadSize;
00383 if(total + toWrite > payload.Length())
00384 {
00385 toWrite = payload.Length() - total;
00386
00387
00388 if(sHeader.mSequenceNumber != 0)
00389 {
00390 sHeader.mControlFlag = Header::DataControl::Last;
00391 }
00392 else
00393 {
00394 sHeader.mControlFlag = Header::DataControl::Single;
00395 }
00396 }
00397 sHeader.mSize = (UShort)(Header::MinSize + USHORT_SIZE + toWrite);
00398
00399 sPacket.Clear();
00400 if(transportHeaderSize > 0)
00401 {
00402 sPacket.Write(*transportHeader);
00403 }
00404 sHeader.Write(sPacket);
00405 sPacket.Write(messageCode);
00406 total += (unsigned int)sPacket.Write((unsigned char *)(ptr), toWrite);
00407 stream.push_back(sPacket);
00408 streamHeaders.push_back(sHeader);
00409 ptr += toWrite;
00410 sHeader.mSequenceNumber++;
00411 }
00412 }
00413
00414
00431 bool LargeDataSet::MergeLargeDataSet(Header& header,
00432 UShort& messageCode,
00433 Packet& payload,
00434 const Packet::List& stream,
00435 const Packet* transportHeader)
00436 {
00437 Header sHeader;
00438 Packet::List::const_iterator sPacket;
00439 UShort sequenceNumber = 0;
00440 std::map<UShort, const Packet*> orderedPackets;
00441 std::map<UShort, Header> orderedHeaders;
00442
00443
00444 header.mSize = Header::MinSize;
00445 header.mAckNackFlag = Header::AckNack::None;
00446 header.mSequenceNumber = 0;
00447 header.mControlFlag = Header::DataControl::Single;
00448 payload.Clear();
00449
00450 unsigned int transportHeaderSize = transportHeader ? transportHeader->Length() : 0;
00451 for(sPacket = stream.begin(); sPacket != stream.end(); sPacket++)
00452 {
00453
00454 if(transportHeaderSize > 0)
00455 {
00456 sPacket->SetReadPos(transportHeaderSize);
00457 }
00458 else
00459 {
00460 sPacket->SetReadPos(0);
00461 }
00462 if(sHeader.Read(*sPacket) && sPacket->Read(messageCode))
00463 {
00464 orderedPackets[sHeader.mSequenceNumber] = &(*sPacket);
00465 orderedHeaders[sHeader.mSequenceNumber] = sHeader;
00466
00467
00468
00469
00470
00471
00472
00473
00474
00475
00476
00477
00478
00479
00480
00481
00482
00483
00484 }
00485 else
00486 {
00487 payload.Clear();
00488 break;
00489 }
00490 }
00491 if(stream.size() == orderedPackets.size())
00492 {
00493 std::map<UShort, const Packet*>::iterator data;
00494 std::map<UShort, Header>::iterator dataHeader;
00495
00496 bool first = true;
00497 UShort prevSequenceNumber = 0;
00498 for(data = orderedPackets.begin(), dataHeader = orderedHeaders.begin();
00499 data != orderedPackets.end() && dataHeader != orderedHeaders.end();
00500 data++, dataHeader++)
00501 {
00502 if(first)
00503 {
00504 if(dataHeader->second.mControlFlag != Header::DataControl::First)
00505 {
00506 break;
00507 }
00508 header.mSourceID = dataHeader->second.mSourceID;
00509 header.mDestinationID = dataHeader->second.mDestinationID;
00510 header.mPriorityFlag = dataHeader->second.mPriorityFlag;
00511 first = false;
00512 }
00513 else if(prevSequenceNumber + 1 != data->first)
00514 {
00515 break;
00516 }
00517 prevSequenceNumber = header.mSequenceNumber = data->first;
00518
00519 payload.Write((unsigned char *)(data->second->Ptr() + data->second->GetReadPos()),
00520 dataHeader->second.mSize - (Header::MinSize + USHORT_SIZE));
00521 }
00522 }
00523
00524 return payload.Length() > 0;
00525 }
00526
00527
00544 bool LargeDataSet::MergeLargeDataSet(Header& header,
00545 UShort& messageCode,
00546 Packet& payload,
00547 const Stream& stream,
00548 const Packet* transportHeader)
00549 {
00550 Header sHeader;
00551 Stream::const_iterator sPacket;
00552 UShort prevSequenceNumber = 0;
00553
00554
00555 header.mSize = Header::MinSize;
00556 header.mAckNackFlag = Header::AckNack::None;
00557 header.mSequenceNumber = 0;
00558 header.mControlFlag = Header::DataControl::Single;
00559 payload.Clear();
00560
00561 bool first = true;
00562 unsigned int transportHeaderSize = transportHeader ? transportHeader->Length() : 0;
00563 for(sPacket = stream.begin(); sPacket != stream.end(); sPacket++)
00564 {
00565
00566 if(transportHeaderSize > 0)
00567 {
00568 sPacket->second.SetReadPos(transportHeaderSize);
00569 }
00570 else
00571 {
00572 sPacket->second.SetReadPos(0);
00573 }
00574 if(sHeader.Read(sPacket->second) && sPacket->second.Read(messageCode))
00575 {
00576 if(first)
00577 {
00578 header.mSourceID = sHeader.mSourceID;
00579 header.mDestinationID = sHeader.mDestinationID;
00580 header.mPriorityFlag = sHeader.mPriorityFlag;
00581 first = false;
00582 }
00583
00584 else if(prevSequenceNumber + 1 != sHeader.mSequenceNumber)
00585 {
00586 payload.Clear();
00587 break;
00588 }
00589
00590 payload.Write((unsigned char *)(sPacket->second.Ptr() + sPacket->second.GetReadPos()),
00591 sHeader.mSize - (Header::MinSize + USHORT_SIZE));
00592 header.mSequenceNumber = prevSequenceNumber = sHeader.mSequenceNumber;
00593 }
00594 else
00595 {
00596 payload.Clear();
00597 break;
00598 }
00599 }
00600
00601 return payload.Length() > 0;
00602 }
00603
00604