Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041 #include "cxutils/ipc/messageserver.h"
00042 #include <iostream>
00043 #include <string.h>
00044 #include <sstream>
00045 #include <cstdio>
00046
00047 using namespace std;
00048 using namespace CxUtils;
00049
00050 unsigned int gInactiveThreshMs = 250;
00051
00052
00053
00059 MessageServer::Registry::Registry()
00060 {
00061 mServerFlag = false;
00062 }
00063
00064
00070 MessageServer::Registry::~Registry()
00071 {
00072 CloseRegistry();
00073 }
00074
00075
00088 int MessageServer::Registry::OpenRegistry(const ID registryID)
00089 {
00090 std::ostringstream input;
00091 input << registryID;
00092 return OpenRegistry(input.str());
00093 }
00094
00095
00108 int MessageServer::Registry::OpenRegistry(const std::string& registryID)
00109 {
00110 int result = 0;
00111 CloseRegistry();
00112
00113 if(mRegistry.OpenMappedMemory(registryID))
00114 {
00115 mRegistry.Lock();
00116
00117 if(memcmp(mRegistry->Ptr(), "REGISTRY", 8) == 0)
00118 {
00119 result = 1;
00120 }
00121 mRegistry.Unlock();
00122 }
00123 if(result == 0)
00124 {
00125 mRegistry.CloseMappedMemory();
00126 }
00127 return result;
00128 }
00129
00130
00138 int MessageServer::Registry::CloseRegistry()
00139 {
00140 List registry;
00141 if(mRegistry.IsOpen())
00142 {
00143 std::string name;
00144 name = mRegistry.GetMappedName();
00145 mRegistry.Lock();
00146
00147
00148 unsigned int count = 0;
00149
00150 mRegistry->SetReadPos(CountPos);
00151 mRegistry->Read(count);
00152 if(count == 0 && mServerFlag == true)
00153 {
00154
00155
00156 mRegistry->SetWritePos(0);
00157 mRegistry->Write( std::string("REGISTRY") );
00158 mRegistry->Write( ( (unsigned int)0 ) );
00159 mRegistry->Write( ( (unsigned int)0 ) );
00160 }
00161
00162 mRegistry.Unlock();
00163 mRegistry.CloseMappedMemory();
00164 }
00165
00166 return 1;
00167 }
00168
00169
00179 int MessageServer::Registry::Register(const ID id)
00180 {
00181 int result = 0;
00182 if(mRegistry.IsOpen())
00183 {
00184 unsigned int count = 0;
00185 ID temp = 0;
00186 mRegistry.Lock();
00187
00188
00189 mRegistry->SetReadPos(CountPos);
00190 mRegistry->Read(count);
00191 for(unsigned int i = 0; i < count; i++)
00192 {
00193 mRegistry->Read(temp);
00194 if(temp == id)
00195 {
00196 result = 1;
00197 break;
00198 }
00199 }
00200 if(result == 0)
00201 {
00202 mRegistry->SetWritePos(HeaderSize + count*sizeof(ID));
00203 if(mRegistry->Write(id))
00204 {
00205 count++;
00206 mRegistry->SetWritePos(CountPos);
00207 mRegistry->Write(count);
00208 result = 1;
00209 }
00210 }
00211 mRegistry.Unlock();
00212 }
00213
00214 return result;
00215 }
00216
00217
00227 int MessageServer::Registry::Unregister(const ID id)
00228 {
00229 int result = 0;
00230 if(mRegistry.IsOpen())
00231 {
00232 unsigned int count = 0;
00233 ID temp = 0;
00234 mRegistry.Lock();
00235
00236
00237 mRegistry->SetReadPos(CountPos);
00238 mRegistry->Read(count);
00239 for(unsigned int i = 0; i < count; i++)
00240 {
00241 mRegistry->Read(temp);
00242 if(temp == id)
00243 {
00244 result = 1;
00245 count--;
00246 memmove(mRegistry->Ptr() + HeaderSize + i*sizeof(ID),
00247 mRegistry->Ptr() + HeaderSize + (i+1)*sizeof(ID),
00248 count*sizeof(ID));
00249 break;
00250 }
00251 }
00252 if(result == 1)
00253 {
00254 mRegistry->SetWritePos(CountPos);
00255 mRegistry->Write(count);
00256 }
00257 mRegistry.Unlock();
00258 }
00259 return result;
00260 }
00261
00262
00272 int MessageServer::Registry::GetRegistry(MessageServer::List& registry)
00273 {
00274 int result = 0;
00275 registry.clear();
00276
00277 if(mRegistry.IsOpen())
00278 {
00279 result = 1;
00280 unsigned int count = 0;
00281 ID temp = 0;
00282 mRegistry.Lock();
00283
00284
00285 mRegistry->SetReadPos(CountPos);
00286 mRegistry->Read(count);
00287 for(unsigned int i = 0; i < count; i++)
00288 {
00289 mRegistry->Read(temp);
00290 registry.insert(temp);
00291 }
00292
00293 mRegistry.Unlock();
00294 }
00295 return result;
00296 }
00297
00298
00308 bool MessageServer::Registry::IsRegistered(const ID id) const
00309 {
00310 bool result = false;
00311 if(mRegistry.IsOpen())
00312 {
00313 unsigned int count = 0;
00314 ID temp = 0;
00315 mRegistry.Lock();
00316
00317
00318 mRegistry->SetReadPos(CountPos);
00319 mRegistry->Read(count);
00320 for(unsigned int i = 0; i < count; i++)
00321 {
00322 mRegistry->Read(temp);
00323 if(temp == id)
00324 {
00325 result = true;
00326 break;
00327 }
00328 }
00329 mRegistry.Unlock();
00330 }
00331 return result;
00332 }
00333
00334
00348 bool MessageServer::Registry::IsActive(const unsigned int thresholdMs) const
00349 {
00350 bool result = false;
00351
00352 if(mRegistry.IsOpen())
00353 {
00354 mRegistry.Lock();
00355
00356
00357 mRegistry->SetReadPos(TimePos);
00358 Time::Stamp updateTimeMs = 0;
00359 if(mRegistry->Read(updateTimeMs))
00360 {
00361 if((unsigned int)(GetTimeMs() - updateTimeMs) <= thresholdMs)
00362 {
00363 result = true;
00364 }
00365 }
00366 mRegistry.Unlock();
00367 }
00368
00369 return result;
00370 }
00371
00372
00385 int MessageServer::Registry::CreateRegistry(const ID registryID,
00386 const unsigned int maxClients)
00387 {
00388 std::ostringstream input;
00389 input << registryID;
00390 return CreateRegistry(input.str(), maxClients);
00391 }
00392
00393
00406 int MessageServer::Registry::CreateRegistry(const std::string& registryID,
00407 const unsigned int maxClients)
00408 {
00409 int result = 0;
00410 unsigned int size = maxClients*sizeof(ID) + HeaderSize;
00411
00412 CloseRegistry();
00413
00414 if(mRegistry.OpenMappedMemory(registryID))
00415 {
00416 mRegistry.Lock();
00417
00418 if(memcmp(mRegistry->Ptr(), "REGISTRY", 8) == 0)
00419 {
00420
00421
00422 mRegistry->SetReadPos(TimePos);
00423 Time::Stamp updateTimeMs = 0;
00424 if(mRegistry->Read(updateTimeMs))
00425 {
00426 if(GetTimeMs() - updateTimeMs > 500)
00427 {
00428 result = 1;
00429 mRegistry->SetWritePos(TimePos);
00430
00431 mRegistry->Write( GetTimeMs() );
00432 }
00433 }
00434 }
00435 else
00436 {
00437 mRegistry->SetWritePos(0);
00438 mRegistry->Write( std::string("REGISTRY") );
00439 mRegistry->Write( GetTimeMs() );
00440 result = 1;
00441 }
00442 mRegistry.Unlock();
00443 }
00444 else if(mRegistry.CreateMappedMemory(registryID, size))
00445 {
00446 mRegistry.Lock();
00447 mRegistry->SetWritePos(0);
00448 mRegistry->Write( std::string("REGISTRY") );
00449 mRegistry->Write( GetTimeMs() );
00450 mRegistry->Write( ( (unsigned int)0 ) );
00451 mRegistry.Unlock();
00452 result = 1;
00453 }
00454
00455 if(result == 0)
00456 {
00457 mRegistry.CloseMappedMemory();
00458 }
00459 else
00460 {
00461 mServerFlag = true;
00462 }
00463 return result;
00464 }
00465
00466
00476 bool MessageServer::Registry::Touch(const Time::Stamp timestampMs)
00477 {
00478 if(mRegistry.IsOpen())
00479 {
00480 mRegistry.Lock();
00481 mRegistry->SetWritePos(TimePos);
00482
00483 mRegistry->Write( timestampMs );
00484 mRegistry.Unlock();
00485 return true;
00486 }
00487 return false;
00488 }
00489
00490
00496 MessageServer::MessageServer()
00497 {
00498 }
00499
00500
00506 MessageServer::~MessageServer()
00507 {
00508 }
00509
00510
00525 int MessageServer::Initialize(const ID serverID,
00526 const unsigned int maxClients)
00527 {
00528 std::ostringstream input;
00529 input << serverID;
00530 return Initialize(input.str(), maxClients);
00531 }
00532
00533
00548 int MessageServer::Initialize(const std::string& serverID,
00549 const unsigned int maxClients)
00550 {
00551 int result = 0;
00552
00553
00554 Shutdown();
00555
00556 if(mClientRegistry.CreateRegistry(serverID, maxClients))
00557 {
00558 if(mClientDiscoveryThread.CreateThread(MessageServer::ClientDiscoveryThread, this))
00559 {
00560 result = 1;
00561 }
00562 else
00563 {
00564 cout << "MessageServer::Initialize - Failed to create thread.\n";
00565 }
00566 }
00567 else
00568 {
00569 #ifdef __CX_DEBUG
00570 cout << "MessageServer::Initialize - Failed to create registry for clients.\n";
00571 cout << "Another server may be running, or a previouis instance of an application\nrunning a server didn't close properly (delete memory or reboot system to force release).\n";
00572 #endif
00573 }
00574
00575 if(result == 0)
00576 {
00577 Shutdown();
00578 }
00579
00580 return result;
00581 }
00582
00583
00590 void MessageServer::Shutdown()
00591 {
00592 mClientDiscoveryThread.StopThread();
00593
00594 if(mClientRegistry.IsOpen())
00595 {
00596
00597
00598 mClientRegistry.Touch(0);
00599 mClientRegistry.CloseRegistry();
00600 }
00601
00602 Clients::iterator client;
00603 for(client = mClients.begin();
00604 client != mClients.end();
00605 client++)
00606 {
00607 client->second->CloseMessageBox();
00608 delete client->second;
00609 }
00610 mClients.clear();
00611 }
00612
00613
00627 bool MessageServer::CreateConnection(const ID id)
00628 {
00629 bool result = false;
00630 MappedMessageBox* client = new MappedMessageBox();
00631
00632 if(client->OpenMessageBox(id))
00633 {
00634 mClientRegistry.Register(id);
00635 Clients::iterator c;
00636 mClientsMutex.Lock();
00637 c = mClients.find(id);
00638 if(c != mClients.end())
00639 {
00640 c->second->CloseMessageBox();
00641 delete c->second;
00642 mClients.erase(c);
00643 }
00644 mClients[id] = client;
00645 client = NULL;
00646 mClientsMutex.Unlock();
00647 result = true;
00648 }
00649
00650 if(result == false && client)
00651 {
00652 delete client;
00653 client = NULL;
00654 }
00655
00656 return result;
00657 }
00658
00669 bool MessageServer::SendToClient(const ID id, const Packet& message)
00670 {
00671 bool result = false;
00672 Clients::iterator client;
00673 mClientsMutex.Lock();
00674 client = mClients.find(id);
00675 if(client != mClients.end())
00676 {
00677 result = client->second->WriteMessage(message);
00678 }
00679 mClientsMutex.Unlock();
00680 return result;
00681 }
00682
00683
00694 bool MessageServer::SendToClient(const ID id, const std::string& message)
00695 {
00696 bool result = false;
00697 Clients::iterator client;
00698 mClientsMutex.Lock();
00699 client = mClients.find(id);
00700 if(client != mClients.end())
00701 {
00702 result = client->second->WriteMessage(message);
00703 }
00704 mClientsMutex.Unlock();
00705 return result;
00706 }
00707
00708
00718 bool MessageServer::SendToAllClients(const Packet& message)
00719 {
00720 bool result = false;
00721 Clients::iterator client;
00722 mClientsMutex.Lock();
00723 for(client = mClients.begin();
00724 client != mClients.end();
00725 client++)
00726 {
00727 result = client->second->WriteMessage(message);
00728 }
00729 mClientsMutex.Unlock();
00730 return result;
00731 }
00732
00733
00743 bool MessageServer::SendToAllClients(const std::string& message)
00744 {
00745 bool result = false;
00746 Clients::iterator client;
00747 mClientsMutex.Lock();
00748 for(client = mClients.begin();
00749 client != mClients.end();
00750 client++)
00751 {
00752 result = client->second->WriteMessage(message);
00753 }
00754 mClientsMutex.Unlock();
00755 return result;
00756 }
00757
00758
00768 bool MessageServer::GetClientList(List& clients) const
00769 {
00770 bool result = false;
00771 Clients::const_iterator client;
00772 clients.clear();
00773 mClientsMutex.Lock();
00774 for(client = mClients.begin();
00775 client != mClients.end();
00776 client++)
00777 {
00778 result = true;
00779 clients.insert(client->first);
00780 }
00781 mClientsMutex.Unlock();
00782 return result;
00783 }
00784
00785
00791 unsigned int MessageServer::GetNumClients() const
00792 {
00793 unsigned int result = 0;
00794 mClientsMutex.Lock();
00795 result = (unsigned int)mClients.size();
00796 mClientsMutex.Unlock();
00797 return result;
00798 }
00799
00800
00806 bool MessageServer::IsInitialized() const
00807 {
00808 if(mClientRegistry.IsOpen() && mClientDiscoveryThread.IsThreadActive())
00809 {
00810 return true;
00811 }
00812 return false;
00813 }
00814
00815
00825 bool MessageServer::IsServerPresent(const std::string& serverID)
00826 {
00827 bool result = false;
00828 MappedMemory registry;
00829 if(registry.OpenMappedMemory(serverID))
00830 {
00831 registry.Lock();
00832
00833 if(memcmp(registry->Ptr(), "REGISTRY", 8) == 0)
00834 {
00835
00836 registry->SetReadPos(Registry::TimePos);
00837 Time::Stamp updateTimeMs = 0;
00838 if(registry->Read(updateTimeMs))
00839 {
00840 if(GetTimeMs() - updateTimeMs <= 500)
00841 {
00842 result = true;
00843 }
00844 }
00845 }
00846 registry.Unlock();
00847 }
00848
00849 return result;
00850 }
00851
00858 void MessageServer::ClientDiscoveryThread(void* args)
00859 {
00860 MessageServer* server = (MessageServer *)args;
00861 MessageServer::List registry;
00862 MessageServer::List myRegistry;
00863 MessageServer::List::iterator client;
00864 Clients::iterator connection;
00865
00866 while(server->mClientDiscoveryThread.QuitThreadFlag() == false)
00867 {
00868
00869 if(server->mClientRegistry.IsOpen() && server->mClientRegistry.GetRegistry(registry))
00870 {
00871
00872 for(client = registry.begin();
00873 client != registry.end();
00874 client++)
00875 {
00876
00877 if(myRegistry.find( *client ) == myRegistry.end())
00878 {
00879 server->mClientsMutex.Lock();
00880 connection = server->mClients.find(*client);
00881 if(connection != server->mClients.end())
00882 {
00883
00884 myRegistry.insert(*client);
00885 }
00886 else
00887 {
00888
00889 MappedMessageBox* newConnection = new MappedMessageBox();
00890 char name[256];
00891 sprintf(name, "%llu", *client);
00892 if(newConnection->OpenMessageBox(name))
00893 {
00894 myRegistry.insert(*client);
00895 server->mClients[*client] = newConnection;
00896 }
00897 else
00898 {
00899 delete newConnection;
00900
00901
00902 server->mClientRegistry.Unregister(*client);
00903 }
00904 }
00905 server->mClientsMutex.Unlock();
00906 }
00907 }
00908
00909 client = myRegistry.begin();
00910 while(client != myRegistry.end())
00911 {
00912
00913
00914 if(registry.find(*client) == registry.end())
00915 {
00916 server->mClientsMutex.Lock();
00917 connection = server->mClients.find(*client);
00918 if(connection != server->mClients.end())
00919 {
00920 connection->second->CloseMessageBox();
00921 delete connection->second;
00922 server->mClients.erase(connection);
00923 }
00924 server->mClientsMutex.Unlock();
00925
00926 myRegistry.erase(client);
00927 client = myRegistry.begin();
00928 }
00929 else
00930 {
00931 client++;
00932 }
00933 }
00934 }
00935
00936
00937 server->mClientsMutex.Lock();
00938 client = myRegistry.begin();
00939 while(client != myRegistry.end())
00940 {
00941 connection = server->mClients.find( *client );
00942 if(connection == server->mClients.end())
00943 {
00944 myRegistry.erase(client);
00945 client = myRegistry.begin();
00946 }
00947 else
00948 {
00949 client++;
00950 }
00951 }
00952 connection = server->mClients.begin();
00953 while(connection != server->mClients.end())
00954 {
00955 if(connection->second->IsActive(true, gInactiveThreshMs) == false)
00956 {
00957 connection->second->CloseMessageBox();
00958 delete connection->second;
00959 server->mClients.erase(connection);
00960 connection = server->mClients.begin();
00961 }
00962 else
00963 {
00964 connection++;
00965 }
00966 }
00967 server->mClientsMutex.Unlock();
00968
00969
00970
00971
00972 server->mClientRegistry.Touch(GetTimeMs());
00973
00974 SleepMs(100);
00975 }
00976 }
00977
00978