Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
testTMPIFile.C
Go to the documentation of this file.
1/// \file
2/// \ingroup tutorial_io
3/// This macro shows the usage of TMPIFile to simulate event
4/// reconstruction and merging them in parallel.
5/// The JetEvent class is in $ROOTSYS/tutorials/tree/JetEvent.h,cxx
6///
7/// To run this macro do the following:
8/// ~~~{.bash}
9/// mpirun -np 4 root -b -l -q testTMPIFile.C
10/// ~~~
11///
12/// \macro_code
13///
14/// \author Taylor Childers, Yunsong Wang
15
16#include "TMPIFile.h"
17
18#ifdef TMPI_SECOND_RUN
19
20#include <chrono>
21#include <sstream>
22
23/* ---------------------------------------------------------------------------
24
25The idea of TMPIFile is to run N MPI ranks where some ranks are
26producing data (called workers), while other ranks are collecting data and
27writing it to disk (called collectors). The number of collectors can be
28configured and this should be optimized for each workflow and data size.
29
30This example uses a typical event processing loop, where every N events the
31TMPIFile::Sync() function is called. This call triggers the local TTree data
32to be sent via MPI to the collector rank where it is merged with all the
33other worker rank data and written to a TFile.
34
35An MPI Sub-Communictor is created for each collector which equally distributes
36the remaining ranks to distribute the workers among collectors.
37
38--------------------------------------------------------------------------- */
39
40void test_tmpi()
41{
42
43 Int_t N_collectors = 2; // specify how many collectors to run
44 Int_t sync_rate = 2; // workers sync every sync_rate events
45 Int_t events_per_rank = 6; // total events each rank will produce then exit
46 Int_t sleep_mean = 5; // simulate compute time for event processing
47 Int_t sleep_sigma = 2; // variation in compute time
48
49 // using JetEvent generator to create a data structure
50 // these parameters control this generator
51 Int_t jetm = 25;
52 Int_t trackm = 60;
53 Int_t hitam = 200;
54 Int_t hitbm = 100;
55
56 std::string treename = "test_tmpi";
57 std::string branchname = "event";
58
59 // set output filename
60 std::stringstream smpifname;
61 smpifname << "/tmp/merged_output_" << getpid() << ".root";
62
63 // Create new TMPIFile, passing the filename, setting read/write permissions
64 // and setting the number of collectors.
65 // If MPI_INIT has not been called already, the constructor of TMPIFile
66 // will call this.
67 TMPIFile *newfile = new TMPIFile(smpifname.str().c_str(), "RECREATE", N_collectors);
68 // set random number seed that is based on MPI rank
69 // this avoids producing the same events in each MPI rank
70 gRandom->SetSeed(gRandom->GetSeed() + newfile->GetMPIGlobalRank());
71
72 // only print log messages in MPI Rank 0
73 if (newfile->GetMPIGlobalRank() == 0) {
74 Info("test_tmpi", " running with parallel ranks: %d", newfile->GetMPIGlobalSize());
75 Info("test_tmpi", " running with collecting ranks: %d", N_collectors);
76 Info("test_tmpi", " running with working ranks: %d", (newfile->GetMPIGlobalSize() - N_collectors));
77 Info("test_tmpi", " running with sync rate: %d", sync_rate);
78 Info("test_tmpi", " running with events per rank: %d", events_per_rank);
79 Info("test_tmpi", " running with sleep mean: %d", sleep_mean);
80 Info("test_tmpi", " running with sleep sigma: %d", sleep_sigma);
81 Info("test_tmpi", " running with seed: %d", gRandom->GetSeed());
82 }
83
84 // print filename for each collector Rank
85 if (newfile->IsCollector()) {
86 Info("Collector", "[%d]\troot output filename = %s", newfile->GetMPIGlobalRank(), smpifname.str().c_str());
87 }
88
89 // This if statement splits the run-time functionality of
90 // workers and collectors.
91 if (newfile->IsCollector()) {
92 // Run by collector ranks
93 // This will run until all workers have exited
94 newfile->RunCollector();
95 } else {
96 // Run by worker ranks
97 // these ranks generate data to be written to TMPIFile
98
99 // create a TTree to store event data
100 TTree *tree = new TTree(treename.c_str(), "Event example with Jets");
101 // set the AutoFlush rate to be the same as the sync_rate
102 // this synchronizes the TTree branch compression
103 tree->SetAutoFlush(sync_rate);
104
105 // Create our fake event data generator
106 JetEvent *event = new JetEvent;
107
108 // add our branch to the TTree
109 tree->Branch(branchname.c_str(), "JetEvent", &event, 8000, 2);
110
111 // monitor timing
112 auto sync_start = std::chrono::high_resolution_clock::now();
113
114 // generate the specified number of events
115 for (int i = 0; i < events_per_rank; i++) {
116
117 auto start = std::chrono::high_resolution_clock::now();
118 // Generate one event
119 event->Build(jetm, trackm, hitam, hitbm);
120
121 auto evt_built = std::chrono::high_resolution_clock::now();
122 double build_time = std::chrono::duration_cast<std::chrono::duration<double>>(evt_built - start).count();
123
124 Info("Rank", "[%d] [%d]\tevt = %d;\tbuild_time = %f", newfile->GetMPIColor(), newfile->GetMPILocalRank(), i,
125 build_time);
126
127 // if our build time was significant, subtract that from the sleep time
128 auto adjusted_sleep = (int)(sleep_mean - build_time);
129 auto sleep = abs(gRandom->Gaus(adjusted_sleep, sleep_sigma));
130
131 // simulate the time taken by more complicated event generation
132 std::this_thread::sleep_for(std::chrono::seconds(int(sleep)));
133
134 // Fill the tree
135 tree->Fill();
136
137 // every sync_rate events, call the TMPIFile::Sync() function
138 // to trigger MPI collection of local data
139 if ((i + 1) % sync_rate == 0) {
140 // call TMPIFile::Sync()
141 newfile->Sync();
142
143 auto end = std::chrono::high_resolution_clock::now();
144 double sync_time = std::chrono::duration_cast<std::chrono::duration<double>>(end - sync_start).count();
145 Info("Rank", "[%d] [%d]\tevent collection time: %f", newfile->GetMPIColor(), newfile->GetMPILocalRank(),
146 sync_time);
147 sync_start = std::chrono::high_resolution_clock::now();
148 }
149 }
150
151 // synchronize any left over events
152 if (events_per_rank % sync_rate != 0) {
153 newfile->Sync();
154 }
155 }
156
157 // call Close on the file for clean exit.
158 Info("Rank", "[%d] [%d]\tclosing file", newfile->GetMPIColor(), newfile->GetMPILocalRank());
159 newfile->Close();
160
161 // open file and test contents
162 if (newfile->GetMPILocalRank() == 0) {
163 TString filename = newfile->GetMPIFilename();
164 Info("Rank", "[%d] [%d]\topening file: %s", newfile->GetMPIColor(), newfile->GetMPILocalRank(), filename.Data());
165 TFile file(filename.Data());
166 if (file.IsOpen()) {
167 file.ls();
168 TTree *tree = (TTree *)file.Get(treename.c_str());
169 if (tree)
170 tree->Print();
171
172 Info("Rank", "[%d] [%d]\tfile should have %d events and has %lld", newfile->GetMPIColor(),
173 newfile->GetMPILocalRank(), (newfile->GetMPILocalSize() - 1) * events_per_rank, tree->GetEntries());
174 }
175 }
176}
177
178void testTMPIFile(Bool_t secRun)
179{
180 auto start = std::chrono::high_resolution_clock::now();
181
182 test_tmpi();
183
184 auto end = std::chrono::high_resolution_clock::now();
185 double time = std::chrono::duration_cast<std::chrono::duration<double>>(end - start).count();
186 std::string msg = "Total elapsed time: ";
187 msg += std::to_string(time);
188 Info("testTMPIFile", "%s", msg.c_str());
189 Info("testTMPIFile", "exiting");
190}
191
192#else
193
194void testTMPIFile()
195{
196 Int_t flag;
197 MPI_Initialized(&flag);
198 if (!flag) {
199 MPI_Init(NULL, NULL);
200 }
201
202 // Get rank and size
203 Int_t rank, size;
204 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
205 MPI_Comm_size(MPI_COMM_WORLD, &size);
206
207
208 // Procecss 0 generates JetEvent library
209 if (rank == 0) {
210 TString tutdir = gROOT->GetTutorialDir();
211 gSystem->Exec("cp " + tutdir + "/tree/JetEvent* .");
212 gROOT->ProcessLine(".L JetEvent.cxx+");
213 }
214 // Wait until it's done
215 MPI_Barrier(MPI_COMM_WORLD);
216
217 gROOT->ProcessLine("#define TMPI_SECOND_RUN yes");
218 gROOT->ProcessLine("#include \"" __FILE__ "\"");
219 gROOT->ProcessLine("testTMPIFile(true)");
220
221 // TMPIFile will do MPI_Finalize() when closing the file
222 Int_t finalized = 0;
223 MPI_Finalized(&finalized);
224 if (!finalized) {
225 MPI_Finalize();
226 }
227}
228
229#endif
size_t size(const MatrixT &matrix)
retrieve the size of a square matrix
bool Bool_t
Definition RtypesCore.h:63
int Int_t
Definition RtypesCore.h:45
void Info(const char *location, const char *msgfmt,...)
Use this function for informational messages.
Definition TError.cxx:218
Option_t Option_t TPoint TPoint const char GetTextMagnitude GetFillStyle GetLineColor GetLineWidth GetMarkerStyle GetTextAlign GetTextColor GetTextSize void char Point_t Rectangle_t WindowAttributes_t Float_t Float_t Float_t Int_t Int_t UInt_t UInt_t Rectangle_t Int_t Int_t Window_t TString Int_t GCValues_t GetPrimarySelectionOwner GetDisplay GetScreen GetColormap GetNativeEvent const char const char dpyName wid window const char font_name cursor keysym reg const char only_if_exist regb h Point_t winding char text const char depth char const char Int_t count const char ColorStruct_t color const char filename
R__EXTERN C unsigned int sleep(unsigned int seconds)
#define gROOT
Definition TROOT.h:406
R__EXTERN TRandom * gRandom
Definition TRandom.h:62
R__EXTERN TSystem * gSystem
Definition TSystem.h:555
A ROOT file is an on-disk file, usually with extension .root, that stores objects in a file-system-li...
Definition TFile.h:53
virtual Double_t Gaus(Double_t mean=0, Double_t sigma=1)
Samples a random number from the standard Normal (Gaussian) Distribution with the given mean and sigm...
Definition TRandom.cxx:275
virtual void SetSeed(ULong_t seed=0)
Set the random generator seed.
Definition TRandom.cxx:615
virtual UInt_t GetSeed() const
Get the random generator seed.
Definition TRandom.cxx:651
Basic string class.
Definition TString.h:139
virtual Int_t Exec(const char *shellcmd)
Execute a command.
Definition TSystem.cxx:653
A TTree represents a columnar dataset.
Definition TTree.h:79
RVec< PromoteType< T > > abs(const RVec< T > &v)
Definition RVec.hxx:1832