Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042 #include "cxutils/ipc/messageclient.h"
00043 #include "cxutils/time.h"
00044 #include <iostream>
00045 #include <sstream>
00046
00047 using namespace std;
00048 using namespace CxUtils;
00049
00050 unsigned int gMaxQueueSize = 500;
00051
00052
00053
00059 MessageClient::Callback::Function::Function()
00060 {
00061 mpCallback = NULL;
00062 mpCallbackArgs = NULL;
00063 }
00064
00065
00075 MessageClient::Callback::Function::Function(void (*callback)(Packet& packet, void* args),
00076 void* cbArgs)
00077 {
00078 mpCallback = callback;
00079 mpCallbackArgs = cbArgs;
00080 }
00081
00082
00088 MessageClient::Callback::Function::Function(const MessageClient::Callback::Function& data)
00089 {
00090 *this = data;
00091 }
00092
00093
00099 MessageClient::Callback::Function::~Function()
00100 {
00101 }
00102
00103
00111 void MessageClient::Callback::Function::Run(Packet& packet)
00112 {
00113 if(mpCallback)
00114 {
00115 mpCallback(packet, mpCallbackArgs);
00116 }
00117 }
00118
00119
00125 MessageClient::Callback::Function& MessageClient::Callback::Function::operator=(const MessageClient::Callback::Function& data)
00126 {
00127 mpCallback = data.mpCallback;
00128 mpCallbackArgs = data.mpCallbackArgs;
00129 return *this;
00130 }
00131
00132
00138 MessageClient::MessageClient()
00139 {
00140 mConnectToServerFlag = false;
00141 mDesiredBoxSize = 0;
00142 mUpdateDelayMs = 0;
00143 }
00144
00145
00151 MessageClient::~MessageClient()
00152 {
00153 Shutdown();
00154 }
00155
00156
00176 int MessageClient::Initialize(const ID serverID,
00177 const ID clientID,
00178 const unsigned int messageBoxSize,
00179 const bool mustConnect)
00180 {
00181 std::ostringstream input;
00182 input << serverID;
00183 return Initialize(input.str(), clientID, messageBoxSize, mustConnect);
00184 }
00185
00186
00206 int MessageClient::Initialize(const std::string& serverID,
00207 const ID clientID,
00208 const unsigned int messageBoxSize,
00209 const bool mustConnect)
00210 {
00211 int result = 0;
00212 Shutdown();
00213
00214
00215 if(mRegistry.OpenRegistry(serverID) || mustConnect == false)
00216 {
00217 mServerID = serverID;
00218 mConnectToServerFlag = true;
00219 mDesiredBoxSize = messageBoxSize;
00220
00221 ID id = clientID;
00222 if(id == AnyID && mRegistry.IsOpen())
00223 {
00224
00225 for(id = 1; id < 10000 && result == 0; id++)
00226 {
00227 if(mRegistry.IsRegistered(id) == false)
00228 {
00229 if(mMessageBox.CreateMessageBox(id, messageBoxSize) || mustConnect == false)
00230 {
00231 mID = id;
00232 if(mReceiveThread.CreateThread(MessageClient::ReceiveThread, this))
00233 {
00234
00235 mReceiveThread.SetThreadName("MessageClientThread");
00236 result = 1;
00237 break;
00238 }
00239 else
00240 {
00241 #ifdef __CX_DEBUG
00242 cout << "MessageClient::Initialize - Unable to create thread.\n";
00243 #endif
00244 }
00245 }
00246 else
00247 {
00248 #ifdef __CX_DEBUG
00249 cout << "MessageClient::Initialize - Unable to create a Message Box [ID = " << id << "].\n";
00250 #endif
00251 }
00252 }
00253 SleepMs(1);
00254 }
00255 }
00256 else if(mMessageBox.CreateMessageBox(id, messageBoxSize) || mustConnect == false)
00257 {
00258 mID = id;
00259 if(mReceiveThread.CreateThread(MessageClient::ReceiveThread, this))
00260 {
00261
00262 mReceiveThread.SetThreadName("MessageClientThread");
00263 result = 1;
00264 }
00265 else
00266 {
00267 #ifdef __CX_DEBUG
00268 cout << "MessageClient::Initialize - Unable to create thread.\n";
00269 #endif
00270 }
00271 }
00272 else
00273 {
00274 #ifdef __CX_DEBUG
00275 cout << "MessageClient::Initialize - Unable to create a Message Box [ID = " << id << "].\n";
00276 #endif
00277 }
00278 }
00279 else
00280 {
00281
00282 }
00283
00284
00285
00286 if(result == 0)
00287 {
00288 Shutdown();
00289 }
00290 return result;
00291 }
00292
00293
00309 int MessageClient::Initialize(const ID clientID,
00310 const unsigned int messageBoxSize)
00311 {
00312 int result = 0;
00313 Shutdown();
00314
00315 if(mMessageBox.CreateMessageBox(clientID, messageBoxSize))
00316 {
00317 mID = clientID;
00318 if(mReceiveThread.CreateThread(MessageClient::ReceiveThread, this))
00319 {
00320
00321 result = 1;
00322 }
00323 else
00324 {
00325 #ifdef __CX_DEBUG
00326 cout << "MessageClient::Initialize - Unable to create thread.\n";
00327 #endif
00328 }
00329 }
00330 else
00331 {
00332 #ifdef __CX_DEBUG
00333 cout << "MessageClient::Initialize - Unable to create a Message Box [ID = " << id << "].\n";
00334 #endif
00335 }
00336
00337 if(result == 0)
00338 {
00339 Shutdown();
00340 }
00341 return result;
00342 }
00343
00344
00352 void MessageClient::Shutdown()
00353 {
00354 mReceiveThread.StopThread();
00355 mRegistry.Unregister(mID);
00356 mRegistry.CloseRegistry();
00357 mMessageBox.CloseMessageBox();
00358 mReceivedDataMutex.Lock();
00359 mReceivedMessages.clear();
00360 mReceivedDataMutex.Unlock();
00361 mConnectToServerFlag = false;
00362 mID = 0;
00363 mDesiredBoxSize = 0;
00364 mServerID.clear();
00365 }
00366
00367
00368
00374 bool MessageClient::IsInitialized() const
00375 {
00376 return mReceiveThread.IsThreadActive();
00377 }
00378
00379
00389 bool MessageClient::ReadMessage(Packet& message) const
00390 {
00391 message.Clear();
00392 Packet::Queue* buffer = ( (Packet::Queue *)(&mReceivedMessages) );
00393 mReceivedDataMutex.Lock();
00394 if(buffer->size() > 0)
00395 {
00396 message = buffer->front();
00397 buffer->pop_front();
00398 }
00399 mReceivedDataMutex.Unlock();
00400 return message.Length() > 0 ? true : false;
00401 }
00402
00403
00413 bool MessageClient::ReadMessage(std::string& message) const
00414 {
00415 message.clear();
00416 Packet::Queue* buffer = ( (Packet::Queue *)(&mReceivedMessages) );
00417 mReceivedDataMutex.Lock();
00418 if(buffer->size() > 0)
00419 {
00420 message = (char *)buffer->front().Ptr();
00421 buffer->pop_front();
00422 }
00423 mReceivedDataMutex.Unlock();
00424 return message.size() > 0 ? true : false;
00425 }
00426
00427
00438 bool MessageClient::RegisterCallback(Callback* callback)
00439 {
00440 bool result = false;
00441 std::set<Callback*>::iterator cb;
00442 mCallbacksMutex.Lock();
00443 cb = mCallbacks.find(callback);
00444 if(cb == mCallbacks.end())
00445 {
00446 result = true;
00447 mCallbacks.insert(callback);
00448 }
00449 mCallbacksMutex.Unlock();
00450 return result;
00451 }
00452
00453
00464 bool MessageClient::RegisterCallback(const Callback::Function& callback)
00465 {
00466 bool result = false;
00467 std::vector<Callback::Function>::iterator cb;
00468 mCallbacksMutex.Lock();
00469 for(cb = mFunctionCallbacks.begin();
00470 cb != mFunctionCallbacks.end();
00471 cb++)
00472 {
00473 if(cb->mpCallback == callback.mpCallback &&
00474 cb->mpCallbackArgs == callback.mpCallbackArgs)
00475 {
00476 break;
00477 }
00478 }
00479 if(cb == mFunctionCallbacks.end())
00480 {
00481 result = true;
00482 mFunctionCallbacks.push_back(callback);
00483 }
00484 mCallbacksMutex.Unlock();
00485 return result;
00486 }
00487
00488
00496 void MessageClient::RemoveCallback(Callback* callback)
00497 {
00498 std::set<Callback*>::iterator cb;
00499 mCallbacksMutex.Lock();
00500 cb = mCallbacks.find(callback);
00501 if(cb != mCallbacks.end())
00502 {
00503 mCallbacks.erase(callback);
00504 }
00505 mCallbacksMutex.Unlock();
00506 }
00507
00508
00516 void MessageClient::RemoveCallback(const Callback::Function& callback)
00517 {
00518 std::vector<Callback::Function>::iterator cb;
00519 mCallbacksMutex.Lock();
00520 for(cb = mFunctionCallbacks.begin();
00521 cb != mFunctionCallbacks.end();
00522 cb++)
00523 {
00524 if(cb->mpCallback == callback.mpCallback &&
00525 cb->mpCallbackArgs == callback.mpCallbackArgs)
00526 {
00527 mFunctionCallbacks.erase(cb);
00528 break;
00529 }
00530 }
00531 mCallbacksMutex.Unlock();
00532 }
00533
00534
00543 void MessageClient::SetUpdateDelayMs(const unsigned int delayTimeMs)
00544 {
00545 Mutex::ScopedLock lock(&mDelayMutex);
00546 mUpdateDelayMs = delayTimeMs;
00547 }
00548
00549
00559 void MessageClient::ReceiveThread(void* args)
00560 {
00561 MessageClient* client = (MessageClient *)args;
00562 #ifdef WIN32
00563 unsigned int loopCounter = 0;
00564 #endif
00565
00566 Time::Stamp registryCheckTimeMs = 0;
00567
00568 while(client->mReceiveThread.QuitThreadFlag() == false)
00569 {
00570
00571 if(GetTimeMs() - registryCheckTimeMs > 250)
00572 {
00573 if(client->mRegistry.IsOpen())
00574 {
00575 client->mRegistry.Register(client->mID);
00576 }
00577 else if(client->mRegistry.OpenRegistry(client->mServerID))
00578 {
00579 ID id = client->mID;
00580 if(client->mID == AnyID)
00581 {
00582 id = 1;
00583 while(client->mRegistry.IsRegistered(id))
00584 {
00585 id++;
00586
00587 if(id - client->mID >= 10000)
00588 {
00589 break;
00590 }
00591
00592 }
00593 }
00594 if(client->mMessageBox.CreateMessageBox(id, client->mDesiredBoxSize))
00595 {
00596 client->mRegistry.Register(id);
00597 client->mID = id;
00598 }
00599 }
00600 registryCheckTimeMs = GetTimeMs();
00601 }
00602
00603
00604 if(client->mMessageBox.ReadMessage(client->mTempPacket))
00605 {
00606
00607 client->mReceivedDataMutex.Lock();
00608 client->mReceivedMessages.push_back(client->mTempPacket);
00609 if(client->mReceivedMessages.size() > gMaxQueueSize)
00610 {
00611 client->mReceivedMessages.erase(client->mReceivedMessages.begin());
00612 }
00613 client->mReceivedDataMutex.Unlock();
00614
00615
00616 client->mCallbacksMutex.Lock();
00617
00618 std::set<Callback*>::iterator cb;
00619 for(cb = client->mCallbacks.begin();
00620 cb != client->mCallbacks.end();
00621 cb++)
00622 {
00623 (*cb)->ProcessMessage(client->mTempPacket);
00624 }
00625 std::vector<Callback::Function>::iterator cbf;
00626 for(cbf = client->mFunctionCallbacks.begin();
00627 cbf != client->mFunctionCallbacks.end();
00628 cbf++)
00629 {
00630 cbf->Run(client->mTempPacket);
00631 }
00632
00633 client->mCallbacksMutex.Unlock();
00634 }
00635 client->mDelayMutex.Lock();
00636 if(client->mUpdateDelayMs == 0)
00637 {
00638 #ifdef WIN32
00639 if(loopCounter++ == 250)
00640 {
00641 loopCounter = 0;
00642 SleepMs(1);
00643 }
00644 #else
00645 usleep(500);
00646 #endif
00647 }
00648 else
00649 {
00650 SleepMs(client->mUpdateDelayMs);
00651 }
00652 client->mDelayMutex.Unlock();
00653 }
00654
00655
00656 client->mRegistry.Unregister(client->mID);
00657 }
00658
00659