00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040 #include "jaus/core/transport/judp.h"
00041 #include "jaus/core/management/management.h"
00042 #include "jaus/core/component.h"
00043 #include <iostream>
00044 #include <tinyxml/tinyxml.h>
00045
00046 using namespace JAUS;
00047
00048
00054 JUDP::JUDP()
00055 {
00056 mDisconnectTimeMs = 5000;
00057 mMulticastIP.SetAddress("239.255.0.1");
00058 mTimeToLive = 16;
00059 mTransportHeader.Write(Version);
00060 mMaxPayloadSize = 1500 - OverheadSizeBytes - USHORT_SIZE;
00061 mDelayTimeMs = 0;
00062 mUseBroadcastingFlag = false;
00063 mInternalDiscoveryPort = 49152;
00064 }
00065
00066
00072 JUDP::~JUDP()
00073 {
00074 Shutdown();
00075 mPrimaryThread.StopThread();
00076 mSecondaryThread.StopThread();
00077 mInput.Shutdown();
00078 mMulticast.Shutdown();
00079 mPrimaryThreadCreatedFlag = false;
00080 }
00081
00082
00095 void JUDP::SetPacketPollingDelayMs(const unsigned int delayTimeMs)
00096 {
00097 if(mPrimaryThread.IsThreadActive() == false)
00098 {
00099 mDelayTimeMs = delayTimeMs;
00100 }
00101 }
00102
00103
00114 bool JUDP::LoadSettings(const std::string& filename)
00115 {
00116 TiXmlDocument xml;
00117
00118 if(xml.LoadFile(filename.c_str()) == false)
00119 {
00120 return false;
00121 }
00122 TiXmlHandle doc(&xml);
00123 TiXmlNode* node;
00124
00125 node = doc.FirstChild("JAUS").FirstChild("Transport").FirstChild("BroadcastIP").FirstChild().ToNode();
00126 if(node && node->Value())
00127 {
00128 mUseBroadcastingFlag = true;
00129 }
00130 node = doc.FirstChild("JAUS").FirstChild("Transport").FirstChild("UseBroadcasting").FirstChild().ToNode();
00131 if(node && node->Value())
00132 {
00133 mUseBroadcastingFlag = atoi(node->Value()) > 0 ? true : false;
00134 }
00135 node = doc.FirstChild("JAUS").FirstChild("Transport").FirstChild("MulticastIP").FirstChild().ToNode();
00136 if(node && node->Value())
00137 {
00138 mMulticastIP.SetAddress(node->Value());
00139 }
00140 node = doc.FirstChild("JAUS").FirstChild("Transport").FirstChild("TTL").FirstChild().ToNode();
00141 if(node && node->Value())
00142 {
00143 mTimeToLive = (unsigned char )atoi(node->Value());
00144 }
00145 node = doc.FirstChild("JAUS").FirstChild("Transport").FirstChild("NetAddress").FirstChild().ToNode();
00146 if(node && node->Value())
00147 {
00148 mHostIP.SetAddress(node->Value());
00149 }
00150 node = doc.FirstChild("JAUS").FirstChild("Transport").FirstChild("MaxPacketSizeBytes").FirstChild().ToNode();
00151 if(node && node->Value())
00152 {
00153 mMaxPayloadSize = (unsigned int)atoi(node->Value());
00154 }
00155 node = doc.FirstChild("JAUS").FirstChild("Transport").FirstChild("Logger").FirstChild().ToNode();
00156 if(node && node->Value() && atoi(node->Value()) > 0)
00157 {
00158 EnableLogging(true);
00159 }
00160 else
00161 {
00162 EnableLogging(false);
00163 }
00164 node = doc.FirstChild("JAUS").FirstChild("Transport").FirstChild("DisconnectTimeMs").FirstChild().ToNode();
00165 if(node && node->Value() && atoi(node->Value()) >= 0)
00166 {
00167 SetDisconnectTimeMs(atoi(node->Value()));
00168 }
00169 return true;
00170 }
00171
00172
00183 bool JUDP::Initialize(const Address& componentID)
00184 {
00185 if(componentID.IsValid() == false || componentID.IsBroadcast())
00186 {
00187 return false;
00188 }
00189
00190 Shutdown();
00191
00192
00193 mComponentID = componentID;
00194 CxUtils::IP4Address::List hostnames;
00195 CxUtils::IP4Address::List::iterator eth0;
00196 CxUtils::Socket::GetHostAddresses(hostnames);
00197 if(hostnames.size() == 0 && mDebugMessagesFlag)
00198 {
00199 Mutex::ScopedLock lock(&mDebugMessagesMutex);
00200 std::cout << "[JUDP-" << mComponentID.ToString() << "] - No Network Interface Found.\n";
00201 }
00202 unsigned int count = 0;
00203 for(eth0 = hostnames.begin();
00204 eth0 != hostnames.end();
00205 eth0++, count++)
00206 {
00207
00208 if(mHostIP.mString == "0.0.0.0" ||
00209 mHostIP.mString == "127.0.0.1" ||
00210 mHostIP.mString.empty() ||
00211 mHostIP == *eth0)
00212 {
00213
00214 if(mInput.InitializeMulticastSocket(Port, mMulticastIP, true))
00215 {
00216 mMulticast.SetNetworkInterface(*eth0);
00217 if(mMulticast.InitializeMulticastSocket(mMulticastIP, Port, mTimeToLive, mUseBroadcastingFlag, 0))
00218 {
00219 SetComponentID(componentID);
00220
00221 mPrimaryThread.CreateThread(JUDP::ReceiveThread, this);
00222 CxUtils::SleepMs(250);
00223 mSecondaryThread.CreateThread(JUDP::ReceiveThread, this);
00224
00225 mPrimaryThread.SetThreadName(std::string("JUDP 1 ") + componentID.ToString());
00226 mSecondaryThread.SetThreadName(std::string("JUDP 2 ") + componentID.ToString());
00227 return true;
00228 }
00229 }
00230
00231
00232
00233
00234
00235
00236
00237
00238
00239
00240
00241
00242
00243
00244
00245
00246
00247
00248
00249
00250 }
00251 }
00252
00253 Shutdown();
00254
00255 return false;
00256 }
00257
00258
00264 bool JUDP::IsInitialized() const
00265 {
00266 return mInput.IsValid();
00267 }
00268
00269
00275 void JUDP::Shutdown()
00276 {
00277 {
00278 Mutex::ScopedLock lock(&mClientsMutex);
00279 std::map<Address, CxUtils::UdpClient*>::iterator client;
00280 for(client = mClients.begin();
00281 client != mClients.end();
00282 client++)
00283 {
00284 client->second->Shutdown();
00285 delete client->second;
00286 }
00287 mClients.clear();
00288 }
00289
00290
00291
00292
00293
00294
00295
00296
00297
00298
00299
00300
00301
00302 for(unsigned int i = 0; i < 500; i++)
00303 {
00304 if(mPrimaryThread.IsThreadActive() == false &&
00305 mSecondaryThread.IsThreadActive() == false)
00306 {
00307 break;
00308 }
00309 CxUtils::SleepMs(1);
00310 }
00311
00312
00313 mPrimaryThread.KillThread();
00314 mSecondaryThread.KillThread();
00315
00316 mInput.Shutdown();
00317 mMulticast.Shutdown();
00318
00319 mComponentID.Clear();
00320 mPrimaryThreadCreatedFlag = false;
00321 }
00322
00323
00329 Address::List JUDP::GetConnections() const
00330 {
00331 Mutex::ScopedLock lock(&mClientsMutex);
00332 Address::List results;
00333 std::map<Address, CxUtils::UdpClient *>::const_iterator client;
00334 for(client = mClients.begin();
00335 client != mClients.end();
00336 client++)
00337 {
00338 results.push_back(client->first);
00339 }
00340 return results;
00341 }
00342
00343
00351 bool JUDP::HaveConnection(const Address& id) const
00352 {
00353 Mutex::ScopedLock lock(&mClientsMutex);
00354 Address::List results;
00355 std::map<Address, CxUtils::UdpClient *>::const_iterator client;
00356 client = mClients.find(id);
00357 if(client != mClients.end())
00358 {
00359 return true;
00360 }
00361 return false;
00362 }
00363
00364
00381 bool JUDP::SendPacket(const Packet& packet,
00382 const Header& header,
00383 const int broadcastFlags) const
00384 {
00385 bool result = false;
00386
00387 if(broadcastFlags != Service::NoBroadcast)
00388 {
00389 result = mMulticast.Send(packet) > 0;
00390
00391 if(result == false && mDebugMessagesFlag)
00392 {
00393 Mutex::ScopedLock lock(&mDebugMessagesMutex);
00394 std::cout << "[JUDP-" << mComponentID.ToString() << "] - Failed to Broadcast Message to " << header.mDestinationID.ToString() << "\n";
00395 }
00396 if(header.mDestinationID.IsBroadcast() == false)
00397 {
00398 return result;
00399 }
00400 }
00401
00402 if(header.mDestinationID.IsBroadcast())
00403 {
00404
00405 result = true;
00406 std::map<Address, CxUtils::UdpClient*>::const_iterator client;
00407 Mutex::ScopedLock lock(&mClientsMutex);
00408 for(client = mClients.begin(); client != mClients.end(); client++)
00409 {
00410 if(Address::DestinationMatch(header.mDestinationID, client->first))
00411 {
00412 result &= client->second->Send(packet) > 0;
00413 }
00414 }
00415
00416 if(result == false && mDebugMessagesFlag)
00417 {
00418 Mutex::ScopedLock plock(&mDebugMessagesMutex);
00419 std::cout << "[JUDP-" << mComponentID.ToString() << "] - Failed to Send Unicast Message to " << header.mDestinationID.ToString() << "\n";
00420 }
00421 }
00422 else
00423 {
00424
00425 std::map<Address, CxUtils::UdpClient*>::const_iterator client;
00426 Mutex::ScopedLock lock(&mClientsMutex);
00427
00428 client = mClients.find(header.mDestinationID);
00429 if(client != mClients.end())
00430 {
00431 result = client->second->Send(packet) > 0;
00432 }
00433 if(result == false && mDebugMessagesFlag)
00434 {
00435 Mutex::ScopedLock plock(&mDebugMessagesMutex);
00436 std::cout << "[JUDP-" << mComponentID.ToString() << "] - Failed to Send Unicast Message to " << header.mDestinationID.ToString() << "\n";
00437 }
00438 }
00439
00440 return result;
00441 }
00442
00443
00460 bool JUDP::SerializeMessage(const Message* message,
00461 Packet::List& stream,
00462 Header::List& streamHeaders,
00463 const UShort startingSequenceNumber,
00464 const int broadcastFlags) const
00465 {
00466 Packet packet;
00467 Header header;
00468
00469
00470 packet.Reserve(mMaxPayloadSize + OverheadSizeBytes);
00471
00472
00473 stream.clear();
00474 streamHeaders.clear();
00475
00476
00477 if(message->IsLargeDataSet(mMaxPayloadSize))
00478 {
00479 return message->WriteLargeDataSet(stream,
00480 streamHeaders,
00481 mMaxPayloadSize,
00482 &mTransportHeader,
00483 startingSequenceNumber) > 0;
00484 }
00485
00486 else if(message->Write(packet, header, &mTransportHeader, true, startingSequenceNumber) > 0)
00487 {
00488 stream.push_back(packet);
00489 streamHeaders.push_back(header);
00490 return true;
00491 }
00492 return false;
00493 }
00494
00495
00503 Address::List JUDP::GetManualConnections() const
00504 {
00505 Address::List copy;
00506 {
00507 Mutex::ScopedLock lock(&mClientsMutex);
00508 std::map<Address, bool>::const_iterator p;
00509 for(p = mPermanentConnections.begin();
00510 p != mPermanentConnections.end();
00511 p++)
00512 {
00513 copy.push_back(p->first);
00514 }
00515 }
00516 return copy;
00517 }
00518
00519
00531 void JUDP::SetMaxPacketSize(const unsigned int maxSizeBytes)
00532 {
00533 if(IsInitialized() == false && maxSizeBytes > (OverheadSizeBytes + USHORT_SIZE))
00534 {
00535 mMaxPayloadSize = maxSizeBytes - OverheadSizeBytes - USHORT_SIZE;
00536 }
00537 }
00538
00539
00550 void JUDP::SetDisconnectTimeMs(const unsigned int timeMs)
00551 {
00552 if(IsInitialized() == false)
00553 {
00554 mDisconnectTimeMs = timeMs;
00555 }
00556 }
00557
00558
00568 void JUDP::SetTTL(const unsigned char ttl)
00569 {
00570 if(IsInitialized() == false && ttl > 0)
00571 {
00572 mTimeToLive = ttl;
00573 }
00574 }
00575
00576
00586 void JUDP::SetMulticastIP(const CxUtils::IP4Address& multicastIP)
00587 {
00588 if(IsInitialized() == false)
00589 {
00590 mMulticastIP = multicastIP;
00591 }
00592 }
00593
00594
00603 void JUDP::EnableBroadcasting(const bool enabled)
00604 {
00605 mUseBroadcastingFlag = enabled;
00606 }
00607
00608
00617 void JUDP::SetInterfaceIP(const CxUtils::IP4Address& networkIP)
00618 {
00619 if(IsInitialized() == false)
00620 {
00621 mHostIP = networkIP;
00622 }
00623 }
00624
00625
00638 bool JUDP::AddConnection(const CxUtils::IP4Address& networkIP,
00639 const Address& jausID,
00640 const unsigned short port)
00641 {
00642 bool result = false;
00643
00644 if(jausID.IsValid() && !jausID.IsBroadcast())
00645 {
00646 Mutex::ScopedLock lock(&mClientsMutex);
00647 std::map<Address, CxUtils::UdpClient*>::iterator citer;
00648 citer = mClients.find(jausID);
00649 if(citer == mClients.end())
00650 {
00651 CxUtils::UdpClient* udp = mMulticast.CreateNewDestination(networkIP, port);
00652 if(udp)
00653 {
00654 mClients[jausID] = udp;
00655 mUpdateTimes[jausID] = Time::GetUtcTimeMs();
00656 mPermanentConnections[jausID] = true;
00657 udp = NULL;
00658 result = true;
00659 }
00660 else
00661 {
00662 if(mDebugMessagesFlag)
00663 {
00664 Mutex::ScopedLock lock(&mDebugMessagesMutex);
00665 std::cout << "[" << GetServiceID().ToString() << "-" << mComponentID.ToString() << "] - New Connection Made to " << jausID << " at IP:" << networkIP.mString << std::endl;
00666 }
00667 }
00668 }
00669 }
00670
00671 return result;
00672 }
00673
00674
00684 bool JUDP::CloseConnection(const Address& jausID)
00685 {
00686 bool result = false;
00687 std::map<Address, Time::Stamp>::iterator times;
00688 std::map<Address, CxUtils::UdpClient*>::iterator client;
00689
00690
00691
00692 Mutex::ScopedLock lock(&mClientsMutex);
00693 times = mUpdateTimes.find(jausID);
00694 client = mClients.find(jausID);
00695 if(client != mClients.end())
00696 {
00697 if(client != mClients.end())
00698 {
00699 delete client->second;
00700 mClients.erase(client);
00701 }
00702 if(times != mUpdateTimes.end())
00703 {
00704 mUpdateTimes.erase(times);
00705 }
00706 if(mPermanentConnections.find(jausID) != mPermanentConnections.end())
00707 {
00708 mPermanentConnections.erase(mPermanentConnections.find(jausID));
00709 }
00710 return true;
00711 }
00712 return false;
00713 }
00714
00715
00729 bool JUDP::ListenForSubsystems(Address::Set& discovered,
00730 const unsigned int waitTimeMs,
00731 const CxUtils::IP4Address& multicastGroup)
00732 {
00733 discovered.clear();
00734
00735 CxUtils::UdpSharedServer listen;
00736 if(listen.InitializeSocket(JUDP::Port, multicastGroup))
00737 {
00738 Packet packet;
00739 Header header;
00740 UShort messageCode;
00741 CxUtils::IP4Address ip;
00742 Time::Stamp start = Time::GetUtcTimeMs();
00743 while(Time::GetUtcTimeMs() - start < waitTimeMs)
00744 {
00745 if(listen.Recv(packet, ip))
00746 {
00747 packet.SetReadPos(1);
00748 if(header.Read(packet) &&
00749 packet.Read(messageCode))
00750 {
00751 discovered.insert(header.mSourceID);
00752 }
00753 }
00754 }
00755 return true;
00756 }
00757
00758 return false;
00759 }
00760
00761
00770 void JUDP::CheckServiceStatus(const unsigned int timeSinceLastCheckMs)
00771 {
00772 if(mDisconnectTimeMs > 0)
00773 {
00774 std::map<Address, Time::Stamp>::iterator times;
00775 std::map<Address, CxUtils::UdpClient*>::iterator client;
00776
00777
00778
00779 Mutex::ScopedLock lock(&mClientsMutex);
00780 Time::Stamp currentTimeMs = Time::GetUtcTimeMs();
00781 times = mUpdateTimes.begin();
00782 while(times != mUpdateTimes.end())
00783 {
00784 std::map<Address, bool>::iterator perm = mPermanentConnections.find(times->first);
00785 if(perm == mPermanentConnections.end() &&
00786 currentTimeMs - times->second >= mDisconnectTimeMs)
00787 {
00788 client = mClients.find(times->first);
00789 if(client != mClients.end())
00790 {
00791 delete client->second;
00792 mClients.erase(client);
00793 }
00794 mUpdateTimes.erase(times);
00795 times = mUpdateTimes.begin();
00796 }
00797 else
00798 {
00799 times++;
00800 }
00801 }
00802 }
00803
00804 }
00805
00806
00817 void JUDP::ProcessUDP(const Packet& packet,
00818 const CxUtils::IP4Address& ipAddress,
00819 const unsigned short sourcePort)
00820 {
00821
00822 if(packet.Length() > Header::MinSize + mTransportHeader.Length() && *packet.Ptr() == Version)
00823 {
00824
00825 Packet::Wrapper stripped((unsigned char *)(packet.Ptr() + 1), packet.Length() - 1);
00826 Header header;
00827 stripped->SetReadPos(0);
00828 std::string errorMessage;
00829 if(header.Read(*stripped.GetData()) && header.IsValid(&errorMessage))
00830 {
00831
00832
00833
00834 if(header.mSourceID == mComponentID)
00835 {
00836 if(sourcePort != mMulticast.GetSourcePort() && sourcePort != Port)
00837 {
00838 Mutex::ScopedLock lock(&mDebugMessagesMutex);
00839 std::cout << "=============================================================\n";
00840 std::cout << "=============================================================\n";
00841 std::cout << "=============================================================\n";
00842 std::cout << "=============================================================\n";
00843 std::cout << "JAUS ID DUPLICATED BY " << ipAddress.mString << std::endl;
00844 std::cout << "=============================================================\n";
00845 std::cout << "=============================================================\n";
00846 std::cout << "=============================================================\n";
00847 std::cout << "=============================================================\n";
00848 }
00849 return;
00850 }
00851
00852
00853
00854 UpdateClientConnections(header.mSourceID,
00855 ipAddress,
00856 sourcePort);
00857
00858
00859 ProcessPacket(*stripped.GetData(), header);
00860
00861
00862
00863
00864
00865
00866
00867
00868
00869
00870
00871
00872
00873
00874
00875
00876
00877
00878
00879
00880
00881
00882
00883
00884
00885
00886
00887
00888
00889
00890
00891 if(header.mSize < stripped->Size())
00892 {
00893
00894 unsigned int offset = header.mSize;
00895 while(offset < stripped->Size())
00896 {
00897 if(mDebugMessagesFlag)
00898 {
00899 Mutex::ScopedLock lock(&mDebugMessagesMutex);
00900 std::cout << "[" << GetServiceID().ToString() << "-" << mComponentID.ToString() << "] - Received Multi-Message Packet\n";
00901 }
00902 Packet::Wrapper child((unsigned char *)(stripped->Ptr()), stripped->Size() + offset);
00903 if(header.Read(*child.GetData()) && header.IsValid(&errorMessage))
00904 {
00905 ProcessPacket(*child.GetData(), header);
00906 offset += header.mSize;
00907 }
00908 else
00909 {
00910 break;
00911 }
00912 }
00913 }
00914 }
00915 else if(mDebugMessagesFlag)
00916 {
00917 Mutex::ScopedLock lock(&mDebugMessagesMutex);
00918 std::cout << "[" << GetServiceID().ToString() << "-" << mComponentID.ToString() << "] - Bad JAUS Header [" << errorMessage << "]\n";
00919 }
00920 }
00921 }
00922
00923
00935 int JUDP::UpdateClientConnections(const Address& id,
00936 const CxUtils::IP4Address& ipAddress,
00937 const unsigned short sourcePort)
00938 {
00939 int result = 0;
00940
00941 std::map<Address, Time::Stamp>::iterator times;
00942 std::map<Address, CxUtils::UdpClient*>::iterator client;
00943 mClientsMutex.Lock();
00944 client = mClients.find(id);
00945 if(client == mClients.end())
00946 {
00947
00948
00949
00950 CxUtils::UdpClient* udp = mMulticast.CreateNewDestination(ipAddress, sourcePort);
00951 if(udp)
00952 {
00953 mClients[id] = udp;
00954 mUpdateTimes[id] = Time::GetUtcTimeMs();
00955 udp = NULL;
00956 result = 1;
00957 }
00958 else
00959 {
00960 if(mDebugMessagesFlag)
00961 {
00962 Mutex::ScopedLock lock(&mDebugMessagesMutex);
00963 std::cout << "[" << GetServiceID().ToString() << "-" << mComponentID.ToString() << "] - New Connection Made to " << id.ToString() << " at IP:" << ipAddress.mString << std::endl;
00964 }
00965 }
00966 }
00967 else
00968 {
00969 times = mUpdateTimes.find(id);
00970
00971
00972
00973 if(
00974 (ipAddress != client->second->GetConnectionAddress() || sourcePort != client->second->GetSourcePort()))
00975 {
00976
00977 delete client->second;
00978 client->second = mMulticast.CreateNewDestination(ipAddress, sourcePort);
00979 }
00980
00981
00982 times->second = Time::GetUtcTimeMs();
00983 }
00984 mClientsMutex.Unlock();
00985 return result;
00986 }
00987
00988
00994 void JUDP::ReceiveThread(void* args)
00995 {
00996 JUDP* transport = (JUDP*)args;
00997 Packet udpMessage;
00998 CxUtils::IP4Address sourceAddress;
00999 unsigned short sourcePort = 0;
01000 long int timeoutMs = 100;
01001
01002 #ifdef WIN32
01003 int loopCounter = 0;
01004 #endif
01005
01006 udpMessage.Reserve(5000);
01007 Thread* thread = NULL;
01008 bool primary = false;
01009
01010 if(transport->mPrimaryThreadCreatedFlag == false)
01011 {
01012 primary = true;
01013 transport->mPrimaryThreadCreatedFlag = true;
01014 thread = &transport->mPrimaryThread;
01015 }
01016 else
01017 {
01018 thread = &transport->mSecondaryThread;
01019 }
01020
01021 while(transport &&
01022 thread->QuitThreadFlag() == false &&
01023 transport->mShutdownServiceFlag == false)
01024 {
01025 udpMessage.Clear(false);
01026 sourcePort = 0;
01027 if(primary)
01028 {
01029 if(transport->mInput.Recv(udpMessage,
01030 5000,
01031 timeoutMs,
01032 &sourceAddress,
01033 &sourcePort) > 0)
01034 {
01035 transport->ProcessUDP(udpMessage, sourceAddress, sourcePort);
01036 }
01037 }
01038 else
01039 {
01040 if(transport->mMulticast.GetSourcePort() > 0 &&
01041 transport->mMulticast.Recv(udpMessage,
01042 5000,
01043 timeoutMs,
01044 &sourceAddress,
01045 &sourcePort) > 0)
01046 {
01047 transport->ProcessUDP(udpMessage, sourceAddress, sourcePort);
01048 }
01049 }
01050
01051 if(transport->mShutdownServiceFlag)
01052 {
01053 break;
01054 }
01055
01056 if(transport->mDelayTimeMs == 0)
01057 {
01058 #ifdef WIN32
01059
01060 if( loopCounter++ == 250)
01061 {
01062 loopCounter = 0;
01063 CxUtils::SleepMs(1);
01064 }
01065 #else
01066 usleep(500);
01067 #endif
01068 }
01069 else
01070 {
01071 CxUtils::SleepMs(transport->mDelayTimeMs);
01072 }
01073 }
01074
01075
01076 }
01077
01078
01079