diff --git a/Jamroot_rub_AL9_mn2standalone b/Jamroot_rub_AL9_mn2standalone new file mode 100644 index 0000000000000000000000000000000000000000..07c58f1913d6f56595d9c13f7f30cc21957431fa --- /dev/null +++ b/Jamroot_rub_AL9_mn2standalone @@ -0,0 +1,76 @@ +import os ; +import testing ; + +path-constant TOP : . ; +local MINUIT2_STANDALONE = [ os.environ MINUIT2_STANDALONE ] ; + +local ROOTSYS = [ os.environ ROOTSYS ] ; +local rlibs = [ SHELL "$(ROOTSYS)/bin/root-config --libs" ] ; +ROOTLIBS = [ MATCH "(.*)[\n]" : $(rlibs) ] ; + +BOOSTLIBS = + -lboost_chrono + -lboost_date_time + -lboost_filesystem + -lboost_program_options + -lboost_regex + -lboost_serialization + -lboost_system + -lboost_timer + -lboost_unit_test_framework + -lrt + ; + +lib boost_test : : <name>boost_unit_test_framework ; + +project : + requirements <include>./ + <include>$(TOP) + <include>$(BOOSTINCLUDE) + <include>$(MINUIT2_STANDALONE)/include/Minuit2 + <include>$(ROOTSYS)/include + <cxxflags>--std=c++17 + <cxxflags>-ftemplate-depth=256 + <cxxflags>-DBOOST_BIND_GLOBAL_PLACEHOLDERS + <link>shared + <linkflags>-L$(TOP)/lib + <linkflags>$(ROOTLIBS) + <linkflags>-L$(BOOSTLIBPATH) + <linkflags>$(BOOSTLIBS) + <linkflags>-lgsl + <linkflags>-lgslcblas + <cxxflags>-fPIC + <cxxflags>-Wall + ; + +actions rootlibs +{ + $(ROOTSYS)/bin/root-config --libs +} + +lib Minuit2 : : <file>$(MINUIT2_STANDALONE)/lib/libMinuit2.so : : ; + +build-project ErrLogger ; +build-project qft++ ; +build-project qft++Extension ; +build-project ConfigParser ; +build-project Particle ; +build-project Utils ; +build-project Event ; +build-project FitParams ; +build-project PwaDynamics ; +build-project PwaUtils ; +build-project MinFunctions ; +build-project AppUtils ; +build-project pbarpUtils ; +build-project epemUtils ; +build-project resUtils ; +build-project ggUtils ; +# build-project gammapUtils ; +build-project pipiScatteringUtils ; +build-project gslUtils ; +build-project qaErrorExtract ; +build-project PspGen ; +build-project Examples ; +build-project KMatrixExtract ; +build-project PwaApps ; diff --git a/PwaUtils/NetworkClient.cc b/PwaUtils/NetworkClient.cc index 0e189f5df8e4172e565ca5840fb84c3ba4a1c49d..bfcadecf8178516828b7d6cd05d04e3172ee6bbf 100644 --- a/PwaUtils/NetworkClient.cc +++ b/PwaUtils/NetworkClient.cc @@ -69,6 +69,8 @@ bool NetworkClient::Login(){ InfoMsg << "Connecting to server " << _serverAddress << ":" << _port << endmsg; _theStream.connect(_serverAddress, _port); + _theStream.rdbuf()->set_option(boost::asio::socket_base::keep_alive(true)); // send keep alive msgs, prob not needed + _theStream.rdbuf()->set_option(boost::asio::ip::tcp::no_delay(true)); // disable nagels if(!_theStream){ Alert << "Error: " << _theStream.error().message() << endmsg; @@ -90,8 +92,9 @@ bool NetworkClient::Login(){ InfoMsg << "Received data event range " << _eventLimits[0] << " - " << _eventLimits[1] << endmsg; InfoMsg << "Received mc event range " << _eventLimits[2] << " - " << _eventLimits[3] << endmsg; - std::thread timerthread(&NetworkClient::SendHeartbeat, this); - timerthread.detach(); + //// disable heartbeat + //std::thread timerthread(&NetworkClient::SendHeartbeat, this); + //timerthread.detach(); return true; } @@ -100,7 +103,8 @@ bool NetworkClient::Login(){ bool NetworkClient::SendLH(double llh_data, double lh_mc){ - _theStream.connect(_serverAddress, _port); + //// we want to keep the connection alive, so no need to reconnect all the time + //_theStream.connect(_serverAddress, _port); if(!_theStream){ //try it serveral times @@ -108,7 +112,7 @@ bool NetworkClient::SendLH(double llh_data, double lh_mc){ while(!_theStream){ WarningMsg << "Could not send LH " << counter << " time!" << endmsg; WarningMsg << "current error message " << _theStream.error().message() << endmsg; - std::this_thread::sleep_for( std::chrono::seconds(5)); + std::this_thread::sleep_for( std::chrono::seconds(5)); _theStream.clear(); _theStream.connect(_serverAddress, _port); @@ -134,42 +138,42 @@ bool NetworkClient::SendLH(double llh_data, double lh_mc){ } - -bool NetworkClient::SendHeartbeat(){ - - while(true){ - - _theHeartbeatStream.connect(_serverAddress, _port); - - int counter=0; - while(!_theHeartbeatStream){ - WarningMsg << "Could not send heartbeat " << counter << " time!" << endmsg; - WarningMsg << "current error message " << _theHeartbeatStream.error().message() << endmsg; - std::this_thread::sleep_for( std::chrono::seconds(1)); - _theHeartbeatStream.clear(); - _theHeartbeatStream.connect(_serverAddress, _port); - counter++; - if (counter>20){ - Alert << "Could not send heartbeat last time" << endmsg; - exit(1); - } - WarningMsg << "Try to send heartbeat again!!!" << endmsg; - } - - // if(!_theStream){ - // Alert << "Could not send heartbeat." << endmsg; - // return false; - // } - - _theHeartbeatStream << NetworkClient::CLIENTMESSAGE_HEARTBEAT << "\n"; - _theHeartbeatStream << _clientID << "\n"; - short answer; - _theHeartbeatStream >> answer; - - std::this_thread::sleep_for( std::chrono::seconds(HEARTBEAT_INTERVAL) ); - } - -} +//// heartbeat disabled (so no need for this) +//bool NetworkClient::SendHeartbeat(){ +// +// while(true){ +// +// _theHeartbeatStream.connect(_serverAddress, _port); +// +// int counter=0; +// while(!_theHeartbeatStream){ +// WarningMsg << "Could not send heartbeat " << counter << " time!" << endmsg; +// WarningMsg << "current error message " << _theHeartbeatStream.error().message() << endmsg; +// std::this_thread::sleep_for( std::chrono::seconds(1)); +// _theHeartbeatStream.clear(); +// _theHeartbeatStream.connect(_serverAddress, _port); +// counter++; +// if (counter>20){ +// Alert << "Could not send heartbeat last time" << endmsg; +// exit(1); +// } +// WarningMsg << "Try to send heartbeat again!!!" << endmsg; +// } +// +// // if(!_theStream){ +// // Alert << "Could not send heartbeat." << endmsg; +// // return false; +// // } +// +// _theHeartbeatStream << NetworkClient::CLIENTMESSAGE_HEARTBEAT << "\n"; +// _theHeartbeatStream << _clientID << "\n"; +// short answer; +// _theHeartbeatStream >> answer; +// +// std::this_thread::sleep_for( std::chrono::seconds(HEARTBEAT_INTERVAL) ); +// } +// +//} diff --git a/PwaUtils/NetworkClient.hh b/PwaUtils/NetworkClient.hh index bb685d1bf9352fbc695846fac12da6b52449815c..de180e8b68dcf8f17d4fb9bf2add8cd5774f774e 100644 --- a/PwaUtils/NetworkClient.hh +++ b/PwaUtils/NetworkClient.hh @@ -42,12 +42,13 @@ class NetworkClient static short CLIENTMESSAGE_LOGIN; static short CLIENTMESSAGE_LH; static short CLIENTMESSAGE_HEARTBEAT; - static short HEARTBEAT_INTERVAL; + static short HEARTBEAT_INTERVAL; //keep this variable for timeouts NetworkClient(std::string serverAddress,std::string port); bool Login(); bool SendLH(double llh_data, double lh_mc); - bool SendHeartbeat(); + //// heartbeat disabled + //bool SendHeartbeat(); bool WaitForParams(); ChannelID channelID(); std::vector<double>& GetParams(){return _theParams;} @@ -61,6 +62,7 @@ class NetworkClient std::string _port; std::string _serverAddress; tcp::iostream _theStream; - tcp::iostream _theHeartbeatStream; + //// heartbeat disabled + //tcp::iostream _theHeartbeatStream; std::vector<double> _theParams; }; diff --git a/PwaUtils/NetworkServer.cc b/PwaUtils/NetworkServer.cc index d0e2f816ea258245a7a59234ecfb735b4f5fda91..bb7f4f5fff4385909a25e218fc946eacb0993ec2 100644 --- a/PwaUtils/NetworkServer.cc +++ b/PwaUtils/NetworkServer.cc @@ -48,7 +48,7 @@ NetworkServer::NetworkServer(int port, unsigned short noOfClients, std::map<Chan , _clientTimeout(100*NetworkClient::HEARTBEAT_INTERVAL) , _globalTimeout(100*NetworkClient::HEARTBEAT_INTERVAL) , _noOfClients(noOfClients) - , _noOfChannels(numEventMap.size()) + , _noOfChannels(numEventMap.size()) , _closed(false) , _clientParamsInitialized(false) , _numBroadcasted(0) @@ -60,6 +60,8 @@ NetworkServer::NetworkServer(int port, unsigned short noOfClients, std::map<Chan theAcceptor = std::shared_ptr<tcp::acceptor>(new tcp::acceptor(*theIOService, tcp::endpoint(tcp::v4(), _port))); theDeadlineTimer = std::shared_ptr<boost::asio::deadline_timer>(new boost::asio::deadline_timer(*theIOService)); + theAcceptor->set_option(boost::asio::ip::tcp::no_delay(true)); //diable nagels + for(int i=0; i<_noOfClients; i++){ theStreams.push_back( std::shared_ptr<tcp::iostream>(new tcp::iostream) ); } @@ -87,20 +89,22 @@ bool NetworkServer::WaitForFirstClientLogin(){ short connectionPurpose; *theStreams.at(i) >> connectionPurpose; - if(connectionPurpose == NetworkClient::CLIENTMESSAGE_HEARTBEAT){ - short clientID; - *theStreams.at(i) >> clientID; - *theStreams.at(i) << NetworkServer::SERVERMESSAGE_OK << "\n"; - theStreams.at(i)->flush(); - theStreams.at(i)->close(); - i--; - continue; - } - else if(connectionPurpose != NetworkClient::CLIENTMESSAGE_LOGIN){ + //// heartbeats disabled on client side + // if(connectionPurpose == NetworkClient::CLIENTMESSAGE_HEARTBEAT){ + // short clientID; + // *theStreams.at(i) >> clientID; + // *theStreams.at(i) << NetworkServer::SERVERMESSAGE_OK << "\n"; + // theStreams.at(i)->flush(); + // theStreams.at(i)->close(); + // i--; + // continue; + // } + // else if(connectionPurpose != NetworkClient::CLIENTMESSAGE_LOGIN){ + if(connectionPurpose != NetworkClient::CLIENTMESSAGE_LOGIN){ Alert << "ERROR: Client did not login. Message: " << connectionPurpose << endmsg; - SendClosingMessage(theStreams.at(i)); - i--; - continue; + SendClosingMessage(theStreams.at(i)); + i--; + continue; } std::string nodeName; @@ -167,23 +171,24 @@ bool NetworkServer::WaitForLH(std::map<ChannelID, LHData>& theLHDataMap){ i--; continue; } - else if (connectionPurpose == NetworkClient::CLIENTMESSAGE_HEARTBEAT){ - short clientID; - *theStreams.at(i) >> clientID; - - if(!UpdateHeartbeats(clientID)){ - Timeout(boost::asio::error::timed_out); - BroadcastClosingMessage(); - } - else{ - *theStreams.at(i) << NetworkServer::SERVERMESSAGE_OK << "\n"; - theStreams.at(i)->flush(); - theStreams.at(i)->close(); - } - - i--; - continue; - } + //// heartbeats diabled on client side + //else if (connectionPurpose == NetworkClient::CLIENTMESSAGE_HEARTBEAT){ + // short clientID; + // *theStreams.at(i) >> clientID; + + // if(!UpdateHeartbeats(clientID)){ + // Timeout(boost::asio::error::timed_out); + // BroadcastClosingMessage(); + // } + // else{ + // *theStreams.at(i) << NetworkServer::SERVERMESSAGE_OK << "\n"; + // theStreams.at(i)->flush(); + // theStreams.at(i)->close(); + // } + + // i--; + // continue; + //} else if(connectionPurpose != NetworkClient::CLIENTMESSAGE_LH){ Alert << "Protocol error in WaitForLH(): i=" << i << " CP " << connectionPurpose << endmsg; _closed = true; @@ -203,7 +208,7 @@ bool NetworkServer::WaitForLH(std::map<ChannelID, LHData>& theLHDataMap){ lastLhTimes[i] = std::pair<short, boost::posix_time::ptime>(clientID, boost::posix_time::microsec_clock::local_time()); if(_closed) - SendClosingMessage(theStreams.at(i)); + SendClosingMessage(theStreams.at(i)); } EvalClientTiming(); @@ -231,7 +236,7 @@ void NetworkServer::EvalClientTiming(){ double diffInSeconds=((double)(maxdiff.total_microseconds() - diff.total_microseconds()))/1E6; InfoMsg << "Client id " << clientID << " channel id " << channelID << " " - << " response time +" + << " response time +" << std::setprecision(10) << diffInSeconds << " s" << endmsg; _delayTimesClients.at(clientID)+=diffInSeconds; _delayTimesChannels.at(channelID)+=diffInSeconds; @@ -240,24 +245,24 @@ void NetworkServer::EvalClientTiming(){ } - -bool NetworkServer::UpdateHeartbeats(short clientID){ - - boost::posix_time::ptime now(boost::posix_time::second_clock::local_time()); - lastHeartbeats[clientID] = now; - - for(auto it = lastHeartbeats.begin(); it!= lastHeartbeats.end(); ++it){ - boost::posix_time::time_duration diff = now - (*it).second; - - if((unsigned)diff.total_seconds() >= _clientTimeout){ - Alert << "No signal from clientID " << (*it).first << " for " - << diff.total_seconds() << " seconds." << endmsg; - return false; - } - } - - return true; -} +//// heartbeats disabled +//bool NetworkServer::UpdateHeartbeats(short clientID){ +// +// boost::posix_time::ptime now(boost::posix_time::second_clock::local_time()); +// lastHeartbeats[clientID] = now; +// +// for(auto it = lastHeartbeats.begin(); it!= lastHeartbeats.end(); ++it){ +// boost::posix_time::time_duration diff = now - (*it).second; +// +// if((unsigned)diff.total_seconds() >= _clientTimeout){ +// Alert << "No signal from clientID " << (*it).first << " for " +// << diff.total_seconds() << " seconds." << endmsg; +// return false; +// } +// } +// +// return true; +//} @@ -272,7 +277,8 @@ void NetworkServer::SendParams(std::shared_ptr<tcp::iostream> destinationStream, } destinationStream->flush(); - destinationStream->close(); + //// we want to keep the connection alive, so dont close here + //destinationStream->close(); } @@ -386,7 +392,7 @@ void NetworkServer::CalcEventDistribution(std::map<ChannelID, std::tuple<long,do } i++; } - + numClVec.at(minid)++; sumCl++; } @@ -396,7 +402,7 @@ void NetworkServer::CalcEventDistribution(std::map<ChannelID, std::tuple<long,do if(*it == 0){ // Minimum number is 1 *it = 1; - + // Find channel with highest number of clients and decrease by one short max=0; short maxid=-1; @@ -421,7 +427,7 @@ void NetworkServer::CalcEventDistribution(std::map<ChannelID, std::tuple<long,do for(unsigned int i=0; i<numClVec.size();i++){ InfoMsg << "Number of clients for channel " << i << " : " << numClVec.at(i) << endmsg; } - + // Fill event number vector int i=0; @@ -499,12 +505,12 @@ void NetworkServer::dumpTimeDelays() const{ theStream << "Channel Id\tdelay time [s]\tdelay time/noClients [s]" << std::endl; for (unsigned int i=0; i<_delayTimesChannels.size(); ++i){ - theStream << i << "\t" << std::setprecision(10) << _delayTimesChannels.at(i) << "\t" << std::setprecision(10) << _delayTimesChannels.at(i)/_noOfClientsPerChannel.at(i) << std::endl; + theStream << i << "\t" << std::setprecision(10) << _delayTimesChannels.at(i) << "\t" << std::setprecision(10) << _delayTimesChannels.at(i)/_noOfClientsPerChannel.at(i) << std::endl; } theStream << "\n\nClientId\tdelay time [s] " << std::endl; for (unsigned int i=0; i<_delayTimesClients.size(); ++i){ - theStream << i << "\t" << std::setprecision(10) << _delayTimesClients.at(i) << std::endl; + theStream << i << "\t" << std::setprecision(10) << _delayTimesClients.at(i) << std::endl; } int noOfClientsWoScattering = 0; @@ -515,7 +521,7 @@ void NetworkServer::dumpTimeDelays() const{ for (unsigned int i=0; i<_noOfClientsPerChannel.size(); ++i){ theStream << _noOfClientsPerChannel.at(i) << " "; if (_noOfClientsPerChannel.at(i) > 1){ - noOfClientsWoScattering+=_noOfClientsPerChannel.at(i); + noOfClientsWoScattering+=_noOfClientsPerChannel.at(i); totalDelayTimeWoScattering+=_delayTimesChannels.at(i); } } @@ -526,10 +532,9 @@ void NetworkServer::dumpTimeDelays() const{ for (unsigned int i=0; i<_noOfClientsPerChannel.size(); ++i){ double noOfProposedClients(_noOfClientsPerChannel.at(i)); if (noOfProposedClients > 1.5){ - noOfProposedClients = _delayTimesChannels.at(i)/totalDelayTimeWoScattering * noOfClientsWoScattering; + noOfProposedClients = _delayTimesChannels.at(i)/totalDelayTimeWoScattering * noOfClientsWoScattering; } theStream << noOfProposedClients << " "; } theStream << std::endl; } - diff --git a/PwaUtils/NetworkServer.hh b/PwaUtils/NetworkServer.hh index fd890b13ad420546436036b5042a17c39ef3467e..e7a58c1a07d6644b51d7bcb1880093b323f3be88 100644 --- a/PwaUtils/NetworkServer.hh +++ b/PwaUtils/NetworkServer.hh @@ -73,7 +73,8 @@ private: std::shared_ptr<boost::asio::deadline_timer> theDeadlineTimer; std::shared_ptr<tcp::acceptor> theAcceptor; std::vector<std::shared_ptr<tcp::iostream>> theStreams; - std::map<short, boost::posix_time::ptime > lastHeartbeats; + //// heartbeats disables on client side + //std::map<short, boost::posix_time::ptime > lastHeartbeats; std::map<short, std::pair<short, boost::posix_time::ptime > > lastLhTimes; std::map<short, ChannelID> _clientChannelMap; @@ -82,13 +83,14 @@ private: std::vector<double> _cachedParams; std::vector<double> _delayTimesClients; //time in seconds std::vector<double> _delayTimesChannels; //time in seconds - std::vector<int> _noOfClientsPerChannel; + std::vector<int> _noOfClientsPerChannel; std::string _currentTimeDelayFileName; void Timeout(const boost::system::error_code& err); void AcceptHandler(const boost::system::error_code& err); void EvalClientTiming(); - bool UpdateHeartbeats(short clientID); + //// heartbeats disables on client side + //bool UpdateHeartbeats(short clientID); bool ReadNumClientsFromConfig(std::vector<short>& numClVec); void dumpTimeDelays() const; }; diff --git a/SetEnv_rub_mn2standalone.bash b/SetEnv_rub_mn2standalone.bash new file mode 100755 index 0000000000000000000000000000000000000000..890a929df1b551cef72e9304c4f2102dd2615124 --- /dev/null +++ b/SetEnv_rub_mn2standalone.bash @@ -0,0 +1,57 @@ +############################ +#### user defined variables +############################ +# path to root installation (will only be used if root is not already sourced -> ROOTSYS not set) +ROOT_DIR="/opt/root/6-32.04-AL9.4-gcc12.2.0" +# path to Minuit2 package +MINUIT2_STANDALONE="/data/iltschi/pawian_utils/Minuit2/Minuit2-6.33.01_AlmaLinux9.4_gcc12.2.0" +# path to boost build +BOOST_BUILD_PATH="/usr/share/boost-build" +# pawian internal (path to KMatrix store) +KMAT_DIR="/data/duldul/bertram/KMatStore/" +# pawian internal (path to Event store) +EVT_DIR="/data/duldul/bertram/EvtStore/" +# set Jamfile for bjam/b2 (a symlink Jamroot -> $JAM_FILE will be created) +JAM_FILE="Jamroot_rub_AL9_mn2standalone" + +#### check for existing pawian environment +if [[ -z "$TOP_DIR" ]]; then + TOP_DIR=$(realpath "${BASH_SOURCE[0]}") + TOP_DIR="${TOP_DIR%/*}" +else + echo "pawian environment already exists, aborting setup new env" + return +fi + +#### setup root (if needed) +if [[ -z "${ROOTSYS}" ]]; then + source ${ROOT_DIR}/bin/thisroot.sh +else + echo "root ${ROOTSYS} already sourced" +fi + +#### prepend minuit2 and pawian libs to LD_LIBRARY_PATH +LD_LIBRARY_PATH="${MINUIT2_STANDALONE}/lib:${TOP_DIR}/lib${LD_LIBRARY_PATH:+:${LD_LIBRARY_PATH}}" + +#### append PATH +PATH="${PATH:+${PATH}:}${TOP_DIR}/bin" + +#### export variables +export TOP_DIR +export KMAT_DIR +export EVT_DIR +export BOOST_BUILD_PATH +export MINUIT2_STANDALONE +export LD_LIBRARY_PATH +export PATH + +# create symlink to Jamroot file (remove old link before) +if [[ -L "${TOP_DIR}/Jamroot" ]]; then + jam_link_target=$(readlink "${TOP_DIR}/Jamroot") + if [[ "${jam_link_target}" != "${JAM_FILE}" ]]; then + rm ${TOP_DIR}/Jamroot + ln -s ${JAM_FILE} ${TOP_DIR}/Jamroot + fi +else + ln -s "${JAM_FILE}" ${TOP_DIR}/Jamroot +fi