From 3b41156128e58d49318a826707a0dd19d743b5c7 Mon Sep 17 00:00:00 2001 From: oafedulidis <oafedulidis@ep1.rub.de> Date: Mon, 23 Sep 2024 14:42:14 +0200 Subject: [PATCH] Added Jamroot_rub_AL9_mn2standalone to support new minuit2 extracted from root as standalone package, since there are no other external packages the env variable is renamed from extern to MINUIT2_STANDALONE (the subdirs in minuit2 have changed compared to old version, now need to include <Minuit2Path>/include/Minuit2 instead of just <Minuit2Path>/include in Jamfile); Added a new bash script SetEnv_rub_mn2standalone.bash to setup pawian with new minuit2 version, its not needed to run the script inside the pawian directory and it will also check if another pawian is already loaded (checks if TOP_DIR exists), if root is already sourced this step will be skipped (else load root defined in script) and appends/prepends the LD_LIBARAY_PATH and PATH safe; MODIFIED NETWORK CLIENT AND SERVERgit add SetEnv_rub_mn2standalone.bash -! Modified the network client and server to keep the connections and not permanently close and reconnect, this should prevent that there are thousands of tcp connections blocking ports in TIME_WAIT status. With the open connections there was an issue with the heartbeat, that was sendet from a second client stream to the server, so the heartbeat is completely removed for now (all changes are marked with comments in the code) --- Jamroot_rub_AL9_mn2standalone | 76 +++++++++++++++++++++ PwaUtils/NetworkClient.cc | 84 ++++++++++++----------- PwaUtils/NetworkClient.hh | 8 ++- PwaUtils/NetworkServer.cc | 125 ++++++++++++++++++---------------- PwaUtils/NetworkServer.hh | 8 ++- SetEnv_rub_mn2standalone.bash | 57 ++++++++++++++++ 6 files changed, 252 insertions(+), 106 deletions(-) create mode 100644 Jamroot_rub_AL9_mn2standalone create mode 100755 SetEnv_rub_mn2standalone.bash diff --git a/Jamroot_rub_AL9_mn2standalone b/Jamroot_rub_AL9_mn2standalone new file mode 100644 index 00000000..07c58f19 --- /dev/null +++ b/Jamroot_rub_AL9_mn2standalone @@ -0,0 +1,76 @@ +import os ; +import testing ; + +path-constant TOP : . ; +local MINUIT2_STANDALONE = [ os.environ MINUIT2_STANDALONE ] ; + +local ROOTSYS = [ os.environ ROOTSYS ] ; +local rlibs = [ SHELL "$(ROOTSYS)/bin/root-config --libs" ] ; +ROOTLIBS = [ MATCH "(.*)[\n]" : $(rlibs) ] ; + +BOOSTLIBS = + -lboost_chrono + -lboost_date_time + -lboost_filesystem + -lboost_program_options + -lboost_regex + -lboost_serialization + -lboost_system + -lboost_timer + -lboost_unit_test_framework + -lrt + ; + +lib boost_test : : <name>boost_unit_test_framework ; + +project : + requirements <include>./ + <include>$(TOP) + <include>$(BOOSTINCLUDE) + <include>$(MINUIT2_STANDALONE)/include/Minuit2 + <include>$(ROOTSYS)/include + <cxxflags>--std=c++17 + <cxxflags>-ftemplate-depth=256 + <cxxflags>-DBOOST_BIND_GLOBAL_PLACEHOLDERS + <link>shared + <linkflags>-L$(TOP)/lib + <linkflags>$(ROOTLIBS) + <linkflags>-L$(BOOSTLIBPATH) + <linkflags>$(BOOSTLIBS) + <linkflags>-lgsl + <linkflags>-lgslcblas + <cxxflags>-fPIC + <cxxflags>-Wall + ; + +actions rootlibs +{ + $(ROOTSYS)/bin/root-config --libs +} + +lib Minuit2 : : <file>$(MINUIT2_STANDALONE)/lib/libMinuit2.so : : ; + +build-project ErrLogger ; +build-project qft++ ; +build-project qft++Extension ; +build-project ConfigParser ; +build-project Particle ; +build-project Utils ; +build-project Event ; +build-project FitParams ; +build-project PwaDynamics ; +build-project PwaUtils ; +build-project MinFunctions ; +build-project AppUtils ; +build-project pbarpUtils ; +build-project epemUtils ; +build-project resUtils ; +build-project ggUtils ; +# build-project gammapUtils ; +build-project pipiScatteringUtils ; +build-project gslUtils ; +build-project qaErrorExtract ; +build-project PspGen ; +build-project Examples ; +build-project KMatrixExtract ; +build-project PwaApps ; diff --git a/PwaUtils/NetworkClient.cc b/PwaUtils/NetworkClient.cc index 0e189f5d..bfcadecf 100644 --- a/PwaUtils/NetworkClient.cc +++ b/PwaUtils/NetworkClient.cc @@ -69,6 +69,8 @@ bool NetworkClient::Login(){ InfoMsg << "Connecting to server " << _serverAddress << ":" << _port << endmsg; _theStream.connect(_serverAddress, _port); + _theStream.rdbuf()->set_option(boost::asio::socket_base::keep_alive(true)); // send keep alive msgs, prob not needed + _theStream.rdbuf()->set_option(boost::asio::ip::tcp::no_delay(true)); // disable nagels if(!_theStream){ Alert << "Error: " << _theStream.error().message() << endmsg; @@ -90,8 +92,9 @@ bool NetworkClient::Login(){ InfoMsg << "Received data event range " << _eventLimits[0] << " - " << _eventLimits[1] << endmsg; InfoMsg << "Received mc event range " << _eventLimits[2] << " - " << _eventLimits[3] << endmsg; - std::thread timerthread(&NetworkClient::SendHeartbeat, this); - timerthread.detach(); + //// disable heartbeat + //std::thread timerthread(&NetworkClient::SendHeartbeat, this); + //timerthread.detach(); return true; } @@ -100,7 +103,8 @@ bool NetworkClient::Login(){ bool NetworkClient::SendLH(double llh_data, double lh_mc){ - _theStream.connect(_serverAddress, _port); + //// we want to keep the connection alive, so no need to reconnect all the time + //_theStream.connect(_serverAddress, _port); if(!_theStream){ //try it serveral times @@ -108,7 +112,7 @@ bool NetworkClient::SendLH(double llh_data, double lh_mc){ while(!_theStream){ WarningMsg << "Could not send LH " << counter << " time!" << endmsg; WarningMsg << "current error message " << _theStream.error().message() << endmsg; - std::this_thread::sleep_for( std::chrono::seconds(5)); + std::this_thread::sleep_for( std::chrono::seconds(5)); _theStream.clear(); _theStream.connect(_serverAddress, _port); @@ -134,42 +138,42 @@ bool NetworkClient::SendLH(double llh_data, double lh_mc){ } - -bool NetworkClient::SendHeartbeat(){ - - while(true){ - - _theHeartbeatStream.connect(_serverAddress, _port); - - int counter=0; - while(!_theHeartbeatStream){ - WarningMsg << "Could not send heartbeat " << counter << " time!" << endmsg; - WarningMsg << "current error message " << _theHeartbeatStream.error().message() << endmsg; - std::this_thread::sleep_for( std::chrono::seconds(1)); - _theHeartbeatStream.clear(); - _theHeartbeatStream.connect(_serverAddress, _port); - counter++; - if (counter>20){ - Alert << "Could not send heartbeat last time" << endmsg; - exit(1); - } - WarningMsg << "Try to send heartbeat again!!!" << endmsg; - } - - // if(!_theStream){ - // Alert << "Could not send heartbeat." << endmsg; - // return false; - // } - - _theHeartbeatStream << NetworkClient::CLIENTMESSAGE_HEARTBEAT << "\n"; - _theHeartbeatStream << _clientID << "\n"; - short answer; - _theHeartbeatStream >> answer; - - std::this_thread::sleep_for( std::chrono::seconds(HEARTBEAT_INTERVAL) ); - } - -} +//// heartbeat disabled (so no need for this) +//bool NetworkClient::SendHeartbeat(){ +// +// while(true){ +// +// _theHeartbeatStream.connect(_serverAddress, _port); +// +// int counter=0; +// while(!_theHeartbeatStream){ +// WarningMsg << "Could not send heartbeat " << counter << " time!" << endmsg; +// WarningMsg << "current error message " << _theHeartbeatStream.error().message() << endmsg; +// std::this_thread::sleep_for( std::chrono::seconds(1)); +// _theHeartbeatStream.clear(); +// _theHeartbeatStream.connect(_serverAddress, _port); +// counter++; +// if (counter>20){ +// Alert << "Could not send heartbeat last time" << endmsg; +// exit(1); +// } +// WarningMsg << "Try to send heartbeat again!!!" << endmsg; +// } +// +// // if(!_theStream){ +// // Alert << "Could not send heartbeat." << endmsg; +// // return false; +// // } +// +// _theHeartbeatStream << NetworkClient::CLIENTMESSAGE_HEARTBEAT << "\n"; +// _theHeartbeatStream << _clientID << "\n"; +// short answer; +// _theHeartbeatStream >> answer; +// +// std::this_thread::sleep_for( std::chrono::seconds(HEARTBEAT_INTERVAL) ); +// } +// +//} diff --git a/PwaUtils/NetworkClient.hh b/PwaUtils/NetworkClient.hh index bb685d1b..de180e8b 100644 --- a/PwaUtils/NetworkClient.hh +++ b/PwaUtils/NetworkClient.hh @@ -42,12 +42,13 @@ class NetworkClient static short CLIENTMESSAGE_LOGIN; static short CLIENTMESSAGE_LH; static short CLIENTMESSAGE_HEARTBEAT; - static short HEARTBEAT_INTERVAL; + static short HEARTBEAT_INTERVAL; //keep this variable for timeouts NetworkClient(std::string serverAddress,std::string port); bool Login(); bool SendLH(double llh_data, double lh_mc); - bool SendHeartbeat(); + //// heartbeat disabled + //bool SendHeartbeat(); bool WaitForParams(); ChannelID channelID(); std::vector<double>& GetParams(){return _theParams;} @@ -61,6 +62,7 @@ class NetworkClient std::string _port; std::string _serverAddress; tcp::iostream _theStream; - tcp::iostream _theHeartbeatStream; + //// heartbeat disabled + //tcp::iostream _theHeartbeatStream; std::vector<double> _theParams; }; diff --git a/PwaUtils/NetworkServer.cc b/PwaUtils/NetworkServer.cc index d0e2f816..bb7f4f5f 100644 --- a/PwaUtils/NetworkServer.cc +++ b/PwaUtils/NetworkServer.cc @@ -48,7 +48,7 @@ NetworkServer::NetworkServer(int port, unsigned short noOfClients, std::map<Chan , _clientTimeout(100*NetworkClient::HEARTBEAT_INTERVAL) , _globalTimeout(100*NetworkClient::HEARTBEAT_INTERVAL) , _noOfClients(noOfClients) - , _noOfChannels(numEventMap.size()) + , _noOfChannels(numEventMap.size()) , _closed(false) , _clientParamsInitialized(false) , _numBroadcasted(0) @@ -60,6 +60,8 @@ NetworkServer::NetworkServer(int port, unsigned short noOfClients, std::map<Chan theAcceptor = std::shared_ptr<tcp::acceptor>(new tcp::acceptor(*theIOService, tcp::endpoint(tcp::v4(), _port))); theDeadlineTimer = std::shared_ptr<boost::asio::deadline_timer>(new boost::asio::deadline_timer(*theIOService)); + theAcceptor->set_option(boost::asio::ip::tcp::no_delay(true)); //diable nagels + for(int i=0; i<_noOfClients; i++){ theStreams.push_back( std::shared_ptr<tcp::iostream>(new tcp::iostream) ); } @@ -87,20 +89,22 @@ bool NetworkServer::WaitForFirstClientLogin(){ short connectionPurpose; *theStreams.at(i) >> connectionPurpose; - if(connectionPurpose == NetworkClient::CLIENTMESSAGE_HEARTBEAT){ - short clientID; - *theStreams.at(i) >> clientID; - *theStreams.at(i) << NetworkServer::SERVERMESSAGE_OK << "\n"; - theStreams.at(i)->flush(); - theStreams.at(i)->close(); - i--; - continue; - } - else if(connectionPurpose != NetworkClient::CLIENTMESSAGE_LOGIN){ + //// heartbeats disabled on client side + // if(connectionPurpose == NetworkClient::CLIENTMESSAGE_HEARTBEAT){ + // short clientID; + // *theStreams.at(i) >> clientID; + // *theStreams.at(i) << NetworkServer::SERVERMESSAGE_OK << "\n"; + // theStreams.at(i)->flush(); + // theStreams.at(i)->close(); + // i--; + // continue; + // } + // else if(connectionPurpose != NetworkClient::CLIENTMESSAGE_LOGIN){ + if(connectionPurpose != NetworkClient::CLIENTMESSAGE_LOGIN){ Alert << "ERROR: Client did not login. Message: " << connectionPurpose << endmsg; - SendClosingMessage(theStreams.at(i)); - i--; - continue; + SendClosingMessage(theStreams.at(i)); + i--; + continue; } std::string nodeName; @@ -167,23 +171,24 @@ bool NetworkServer::WaitForLH(std::map<ChannelID, LHData>& theLHDataMap){ i--; continue; } - else if (connectionPurpose == NetworkClient::CLIENTMESSAGE_HEARTBEAT){ - short clientID; - *theStreams.at(i) >> clientID; - - if(!UpdateHeartbeats(clientID)){ - Timeout(boost::asio::error::timed_out); - BroadcastClosingMessage(); - } - else{ - *theStreams.at(i) << NetworkServer::SERVERMESSAGE_OK << "\n"; - theStreams.at(i)->flush(); - theStreams.at(i)->close(); - } - - i--; - continue; - } + //// heartbeats diabled on client side + //else if (connectionPurpose == NetworkClient::CLIENTMESSAGE_HEARTBEAT){ + // short clientID; + // *theStreams.at(i) >> clientID; + + // if(!UpdateHeartbeats(clientID)){ + // Timeout(boost::asio::error::timed_out); + // BroadcastClosingMessage(); + // } + // else{ + // *theStreams.at(i) << NetworkServer::SERVERMESSAGE_OK << "\n"; + // theStreams.at(i)->flush(); + // theStreams.at(i)->close(); + // } + + // i--; + // continue; + //} else if(connectionPurpose != NetworkClient::CLIENTMESSAGE_LH){ Alert << "Protocol error in WaitForLH(): i=" << i << " CP " << connectionPurpose << endmsg; _closed = true; @@ -203,7 +208,7 @@ bool NetworkServer::WaitForLH(std::map<ChannelID, LHData>& theLHDataMap){ lastLhTimes[i] = std::pair<short, boost::posix_time::ptime>(clientID, boost::posix_time::microsec_clock::local_time()); if(_closed) - SendClosingMessage(theStreams.at(i)); + SendClosingMessage(theStreams.at(i)); } EvalClientTiming(); @@ -231,7 +236,7 @@ void NetworkServer::EvalClientTiming(){ double diffInSeconds=((double)(maxdiff.total_microseconds() - diff.total_microseconds()))/1E6; InfoMsg << "Client id " << clientID << " channel id " << channelID << " " - << " response time +" + << " response time +" << std::setprecision(10) << diffInSeconds << " s" << endmsg; _delayTimesClients.at(clientID)+=diffInSeconds; _delayTimesChannels.at(channelID)+=diffInSeconds; @@ -240,24 +245,24 @@ void NetworkServer::EvalClientTiming(){ } - -bool NetworkServer::UpdateHeartbeats(short clientID){ - - boost::posix_time::ptime now(boost::posix_time::second_clock::local_time()); - lastHeartbeats[clientID] = now; - - for(auto it = lastHeartbeats.begin(); it!= lastHeartbeats.end(); ++it){ - boost::posix_time::time_duration diff = now - (*it).second; - - if((unsigned)diff.total_seconds() >= _clientTimeout){ - Alert << "No signal from clientID " << (*it).first << " for " - << diff.total_seconds() << " seconds." << endmsg; - return false; - } - } - - return true; -} +//// heartbeats disabled +//bool NetworkServer::UpdateHeartbeats(short clientID){ +// +// boost::posix_time::ptime now(boost::posix_time::second_clock::local_time()); +// lastHeartbeats[clientID] = now; +// +// for(auto it = lastHeartbeats.begin(); it!= lastHeartbeats.end(); ++it){ +// boost::posix_time::time_duration diff = now - (*it).second; +// +// if((unsigned)diff.total_seconds() >= _clientTimeout){ +// Alert << "No signal from clientID " << (*it).first << " for " +// << diff.total_seconds() << " seconds." << endmsg; +// return false; +// } +// } +// +// return true; +//} @@ -272,7 +277,8 @@ void NetworkServer::SendParams(std::shared_ptr<tcp::iostream> destinationStream, } destinationStream->flush(); - destinationStream->close(); + //// we want to keep the connection alive, so dont close here + //destinationStream->close(); } @@ -386,7 +392,7 @@ void NetworkServer::CalcEventDistribution(std::map<ChannelID, std::tuple<long,do } i++; } - + numClVec.at(minid)++; sumCl++; } @@ -396,7 +402,7 @@ void NetworkServer::CalcEventDistribution(std::map<ChannelID, std::tuple<long,do if(*it == 0){ // Minimum number is 1 *it = 1; - + // Find channel with highest number of clients and decrease by one short max=0; short maxid=-1; @@ -421,7 +427,7 @@ void NetworkServer::CalcEventDistribution(std::map<ChannelID, std::tuple<long,do for(unsigned int i=0; i<numClVec.size();i++){ InfoMsg << "Number of clients for channel " << i << " : " << numClVec.at(i) << endmsg; } - + // Fill event number vector int i=0; @@ -499,12 +505,12 @@ void NetworkServer::dumpTimeDelays() const{ theStream << "Channel Id\tdelay time [s]\tdelay time/noClients [s]" << std::endl; for (unsigned int i=0; i<_delayTimesChannels.size(); ++i){ - theStream << i << "\t" << std::setprecision(10) << _delayTimesChannels.at(i) << "\t" << std::setprecision(10) << _delayTimesChannels.at(i)/_noOfClientsPerChannel.at(i) << std::endl; + theStream << i << "\t" << std::setprecision(10) << _delayTimesChannels.at(i) << "\t" << std::setprecision(10) << _delayTimesChannels.at(i)/_noOfClientsPerChannel.at(i) << std::endl; } theStream << "\n\nClientId\tdelay time [s] " << std::endl; for (unsigned int i=0; i<_delayTimesClients.size(); ++i){ - theStream << i << "\t" << std::setprecision(10) << _delayTimesClients.at(i) << std::endl; + theStream << i << "\t" << std::setprecision(10) << _delayTimesClients.at(i) << std::endl; } int noOfClientsWoScattering = 0; @@ -515,7 +521,7 @@ void NetworkServer::dumpTimeDelays() const{ for (unsigned int i=0; i<_noOfClientsPerChannel.size(); ++i){ theStream << _noOfClientsPerChannel.at(i) << " "; if (_noOfClientsPerChannel.at(i) > 1){ - noOfClientsWoScattering+=_noOfClientsPerChannel.at(i); + noOfClientsWoScattering+=_noOfClientsPerChannel.at(i); totalDelayTimeWoScattering+=_delayTimesChannels.at(i); } } @@ -526,10 +532,9 @@ void NetworkServer::dumpTimeDelays() const{ for (unsigned int i=0; i<_noOfClientsPerChannel.size(); ++i){ double noOfProposedClients(_noOfClientsPerChannel.at(i)); if (noOfProposedClients > 1.5){ - noOfProposedClients = _delayTimesChannels.at(i)/totalDelayTimeWoScattering * noOfClientsWoScattering; + noOfProposedClients = _delayTimesChannels.at(i)/totalDelayTimeWoScattering * noOfClientsWoScattering; } theStream << noOfProposedClients << " "; } theStream << std::endl; } - diff --git a/PwaUtils/NetworkServer.hh b/PwaUtils/NetworkServer.hh index fd890b13..e7a58c1a 100644 --- a/PwaUtils/NetworkServer.hh +++ b/PwaUtils/NetworkServer.hh @@ -73,7 +73,8 @@ private: std::shared_ptr<boost::asio::deadline_timer> theDeadlineTimer; std::shared_ptr<tcp::acceptor> theAcceptor; std::vector<std::shared_ptr<tcp::iostream>> theStreams; - std::map<short, boost::posix_time::ptime > lastHeartbeats; + //// heartbeats disables on client side + //std::map<short, boost::posix_time::ptime > lastHeartbeats; std::map<short, std::pair<short, boost::posix_time::ptime > > lastLhTimes; std::map<short, ChannelID> _clientChannelMap; @@ -82,13 +83,14 @@ private: std::vector<double> _cachedParams; std::vector<double> _delayTimesClients; //time in seconds std::vector<double> _delayTimesChannels; //time in seconds - std::vector<int> _noOfClientsPerChannel; + std::vector<int> _noOfClientsPerChannel; std::string _currentTimeDelayFileName; void Timeout(const boost::system::error_code& err); void AcceptHandler(const boost::system::error_code& err); void EvalClientTiming(); - bool UpdateHeartbeats(short clientID); + //// heartbeats disables on client side + //bool UpdateHeartbeats(short clientID); bool ReadNumClientsFromConfig(std::vector<short>& numClVec); void dumpTimeDelays() const; }; diff --git a/SetEnv_rub_mn2standalone.bash b/SetEnv_rub_mn2standalone.bash new file mode 100755 index 00000000..890a929d --- /dev/null +++ b/SetEnv_rub_mn2standalone.bash @@ -0,0 +1,57 @@ +############################ +#### user defined variables +############################ +# path to root installation (will only be used if root is not already sourced -> ROOTSYS not set) +ROOT_DIR="/opt/root/6-32.04-AL9.4-gcc12.2.0" +# path to Minuit2 package +MINUIT2_STANDALONE="/data/iltschi/pawian_utils/Minuit2/Minuit2-6.33.01_AlmaLinux9.4_gcc12.2.0" +# path to boost build +BOOST_BUILD_PATH="/usr/share/boost-build" +# pawian internal (path to KMatrix store) +KMAT_DIR="/data/duldul/bertram/KMatStore/" +# pawian internal (path to Event store) +EVT_DIR="/data/duldul/bertram/EvtStore/" +# set Jamfile for bjam/b2 (a symlink Jamroot -> $JAM_FILE will be created) +JAM_FILE="Jamroot_rub_AL9_mn2standalone" + +#### check for existing pawian environment +if [[ -z "$TOP_DIR" ]]; then + TOP_DIR=$(realpath "${BASH_SOURCE[0]}") + TOP_DIR="${TOP_DIR%/*}" +else + echo "pawian environment already exists, aborting setup new env" + return +fi + +#### setup root (if needed) +if [[ -z "${ROOTSYS}" ]]; then + source ${ROOT_DIR}/bin/thisroot.sh +else + echo "root ${ROOTSYS} already sourced" +fi + +#### prepend minuit2 and pawian libs to LD_LIBRARY_PATH +LD_LIBRARY_PATH="${MINUIT2_STANDALONE}/lib:${TOP_DIR}/lib${LD_LIBRARY_PATH:+:${LD_LIBRARY_PATH}}" + +#### append PATH +PATH="${PATH:+${PATH}:}${TOP_DIR}/bin" + +#### export variables +export TOP_DIR +export KMAT_DIR +export EVT_DIR +export BOOST_BUILD_PATH +export MINUIT2_STANDALONE +export LD_LIBRARY_PATH +export PATH + +# create symlink to Jamroot file (remove old link before) +if [[ -L "${TOP_DIR}/Jamroot" ]]; then + jam_link_target=$(readlink "${TOP_DIR}/Jamroot") + if [[ "${jam_link_target}" != "${JAM_FILE}" ]]; then + rm ${TOP_DIR}/Jamroot + ln -s ${JAM_FILE} ${TOP_DIR}/Jamroot + fi +else + ln -s "${JAM_FILE}" ${TOP_DIR}/Jamroot +fi -- GitLab