Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049 #include "cxutils/networking/udpsharedserver.h"
00050 #include "cxutils/timer.h"
00051 #include "cxutils/time.h"
00052 #include <iostream>
00053
00054 using namespace std;
00055 using namespace CxUtils;
00056
00057
00063 UdpSharedServer::Info::Info()
00064 {
00065 mNetworkInterface = -1;
00066 }
00067
00068
00074 UdpSharedServer::Info::Info(const UdpSharedServer::Info& info)
00075 {
00076 *this = info;
00077 }
00078
00079
00085 UdpSharedServer::Info::~Info()
00086 {
00087 }
00088
00089
00095 UdpSharedServer::Info& UdpSharedServer::Info::operator=(const UdpSharedServer::Info& info)
00096 {
00097 mNetworkInterface = info.mNetworkInterface;
00098 mPort = info.mPort;
00099 mMulticastGroup = info.mMulticastGroup;
00100 return *this;
00101 }
00102
00103
00109 UdpSharedServer::Callback::Data::Data()
00110 {
00111 mpCallback = NULL;
00112 mpCallbackArgs = NULL;
00113 }
00114
00115
00125 UdpSharedServer::Callback::Data::Data(void (*callback)(const Packet& packet,
00126 const IP4Address& ipAddress,
00127 void* args),
00128 void* cbArgs)
00129 {
00130 mpCallback = callback;
00131 mpCallbackArgs = cbArgs;
00132 }
00133
00134
00140 UdpSharedServer::Callback::Data::Data(const UdpSharedServer::Callback::Data& data)
00141 {
00142 *this = data;
00143 }
00144
00145
00151 UdpSharedServer::Callback::Data::~Data()
00152 {
00153 }
00154
00155
00164 void UdpSharedServer::Callback::Data::Run(const Packet& packet, const IP4Address& ipAddress)
00165 {
00166 if(mpCallback)
00167 {
00168 mpCallback(packet, ipAddress, mpCallbackArgs);
00169 }
00170 }
00171
00172
00178 UdpSharedServer::Callback::Data& UdpSharedServer::Callback::Data::operator=(const UdpSharedServer::Callback::Data& data)
00179 {
00180 mpCallback = data.mpCallback;
00181 mpCallbackArgs = data.mpCallbackArgs;
00182 return *this;
00183 }
00184
00185
00191 UdpSharedServer::UdpSharedServer() : mPrimaryFlag(false),
00192 mConnectedFlag(false),
00193 mUpdateDelayMs(0),
00194 mpSocket(NULL),
00195 mpMessageServer(NULL),
00196 mpMessageClient(NULL),
00197 mQuitServerFlag(false)
00198 {
00199 mPacketQueue.Reserve(15);
00200 mMessageAddressQueue.Reserve(15);
00201 }
00202
00203
00209 UdpSharedServer::~UdpSharedServer()
00210 {
00211 Shutdown();
00212 }
00213
00214
00235 int UdpSharedServer::InitializeSocket(const unsigned short port,
00236 const IP4Address& multicastGroup,
00237 const int netInterface)
00238 {
00239 int result = 0;
00240
00241 Shutdown();
00242 mConnectedFlag = false;
00243 mQuitServerFlag = false;
00244
00245 mSocketInfo.mPort = port;
00246 mSocketInfo.mNetworkInterface = netInterface;
00247 mSocketInfo.mMulticastGroup = multicastGroup;
00248
00249
00250 if(mUdpStatusThread.CreateThread(UdpSharedServer::UdpStatusThread, this))
00251 {
00252 mUdpStatusThread.SetThreadName("UdpSharedServer");
00253 result = 1;
00254 }
00255
00256
00257 if(result == 0)
00258 {
00259 Shutdown();
00260 }
00261
00262 return result;
00263 }
00264
00265
00271 void UdpSharedServer::Shutdown()
00272 {
00273 mQuitServerFlag = true;
00274 mUdpStatusThread.StopThread();
00275 mUdpSocketThread.StopThread();
00276
00277 if(mpMessageServer)
00278 {
00279 mpMessageServer->Shutdown();
00280 delete mpMessageServer;
00281 mpMessageServer = NULL;
00282 }
00283
00284 if(mpMessageClient)
00285 {
00286 mpMessageClient->Shutdown();
00287 delete mpMessageClient;
00288 mpMessageClient = NULL;
00289 }
00290
00291 mPrimaryFlag = false;
00292 mSocketInfo.mMulticastGroup.SetAddress("0.0.0.0");
00293 mSocketInfo.mNetworkInterface = -1;
00294 mSocketInfo.mPort = 0;
00295
00296 mQuitServerFlag = false;
00297
00298 if(mpSocket)
00299 {
00300 mpSocket->Shutdown();
00301 delete mpSocket;
00302 mpSocket = NULL;
00303 }
00304 mConnectedFlag = false;
00305 mPacketQueue.Clear();
00306 mMessageAddressQueue.Clear();
00307 }
00308
00309
00315 int UdpSharedServer::GetNetworkInterface() const
00316 {
00317 return mSocketInfo.mNetworkInterface;
00318 }
00319
00320
00331 bool UdpSharedServer::RegisterCallback(Callback* callback)
00332 {
00333 bool result = false;
00334 std::set<Callback*>::iterator cb;
00335 mCallbacksMutex.Lock();
00336 cb = mCallbacks.find(callback);
00337 if(cb == mCallbacks.end())
00338 {
00339 result = true;
00340 mCallbacks.insert(callback);
00341 }
00342 mCallbacksMutex.Unlock();
00343 return result;
00344 }
00345
00346
00357 bool UdpSharedServer::RegisterCallback(const Callback::Data& callback)
00358 {
00359 bool result = false;
00360 std::vector<Callback::Data>::iterator cb;
00361 mCallbacksMutex.Lock();
00362 for(cb = mFunctionCallbacks.begin();
00363 cb != mFunctionCallbacks.end();
00364 cb++)
00365 {
00366 if(cb->mpCallback == callback.mpCallback &&
00367 cb->mpCallbackArgs == callback.mpCallbackArgs)
00368 {
00369 break;
00370 }
00371 }
00372 if(cb == mFunctionCallbacks.end())
00373 {
00374 result = true;
00375 mFunctionCallbacks.push_back(callback);
00376 }
00377 mCallbacksMutex.Unlock();
00378 return result;
00379 }
00380
00381
00389 void UdpSharedServer::RemoveCallback(Callback* callback)
00390 {
00391 std::set<Callback*>::iterator cb;
00392 mCallbacksMutex.Lock();
00393 cb = mCallbacks.find(callback);
00394 if(cb != mCallbacks.end())
00395 {
00396 mCallbacks.erase(callback);
00397 }
00398 mCallbacksMutex.Unlock();
00399 }
00400
00401
00409 void UdpSharedServer::RemoveCallback(const Callback::Data& callback)
00410 {
00411 std::vector<Callback::Data>::iterator cb;
00412 mCallbacksMutex.Lock();
00413 for(cb = mFunctionCallbacks.begin();
00414 cb != mFunctionCallbacks.end();
00415 cb++)
00416 {
00417 if(cb->mpCallback == callback.mpCallback &&
00418 cb->mpCallbackArgs == callback.mpCallbackArgs)
00419 {
00420 mFunctionCallbacks.erase(cb);
00421 break;
00422 }
00423 }
00424 mCallbacksMutex.Unlock();
00425 }
00426
00427
00433 bool UdpSharedServer::IsValid() const
00434 {
00435 if(mUdpStatusThread.IsThreadActive())
00436 return true;
00437
00438 return false;
00439 }
00440
00441
00448 bool UdpSharedServer::IsPrimary() const
00449 {
00450 return mPrimaryFlag;
00451 }
00452
00453
00466 bool UdpSharedServer::Recv(Packet& packet, IP4Address& ipAddress)
00467 {
00468 bool result = false;
00469
00470 mQueueMutex.Lock();
00471 if(mPacketQueue.Size() > 0)
00472 {
00473 packet = *mPacketQueue.begin();
00474 ipAddress = *mMessageAddressQueue.begin();
00475 mPacketQueue.PopFront(NULL);
00476 mMessageAddressQueue.PopFront(NULL);
00477 result = true;
00478 }
00479 mQueueMutex.Unlock();
00480
00481 return result;
00482 }
00483
00484
00492 void UdpSharedServer::SetUpdateDelayMs(const unsigned int delayTimeMs)
00493 {
00494 Mutex::ScopedLock lock(&mDelayMutex);
00495 mUpdateDelayMs = delayTimeMs;
00496 }
00497
00498
00507 void UdpSharedServer::UdpStatusThread(void* udpSharedServer)
00508 {
00509 UdpSharedServer* server = (UdpSharedServer*)udpSharedServer;
00510 char messageServerName[512];
00511
00512 sprintf(messageServerName, "%d", server->mSocketInfo.mPort);
00513
00514 while(server &&
00515 (server->mUdpStatusThread.QuitThreadFlag() == false ||
00516 server->mQuitServerFlag == false))
00517 {
00518
00519 if(server->mPrimaryFlag == false && server->mpSocket == NULL && MessageServer::IsServerPresent(messageServerName) == false)
00520 {
00521
00522 server->mpSocket = new UdpServer();
00523 server->mpSocket->SetNetworkInterface(server->mSocketInfo.mNetworkInterface);
00524
00525 if(server->mSocketInfo.mMulticastGroup.mString >= "224.0.0.0" &&
00526 server->mSocketInfo.mMulticastGroup.mString <= "239.255.255.255")
00527 {
00528 server->mpSocket->InitializeMulticastSocket(server->mSocketInfo.mPort,
00529 server->mSocketInfo.mMulticastGroup);
00530 }
00531 else
00532 {
00533 server->mpSocket->InitializeSocket(server->mSocketInfo.mPort);
00534 }
00535
00536 if(server->mpSocket->IsValid())
00537 {
00538
00539 server->mpMessageServer = new MessageServer();
00540
00541 if(server->mpMessageServer->Initialize(messageServerName, MessageServer::DefaultMaxClients))
00542 {
00543 server->mPrimaryFlag = true;
00544
00545 if(server->mpMessageClient)
00546 {
00547 delete server->mpMessageClient;
00548 server->mpMessageClient = NULL;
00549
00550 }
00551
00552 server->mUdpSocketThread.CreateThread(UdpSharedServer::UdpSocketThread, server);
00553
00554 server->mUdpSocketThread.SetThreadName("UdpSocketServer");
00555 }
00556 }
00557
00558 if(server->mPrimaryFlag == false)
00559 {
00560
00561 if(server->mpSocket)
00562 {
00563 delete server->mpSocket;
00564 }
00565 if(server->mpMessageServer)
00566 {
00567 delete server->mpMessageServer;
00568 }
00569 server->mpSocket = NULL;
00570 server->mpMessageServer = NULL;
00571 }
00572 }
00573
00574
00575 if(server->mPrimaryFlag == false && server->mpMessageClient == NULL)
00576 {
00577 server->mpMessageClient = new MessageClient();
00578 server->mpMessageClient->SetUpdateDelayMs(server->mUpdateDelayMs);
00579 if(server->mpMessageClient->Initialize(messageServerName, MessageClient::AnyID) == 0)
00580 {
00581 delete server->mpMessageClient;
00582 server->mpMessageClient = NULL;
00583 }
00584 else
00585 {
00586
00587 server->mpMessageClient->RegisterCallback(MessageClient::Callback::Function(UdpSharedServer::ProcessUdpMessage, server));
00588 }
00589 }
00590
00591 SleepMs(250);
00592 }
00593 }
00594
00595
00602 void UdpSharedServer::UdpSocketThread(void* udpSharedServer)
00603 {
00604 UdpSharedServer* server = (UdpSharedServer*)udpSharedServer;
00605 Packet udpMessage;
00606 IP4Address sourceAddress;
00607 unsigned short sourcePort = 0;
00608 #ifdef WIN32
00609 unsigned int loopCounter = 0;
00610 #endif
00611
00612 while(server &&
00613 (server->mUdpSocketThread.QuitThreadFlag() == false ||
00614 server->mQuitServerFlag == false))
00615 {
00616
00617 udpMessage.Clear(false);
00618 sourcePort = 0;
00619 if(server->mpSocket->Recv(udpMessage, 5000, 100, &sourceAddress, &sourcePort) > 0)
00620 {
00621
00622 udpMessage.Write(sourceAddress.mData, 4, udpMessage.Length());
00623 udpMessage.Write(sourcePort, udpMessage.Length());
00624
00625 server->mpMessageServer->SendToAllClients(udpMessage);
00626
00627
00628 UdpSharedServer::ProcessUdpMessage(udpMessage, server);
00629 }
00630 server->mDelayMutex.Lock();
00631 if(server->mUpdateDelayMs == 0)
00632 {
00633 #ifdef WIN32
00634
00635 if( loopCounter++ == 250)
00636 {
00637 loopCounter = 0;
00638 SleepMs(1);
00639 }
00640 #else
00641 usleep(500);
00642 #endif
00643 }
00644 else
00645 {
00646 CxUtils::SleepMs(server->mUpdateDelayMs);
00647 }
00648 server->mDelayMutex.Unlock();
00649 }
00650 server->mConnectedFlag = false;
00651 }
00652
00653
00667 void UdpSharedServer::ProcessUdpMessage(Packet& udpMessage, void* udpSharedServer)
00668 {
00669 UdpSharedServer* server = (UdpSharedServer*)udpSharedServer;
00670 IP4Address sourceAddress;
00671 unsigned short sourcePort = 0;
00672
00673 const unsigned char * ptr = udpMessage.Ptr() + udpMessage.Length() - 6;
00674 sourceAddress.SetAddress(*ptr, *(ptr + 1), *(ptr + 2), *(ptr + 3));
00675 udpMessage.Read(sourcePort, udpMessage.Length() - 2);
00676 udpMessage.Write((unsigned char)0, (unsigned int)udpMessage.Length() - 6);
00677 udpMessage.SetLength(udpMessage.Length() - 6);
00678
00679 server->mConnectedFlag = true;
00680
00681
00682 std::set<Callback*>::iterator cb;
00683 server->mCallbacksMutex.Lock();
00684 for(cb = server->mCallbacks.begin();
00685 cb != server->mCallbacks.end();
00686 cb++)
00687 {
00688 (*cb)->ProcessUDP(udpMessage, sourceAddress);
00689 (*cb)->ProcessUDP(udpMessage, sourceAddress, sourcePort);
00690 }
00691
00692 std::vector<Callback::Data>::iterator cbf;
00693 for(cbf = server->mFunctionCallbacks.begin();
00694 cbf != server->mFunctionCallbacks.end();
00695 cbf++)
00696 {
00697 cbf->Run(udpMessage, sourceAddress);
00698 }
00699
00700 server->mCallbacksMutex.Unlock();
00701
00702
00703 server->mQueueMutex.Lock();
00704 server->mPacketQueue.PushBack(udpMessage);
00705 server->mMessageAddressQueue.PushBack(sourceAddress);
00706 server->mQueueMutex.Unlock();
00707
00708 }
00709
00710