00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041 #include "jaus/core/transport/transport.h"
00042 #include "jaus/core/events/event.h"
00043 #include <stdlib.h>
00044 #include <stdio.h>
00045 #include <iostream>
00046 #include <iomanip>
00047 #include <cxutils/fileio.h>
00048
00049
00050 #ifdef WIN32
00051
00052 #else
00053 #include <sys/sem.h>
00054 #include <sys/types.h>
00055 #include <sys/stat.h>
00056 #include <unistd.h>
00057 #endif
00058
00059 using namespace JAUS;
00060
00061 const std::string Transport::Name = "urn:jaus:jss:core:Transport";
00062
00068 Transport::Receipt::Receipt()
00069 {
00070 mPendingFlag = true;
00071 mpMessage = NULL;
00072 mSendTimeMs = mUpdateTimeMs = 0;
00073 mpResponses = NULL;
00074 }
00075
00076
00082 Transport::Receipt::~Receipt()
00083 {
00084 }
00085
00086
00092 Transport::Transport() : Service(Service::ID(Transport::Name, 1.0), Service::ID())
00093 {
00094 mLoggingFlag = false;
00095 mStopMessageProcessingFlag = false;
00096 mMessageProcessingDelayMs = 1;
00097 mNumberOfMessageProcessingThreads = 1;
00098 mJausMessageQueue.Reserve(MaxQueueSize);
00099 mSequenceNumber = 0;
00100 }
00101
00102
00108 Transport::~Transport()
00109 {
00110 mStopMessageProcessingFlag = true;
00111 CxUtils::SleepMs(50);
00112 mMessageProcessingThreads.StopAllThreads();
00113 mPriorityJausMessagesThread.StopThread();
00114 std::map<UShort, Message*>::iterator m;
00115 for(m = mMessageTemplates.begin(); m != mMessageTemplates.end(); m++)
00116 {
00117 delete m->second;
00118 }
00119 mMessageTemplates.clear();
00120 }
00121
00122
00132 void Transport::SetMaxMessageProcessingThreads(const unsigned int maxThreads)
00133 {
00134 if(IsInitialized() == false && maxThreads >= 1)
00135 {
00136 mNumberOfMessageProcessingThreads = maxThreads;
00137 }
00138 }
00139
00140
00155 void Transport::AddPriorityMessage(const UShort messageCode)
00156 {
00157 if(IsInitialized() == false)
00158 {
00159 mPriorityJausMessageTypes.insert(messageCode);
00160 if(mPriorityJausMessagesThread.IsThreadActive() == false)
00161 {
00162 mPriorityJausMessagesThread.CreateThread(PriorityMessageProcessingThread, this);
00163 char buffer[512];
00164 sprintf(buffer, "%s_PRIORITY_THREAD_%d", mComponentID.ToString().c_str(), mMessageProcessingThreads.GetNumThreads() + 1);
00165 }
00166 }
00167 }
00168
00169
00182 void Transport::SetMessagePollingDelayMs(const unsigned int delayTimeMs)
00183 {
00184 mMessageProcessingDelayMs = delayTimeMs;
00185 }
00186
00187
00199 Message* Transport::CreateMessage(const UShort messageCode) const
00200 {
00201
00202 Message* message = CreateMessageFromService(messageCode, this);
00203 if(message == NULL)
00204 {
00205 message = GetMessageFromTemplate(messageCode);
00206 }
00207 return message;
00208 }
00209
00210
00224 bool Transport::Send(const Message* message,
00225 const int broadcastFlags) const
00226 {
00227 Packet::List stream;
00228 Header::List streamHeaders;
00229 Packet::List::iterator packet;
00230 Header::List::iterator header;
00231
00232 if(message->GetDestinationID() == mComponentID)
00233 {
00234 return false;
00235 }
00236
00237 ( (Message * ) message )->SetSourceID(mComponentID);
00238 mSequenceNumberMutex.Lock();
00239 if(SerializeMessage(message, stream, streamHeaders, mSequenceNumber, (Byte)broadcastFlags))
00240 {
00241 *((UShort *)(&mSequenceNumber)) += (UShort)stream.size();
00242 mSequenceNumberMutex.Unlock();
00243 for(packet = stream.begin(), header = streamHeaders.begin();
00244 packet != stream.end() && header != streamHeaders.end();
00245 packet++, header++)
00246 {
00247 if(SendPacket(*packet, *header, broadcastFlags) == false)
00248 {
00249 return false;
00250 }
00251 }
00252 return true;
00253 }
00254 mSequenceNumberMutex.Unlock();
00255 return false;
00256 }
00257
00258
00272 bool Transport::Send(const Message* message,
00273 Message* response,
00274 const unsigned int waitTimeMs) const
00275 {
00276 Message::List possibleResponses;
00277 possibleResponses.push_back(response);
00278 return Send(message, possibleResponses, waitTimeMs);
00279 }
00280
00281
00304 bool Transport::Send(const Message* message,
00305 Message::List& possibleResponses,
00306 const unsigned int waitTimeMs) const
00307 {
00308 Receipt receipt;
00309 Receipt::Set::iterator ritr;
00310
00311 if(message == NULL || possibleResponses.size() == 0 || waitTimeMs == 0 || message->GetDestinationID().IsBroadcast())
00312 {
00313 return false;
00314 }
00315
00316 mPendingReceiptsMutex.Lock();
00317 receipt.mPendingFlag = true;
00318 receipt.mpMessage = message;
00319 receipt.mpResponses = &possibleResponses;
00320 receipt.mSendTimeMs = receipt.mUpdateTimeMs = Time::GetUtcTimeMs();
00321 Receipt::Set* pending = ((Receipt::Set *)&mPendingReceipts);
00322 pending->insert(&receipt);
00323 mPendingReceiptsMutex.Unlock();
00324
00325 if(Send(message))
00326 {
00327 Time::Stamp timeMs = Time::GetUtcTimeMs();
00328 while(timeMs - receipt.mSendTimeMs < waitTimeMs && receipt.mPendingFlag == true)
00329 {
00330 timeMs = Time::GetUtcTimeMs();
00331
00332 #ifdef WIN32
00333 CxUtils::SleepMs(1);
00334 #else
00335 usleep(1);
00336 #endif
00337 }
00338 }
00339
00340
00341
00342 mPendingReceiptsMutex.Lock();
00343 ritr = pending->find(&receipt);
00344 if(ritr != pending->end())
00345 {
00346 pending->erase(ritr);
00347 }
00348 mPendingReceiptsMutex.Unlock();
00349
00350 return !receipt.mPendingFlag;
00351 }
00352
00353
00368 void Transport::RegisterCallback(const UShort messageCode, Transport::Callback* callback)
00369 {
00370 Mutex::ScopedLock lock(&mCallbacksMutex);
00371 mMessageCallbacks[messageCode].insert(callback);
00372 }
00373
00374
00384 void Transport::EnableLogging(const bool flag)
00385 {
00386 Mutex::ScopedLock lock(&mLogFileMutex);
00387 mLoggingFlag = flag;
00388 if(flag)
00389 {
00390 if(mLogFile.is_open() == false)
00391 {
00392
00393 CxUtils::FileIO::CreateDir("logs");
00394 std::string fname = "logs/JAUS++ Transport Receive Log - ";
00395 fname += JAUS::Time::GetUtcTime().ToString();
00396 fname += ".csv";
00397 mLogFile.open(fname.c_str(), std::ios_base::out);
00398 if(mLogFile.is_open())
00399 {
00400 mLogFile << "Time (UTC), Message Code, Message Name, Source ID, Destination ID, Size (Bytes), Sequence Number, Message Body\n";
00401 mLogFile.flush();
00402 }
00403 }
00404 }
00405 else
00406 {
00407 if(mLogFile.is_open())
00408 {
00409 mLogFile.close();
00410 }
00411 }
00412 }
00413
00414
00426 void Transport::ProcessPacket(const Packet& packet, const Header& header)
00427 {
00428
00429 if(Address::DestinationMatch(header.mDestinationID, mComponentID) == false)
00430 {
00431 return;
00432 }
00433
00434 if(mLoggingFlag)
00435 {
00436 WriteToLog(packet, header);
00437 }
00438
00439 if(mMessageProcessingThreads.GetNumThreads() < mNumberOfMessageProcessingThreads)
00440 {
00441 char buffer[512];
00442 sprintf(buffer, "%s_MP_THREAD_%d", mComponentID.ToString().c_str(), mMessageProcessingThreads.GetNumThreads() + 1);
00443 mMessageProcessingThreads.CreateThread(buffer, Transport::MessageProcessingThread, this);
00444 }
00445
00446 UShort messageCode = 0;
00447
00448 packet.SetReadPos(Header::PayloadOffset);
00449 packet.Read(messageCode);
00450 packet.SetReadPos(0);
00451
00452
00453 if(header.mAckNackFlag == Header::AckNack::Request)
00454 {
00455 Header ackHeader = header;
00456 ackHeader.mSourceID = mComponentID;
00457 ackHeader.mDestinationID = header.mSourceID;
00458 ackHeader.mSize = Header::MinSize;
00459 ackHeader.mAckNackFlag = Header::AckNack::Ack;
00460 ackHeader.mSequenceNumber = header.mSequenceNumber;
00461 Packet ackPacket;
00462 if(ackHeader.Write(ackPacket))
00463 {
00464 SendPacket(ackPacket, ackHeader);
00465 if(mLoggingFlag)
00466 {
00467 WriteToLog(ackPacket, ackHeader);
00468 }
00469 }
00470 }
00471
00472 if(header.mControlFlag == Header::DataControl::Single)
00473 {
00474
00475 if(CheckPendingReceipts(header, messageCode, packet) == false)
00476 {
00477
00478 mPendingReceiptsMutex.Unlock();
00479 Mutex::ScopedLock lock(&mJausMessageQueueMutex);
00480 mJausMessageQueue.PushBack(packet);
00481 }
00482 }
00483 else
00484 {
00485
00486 UInt presenceVector = 0;
00487 LargeDataSet::Key key(header.mSourceID,
00488 messageCode,
00489 presenceVector);
00490
00491
00492 Mutex::ScopedLock lockLargeData(&mLargeDataSetsMutex);
00493 LargeDataSet::Map::iterator ld;
00494 ld = mLargeDataSets.find(key);
00495 bool added = false;
00496
00497 for(ld = mLargeDataSets.begin();
00498 ld != mLargeDataSets.end() && added == false;
00499 ld++)
00500 {
00501 if(ld->second->mCompleteFlag == false &&
00502 ld->first.mMessageCode == key.mMessageCode &&
00503 ld->first.mSourceID == key.mSourceID)
00504 {
00505
00506
00507 if(header.mControlFlag == Header::DataControl::First)
00508 {
00509 delete ld->second;
00510 mLargeDataSets.erase(ld);
00511 }
00512 else if(ld->second->AddPacket(header, messageCode, packet))
00513 {
00514 added = true;
00515 }
00516 break;
00517 }
00518 }
00519
00520 if(added == false)
00521 {
00522 LargeDataSet* newStream = new LargeDataSet();
00523 if(newStream->AddPacket(header, messageCode, packet))
00524 {
00525 mLargeDataSets[key] = newStream;
00526 }
00527 else
00528 {
00529 delete newStream;
00530 }
00531 newStream = NULL;
00532 }
00533
00534
00535 ld = mLargeDataSets.begin();
00536 while(ld != mLargeDataSets.end())
00537 {
00538 if(ld->second->mCompleteFlag == true)
00539 {
00540 if(CheckPendingReceipts(ld->second))
00541 {
00542 delete ld->second;
00543 ld = mLargeDataSets.begin();
00544 continue;
00545 }
00546 }
00547 ld++;
00548 }
00549 }
00550 }
00551
00552
00565 Message* Transport::CreateMessageFromService(const UShort messageCode, const Service *service) const
00566 {
00567 Message* message = NULL;
00568
00569 if(mStopMessageProcessingFlag)
00570 {
00571 return message;
00572 }
00573
00574 const Transport* transport = dynamic_cast<const Transport*>(service);
00575 if(transport)
00576 {
00577 message = GetMessageFromTemplate(messageCode);
00578 }
00579
00580 if(message == NULL && this != service)
00581 {
00582 message = service->CreateMessage(messageCode);
00583 }
00584 if(message == NULL)
00585 {
00586 Service::Map children;
00587
00588 {
00589 Mutex::ScopedLock lock(&service->mJausServiceMutex);
00590 children = service->mJausChildServices;
00591 }
00592 Service::Map::const_iterator child;
00593 for(child = children.begin();
00594 child != children.end();
00595 child++)
00596 {
00597 if( (message = CreateMessageFromService(messageCode, child->second)) != NULL )
00598 {
00599 break;
00600 }
00601 }
00602 }
00603
00604 return message;
00605 }
00606
00607
00621 Message* Transport::CreateMessageFromPacket(const Packet& packet) const
00622 {
00623
00624 packet.SetReadPos(0);
00625 UShort messageCode = 0;
00626 packet.Read(messageCode);
00627 Message* message = CreateMessageFromService(messageCode, this);
00628 if(message)
00629 {
00630 Packet::Wrapper wrapper((unsigned char *)(packet.Ptr() + USHORT_SIZE), packet.Length() - USHORT_SIZE);
00631 if(message->ReadMessageBody(*wrapper.GetData()) < 0)
00632 {
00633 delete message;
00634 message = NULL;
00635 if(mDebugMessagesFlag)
00636 {
00637 Mutex::ScopedLock lock(&mDebugMessagesMutex);
00638 std::cout << "[" << GetServiceID().ToString() << "-" << mComponentID.ToString() << "] - Received Malformed Packet [0x" << std::setbase(16) << messageCode << std::setbase(10) << "]\n";
00639 }
00640 }
00641 }
00642
00643 packet.SetReadPos(0);
00644 return message;
00645 }
00646
00647
00653 void Transport::TriggerMessageCallbacks(const Message* message)
00654 {
00655 Mutex::ScopedLock lock(&mCallbacksMutex);
00656 Callback::Map::iterator cb;
00657 cb = mMessageCallbacks.find(message->GetMessageCode());
00658 if(cb != mMessageCallbacks.end())
00659 {
00660 Callback::Set::iterator cb2;
00661 for(cb2 = cb->second.begin();
00662 cb2 != cb->second.end();
00663 cb2++)
00664 {
00665 (*cb2)->ProcessMessage(message);
00666 }
00667 }
00668 }
00669
00670
00680 void Transport::AddMessageTemplate(Message* message)
00681 {
00682 Mutex::ScopedLock lock(&mMessageTemplatesMutex);
00683 if(mMessageTemplates.find(message->GetMessageCode()) != mMessageTemplates.end())
00684 {
00685 delete mMessageTemplates[message->GetMessageCode()];
00686 }
00687 mMessageTemplates[message->GetMessageCode()] = message;
00688 }
00689
00690
00701 Message* Transport::GetMessageFromTemplate(const UShort messageCode) const
00702 {
00703 Message* message = NULL;
00704 {
00705 Mutex::ScopedLock lock(&mMessageTemplatesMutex);
00706 std::map<UShort, Message*>::const_iterator m;
00707 m = mMessageTemplates.find(messageCode);
00708 if(m != mMessageTemplates.end())
00709 {
00710 message = m->second->Clone();
00711 }
00712 }
00713 return message;
00714 }
00715
00716
00723 bool Transport::CheckPendingReceipts(const Header& header,
00724 const UShort messageCode,
00725 const Packet& packet)
00726 {
00727 bool foundReceipt = false;
00728 Receipt::Set::iterator receipt;
00729 mPendingReceiptsMutex.Lock();
00730 for(receipt = mPendingReceipts.begin();
00731 receipt != mPendingReceipts.end() && false == foundReceipt;
00732 receipt++)
00733 {
00734 Message::List::iterator responses;
00735 for(responses = (*receipt)->mpResponses->begin();
00736 responses != (*receipt)->mpResponses->end();
00737 responses++)
00738 {
00739 if((*responses)->GetMessageCode() == messageCode &&
00740 (*receipt)->mpMessage->GetDestinationID() == header.mSourceID)
00741 {
00742 if( (*responses)->Read(packet) && (*responses)->IsResponseToMessage((*receipt)->mpMessage))
00743 {
00744 foundReceipt = true;
00745 (*receipt)->mPendingFlag = false;
00746 (*receipt)->mUpdateTimeMs = Time::GetUtcTimeMs();
00747 Message* final = (*responses);
00748
00749
00750 responses = (*receipt)->mpResponses->begin();
00751 while(responses != (*receipt)->mpResponses->end())
00752 {
00753 if( (*responses) != final )
00754 {
00755 responses = (*receipt)->mpResponses->erase(responses); continue;
00756 }
00757 responses++;
00758 }
00759 mPendingReceipts.erase(receipt);
00760 break;
00761 }
00762 }
00763 }
00764 if(foundReceipt)
00765 break;
00766 }
00767 mPendingReceiptsMutex.Unlock();
00768
00769 return foundReceipt;
00770 }
00771
00772
00779 bool Transport::CheckPendingReceipts(const LargeDataSet* stream)
00780 {
00781 bool foundReceipt = false;
00782 Receipt::Set::iterator receipt;
00783 mPendingReceiptsMutex.Lock();
00784 for(receipt = mPendingReceipts.begin();
00785 receipt != mPendingReceipts.end() && false == foundReceipt;
00786 receipt++)
00787 {
00788 Message::List::iterator responses;
00789 for(responses = (*receipt)->mpResponses->begin();
00790 responses != (*receipt)->mpResponses->end();
00791 responses++)
00792 {
00793 if((*responses)->GetMessageCode() == stream->mMessageCode &&
00794 (*receipt)->mpMessage->GetDestinationID() == stream->mHeader.mSourceID)
00795 {
00796 if( (*responses)->ReadLargeDataSet(stream->mStream) && (*responses)->IsResponseToMessage((*receipt)->mpMessage))
00797 {
00798 foundReceipt = true;
00799 (*receipt)->mPendingFlag = false;
00800 (*receipt)->mUpdateTimeMs = Time::GetUtcTimeMs();
00801 Message* final = (*responses);
00802
00803
00804 responses = (*receipt)->mpResponses->begin();
00805 while(responses != (*receipt)->mpResponses->end())
00806 {
00807 if( (*responses) != final )
00808 {
00809 responses = (*receipt)->mpResponses->erase(responses); continue;
00810 }
00811 responses++;
00812 }
00813 mPendingReceipts.erase(receipt);
00814 break;
00815 }
00816 }
00817 }
00818 if(foundReceipt)
00819 break;
00820 }
00821 mPendingReceiptsMutex.Unlock();
00822
00823 return foundReceipt;
00824 }
00825
00826
00832 void Transport::MessageProcessingThread(void* args)
00833 {
00834 Transport* transport = (Transport *)args;
00835 Packet packetToProcess;
00836 bool dataToProcess = false;
00837 unsigned int loopCounter = 0;
00838
00839 while(transport && transport->mStopMessageProcessingFlag == false)
00840 {
00841 dataToProcess = false;
00842 packetToProcess.Clear();
00843
00844
00845
00846 transport->mJausMessageQueueMutex.Lock();
00847 dataToProcess = transport->mJausMessageQueue.PopFront(&packetToProcess);
00848 transport->mJausMessageQueueMutex.Unlock();
00849
00850 if(dataToProcess && transport->mStopMessageProcessingFlag == false)
00851 {
00852 Header header;
00853 UShort messageCode = 0;
00854 Message* message = NULL;
00855
00856 packetToProcess.SetReadPos(0);
00857 if(header.Read(packetToProcess) > 0 &&
00858 packetToProcess.Read(messageCode) > 0)
00859 {
00860 bool foundReceipt = false;
00861 packetToProcess.SetReadPos(0);
00862
00863 message = transport->CreateMessageFromService(messageCode, transport);
00864
00865 if(message && message->Read(packetToProcess) > 0)
00866 {
00867 if(transport->mDebugMessagesFlag)
00868 {
00869 Mutex::ScopedLock lock(&transport->mDebugMessagesMutex);
00870 std::cout << "[" << transport->GetServiceID().ToString() << "-" << transport->mComponentID.ToString() << "] - Processing " << message->GetMessageName() << " Message\n";
00871 }
00872 if(transport->mStopMessageProcessingFlag == false)
00873 {
00874 if(transport->IsPriority(&message) == false)
00875 {
00876 transport->PushMessageToChildren(message);
00877 }
00878 }
00879 }
00880 else if(message == NULL && transport->mDebugMessagesFlag)
00881 {
00882 Mutex::ScopedLock lock(&transport->mDebugMessagesMutex);
00883 std::cout << "[" << transport->GetServiceID().ToString() << "-" << transport->mComponentID.ToString() << "] - Received Unsupported Message Type [0x" << std::setbase(16) << messageCode << std::setbase(10) << "]\n";
00884 }
00885 }
00886
00887
00888 if(message)
00889 {
00890 delete message;
00891 }
00892 }
00893
00894 if(transport->mStopMessageProcessingFlag == true)
00895 {
00896 break;
00897 }
00898
00899
00900 LargeDataSet* stream = NULL;
00901 LargeDataSet::Map::iterator lds;
00902 transport->mLargeDataSetsMutex.Lock();
00903 lds = transport->mLargeDataSets.begin();
00904 while(lds != transport->mLargeDataSets.end())
00905 {
00906
00907 if(lds->second->mCompleteFlag)
00908 {
00909 stream = lds->second;
00910 transport->mLargeDataSets.erase(lds);
00911 break;
00912 }
00913
00914 else if(Time::GetUtcTimeMs() - lds->second->mUpdateTimeMs > 500)
00915 {
00916 delete lds->second;
00917 transport->mLargeDataSets.erase(lds);
00918 lds = transport->mLargeDataSets.begin();
00919 continue;
00920 }
00921 lds++;
00922 }
00923 transport->mLargeDataSetsMutex.Unlock();
00924
00925
00926 if(stream)
00927 {
00928 Message* message = NULL;
00929 message = transport->CreateMessageFromService(stream->mMessageCode, transport);
00930 if(message && message->ReadLargeDataSet(stream->mStream) && transport->mStopMessageProcessingFlag == false)
00931 {
00932 if(transport->IsPriority(&message) == false)
00933 {
00934 transport->PushMessageToChildren(message);
00935 }
00936 }
00937
00938 if(message)
00939 {
00940 delete message;
00941 }
00942 delete stream;
00943 stream = NULL;
00944 }
00945
00946 if(transport->mMessageProcessingDelayMs == 0)
00947 {
00948
00949
00950 #ifdef WIN32
00951 if( ++loopCounter == 10 )
00952 {
00953 loopCounter = 0;
00954 CxUtils::SleepMs(1);
00955 }
00956 #else
00957 loopCounter = 0;
00958 usleep(500);
00959 #endif
00960 }
00961 else
00962 {
00963 CxUtils::SleepMs(transport->mMessageProcessingDelayMs);
00964 }
00965 }
00966
00967 packetToProcess.Destroy();
00968 }
00969
00970
00971
00977 void Transport::PriorityMessageProcessingThread(void* args)
00978 {
00979 Transport* transport = (Transport *)args;
00980 Message::List toProcess;
00981 Message::List::iterator message;
00982
00983 unsigned int loopCounter = 0;
00984
00985 while(transport && transport->mStopMessageProcessingFlag == false)
00986 {
00987 std::map<Address, Message::Map>::iterator src;
00988 Message::Map::iterator m;
00989 transport->mPriorityJausMessageMutex.Lock();
00990 for(src = transport->mPriorityJausMessages.begin();
00991 src != transport->mPriorityJausMessages.end();
00992 src++)
00993 {
00994 for(m = src->second.begin();
00995 m != src->second.end();
00996 m++)
00997 {
00998 toProcess.push_back(m->second);
00999 }
01000 src->second.clear();
01001 }
01002 transport->mPriorityJausMessageMutex.Unlock();
01003
01004 for(message = toProcess.begin();
01005 message != toProcess.end();
01006 message++)
01007 {
01008 transport->PushMessageToChildren(*message);
01009 delete (*message);
01010 }
01011 toProcess.clear();
01012
01013 if(transport->mMessageProcessingDelayMs == 0)
01014 {
01015
01016
01017 #ifdef WIN32
01018 if( ++loopCounter == 10 )
01019 {
01020 loopCounter = 0;
01021 CxUtils::SleepMs(1);
01022 }
01023 #else
01024 loopCounter = 0;
01025 usleep(500);
01026 #endif
01027 }
01028 else
01029 {
01030 CxUtils::SleepMs(transport->mMessageProcessingDelayMs);
01031 }
01032 }
01033
01034 std::map<Address, Message::Map>::iterator src;
01035 Message::Map::iterator m;
01036 transport->mPriorityJausMessageMutex.Lock();
01037 for(src = transport->mPriorityJausMessages.begin();
01038 src != transport->mPriorityJausMessages.end();
01039 src++)
01040 {
01041 for(m = src->second.begin();
01042 m != src->second.end();
01043 m++)
01044 {
01045 delete m->second;
01046 }
01047 }
01048 transport->mPriorityJausMessages.clear();
01049 transport->mPriorityJausMessageMutex.Unlock();
01050
01051 for(message = toProcess.begin();
01052 message != toProcess.end();
01053 message++)
01054 {
01055 delete (*message);
01056 }
01057 }
01058
01059
01065 void Transport::WriteToLog(const Packet& packet,
01066 const Header& header)
01067 {
01068 if(mLoggingFlag)
01069 {
01070 Mutex::ScopedLock lock(&mLogFileMutex);
01071 if(mLogFile.is_open() == false) { return; }
01072
01073 std::string messageName = "Unknown";
01074 mLogFile << Time::GetUtcTime().ToString() << ", ";
01075 UShort messageCode = 0;
01076 packet.Read(messageCode, Header::PayloadOffset);
01077 mLogFile << "0x" << std::hex << std::uppercase << std::setfill('0') << std::setw(4) << messageCode << ", ";
01078
01079
01080 std::map<UShort, std::string>::iterator n;
01081 n = mMessageNames.find(messageCode);
01082 if(n == mMessageNames.end())
01083 {
01084 Message* message = CreateMessageFromService(messageCode, this);
01085 if(message)
01086 {
01087 messageName = mMessageNames[messageCode] = message->GetMessageName();
01088 delete message;
01089 }
01090 else
01091 {
01092 messageName = "Unknown";
01093 }
01094 }
01095 else
01096 {
01097 messageName = n->second;
01098 }
01099 mLogFile << messageName << ", ";
01100
01101 mLogFile << header.mSourceID.ToString() << ", " << header.mDestinationID.ToString() << ", ";
01102 mLogFile << std::dec << header.mSize << ", " << header.mSequenceNumber << ", ";
01103 unsigned char* ptr = (unsigned char *)(packet.Ptr() + Header::PayloadOffset + USHORT_SIZE);
01104 for(unsigned int i = 0; i < packet.Size() - (Header::PayloadOffset + USHORT_SIZE); i++)
01105 {
01106 mLogFile << "0x" << std::hex << std::uppercase << std::setfill('0') << std::setw(2) << (unsigned short)*ptr << ", ";
01107 ptr++;
01108 }
01109 mLogFile << std::endl;
01110 mLogFile.flush();
01111 }
01112 }
01113
01114
01126 bool Transport::IsPriority(Message** message)
01127 {
01128 bool result = false;
01129
01130 JAUS::Event* e = dynamic_cast<JAUS::Event*>(*message);
01131 UShort messageCode = (*message)->GetMessageCode();
01132 if(e)
01133 {
01134 messageCode = e->GetReportMessageCode();
01135 }
01136
01137 mPriorityJausMessageMutex.Lock();
01138
01139 if(mPriorityJausMessageTypes.find(messageCode) != mPriorityJausMessageTypes.end())
01140 {
01141 result = true;
01142 std::map<Address, Message::Map >::iterator src;
01143 src = mPriorityJausMessages.find((*message)->GetSourceID());
01144 if(src == mPriorityJausMessages.end())
01145 {
01146 mPriorityJausMessages[(*message)->GetSourceID()][(*message)->GetMessageCode()] = (*message);
01147 }
01148 else
01149 {
01150 Message::Map::iterator msg;
01151 msg = src->second.find((*message)->GetMessageCode());
01152 if(msg == src->second.end())
01153 {
01154 src->second[(*message)->GetMessageCode()] = (*message);
01155 }
01156 else
01157 {
01158 delete msg->second;
01159 msg->second = (*message);
01160 }
01161 }
01162
01163 (*message) = NULL;
01164 }
01165 mPriorityJausMessageMutex.Unlock();
01166 return result;
01167 }
01168
01169
01170