00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040 #include "jaus/core/events/events.h"
00041 #include "jaus/core/events/cancelevent.h"
00042 #include "jaus/core/events/confirmeventrequest.h"
00043 #include "jaus/core/events/createevent.h"
00044 #include "jaus/core/events/event.h"
00045 #include "jaus/core/events/queryevents.h"
00046 #include "jaus/core/events/rejecteventrequest.h"
00047 #include "jaus/core/events/reportevents.h"
00048 #include "jaus/core/events/updateevent.h"
00049 #include "jaus/core/transport/transport.h"
00050 #include "jaus/core/component.h"
00051 #include <cxutils/math/cxmath.h>
00052 #include <cstring>
00053 #include <iomanip>
00054 #include <iostream>
00055
00056
00057 using namespace JAUS;
00058
00059 const std::string Events::Name = "urn:jaus:jss:core:Events";
00060
00066 Events::Subscription::Subscription()
00067 {
00068 mID = mSequenceNumber = 0;
00069 mPeriodicRate = 0.0;
00070 mpQueryMessage = NULL;
00071 mType = Events::Periodic;
00072 mUpdateTimeMs = 0;
00073 mpEventService = NULL;
00074 mTriggerTimeSeconds = 0;
00075 }
00076
00077
00083 Events::Subscription::Subscription(const Events::Subscription& data)
00084 {
00085 mID = mSequenceNumber = 0;
00086 mPeriodicRate = 0.0;
00087 mpQueryMessage = NULL;
00088 mType = Events::Periodic;
00089 mUpdateTimeMs = 0;
00090 mpEventService = NULL;
00091 mTriggerTimeSeconds = 0;
00092 *this = data;
00093 }
00094
00095
00101 Events::Subscription::~Subscription()
00102 {
00103 if(mpQueryMessage)
00104 {
00105 delete mpQueryMessage;
00106 }
00107 }
00108
00109
00115 Events::Subscription& Events::Subscription::operator =(const Events::Subscription& data)
00116 {
00117 if(this != &data)
00118 {
00119 mID = data.mID;
00120 mSequenceNumber = data.mSequenceNumber;
00121 mPeriodicRate = data.mPeriodicRate;
00122 mType = data.mType;
00123 mUpdateTimeMs = data.mUpdateTimeMs;
00124 mProducer = data.mProducer;
00125 mClients = data.mClients;
00126 if(mpQueryMessage)
00127 {
00128 delete mpQueryMessage;
00129 mpQueryMessage = NULL;
00130 }
00131 if(data.mpQueryMessage)
00132 {
00133 mpQueryMessage = data.mpQueryMessage->Clone();
00134 }
00135 mpEventService = data.mpEventService;
00136 mTriggerTimeSeconds = data.mTriggerTimeSeconds;
00137 }
00138 return *this;
00139 }
00140
00141
00150 Events::Child::Child(const ID& serviceIdentifier,
00151 const ID& parentServiceIdentifier) : Service(serviceIdentifier,
00152 parentServiceIdentifier)
00153 {
00154 }
00155
00156
00162 Events::Child::~Child()
00163 {
00164 }
00165
00166
00180 void Events::Child::SignalEvent(const UShort reportMessageCode)
00181 {
00182 Component* component = GetComponent();
00183 if(component)
00184 {
00185 component->EventsService()->SignalEvent(reportMessageCode);
00186 }
00187 }
00188
00189
00201 void Events::Child::SignalEvent(const Subscription& info)
00202 {
00203 Component* component = GetComponent();
00204 if(component)
00205 {
00206 component->EventsService()->SignalEvent(info);
00207 }
00208 }
00209
00210
00216 Events* Events::Child::EventsService()
00217 {
00218 return this->GetComponent()->EventsService();
00219 }
00220
00221
00227 const Events* Events::Child::EventsService() const
00228 {
00229 return this->GetComponent()->EventsService();
00230 }
00231
00232
00233
00245 void Events::Child::SendEvent(const Events::Subscription& info,
00246 const Message* payload) const
00247 {
00248 JAUS::Event eventMessage;
00249 eventMessage.SetSourceID(GetComponentID());
00250 eventMessage.SetSequenceNumber(info.mSequenceNumber);
00251 eventMessage.SetEventID(info.mID);
00252 eventMessage.SetReportMessage(payload);
00253 Address::Set::const_iterator dest;
00254 for(dest = info.mClients.begin();
00255 dest != info.mClients.end();
00256 dest++)
00257 {
00258 eventMessage.SetDestinationID(*dest);
00259 Send(&eventMessage);
00260 }
00261 }
00262
00263
00269 Events::Events() : Service(Service::ID(Events::Name, 1.0), Service::ID(Transport::Name, 1.0))
00270 {
00271 mCheckEventsTimeMs = Time::GetUtcTimeMs();
00272 mCheckEventDelayMs = 5000;
00273 mPeriodicTimer.RegisterTimerEvent(Events::PeriodicEvent, this);
00274 mShutdownFlag = false;
00275 }
00276
00277
00283 Events::~Events()
00284 {
00285 }
00286
00287
00293 void Events::Shutdown()
00294 {
00295 mShutdownFlag = true;
00296
00297 if(mDebugMessagesFlag)
00298 {
00299 Mutex::ScopedLock lock(&mDebugMessagesMutex);
00300 std::cout << "[" << GetServiceID().ToString() << "-" << mComponentID.ToString() << "] - Stopping Events Timer\n";
00301 }
00302 mPeriodicTimer.Stop();
00303 if(mDebugMessagesFlag)
00304 {
00305 Mutex::ScopedLock lock(&mDebugMessagesMutex);
00306 std::cout << "[" << GetServiceID().ToString() << "-" << mComponentID.ToString() << "] - Events Timer Stopped\n";
00307 }
00308
00309 Subscription::List copy;
00310 Subscription::List removed;
00311
00312
00313
00314
00315 mEventsMutex.Lock();
00316 mEvents.clear();
00317
00318 Subscription::List::iterator sub;
00319
00320 if(mDebugMessagesFlag)
00321 {
00322 Mutex::ScopedLock lock(&mDebugMessagesMutex);
00323 std::cout << "[" << GetServiceID().ToString() << "-" << mComponentID.ToString() << "] - Canceling Event Subscriptions.\n";
00324 }
00325
00326
00327 sub = mSubscriptions.begin();
00328 while(sub != mSubscriptions.end())
00329 {
00330 CancelSubscription(*sub, Service::DefaultWaitMs);
00331 sub = mSubscriptions.erase(sub);
00332 }
00333
00334 mSubscriptions.clear();
00335 mEventsMutex.Unlock();
00336
00337 if(mDebugMessagesFlag)
00338 {
00339 Mutex::ScopedLock lock(&mDebugMessagesMutex);
00340 std::cout << "[" << GetServiceID().ToString() << "-" << mComponentID.ToString() << "] - Finished Canceling Event Subscriptions.\n";
00341 }
00342
00343 mShutdownFlag = false;
00344 }
00345
00346
00361 bool Events::RequestEveryChangeEvent(const Address& provider,
00362 const Message* query,
00363 const unsigned int waitTimeMs)
00364 {
00365 bool result = false;
00366
00367 CreateEvent request(provider, GetComponentID());
00368
00369 request.SetType(Events::EveryChange);
00370 request.SetQueryMessage(query);
00371
00372
00373
00374 Byte requestID = 0;
00375 std::set<Byte>::iterator riter;
00376 mRequestMutex.Lock();
00377 riter = mRequestSet.find(requestID);
00378 while(riter != mRequestSet.end() )
00379 {
00380 requestID++;
00381 riter = mRequestSet.find(requestID);
00382 }
00383 mRequestMutex.Unlock();
00384
00385 request.SetRequestID(requestID);
00386
00387
00388 Message::List responses;
00389 ConfirmEventRequest confirm;
00390 RejectEventRequest reject;
00391 responses.push_back(&confirm);
00392 responses.push_back(&reject);
00393
00394 if(Send(&request, responses, waitTimeMs))
00395 {
00396 if(confirm.GetSourceID().IsValid())
00397 {
00398 Subscription sub;
00399 Subscription::List::iterator siter;
00400
00401 sub.mID = confirm.GetEventID();
00402 sub.mSequenceNumber = 0;
00403 sub.mpQueryMessage = query->Clone();
00404 sub.mType = EveryChange;
00405 sub.mPeriodicRate = confirm.GetConfirmedPeriodicRate();
00406 sub.mUpdateTimeMs = Time::GetUtcTimeMs();
00407 sub.mProducer = provider;
00408 sub.mClients.insert(GetComponentID());
00409
00410 Mutex::ScopedLock lock(&mEventsMutex);
00411
00412
00413 bool exists = false;
00414 for(siter = mSubscriptions.begin();
00415 siter != mSubscriptions.end();
00416 siter++)
00417 {
00418 if(siter->mID == sub.mID &&
00419 siter->mProducer == provider &&
00420 siter->mpQueryMessage->GetMessageCode() == query->GetMessageCode())
00421 {
00422 *siter = sub;
00423 exists = true;
00424 break;
00425 }
00426 }
00427 if(!exists)
00428 {
00429 mSubscriptions.push_back(sub);
00430 }
00431
00432 result = true;
00433 }
00434 else if(mDebugMessagesFlag)
00435 {
00436 Mutex::ScopedLock lock(&mDebugMessagesMutex);
00437 std::cout << "[" << GetServiceID().ToString() << "-" << mComponentID.ToString() << "] - Could not create subscription to " << provider.ToString() << " for Query: " << query->GetMessageName() << "\n";
00438 }
00439 }
00440
00441
00442 mRequestMutex.Lock();
00443 riter = mRequestSet.find(requestID);
00444 if(riter != mRequestSet.end() )
00445 {
00446 mRequestSet.erase(riter);
00447 }
00448 mRequestMutex.Unlock();
00449
00450 return result;
00451 }
00452
00453
00473 bool Events::RequestPeriodicEvent(const Address &provider,
00474 const Message *query,
00475 const double desiredPeriodicRate,
00476 const double minimumPeriodicRate,
00477 const unsigned int waitTimeMs)
00478 {
00479 bool result = false;
00480
00481 CreateEvent request(provider, GetComponentID());
00482
00483 request.SetRequestedPeriodicRate(desiredPeriodicRate);
00484 request.SetType(Events::Periodic);
00485 request.SetQueryMessage(query);
00486
00487
00488
00489 Byte requestID = 0;
00490 std::set<Byte>::iterator riter;
00491 mRequestMutex.Lock();
00492 riter = mRequestSet.find(requestID);
00493 while(riter != mRequestSet.end() )
00494 {
00495 requestID++;
00496 riter = mRequestSet.find(requestID);
00497 }
00498 mRequestMutex.Unlock();
00499
00500 request.SetRequestID(requestID);
00501
00502
00503 Message::List responses;
00504 ConfirmEventRequest confirm;
00505 RejectEventRequest reject;
00506 responses.push_back(&confirm);
00507 responses.push_back(&reject);
00508
00509 if(Send(&request, responses, waitTimeMs))
00510 {
00511 if(confirm.GetSourceID().IsValid())
00512 {
00513 if(confirm.GetConfirmedPeriodicRate() < minimumPeriodicRate)
00514 {
00515
00516
00517 CancelEvent cancel(confirm.GetSourceID(), GetComponentID());
00518 cancel.SetEventID(confirm.GetEventID());
00519 cancel.SetRequestID(requestID);
00520 ConfirmEventRequest confirmResponse;
00521
00522 Send(&cancel, &confirmResponse, waitTimeMs);
00523 }
00524 else
00525 {
00526 Subscription sub;
00527 Subscription::List::iterator siter;
00528
00529 sub.mID = confirm.GetEventID();
00530 sub.mSequenceNumber = 0;
00531 sub.mpQueryMessage = query->Clone();
00532 sub.mType = Periodic;
00533 sub.mPeriodicRate = confirm.GetConfirmedPeriodicRate();
00534 sub.mUpdateTimeMs = Time::GetUtcTimeMs();
00535 sub.mProducer = provider;
00536 sub.mClients.insert(GetComponentID());
00537
00538 Mutex::ScopedLock lock(&mEventsMutex);
00539
00540
00541 bool exists = false;
00542 for(siter = mSubscriptions.begin();
00543 siter != mSubscriptions.end();
00544 siter++)
00545 {
00546 if(siter->mProducer == provider &&
00547 siter->mpQueryMessage->GetMessageCode() == query->GetMessageCode() &&
00548 siter->mpQueryMessage->GetPresenceVector() == query->GetPresenceVector())
00549 {
00550 *siter = sub;
00551 exists = true;
00552 break;
00553 }
00554 }
00555 if(!exists)
00556 {
00557 mSubscriptions.push_back(sub);
00558 }
00559
00560 result = true;
00561 }
00562 }
00563 else if(mDebugMessagesFlag)
00564 {
00565 Mutex::ScopedLock lock(&mDebugMessagesMutex);
00566 std::cout << "[" << GetServiceID().ToString() << "-" << mComponentID.ToString() << "] - Could not create subscription to " << provider.ToString() << " for Query: " << query->GetMessageName() << "\n";
00567 }
00568 }
00569
00570
00571 mRequestMutex.Lock();
00572 riter = mRequestSet.find(requestID);
00573 if(riter != mRequestSet.end() )
00574 {
00575 mRequestSet.erase(riter);
00576 }
00577 mRequestMutex.Unlock();
00578
00579 return result;
00580 }
00581
00582
00596 void Events::SignalEvent(const UShort reportMessageCode,
00597 const bool changeOnly)
00598 {
00599 mEventsMutex.Lock();
00600 Subscription::Map toGenerate = mEvents;
00601 mEventsMutex.Unlock();
00602
00603 Subscription::Map::iterator sub;
00604 for(sub = toGenerate.begin();
00605 sub != toGenerate.end();
00606 sub++)
00607 {
00608 if(reportMessageCode == (UShort)(sub->first & 0xFFFF) &&
00609 (sub->second.mType == EveryChange || changeOnly == false) &&
00610 sub->second.mpEventService != NULL)
00611 {
00612 ((Child *)(sub->second.mpEventService))->GenerateEvent(sub->second);
00613
00614 mEventsMutex.Lock();
00615 Subscription::Map::iterator e = mEvents.find(sub->first);
00616 if(e != mEvents.end())
00617 {
00618 e->second.mSequenceNumber++;
00619 e->second.mTriggerTimeSeconds = CxUtils::Timer::GetTimeSeconds();
00620 e->second.mUpdateTimeMs = Time::GetUtcTimeMs();
00621 }
00622 mEventsMutex.Unlock();
00623 }
00624 }
00625 }
00626
00627
00641 void Events::SignalEvent(const Subscription& info,
00642 const bool changeOnly)
00643 {
00644 mEventsMutex.Lock();
00645 Subscription::Map toGenerate = mEvents;
00646 mEventsMutex.Unlock();
00647
00648 UInt key = info.mID;
00649 key <<= USHORT_BITS;
00650 key |= info.mpQueryMessage->GetMessageCodeOfResponse();
00651
00652 Subscription::Map::iterator sub;
00653 for(sub = toGenerate.begin();
00654 sub != toGenerate.end();
00655 sub++)
00656 {
00657 if(sub->first == key)
00658 {
00659 ((Child *)(sub->second.mpEventService))->GenerateEvent(sub->second);
00660
00661 mEventsMutex.Lock();
00662 Subscription::Map::iterator e = mEvents.find(sub->first);
00663 if(e != mEvents.end())
00664 {
00665 e->second.mSequenceNumber++;
00666 e->second.mTriggerTimeSeconds = CxUtils::Timer::GetTimeSeconds();
00667 e->second.mUpdateTimeMs = Time::GetUtcTimeMs();
00668 }
00669 mEventsMutex.Unlock();
00670 break;
00671 }
00672 }
00673 }
00674
00675
00681 void Events::PrintStatus() const
00682 {
00683
00684 std::cout << "[" << GetServiceID().ToString() << "] - " << GetComponentID().ToString() << "\n";
00685 mEventsMutex.Lock();
00686 unsigned int supporting = (unsigned int)mEvents.size();
00687 unsigned int subscribing = (unsigned int)mSubscriptions.size();
00688 mEventsMutex.Unlock();
00689 std::cout << "Supporting " << supporting << " and Requesting " << subscribing << " Events.\n";
00690 }
00691
00692
00709 bool Events::CancelSubscription(const Address& id,
00710 const UShort reportMessageCode,
00711 const unsigned int waitTimeMs)
00712 {
00713 bool result = true;
00714
00715 Subscription::List copy;
00716 Subscription::List removed;
00717
00718
00719
00720
00721 mEventsMutex.Lock();
00722 copy = mSubscriptions;
00723 mEventsMutex.Unlock();
00724
00725 Subscription::List::iterator sub;
00726
00727 for(sub = copy.begin();
00728 sub != copy.end();
00729 sub++)
00730 {
00731 if((sub->mProducer == id || id == Address())&&
00732 (reportMessageCode == 0 || sub->mpQueryMessage->GetMessageCodeOfResponse() == reportMessageCode))
00733 {
00734 if(CancelSubscription(*sub, waitTimeMs))
00735 {
00736 removed.push_back(*sub);
00737 }
00738 else
00739 {
00740 result = false;
00741 }
00742 }
00743 }
00744
00745
00746 mEventsMutex.Lock();
00747 for(sub = removed.begin();
00748 sub != removed.end();
00749 sub++)
00750 {
00751 Subscription::List::iterator rem;
00752 for(rem = mSubscriptions.begin();
00753 rem != mSubscriptions.end();
00754 rem++)
00755 {
00756 if(sub->mProducer == rem->mProducer &&
00757 sub->mID == rem->mID &&
00758 sub->mpQueryMessage->GetMessageCode() == rem->mpQueryMessage->GetMessageCode())
00759 {
00760 mSubscriptions.erase(rem);
00761 break;
00762 }
00763 }
00764 }
00765 mEventsMutex.Unlock();
00766
00767 return result;
00768 }
00769
00770
00788 bool Events::CancelSubscription(const Address& id,
00789 const UShort reportMessageCode,
00790 const Byte eventID,
00791 const unsigned int waitTimeMs)
00792 {
00793 bool result = true;
00794
00795 Subscription::List copy;
00796 Subscription::List removed;
00797
00798
00799
00800
00801 mEventsMutex.Lock();
00802 copy = mSubscriptions;
00803 mEventsMutex.Unlock();
00804
00805 Subscription::List::iterator sub;
00806
00807 for(sub = copy.begin();
00808 sub != copy.end();
00809 sub++)
00810 {
00811 if(sub->mProducer == id &&
00812 sub->mID == eventID &&
00813 (reportMessageCode == 0 || sub->mpQueryMessage->GetMessageCodeOfResponse() == reportMessageCode))
00814 {
00815 if(CancelSubscription(*sub, waitTimeMs))
00816 {
00817 removed.push_back(*sub);
00818 }
00819 else
00820 {
00821 result = false;
00822 }
00823 }
00824 }
00825
00826
00827 mEventsMutex.Lock();
00828 for(sub = removed.begin();
00829 sub != removed.end();
00830 sub++)
00831 {
00832 Subscription::List::iterator rem;
00833 for(rem = mSubscriptions.begin();
00834 rem != mSubscriptions.end();
00835 rem++)
00836 {
00837 if(sub->mProducer == rem->mProducer &&
00838 sub->mID == rem->mID &&
00839 sub->mpQueryMessage->GetMessageCode() == rem->mpQueryMessage->GetMessageCode())
00840 {
00841 mSubscriptions.erase(rem);
00842 break;
00843 }
00844 }
00845 }
00846 mEventsMutex.Unlock();
00847
00848 return result;
00849 }
00850
00851
00869 bool Events::HaveSubscription(const UShort reportMessageCode,
00870 const Address &id,
00871 const bool verifyWithQuery,
00872 const unsigned int waitTimeMs) const
00873 {
00874 bool result = false;
00875 if(verifyWithQuery && id.IsValid())
00876 {
00877 QueryEvents query(id, GetComponentID());
00878 query.SetQueryType(QueryEvents::MessageID);
00879 query.SetQueryFilter(reportMessageCode);
00880
00881 ReportEvents report;
00882
00883 if(Send(&query, &report, waitTimeMs))
00884 {
00885 if(report.GetRecords()->size() > 0)
00886 {
00887 result = true;
00888 }
00889 }
00890 }
00891 else
00892 {
00893 Mutex::ScopedLock lock(&mEventsMutex);
00894 Subscription::List::const_iterator sub;
00895 for(sub = mSubscriptions.begin();
00896 sub != mSubscriptions.end() && false == result;
00897 sub++)
00898 {
00899 if((id == Address() || sub->mProducer == id) &&
00900 sub->mpQueryMessage->GetMessageCodeOfResponse() == reportMessageCode)
00901 {
00902 result = true;
00903 break;
00904 }
00905 }
00906 }
00907
00908 return result;
00909 }
00910
00911
00921 bool Events::HaveSubscribers(const UShort reportMessageCode) const
00922 {
00923 bool result = false;
00924
00925 Mutex::ScopedLock lock(&mEventsMutex);
00926 Subscription::Map::const_iterator sub;
00927 for(sub = mEvents.begin();
00928 sub != mEvents.end() && false == result;
00929 sub++)
00930 {
00931 if(sub->second.mpQueryMessage->GetMessageCodeOfResponse() == reportMessageCode)
00932 {
00933 result = true;
00934 break;
00935 }
00936 }
00937
00938 return result;
00939 }
00940
00941
00954 void Events::Receive(const Message* message)
00955 {
00956 if(mShutdownServiceFlag)
00957 {
00958 return;
00959 }
00960 switch(message->GetMessageCode())
00961 {
00962 case CANCEL_EVENT:
00963 {
00964 const CancelEvent* input = dynamic_cast<const CancelEvent*>(message);
00965 double maxFrequencyHz = 0.5;
00966 bool canceled = false;
00967 {
00968 Mutex::ScopedLock lock(&mEventsMutex);
00969 Subscription::Map::iterator sub;
00970
00971 for(sub = mEvents.begin();
00972 sub != mEvents.end() && false == canceled;
00973 sub++)
00974 {
00975 if(input->GetEventID() == sub->second.mID)
00976 {
00977 Address::Set::iterator client = sub->second.mClients.find(input->GetSourceID());
00978 if(client != sub->second.mClients.end())
00979 {
00980 sub->second.mClients.erase(client);
00981 canceled = true;
00982 }
00983
00984 if(sub->second.mClients.size() == 0)
00985 {
00986 mEvents.erase(sub);
00987 }
00988 if(canceled)
00989 {
00990 break;
00991 }
00992 }
00993 }
00994
00995
00996
00997
00998 for(sub = mEvents.begin();
00999 sub != mEvents.end() && true == canceled;
01000 sub++)
01001 {
01002 if(sub->second.mPeriodicRate > maxFrequencyHz)
01003 {
01004 maxFrequencyHz = sub->second.mPeriodicRate;
01005 }
01006 }
01007 }
01008
01009
01010 if(canceled)
01011 {
01012 if(mDebugMessagesFlag)
01013 {
01014 Mutex::ScopedLock lock(&mDebugMessagesMutex);
01015 std::cout << "[" << GetServiceID().ToString() << "-" << mComponentID.ToString() << "] - Event for " << input->GetSourceID().ToString() << " was Canceled.\n";
01016 }
01017 ConfirmEventRequest response(input->GetSourceID(), GetComponentID());
01018 response.SetRequestID(input->GetRequestID());
01019 response.SetEventID(input->GetEventID());
01020 Send(&response);
01021
01022
01023
01024 if(mPeriodicTimer.IsActive() && mPeriodicTimer.GetFrequency() > maxFrequencyHz)
01025 {
01026 mPeriodicTimer.ChangeFrequency(maxFrequencyHz);
01027 }
01028 }
01029 else
01030 {
01031 RejectEventRequest response(input->GetSourceID(), GetComponentID());
01032 response.SetRequestID(input->GetRequestID());
01033 response.SetResponseCode(RejectEventRequest::InvalidEventID);
01034 Send(&response);
01035 }
01036 }
01037 break;
01038 case UPDATE_EVENT:
01039 case CREATE_EVENT:
01040 {
01041 const CreateEvent* create = dynamic_cast<const CreateEvent*>(message);
01042 const UpdateEvent* update = dynamic_cast<const UpdateEvent*>(message);
01043 Service::Map children = GetChildServices();
01044 Service::Map::iterator citer;
01045 Events::Child* child = NULL;
01046 Message* queryMessage = NULL;
01047 double confirmedPeriodicRate = 0.0;
01048 std::string errorMessage;
01049 UShort signalEventCode = 0;
01050
01051 bool sentResponse = false;
01052
01053 for(citer = children.begin();
01054 citer != children.end();
01055 citer++)
01056 {
01057 child = dynamic_cast<Events::Child*>(citer->second);
01058 if(child && (queryMessage = child->CreateMessage(create->GetQueryMessageCode())) != NULL)
01059 {
01060
01061 if(queryMessage->ReadMessageBody(*create->GetQueryMessage()) < 0)
01062 {
01063
01064 return;
01065 }
01066 if((create && child->IsEventSupported(create->GetType(),
01067 create->GetRequestedPeriodicRate(),
01068 queryMessage,
01069 confirmedPeriodicRate,
01070 errorMessage)) ||
01071 (update && child->IsEventSupported(update->GetType(),
01072 update->GetRequestedPeriodicRate(),
01073 queryMessage,
01074 confirmedPeriodicRate,
01075 errorMessage)) )
01076 {
01077 Mutex::ScopedLock lock(&mEventsMutex);
01078 Subscription::Map::iterator siter;
01079 Subscription subscription;
01080
01081 bool exists = false;
01082 for(siter = mEvents.begin();
01083 siter != mEvents.end();
01084 siter ++)
01085 {
01086
01087
01088 if(siter->second.mpQueryMessage->GetMessageCode() == queryMessage->GetMessageCode())
01089 {
01090 subscription.mID = siter->second.mID + 1;
01091 }
01092
01093
01094 Packet queryBody1, queryBody2;
01095 siter->second.mpQueryMessage->WriteMessageBody(queryBody1);
01096 queryMessage->WriteMessageBody(queryBody2);
01097 if(siter->second.mpQueryMessage->GetMessageCode() == queryMessage->GetMessageCode() &&
01098 siter->second.mpQueryMessage->GetPresenceVector() == queryMessage->GetPresenceVector() &&
01099 siter->second.mType == create->GetType() &&
01100 queryBody1.Length() == queryBody2.Length() &&
01101 memcmp(queryBody1.Ptr(), queryBody2.Ptr(), queryBody1.Length()) == 0)
01102 {
01103 exists = true;
01104 Type type = create ? create->GetType() : update->GetType();
01105 if(type == Periodic)
01106 {
01107 if(confirmedPeriodicRate > siter->second.mPeriodicRate)
01108 {
01109 siter->second.mPeriodicRate = confirmedPeriodicRate;
01110 }
01111 }
01112 siter->second.mClients.insert(message->GetSourceID());
01113 subscription = siter->second;
01114 break;
01115 }
01116 }
01117
01118
01119 if(update && (exists == false || subscription.mType != update->GetType()))
01120 {
01121 RejectEventRequest reject(update->GetSourceID(), GetComponentID());
01122 if(subscription.mType != update->GetType())
01123 {
01124 reject.SetResponseCode(RejectEventRequest::InvalidSetup);
01125 errorMessage = "Change of Event Type (Periodic/Every Change) Not Supported";
01126 }
01127 else
01128 {
01129 reject.SetResponseCode(RejectEventRequest::InvalidEventID);
01130 }
01131 reject.SetRequestID(update->GetRequestID());
01132 if(errorMessage.size())
01133 {
01134 reject.SetErrorMessage(errorMessage);
01135 }
01136 Send(&reject);
01137 return;
01138 }
01139
01140 else if(create && exists == false)
01141 {
01142 subscription.mPeriodicRate = confirmedPeriodicRate;
01143 subscription.mpQueryMessage = queryMessage; queryMessage = NULL;
01144 subscription.mType = create->GetType();
01145 subscription.mUpdateTimeMs = Time::GetUtcTimeMs();
01146 subscription.mProducer = GetComponentID();
01147 subscription.mClients.insert(create->GetSourceID());
01148 subscription.mpEventService = child;
01149 UInt key = subscription.mID;
01150 key <<= USHORT_BITS;
01151 key |= subscription.mpQueryMessage->GetMessageCodeOfResponse();
01152 mEvents[key] = subscription;
01153 }
01154
01155 ConfirmEventRequest confirm(message->GetSourceID(), GetComponentID());
01156 confirm.SetRequestID((create ? create->GetRequestID() : update->GetRequestID()));
01157 confirm.SetConfirmedPeriodicRate(confirmedPeriodicRate);
01158 confirm.SetEventID(subscription.mID);
01159 if(true == (sentResponse = Send(&confirm)))
01160 {
01161 signalEventCode = subscription.mpQueryMessage->GetMessageCodeOfResponse();
01162
01163 }
01164
01165
01166 if(mPeriodicTimer.IsActive() == false)
01167 {
01168 std::stringstream tname;
01169 tname << "JEVENTS - " << GetComponentID().ToString();
01170 mPeriodicTimer.SetName(tname.str());
01171 if(subscription.mType == Events::Periodic)
01172 {
01173 mPeriodicTimer.Start(subscription.mPeriodicRate);
01174 }
01175 else
01176 {
01177 mPeriodicTimer.Start(0.5);
01178 }
01179 }
01180 else if(subscription.mPeriodicRate > mPeriodicTimer.GetFrequency())
01181 {
01182 mPeriodicTimer.ChangeFrequency(subscription.mPeriodicRate);
01183 }
01184 }
01185
01186
01187
01188
01189 if(queryMessage)
01190 {
01191 delete queryMessage;
01192 queryMessage = NULL;
01193 break;
01194 }
01195
01196 }
01197 if(signalEventCode != 0)
01198 {
01199 SignalEvent(signalEventCode, false);
01200 }
01201 }
01202 if(sentResponse == false)
01203 {
01204 RejectEventRequest reject(message->GetSourceID(), GetComponentID());
01205 reject.SetResponseCode(RejectEventRequest::MessageNotSupported);
01206 reject.SetRequestID((create ? create->GetRequestID() : update->GetRequestID()));
01207 if(errorMessage.size())
01208 {
01209 reject.SetErrorMessage(errorMessage);
01210 }
01211 Send(&reject);
01212 }
01213 }
01214 break;
01215 case EVENT:
01216 {
01217 const Event* input = dynamic_cast<const Event*>(message);
01218 Transport* transport = dynamic_cast<Transport *>(GetParentService());
01219 Message* reportMessage = NULL;
01220 if(transport)
01221 {
01222
01223 reportMessage = transport->CreateMessage(input->GetReportMessageCode());
01224 if(reportMessage)
01225 {
01226 reportMessage->CopyHeaderData(input);
01227 if(reportMessage->ReadMessageBody(*input->GetReportMessage()) >= 0)
01228 {
01229
01230 GetParentService()->Receive(reportMessage);
01231 }
01232 }
01233 else if(mDebugMessagesFlag)
01234 {
01235 Mutex::ScopedLock lock(&mDebugMessagesMutex);
01236 std::cout << "[" << GetServiceID().ToString() << "-" << mComponentID.ToString() << "] - Received unsupported Report [" << std::setbase(16) << input->GetReportMessageCode() << std::setbase(10) << "] within Event Message\n";
01237 }
01238
01239
01240 }
01241
01242 Mutex::ScopedLock lock(&mEventsMutex);
01243 Subscription::List::iterator subscription;
01244 for(subscription = mSubscriptions.begin();
01245 subscription != mSubscriptions.end();
01246 subscription++)
01247 {
01248 if(subscription->mID == input->GetEventID() &&
01249 subscription->mProducer == input->GetSourceID())
01250 {
01251 subscription->mSequenceNumber = input->GetSequenceNumber();
01252 subscription->mUpdateTimeMs = Time::GetUtcTimeMs();
01253
01254 Callback::Set::iterator cb;
01255 mEventCallbackMutex.Lock();
01256 for(cb = mEventCallbacks.begin();
01257 cb != mEventCallbacks.end() && reportMessage;
01258 cb++)
01259 {
01260 Events::Callback* myCB = dynamic_cast<Events::Callback*>(*cb);
01261 if(myCB)
01262 {
01263 myCB->ProcessEvent(subscription->mType,
01264 subscription->mID,
01265 subscription->mSequenceNumber,
01266 reportMessage);
01267 }
01268 }
01269 mEventCallbackMutex.Unlock();
01270 break;
01271 }
01272 }
01273
01274 if(reportMessage)
01275 {
01276 delete reportMessage;
01277 }
01278 }
01279 break;
01280 case QUERY_EVENTS:
01281 {
01282 const QueryEvents* input = dynamic_cast<const QueryEvents*>(message);
01283 ReportEvents report(input->GetSourceID(), GetComponentID());
01284 Mutex::ScopedLock lock(&mEventsMutex);
01285 Events::Subscription::Map::iterator sub;
01286 ReportEvents::Record record;
01287
01288 for(sub = mEvents.begin();
01289 sub != mEvents.end();
01290 sub++)
01291 {
01292
01293
01294 if(sub->second.mClients.find(message->GetSourceID()) != sub->second.mClients.end())
01295 {
01296 record.mEventID = sub->second.mID;
01297 record.mEventType = sub->second.mType;
01298 record.mQueryCode = sub->second.mpQueryMessage->GetMessageCode();
01299 record.mQueryMessage.Clear();
01300 sub->second.mpQueryMessage->WriteMessageBody(record.mQueryMessage);
01301 if(input->GetQueryType() == QueryEvents::AllEvents)
01302 {
01303 report.GetRecords()->push_back(record);
01304 }
01305 else if(input->GetQueryType() == QueryEvents::EventID &&
01306 record.mEventID == (Byte)input->GetQueryFilter())
01307 {
01308 report.GetRecords()->push_back(record);
01309 }
01310 else if(input->GetQueryType() == QueryEvents::EventType &&
01311 input->GetQueryFilter() == (UShort)record.mEventType)
01312 {
01313 report.GetRecords()->push_back(record);
01314 }
01315 else if(input->GetQueryType() == QueryEvents::MessageID &&
01316 (record.mQueryCode == input->GetQueryFilter() ||
01317 sub->second.mpQueryMessage->GetMessageCodeOfResponse() == input->GetQueryFilter()))
01318 {
01319 report.GetRecords()->push_back(record);
01320 }
01321 }
01322 }
01323 Send(&report);
01324 }
01325 break;
01326 case CONFIRM_EVENT_REQUEST:
01327 case REJECT_EVENT_REQUEST:
01328 break;
01329 case REPORT_EVENTS:
01330 {
01331 const ReportEvents* input = dynamic_cast<const ReportEvents*>(message);
01332 Subscription::List subscriptions;
01333 Subscription::List::iterator sub;
01334
01335 mEventsMutex.Lock();
01336 subscriptions = mSubscriptions;
01337 mEventsMutex.Unlock();
01338
01339
01340
01341 for(sub = subscriptions.begin();
01342 sub != subscriptions.end();
01343 sub++)
01344 {
01345 if(sub->mProducer == input->GetSourceID())
01346 {
01347 ReportEvents::Record::List::const_iterator record;
01348 bool exists = false;
01349 for(record = input->GetRecords()->begin();
01350 record != input->GetRecords()->end();
01351 record++)
01352 {
01353 if(record->mEventType == sub->mType &&
01354 record->mQueryCode == sub->mpQueryMessage->GetMessageCode())
01355 {
01356 exists = true;
01357 break;
01358 }
01359 }
01360 if(exists == false)
01361 {
01362
01363 if(sub->mType == Periodic)
01364 {
01365 RequestPeriodicEvent(sub->mProducer,
01366 sub->mpQueryMessage,
01367 sub->mPeriodicRate,
01368 sub->mPeriodicRate - 0.1);
01369 }
01370 else
01371 {
01372 RequestEveryChangeEvent(sub->mProducer,
01373 sub->mpQueryMessage);
01374 }
01375 }
01376 }
01377 }
01378 }
01379 break;
01380 default:
01381 break;
01382 };
01383 }
01384
01385
01397 Message* Events::CreateMessage(const UShort messageCode) const
01398 {
01399 Message* message = NULL;
01400
01401 switch(messageCode)
01402 {
01403 case CANCEL_EVENT:
01404 message = new CancelEvent();
01405 break;
01406 case CONFIRM_EVENT_REQUEST:
01407 message = new ConfirmEventRequest();
01408 break;
01409 case CREATE_EVENT:
01410 message = new CreateEvent();
01411 break;
01412 case EVENT:
01413 message = new Event();
01414 break;
01415 case QUERY_EVENTS:
01416 message = new QueryEvents();
01417 break;
01418 case REJECT_EVENT_REQUEST:
01419 message = new RejectEventRequest();
01420 break;
01421 case REPORT_EVENTS:
01422 message = new ReportEvents();
01423 break;
01424 case UPDATE_EVENT:
01425 message = new UpdateEvent();
01426 break;
01427 default:
01428 message = NULL;
01429 break;
01430 };
01431
01432 return message;
01433 }
01434
01435
01445 void Events::CheckServiceStatus(const unsigned int timeSinceLastCheckMs)
01446 {
01447
01448
01449 if(mShutdownFlag == false && Time::GetUtcTimeMs() - mCheckEventsTimeMs >= mCheckEventDelayMs)
01450 {
01451
01452 Subscription::List subscriptions;
01453 Subscription::List::iterator sub;
01454 mEventsMutex.Lock();
01455 subscriptions = mSubscriptions;
01456 mEventsMutex.Unlock();
01457
01458 QueryEvents query;
01459 query.SetSourceID(GetComponentID());
01460
01461
01462 for(sub = subscriptions.begin();
01463 sub != subscriptions.end() && mShutdownFlag == false;
01464 sub++)
01465 {
01466 query.SetDestinationID(sub->mProducer);
01467 query.SetQueryType(QueryEvents::EventID);
01468 query.SetQueryFilter(sub->mID);
01469 Send(&query);
01470 }
01471
01472 mCheckEventsTimeMs = Time::GetUtcTimeMs();
01473 }
01474 }
01475
01476
01496 Events::Subscription::List Events::GetSubscriptions(const Address& source,
01497 const UShort reportType) const
01498 {
01499 Mutex::ScopedLock lock(&mEventsMutex);
01500 Subscription::List result;
01501 Subscription::List::const_iterator s;
01502 for(s = mSubscriptions.begin();
01503 s != mSubscriptions.end();
01504 s++)
01505 {
01506 if(source.IsValid() == false ||
01507 s->mProducer == source)
01508 {
01509 if(reportType == 0 ||
01510 reportType == (int)s->mpQueryMessage->GetMessageCodeOfResponse())
01511 {
01512 result.push_back(*s);
01513 }
01514 }
01515 }
01516 return result;
01517 }
01518
01519
01537 Events::Subscription::List Events::GetSubscriptions(const UShort reportType) const
01538 {
01539 Mutex::ScopedLock lock(&mEventsMutex);
01540 Subscription::List result;
01541 Subscription::List::const_iterator s;
01542 for(s = mSubscriptions.begin();
01543 s != mSubscriptions.end();
01544 s++)
01545 {
01546 if(reportType == 0 ||
01547 reportType == (int)s->mpQueryMessage->GetMessageCodeOfResponse())
01548 {
01549 result.push_back(*s);
01550 }
01551 }
01552 return result;
01553 }
01554
01555
01574 Events::Subscription::List Events::GetProducedEvents(const UShort reportType) const
01575 {
01576 Mutex::ScopedLock lock(&mEventsMutex);
01577 Subscription::List result;
01578 Subscription::Map::const_iterator s;
01579 for(s = mEvents.begin();
01580 s != mEvents.end();
01581 s++)
01582 {
01583 if(((UShort)(s->first & reportType)) == reportType)
01584 {
01585 result.push_back(s->second);
01586 }
01587 }
01588 return result;
01589 }
01590
01591
01599 void Events::RegisterCallback(Events::Callback* callback)
01600 {
01601 Mutex::ScopedLock lock(&mEventCallbackMutex);
01602 mEventCallbacks.insert(callback);
01603 }
01604
01605
01616 bool Events::CancelSubscription(Subscription& sub, const unsigned int waitTimeMs)
01617 {
01618 bool result = false;
01619
01620
01621
01622 Byte requestID = 0;
01623 std::set<Byte>::iterator riter;
01624 mRequestMutex.Lock();
01625 riter = mRequestSet.find(requestID);
01626 while(riter != mRequestSet.end() )
01627 {
01628 requestID++;
01629 riter = mRequestSet.find(requestID);
01630 }
01631 mRequestMutex.Unlock();
01632
01633 CancelEvent request(sub.mProducer, GetComponentID());
01634 request.SetEventID(sub.mID);
01635 request.SetRequestID(requestID);
01636
01637
01638 Message::List responses;
01639 ConfirmEventRequest confirm;
01640 RejectEventRequest reject;
01641 responses.push_back(&confirm);
01642 responses.push_back(&reject);
01643
01644 if(Send(&request, responses, waitTimeMs))
01645 {
01646 if(confirm.GetSourceID().IsValid())
01647 {
01648 result = true;
01649 }
01650 else if(mDebugMessagesFlag)
01651 {
01652 Mutex::ScopedLock lock(&mDebugMessagesMutex);
01653 std::cout << "[" << GetServiceID().ToString() << "-" << mComponentID.ToString() << "] - Could not cancel subscription to " << sub.mProducer.ToString() << "\n";
01654 }
01655 }
01656
01657
01658 mRequestMutex.Lock();
01659 riter = mRequestSet.find(requestID);
01660 if(riter != mRequestSet.end() )
01661 {
01662 mRequestSet.erase(riter);
01663 }
01664 mRequestMutex.Unlock();
01665
01666 return result;
01667 }
01668
01669
01676 void Events::PeriodicEvent(void *args)
01677 {
01678 Events* service = (Events *)args;
01679 Events::Subscription::Map::iterator local;
01680
01681 if(service->mShutdownFlag)
01682 {
01683 return;
01684 }
01685
01686 Subscription::Map eventsCopy;
01687
01688
01689
01690 service->mEventsMutex.Lock();
01691 eventsCopy = service->mEvents;
01692 service->mEventsMutex.Unlock();
01693
01694 for(local = eventsCopy.begin();
01695 local != eventsCopy.end() && service->mShutdownFlag == false && service->mPeriodicTimer.IsShuttingDown() == false;
01696 local++)
01697 {
01698 Events::Child* child = dynamic_cast<Events::Child*>(local->second.mpEventService);
01699 if(local->second.mType == Events::Periodic && child)
01700 {
01701 double delay = 1.0/(local->second.mPeriodicRate + CxUtils::CX_EPSILON);
01702 if(CxUtils::Timer::GetTimeSeconds() - local->second.mTriggerTimeSeconds >= delay)
01703 {
01704
01705 child->GenerateEvent(local->second);
01706
01707 service->mEventsMutex.Lock();
01708 Events::Subscription::Map::iterator actual = service->mEvents.find(local->first);
01709 if(actual != service->mEvents.end() &&
01710 actual->second.mSequenceNumber == local->second.mSequenceNumber &&
01711 actual->second.mType == local->second.mType)
01712 {
01713 actual->second.mUpdateTimeMs = Time::GetUtcTimeMs();
01714 actual->second.mTriggerTimeSeconds = CxUtils::Timer::GetTimeSeconds();
01715 actual->second.mSequenceNumber++;
01716 }
01717 service->mEventsMutex.Unlock();
01718 }
01719 }
01720 }
01721
01722
01723
01724
01725
01726
01727
01728
01729
01730
01731
01732
01733
01734
01735
01736
01737
01738
01739
01740
01741
01742
01743
01744
01745
01746 }
01747
01748
01749
01750
01751