Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
testTMPIFile.C
Go to the documentation of this file.
1/// \file
2/// \ingroup tutorial_io
3/// This macro shows the usage of TMPIFile to simulate event
4/// reconstruction and merging them in parallel.
5/// The JetEvent class is in $ROOTSYS/tutorials/tree/JetEvent.h,cxx
6///
7/// To run this macro do the following:
8/// ~~~{.bash}
9/// mpirun -np 4 root -b -l -q testTMPIFile.C
10/// ~~~
11///
12/// \macro_code
13///
14/// \author Taylor Childers, Yunsong Wang
15
16#include "TMPIFile.h"
17
18R__LOAD_LIBRARY(RMPI) // Work around autoloading issue when ROOT modules are enabled
19
20#ifdef TMPI_SECOND_RUN
21
22#include <chrono>
23#include <sstream>
24
25R__LOAD_LIBRARY(JetEvent_cxx)
26
27/* ---------------------------------------------------------------------------
28
29The idea of TMPIFile is to run N MPI ranks where some ranks are
30producing data (called workers), while other ranks are collecting data and
31writing it to disk (called collectors). The number of collectors can be
32configured and this should be optimized for each workflow and data size.
33
34This example uses a typical event processing loop, where every N events the
35TMPIFile::Sync() function is called. This call triggers the local TTree data
36to be sent via MPI to the collector rank where it is merged with all the
37other worker rank data and written to a TFile.
38
39An MPI Sub-Communictor is created for each collector which equally distributes
40the remaining ranks to distribute the workers among collectors.
41
42--------------------------------------------------------------------------- */
43
44void test_tmpi()
45{
46
47 Int_t N_collectors = 2; // specify how many collectors to run
48 Int_t sync_rate = 2; // workers sync every sync_rate events
49 Int_t events_per_rank = 6; // total events each rank will produce then exit
50 Int_t sleep_mean = 5; // simulate compute time for event processing
51 Int_t sleep_sigma = 2; // variation in compute time
52
53 // using JetEvent generator to create a data structure
54 // these parameters control this generator
55 Int_t jetm = 25;
56 Int_t trackm = 60;
57 Int_t hitam = 200;
58 Int_t hitbm = 100;
59
60 std::string treename = "test_tmpi";
61 std::string branchname = "event";
62
63 // set output filename
64 std::stringstream smpifname;
65 smpifname << "/tmp/merged_output_" << getpid() << ".root";
66
67 // Create new TMPIFile, passing the filename, setting read/write permissions
68 // and setting the number of collectors.
69 // If MPI_INIT has not been called already, the constructor of TMPIFile
70 // will call this.
71 TMPIFile *newfile = new TMPIFile(smpifname.str().c_str(), "RECREATE", N_collectors);
72 // set random number seed that is based on MPI rank
73 // this avoids producing the same events in each MPI rank
74 gRandom->SetSeed(gRandom->GetSeed() + newfile->GetMPIGlobalRank());
75
76 // only print log messages in MPI Rank 0
77 if (newfile->GetMPIGlobalRank() == 0) {
78 Info("test_tmpi", " running with parallel ranks: %d", newfile->GetMPIGlobalSize());
79 Info("test_tmpi", " running with collecting ranks: %d", N_collectors);
80 Info("test_tmpi", " running with working ranks: %d", (newfile->GetMPIGlobalSize() - N_collectors));
81 Info("test_tmpi", " running with sync rate: %d", sync_rate);
82 Info("test_tmpi", " running with events per rank: %d", events_per_rank);
83 Info("test_tmpi", " running with sleep mean: %d", sleep_mean);
84 Info("test_tmpi", " running with sleep sigma: %d", sleep_sigma);
85 Info("test_tmpi", " running with seed: %d", gRandom->GetSeed());
86 }
87
88 // print filename for each collector Rank
89 if (newfile->IsCollector()) {
90 Info("Collector", "[%d]\troot output filename = %s", newfile->GetMPIGlobalRank(), smpifname.str().c_str());
91 }
92
93 // This if statement splits the run-time functionality of
94 // workers and collectors.
95 if (newfile->IsCollector()) {
96 // Run by collector ranks
97 // This will run until all workers have exited
98 newfile->RunCollector();
99 } else {
100 // Run by worker ranks
101 // these ranks generate data to be written to TMPIFile
102
103 // create a TTree to store event data
104 TTree *tree = new TTree(treename.c_str(), "Event example with Jets");
105 // set the AutoFlush rate to be the same as the sync_rate
106 // this synchronizes the TTree branch compression
107 tree->SetAutoFlush(sync_rate);
108
109 // Create our fake event data generator
110 JetEvent *event = new JetEvent;
111
112 // add our branch to the TTree
113 tree->Branch(branchname.c_str(), "JetEvent", &event, 8000, 2);
114
115 // monitor timing
116 auto sync_start = std::chrono::high_resolution_clock::now();
117
118 // generate the specified number of events
119 for (int i = 0; i < events_per_rank; i++) {
120
121 auto start = std::chrono::high_resolution_clock::now();
122 // Generate one event
123 event->Build(jetm, trackm, hitam, hitbm);
124
125 auto evt_built = std::chrono::high_resolution_clock::now();
126 double build_time = std::chrono::duration_cast<std::chrono::duration<double>>(evt_built - start).count();
127
128 Info("Rank", "[%d] [%d]\tevt = %d;\tbuild_time = %f", newfile->GetMPIColor(), newfile->GetMPILocalRank(), i,
129 build_time);
130
131 // if our build time was significant, subtract that from the sleep time
132 auto adjusted_sleep = (int)(sleep_mean - build_time);
133 auto sleep = abs(gRandom->Gaus(adjusted_sleep, sleep_sigma));
134
135 // simulate the time taken by more complicated event generation
136 std::this_thread::sleep_for(std::chrono::seconds(int(sleep)));
137
138 // Fill the tree
139 tree->Fill();
140
141 // every sync_rate events, call the TMPIFile::Sync() function
142 // to trigger MPI collection of local data
143 if ((i + 1) % sync_rate == 0) {
144 // call TMPIFile::Sync()
145 newfile->Sync();
146
147 auto end = std::chrono::high_resolution_clock::now();
148 double sync_time = std::chrono::duration_cast<std::chrono::duration<double>>(end - sync_start).count();
149 Info("Rank", "[%d] [%d]\tevent collection time: %f", newfile->GetMPIColor(), newfile->GetMPILocalRank(),
150 sync_time);
151 sync_start = std::chrono::high_resolution_clock::now();
152 }
153 }
154
155 // synchronize any left over events
156 if (events_per_rank % sync_rate != 0) {
157 newfile->Sync();
158 }
159 }
160
161 // call Close on the file for clean exit.
162 Info("Rank", "[%d] [%d]\tclosing file", newfile->GetMPIColor(), newfile->GetMPILocalRank());
163 newfile->Close();
164
165 // open file and test contents
166 if (newfile->GetMPILocalRank() == 0) {
167 TString filename = newfile->GetMPIFilename();
168 Info("Rank", "[%d] [%d]\topening file: %s", newfile->GetMPIColor(), newfile->GetMPILocalRank(), filename.Data());
169 TFile file(filename.Data());
170 if (file.IsOpen()) {
171 file.ls();
172 TTree *tree = (TTree *)file.Get(treename.c_str());
173 if (tree)
174 tree->Print();
175
176 Info("Rank", "[%d] [%d]\tfile should have %d events and has %lld", newfile->GetMPIColor(),
177 newfile->GetMPILocalRank(), (newfile->GetMPILocalSize() - 1) * events_per_rank, tree->GetEntries());
178 }
179 }
180}
181
182void testTMPIFile(Bool_t secRun)
183{
184 auto start = std::chrono::high_resolution_clock::now();
185
186 test_tmpi();
187
188 auto end = std::chrono::high_resolution_clock::now();
189 double time = std::chrono::duration_cast<std::chrono::duration<double>>(end - start).count();
190 std::string msg = "Total elapsed time: ";
191 msg += std::to_string(time);
192 Info("testTMPIFile", "%s", msg.c_str());
193 Info("testTMPIFile", "exiting");
194}
195
196#else
197
198void testTMPIFile()
199{
200 Int_t flag;
201 MPI_Initialized(&flag);
202 if (!flag) {
203 MPI_Init(NULL, NULL);
204 }
205
206 // Get rank and size
207 Int_t rank, size;
208 MPI_Comm_rank(MPI_COMM_WORLD, &rank);
209 MPI_Comm_size(MPI_COMM_WORLD, &size);
210
211
212 // Procecss 0 generates JetEvent library
213 if (rank == 0) {
214 TString tutdir = gROOT->GetTutorialDir();
215 gSystem->Exec("cp " + tutdir + "/tree/JetEvent* .");
216 gROOT->ProcessLine(".L JetEvent.cxx+");
217 }
218 // Wait until it's done
219 MPI_Barrier(MPI_COMM_WORLD);
220
221 gROOT->ProcessLine("#define TMPI_SECOND_RUN yes");
222 gROOT->ProcessLine("#include \"" __FILE__ "\"");
223 gROOT->ProcessLine("testTMPIFile(true)");
224
225 // TMPIFile will do MPI_Finalize() when closing the file
226 Int_t finalized = 0;
227 MPI_Finalized(&finalized);
228 if (!finalized) {
229 MPI_Finalize();
230 }
231}
232
233#endif
int Int_t
Definition RtypesCore.h:45
bool Bool_t
Definition RtypesCore.h:63
#define R__LOAD_LIBRARY(LIBRARY)
Definition Rtypes.h:472
void Info(const char *location, const char *msgfmt,...)
Use this function for informational messages.
Definition TError.cxx:220
R__EXTERN C unsigned int sleep(unsigned int seconds)
#define gROOT
Definition TROOT.h:406
R__EXTERN TRandom * gRandom
Definition TRandom.h:62
R__EXTERN TSystem * gSystem
Definition TSystem.h:559
A ROOT file is a suite of consecutive data records (TKey instances) with a well defined format.
Definition TFile.h:54
virtual Double_t Gaus(Double_t mean=0, Double_t sigma=1)
Samples a random number from the standard Normal (Gaussian) Distribution with the given mean and sigm...
Definition TRandom.cxx:274
virtual void SetSeed(ULong_t seed=0)
Set the random generator seed.
Definition TRandom.cxx:608
virtual UInt_t GetSeed() const
Get the random generator seed.
Definition TRandom.cxx:641
Basic string class.
Definition TString.h:136
const char * Data() const
Definition TString.h:369
virtual Int_t Exec(const char *shellcmd)
Execute a command.
Definition TSystem.cxx:654
A TTree represents a columnar dataset.
Definition TTree.h:79
Definition file.py:1
Definition tree.py:1