Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042 #include "jaus/core/transport/jtcpclient.h"
00043 #include "jaus/core/discovery/queryidentification.h"
00044 #include "jaus/core/transport/transportmanager.h"
00045 #include <tinyxml/tinyxml.h>
00046 #include <iostream>
00047 #include <iomanip>
00048
00049 using namespace JAUS;
00050
00051
00057 JTCPClient::JTCPClient()
00058 {
00059 mpTransportManager = new TransportManager();
00060 mpSocket = new CxUtils::TcpClient();
00061 mTransportHeader.Write(Version);
00062 mMaxPayloadSize = 1500 - JTCP::OverheadSizeBytes - USHORT_SIZE;
00063 mDisconnectTimeMs = 3600000;
00064 }
00065
00066
00072 JTCPClient::~JTCPClient()
00073 {
00074 JTCPClient::Shutdown();
00075 if(mpTransportManager)
00076 {
00077 mpTransportManager->Shutdown();
00078 delete mpTransportManager;
00079 }
00080 }
00081
00082
00093 bool JTCPClient::LoadSettings(const std::string& filename)
00094 {
00095 TiXmlDocument xml;
00096
00097 if(xml.LoadFile(filename.c_str()) == false)
00098 {
00099 return false;
00100 }
00101 TiXmlHandle doc(&xml);
00102 TiXmlNode* node;
00103
00104 node = doc.FirstChild("JAUS").FirstChild("Transport").FirstChild("NetAddress").FirstChild().ToNode();
00105 if(node && node->Value())
00106 {
00107 mHostIP.SetAddress(node->Value());
00108 }
00109 node = doc.FirstChild("JAUS").FirstChild("Transport").FirstChild("Logger").FirstChild().ToNode();
00110 if(node && node->Value() && atoi(node->Value()) > 0)
00111 {
00112 EnableLogging(true);
00113 }
00114 else
00115 {
00116 EnableLogging(false);
00117 }
00118 node = doc.FirstChild("JAUS").FirstChild("Transport").FirstChild("MaxPacketSizeBytes").FirstChild().ToNode();
00119 if(node && node->Value())
00120 {
00121 mMaxPayloadSize = (unsigned int)atoi(node->Value());
00122 }
00123
00124 node = doc.FirstChild("JAUS").FirstChild("Transport").FirstChild("DisconnectTimeMs").FirstChild().ToNode();
00125 if(node && node->Value())
00126 {
00127 mDisconnectTimeMs = (Time::Stamp)atoi(node->Value());
00128 }
00129
00130 mpTransportManager->LoadSettings(filename);
00131
00132 return true;
00133 }
00134
00135
00146 bool JTCPClient::Initialize(const Address& componentID)
00147 {
00148 return Initialize(componentID, "127.0.0.1");
00149 }
00150
00151
00164 bool JTCPClient::Initialize(const Address& componentID,
00165 const IP4Address& host)
00166 {
00167 bool result = false;
00168
00169 JTCPClient::Shutdown();
00170
00171 if(mpTransportManager)
00172 {
00173 mpTransportManager->Initialize();
00174 }
00175
00176 mpSocket = new CxUtils::TcpClient();
00177
00178 if(componentID.IsValid() == false || componentID.IsBroadcast())
00179 {
00180 Mutex::ScopedLock lock(&mDebugMessagesMutex);
00181 std::cout << "JTCP- Invalid component ID [" << componentID.ToString() << "]\n";
00182 return false;
00183 }
00184 mComponentID = componentID;
00185 IP4Address::List hostnames;
00186 IP4Address::List::iterator eth0;
00187 CxUtils::Socket::GetHostAddresses(hostnames);
00188 if(hostnames.size() == 0)
00189 {
00190 Mutex::ScopedLock lock(&mDebugMessagesMutex);
00191 std::cout << "[JTCP-" << mComponentID.ToString() << "] - No Network Interface Found.\n";
00192 }
00193
00194
00195 for(eth0 = hostnames.begin();
00196 eth0 != hostnames.end();
00197 eth0++)
00198 {
00199
00200 if(mHostIP.mString == "0.0.0.0" ||
00201 mHostIP.mString == "127.0.0.1" ||
00202 mHostIP.mString.empty() ||
00203 mHostIP == *eth0)
00204 {
00205 mpSocket->SetNetworkInterface(*eth0);
00206 if(((CxUtils::TcpClient *)mpSocket)->InitializeSocket(host, Port, 0, 1000))
00207 {
00208
00209 SetComponentID(componentID);
00210 std::string threadName = std::string("JTCP ") + componentID.ToString();
00211 mRecvThread.SetThreadName(threadName);
00212 mRecvThread.CreateThread(ReceiveThread, (JTCP*)this);
00213 CxUtils::SleepMs(50);
00214
00215
00216
00217 JAUS::QueryIdentification queryIdentification(Address(Address::GlobalBroadcast, 255, 255),
00218 componentID);
00219 for(unsigned int i = 0; i < 3; i++)
00220 {
00221 Send(&queryIdentification);
00222 CxUtils::SleepMs(25);
00223 if(mpSocket->IsValid() == false || mReceivedValidDataFlag == true)
00224 {
00225 break;
00226 }
00227 }
00228
00229 if(mpSocket->IsValid())
00230 {
00231 result = true;
00232 CxUtils::SleepMs(100);
00233 }
00234 else
00235 {
00236 std::cout << "[JTCP-" << componentID.ToString() << "] - Component ID already in use.\n";
00237 }
00238 }
00239
00240 break;
00241 }
00242 }
00243
00244 if(result == false)
00245 {
00246 Shutdown();
00247 }
00248
00249 return result;
00250 }
00251
00252
00258 bool JTCPClient::IsInitialized() const
00259 {
00260 return mpSocket ? mpSocket->IsValid() : false;
00261 }
00262
00263
00269 void JTCPClient::Shutdown()
00270 {
00271 if(mpSocket)
00272 {
00273 mpSocket->Shutdown();
00274 }
00275 for(unsigned int i = 0; i < 500; i++)
00276 {
00277 if(mRecvThread.IsThreadActive() == false)
00278 {
00279 break;
00280 }
00281 CxUtils::SleepMs(1);
00282 }
00283
00284
00285 mRecvThread.StopThread();
00286 mReceivedValidDataFlag = false;
00287 JTCP::Shutdown();
00288 }
00289
00290
00302 void JTCPClient::SetMaxPacketSize(const unsigned int maxSizeBytes)
00303 {
00304 if(IsInitialized() == false && maxSizeBytes > (JTCP::OverheadSizeBytes + USHORT_SIZE))
00305 {
00306 mMaxPayloadSize = maxSizeBytes - OverheadSizeBytes - USHORT_SIZE;
00307 }
00308 }
00309
00310
00316 Address::List JTCPClient::GetConnections() const
00317 {
00318 Mutex::ScopedLock lock(&mDataMutex);
00319 Address::List results;
00320 std::map<Address, Time::Stamp>::const_iterator client;
00321 for(client = mUpdateTimes.begin();
00322 client != mUpdateTimes.end();
00323 client++)
00324 {
00325 results.push_back(client->first);
00326 }
00327 return results;
00328 }
00329
00330
00338 bool JTCPClient::HaveConnection(const Address& id) const
00339 {
00340 return HaveConnection(id, (unsigned int)mDisconnectTimeMs);
00341 }
00342
00343
00354 bool JTCPClient::HaveConnection(const Address& id,
00355 const unsigned int timeSinceUpdateMs) const
00356 {
00357 Mutex::ScopedLock lock(&mDataMutex);
00358 std::map<Address, Time::Stamp>::const_iterator client;
00359 client = mUpdateTimes.find(id);
00360 if(client != mUpdateTimes.end())
00361 {
00362 return true;
00363 }
00364 return false;
00365 }
00366
00367
00384 bool JTCPClient::SendPacket(const Packet& packet,
00385 const Header& header,
00386 const int broadcastFlags) const
00387 {
00388 bool result = false;
00389
00390 if(mpSocket && mpSocket->Send(packet))
00391 {
00392 result = true;
00393 }
00394
00395 return result;
00396 }
00397
00398
00415 bool JTCPClient::SerializeMessage(const Message* message,
00416 Packet::List& stream,
00417 Header::List& streamHeaders,
00418 const UShort startingSequenceNumber,
00419 const int broadcastFlags) const
00420 {
00421 Packet packet;
00422 Header header;
00423
00424
00425 packet.Reserve(JAUS::Header::MaxPacketSize + OverheadSizeBytes);
00426
00427
00428 stream.clear();
00429 streamHeaders.clear();
00430
00431
00432 if(message->IsLargeDataSet(mMaxPayloadSize))
00433 {
00434 return message->WriteLargeDataSet(stream,
00435 streamHeaders,
00436 mMaxPayloadSize,
00437 &mTransportHeader,
00438 startingSequenceNumber) > 0;
00439 }
00440
00441 else if(message->Write(packet, header, &mTransportHeader, true, startingSequenceNumber, (Byte)broadcastFlags) > 0)
00442 {
00443 stream.push_back(packet);
00444 streamHeaders.push_back(header);
00445 return true;
00446 }
00447 return false;
00448 }
00449
00450
00459 void JTCPClient::SetInterfaceIP(const IP4Address& networkIP)
00460 {
00461 if(IsInitialized() == false)
00462 {
00463 mHostIP = networkIP;
00464 }
00465 }
00466
00467
00473 void JTCPClient::EnableDebugMessages(const bool on)
00474 {
00475 if(mpTransportManager)
00476 {
00477 mpTransportManager->EnableDebugMessages(on);
00478 }
00479 Transport::EnableDebugMessages(on);
00480 }
00481
00482
00497 bool JTCPClient::AddConnection(const IP4Address& networkIP,
00498 const Address& jausID)
00499 {
00500 return mpTransportManager->AddConnection(networkIP, jausID);
00501 }
00502
00503
00512 void JTCPClient::CheckServiceStatus(const unsigned int timeSinceLastCheckMs)
00513 {
00514 if(mpTransportManager)
00515 {
00516 mpTransportManager->CheckServiceStatus(timeSinceLastCheckMs);
00517 }
00518
00519 if(mRecvThread.IsThreadActive() == false)
00520 {
00521
00522 Initialize(mComponentID, "127.0.0.1");
00523 }
00524
00525 Mutex::ScopedLock lock(&mDataMutex);
00526 std::map<Address, Time::Stamp>::iterator utime;
00527 utime = mUpdateTimes.begin();
00528 while(utime != mUpdateTimes.end() && mDisconnectTimeMs > 0)
00529 {
00530 CxUtils::Time::Stamp diff = Time::GetUtcTimeMs() - utime->second;
00531 if(diff >= mDisconnectTimeMs)
00532 {
00533 mUpdateTimes.erase(utime);
00534 utime = mUpdateTimes.begin();
00535 continue;
00536 }
00537 utime++;
00538 }
00539 }
00540
00541
00555 void JTCPClient::ProcessPacket(Packet& packet,
00556 Header& header,
00557 const IP4Address& ipAddress,
00558 const unsigned short sourcePort)
00559 {
00560 {
00561 Mutex::ScopedLock lock(&mDataMutex);
00562 mUpdateTimes[header.mSourceID] = Time::GetUtcTimeMs();
00563 }
00564
00565 unsigned int overhead = BYTE_SIZE;
00566 Packet::Wrapper payloadPacket(packet.Ptr() + overhead, packet.Length() - overhead);
00567 Transport::ProcessPacket(*payloadPacket.GetData(), header);
00568 }
00569
00570
00571