Logo ROOT  
Reference Guide
 
Loading...
Searching...
No Matches
TMonaLisaWriter.cxx
Go to the documentation of this file.
1// @(#)root/monalisa:$Id$
2// Author: Andreas Peters 5/10/2005
3
4/*************************************************************************
5 * Copyright (C) 1995-2006, Rene Brun and Fons Rademakers. *
6 * All rights reserved. *
7 * *
8 * For the licensing terms see $ROOTSYS/LICENSE. *
9 * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11
12//////////////////////////////////////////////////////////////////////////
13// //
14// TMonaLisaWriter //
15// //
16// Class defining interface to MonaLisa Monitoring Services in ROOT. //
17// The TMonaLisaWriter object is used to send monitoring information to //
18// a MonaLisa server using the ML ApMon package (libapmoncpp.so/UDP //
19// packets). The MonaLisa ApMon library for C++ can be downloaded at //
20// http://monalisa.cacr.caltech.edu/monalisa__Download__ApMon.html, //
21// current version: //
22// http://monalisa.cacr.caltech.edu/download/apmon/ApMon_c-2.2.0.tar.gz //
23// //
24// The ROOT implementation is primary optimized for process/job //
25// monitoring, although all other generic MonaLisa ApMon functionality //
26// can be exploited through the ApMon class directly via //
27// dynamic_cast<TMonaLisaWriter*>(gMonitoringWriter)->GetApMon(). //
28// //
29// Additions/modifications by Fabrizio Furano 10/04/2008 //
30// - The implementation of TFile throughput and info sending was //
31// just sending 'regular' samples about the activity of the single TFile//
32// instance that happened to trigger an activity in the right moment. //
33// - Now TMonaLisaWriter keeps internally track of every activity //
34// and regularly sends summaries valid for all the files which had //
35// activity in the last time interval. //
36// - Additionally, it's now finalized the infrastructure able to measure//
37// and keep track of the file Open latency. A packet is sent for each //
38// successful Open, sending the measures of the latencies for the //
39// various phases of the open. Currently exploited fully by TAlienFile //
40// and TXNetFile. Easy to report from other TFiles too. //
41// - Now, the hook for the Close() func triggers sending of a packet //
42// containing various information about the performance related to that //
43// file only. //
44// - Added support also for performance monitoring when writing //
45//////////////////////////////////////////////////////////////////////////
46
47#include <cmath>
48#include <pthread.h>
49#include "TMonaLisaWriter.h"
50#include "TSystem.h"
51#include "TGrid.h"
52#include "TFile.h"
53#include "TUrl.h"
54#include "TStopwatch.h"
55#include "Riostream.h"
56#include "TParameter.h"
57#include "THashList.h"
58
59
61
62
63// Information which is kept about an alive instance of TFile
64class MonitoredTFileInfo: public TObject {
65private:
66 TFile *fileinst;
67public:
68 MonitoredTFileInfo(TFile *file, Double_t timenow): TObject(), fileinst(file) {
69 if (file->InheritsFrom("TXNetFile"))
70 fFileClassName = "TXNetFile";
71 else
72 fFileClassName = file->ClassName();
73
74 fLastBytesRead = 0;
75 fLastBytesWritten = 0;
76
77 fTempReadBytes = 0;
78 fTempWrittenBytes = 0;
79
80 fLastResetTime = timenow;
81 fCreationTime = timenow;
82
83 fKillme = kFALSE;
84 }
85
86 Double_t fCreationTime;
87
88 TString fFileClassName;
89
90 Long64_t fLastBytesRead;
91 Long64_t fTempReadBytes;
92 Long64_t fLastBytesWritten;
93 Long64_t fTempWrittenBytes;
94
95 Double_t fLastResetTime;
96
97 Bool_t fKillme; // tells to remove the instance after the next computation step
98
99 void GetThroughputs(Long64_t &readthr, Long64_t &writethr, Double_t timenow, Double_t prectime) {
100 readthr = -1;
101 writethr = -1;
102 Double_t t = std::min(prectime, fLastResetTime);
103
104 Int_t mselapsed = std::round(std::floor(((timenow - t) * 1000)));
105 mselapsed = std::max(mselapsed, 1);
106
107 readthr = fTempReadBytes / mselapsed * 1000;
108 writethr = fTempWrittenBytes / mselapsed * 1000;
109 }
110
111 void UpdateFileStatus(TFile *file) {
112 fTempReadBytes = file->GetBytesRead() - fLastBytesRead;
113 fTempWrittenBytes = file->GetBytesWritten() - fLastBytesWritten;
114 }
115
116 void ResetFileStatus(TFile *file, Double_t timenow) {
117 if (fKillme) return;
118 fLastBytesRead = file->GetBytesRead();
119 fLastBytesWritten = file->GetBytesWritten();
120 fTempReadBytes = 0;
121 fTempWrittenBytes = 0;
122 fLastResetTime = timenow;
123 }
124
125 void ResetFileStatus(Double_t timenow) {
126 if (fKillme) return;
127 ResetFileStatus(fileinst, timenow);
128 }
129
130};
131
132
133
134
135// Helper used to build up ongoing throughput summaries
136class MonitoredTFileSummary: public TNamed {
137public:
138 MonitoredTFileSummary(TString &fileclassname): TNamed(fileclassname, fileclassname) {
139 fBytesRead = 0;
140 fBytesWritten = 0;
141 fReadThroughput = 0;
142 fWriteThroughput = 0;
143 }
144
145 Long64_t fBytesRead;
146 Long64_t fBytesWritten;
147 Long64_t fReadThroughput;
148 Long64_t fWriteThroughput;
149
150 void Update(MonitoredTFileInfo *mi, Double_t timenow, Double_t prectime) {
151 Long64_t rth, wth;
152 mi->GetThroughputs(rth, wth, timenow, prectime);
153
154 fBytesRead += mi->fTempReadBytes;
155 fBytesWritten += mi->fTempWrittenBytes;
156
157 if (rth > 0) fReadThroughput += rth;
158 if (wth > 0) fWriteThroughput += wth;
159 }
160
161};
162
163
164
165////////////////////////////////////////////////////////////////////////////////
166/// Create MonaLisa write object.
167
168TMonaLisaWriter::TMonaLisaWriter(const char *monserver, const char *montag,
169 const char *monid, const char *monsubid,
170 const char *option)
171{
172 fMonInfoRepo = new std::map<UInt_t, MonitoredTFileInfo *>;
173
174 Init(monserver, montag, monid, monsubid, option);
175}
176
177////////////////////////////////////////////////////////////////////////////////
178/// Creates a TMonaLisaWriter object to send monitoring information to a
179/// MonaLisa server using the MonaLisa ApMon package (libapmoncpp.so/UDP
180/// packets). The MonaLisa ApMon library for C++ can be downloaded at
181/// http://monalisa.cacr.caltech.edu/monalisa__Download__ApMon.html,
182/// current version:
183/// http://monalisa.cacr.caltech.edu/download/apmon/ApMon_cpp-2.0.6.tar.gz
184///
185/// The ROOT implementation is primary optimized for process/job monitoring,
186/// although all other generic MonaLisa ApMon functionality can be exploited
187/// through the ApMon class directly (gMonitoringWriter->GetApMon()).
188///
189/// Monitoring information in MonaLisa is structured in the following tree
190/// structure:
191/// <farmname>
192/// |
193/// ---> <nodename1>
194/// |
195/// ---> <key1> - <value1>
196/// ---> <key2> - <value2>
197/// ---> <nodename2>
198/// |
199/// ---> <key3> - <value3>
200/// ---> <key4> - <value4>
201///
202/// The parameter monid is equivalent to the MonaLisa node name, for the
203/// case of process monitoring it can be just an identifier to classify
204/// the type of jobs e.g. "PROOF_PROCESSING".
205/// If monid is not specified, TMonaLisaWriter tries to set it in this order
206/// from environment variables:
207/// - PROOF_JOB_ID
208/// - GRID_JOB_ID
209/// - LCG_JOB_ID
210/// - ALIEN_MASTERJOB_ID
211/// - ALIEN_PROC_ID
212///
213/// The parameter montag is equivalent to the MonaLisa farm name, for the
214/// case of process monitoring it can be a process identifier e.g. a PROOF
215/// session ID.
216///
217/// The parameter monserver specifies the server to whom to send the
218/// monitoring UDP packets. If not specified, the hostname (the port is
219/// a default one) is specified in the environment variable APMON_CONFIG.
220///
221/// To use TMonaLisaWriter, libMonaLisa.so has to be loaded.
222///
223/// According to the fact, that the deepness of the MonaLisa naming scheme
224/// is only 3 (<farm><node><value>), a special naming scheme is used for
225/// process monitoring. There is a high-level method to send progress
226/// information of Tree analysis (# of events, datasize).
227/// To distinguish individual nodes running the processing, part of the
228/// information is kept in the <value> parameter of ML.
229/// <value> is named as:
230/// <site-name>:<host-name>:<pid>:<valuetag>
231/// <site-name> is taken from an environment variable in the following order:
232/// - PROOF_SITE
233/// - GRID_SITE
234/// - ALIEN_SITE
235/// - default 'none'
236/// <host-name> is taken from gSystem->Hostname()
237/// <pid> is the process ID of the ROOT process
238///
239/// Example of use for Process Monitoring:
240/// new TMonaLisaWriter("BATCH_ANALYSIS","AnalysisLoop-00001","lxplus050.cern.ch");
241/// Once when you create an analysis task, execute
242/// gMonitoringWriter->SendInfoUser("myname");
243/// gMonitoringWriter->SendInfoDescription("My first Higgs analysis");
244/// gMonitoringWriter->SendInfoTime();
245/// gMonitoringWriter->SendInfoStatus("Submitted");
246///
247/// On each node executing a subtask, you can set the status of this subtask:
248/// gMonitoringWriter->SendProcessingStatus("Started");
249/// During the processing of your analysis you can send progress updates:
250/// gMonitoringWriter->SendProcessProgress(100,1000000); <= 100 events, 1MB processed
251/// ....
252/// gMonitoringWriter-SendProcessingStatus("Finished");
253/// delete gMonitoringWriter; gMonitoringWriter=0;
254///
255/// Example of use for any Generic Monitoring information:
256/// TList *valuelist = new TList();
257/// valuelist->SetOwner(kTRUE);
258/// // append a text object
259/// TMonaLisaText *valtext = new TMonaLisaText("decaychannel","K->eeg");
260/// valuelist->Add(valtext);
261/// // append a double value
262/// TMonaLisaValue* valdouble = new TMonaLisaValue("n-gamma",5);
263/// valuelist->Add(valdouble);
264/// Bool_t success = SendParameters(valuelist);
265/// delete valuelist;
266///
267/// option:
268/// "global": gMonitoringWriter is initialized with this instance
269
270void TMonaLisaWriter::Init(const char *monserver, const char *montag, const char *monid,
271 const char *monsubid, const char *option)
272{
273 SetName(montag);
274 SetTitle(montag);
275
276 fVerbose = kFALSE; // no verbosity as default
277
281 fLastProgressTime = time(0);
283
284 fReportInterval = 120; // default interval is 120, to prevent flooding
285 if (gSystem->Getenv("APMON_INTERVAL")) {
286 fReportInterval = atoi(gSystem->Getenv("APMON_INTERVAL"));
287 if (fReportInterval < 1)
289 Info("TMonaLisaWriter","Setting APMON Report Interval to %d seconds",fReportInterval);
290 }
291
292 char *apmon_config[1] =
293 { ((monserver == 0) ? (char *) gSystem->Getenv("APMON_CONFIG") : (char *) monserver) };
294 if (apmon_config[0] == 0) {
295 Error("TMonaLisaWriter",
296 "Disabling apmon monitoring since env variable APMON_CONFIG was not found and the monitoring server is not specified in the constructor!");
298 return;
299 }
300
301 try {
302 fApmon = new ApMon(1, apmon_config);
303 fApmon->setConfRecheck(false);
304 fApmon->setJobMonitoring(false);
305 //((ApMon*)fApmon)->setSysMonitoring(false);
306 //((ApMon*)fApmon)->setGenMonitoring(false);
307 } catch (runtime_error &e) {
308 Error("TMonaLisaWriter", "Error initializing ApMon: %s", e.what());
309 Error("TMonaLisaWriter", "Disabling apmon.");
311 return;
312 }
313
314 TString clustername="ROOT_";
315
316 if (montag == 0) {
317 if (gSystem->Getenv("PROOF_SITE")) {
318 clustername+=(gSystem->Getenv("PROOF_SITE"));
319 } else if (gSystem->Getenv("GRID_SITE")) {
320 clustername+=(gSystem->Getenv("GRID_SITE"));
321 } else if (gSystem->Getenv("LCG_SITE")) {
322 clustername+=(gSystem->Getenv("LCG_SITE"));
323 } else if (gSystem->Getenv("ALIEN_SITE")) {
324 clustername+=(gSystem->Getenv("ALIEN_SITE"));
325 } else {
326 clustername += TString("none");
327 }
328 SetName(clustername);
329 SetTitle(clustername);
330 } else {
331 SetName(clustername+TString(montag));
332 SetTitle(clustername+TString(montag));
333 }
334
336 fPid = gSystem->GetPid();
337
338 if (monid == 0) {
339 if (gSystem->Getenv("PROOF_QUERY_ID"))
340 fJobId = gSystem->Getenv("PROOF_QUERY_ID");
341 else if (gSystem->Getenv("GRID_JOB_ID"))
342 fJobId = gSystem->Getenv("GRID_JOB_ID");
343 else if (gSystem->Getenv("LCG_JOB_ID"))
344 fJobId = gSystem->Getenv("LCG_JOB_ID");
345 else if (gSystem->Getenv("ALIEN_MASTERJOBID"))
346 fJobId = gSystem->Getenv("ALIEN_MASTERJOBID");
347 else if (gSystem->Getenv("ALIEN_PROC_ID"))
348 fJobId = gSystem->Getenv("ALIEN_PROC_ID");
349 else
350 fJobId = "-no-job-id";
351 } else {
352 fJobId = monid;
353 }
354
355 if (monsubid == 0) {
356 if (gSystem->Getenv("PROOF_PROC_ID")) {
357 fSubJobId = gSystem->Getenv("PROOF_PROC_ID");
358 } else if (gSystem->Getenv("ALIEN_PROC_ID")) {
359 fSubJobId = gSystem->Getenv("ALIEN_PROC_ID");
360 } else {
362 }
363 } else {
364 fSubJobId = monsubid;
365 }
366
367
368 if (fVerbose)
369 Info("Initialized for ML Server <%s> - Setting ClusterID <%s> JobID <%s> SubID <%s>\n",
370 apmon_config[0], fName.Data() ,fJobId.Data(),fSubJobId.Data());
371
373
374 TString optionStr(option);
375 if (optionStr.Contains("global"))
376 gMonitoringWriter = this;
377}
378
379////////////////////////////////////////////////////////////////////////////////
380/// Cleanup.
381
383{
384 if (fMonInfoRepo) {
385
386 std::map<UInt_t, MonitoredTFileInfo *>::iterator iter = fMonInfoRepo->begin();
387 while (iter != fMonInfoRepo->end()) {
388 delete iter->second;
389 ++iter;
390 }
391
392 fMonInfoRepo->clear();
393 delete fMonInfoRepo;
394 fMonInfoRepo = 0;
395 }
396
397 if (gMonitoringWriter == this)
399}
400
401////////////////////////////////////////////////////////////////////////////////
402/// Sends a <status> text to MonaLisa following the process scheme:
403/// <site> --> <jobid> --> 'status' = <status>
404/// Used to set a global status for a groupjob, e.g.
405/// a master-job or the general status of PROOF processing.
406
408{
409 if (!fInitialized) {
410 Error("SendInfoStatus", "Monitoring is not properly initialized!");
411 return kFALSE;
412 }
413
414 Bool_t success = kFALSE;
415
416 TList *valuelist = new TList();
417 valuelist->SetOwner(kTRUE);
418
419 // create a monitor text object
420 TMonaLisaText *valtext = new TMonaLisaText("status", status);
421 valuelist->Add(valtext);
422
423 // send it to monalisa
424 success = SendParameters(valuelist);
425
426 delete valuelist;
427 return success;
428}
429
430////////////////////////////////////////////////////////////////////////////////
431/// Sends the <user> text to MonaLisa following the process scheme:
432/// <site> --> <jobid> --> 'user' = <user>
433
435{
436 if (!fInitialized) {
437 Error("TMonaLisaWriter",
438 "Monitoring initialization has failed - you can't send to MonaLisa!");
439 return kFALSE;
440 }
441
442 Bool_t success = kFALSE;
443
444 TList *valuelist = new TList();
445 valuelist->SetOwner(kTRUE);
446
447 const char *localuser;
448 if (user) {
449 localuser = user;
450 } else {
451 if (gGrid) {
452 localuser = gGrid->GetUser();
453 } else {
454 localuser = "unknown";
455 }
456 }
457
458 // create a monitor text object
459 TMonaLisaText *valtext = new TMonaLisaText("user", localuser);
460 valuelist->Add(valtext);
461
462 // send it to monalisa
463 success = SendParameters(valuelist);
464
465 delete valuelist;
466 return success;
467}
468
469////////////////////////////////////////////////////////////////////////////////
470/// Sends the description <jobtag> following the processing scheme:
471/// <site> --> <jobid> --> 'jobname' = <jobtag>
472
474{
475 if (!fInitialized) {
476 Error("SendInfoDescription",
477 "Monitoring is not properly initialized!");
478 return kFALSE;
479 }
480
481 Bool_t success = kFALSE;
482
483 TList *valuelist = new TList();
484 valuelist->SetOwner(kTRUE);
485
486 // create a monitor text object
487 TMonaLisaText *valtext = new TMonaLisaText("jobname", jobtag);
488 valuelist->Add(valtext);
489
490 // send it to monalisag
491 success = SendParameters(valuelist);
492
493 delete valuelist;
494 return success;
495}
496
497////////////////////////////////////////////////////////////////////////////////
498/// Sends the current time to MonaLisa following the processing scheme
499/// <site> --> <jobid> --> 'time' = >unixtimestamp<
500
502{
503 if (!fInitialized) {
504 Error("SendInfoTime", "Monitoring is not properly initialized!");
505 return kFALSE;
506 }
507
508 Bool_t success = kFALSE;
509
510 TList *valuelist = new TList();
511 valuelist->SetOwner(kTRUE);
512
513 TString valtime = (Int_t) time(0);
514
515 // create a monitor text object
516 TMonaLisaText *valtext = new TMonaLisaText("time", valtime);
517 valuelist->Add(valtext);
518
519 // send it to monalisa
520 success = SendParameters(valuelist);
521
522 delete valuelist;
523 return success;
524}
525
526////////////////////////////////////////////////////////////////////////////////
527/// Send the procesing status 'status' to MonaLisa following the
528/// processing scheme:
529/// <site> --> <jobid> --> 'status' = <status>
530/// Used, to set the processing status of individual subtaks e.g. the
531/// status of a batch (sub-)job or the status of a PROOF slave
532/// participating in query <jobid>
533
534Bool_t TMonaLisaWriter::SendProcessingStatus(const char *status, Bool_t restarttimer)
535{
536 if (restarttimer) {
538 }
539
540 if (!fInitialized) {
541 Error("TMonaLisaWriter",
542 "Monitoring initialization has failed - you can't send to MonaLisa!");
543 return kFALSE;
544 }
545
546 Bool_t success = kFALSE;
547
548 TList *valuelist = new TList();
549 valuelist->SetOwner(kTRUE);
550
551 // create a monitor text object
552 TMonaLisaText *valtext = new TMonaLisaText("status", status);
553 valuelist->Add(valtext);
554
555 TMonaLisaText *valhost = new TMonaLisaText("hostname",fHostname);
556 valuelist->Add(valhost);
557
558 TMonaLisaText *valsid = new TMonaLisaText("subid", fSubJobId.Data());
559 valuelist->Add(valsid);
560
561 // send it to monalisa
562 success = SendParameters(valuelist);
563
564 delete valuelist;
565 return success;
566}
567
568////////////////////////////////////////////////////////////////////////////////
569/// Send the procesing progress to MonaLisa.
570
572{
573 if (!force && (time(0)-fLastProgressTime) < fReportInterval) {
574 // if the progress is not forced, we send maximum < fReportInterval per second!
575 return kFALSE;
576 }
577
578 if (!fInitialized) {
579 Error("SendProcessingProgress",
580 "Monitoring is not properly initialized!");
581 return kFALSE;
582 }
583
584 Bool_t success = kFALSE;
585
586 TList *valuelist = new TList();
587 valuelist->SetOwner(kTRUE);
588
589 // create a monitor text object
590 TMonaLisaValue *valevent = new TMonaLisaValue("events", nevent);
591 TMonaLisaValue *valbyte = new TMonaLisaValue("processedbytes", nbytes);
592 TMonaLisaValue *valrealtime = new TMonaLisaValue("realtime",fStopwatch.RealTime());
593 TMonaLisaValue *valcputime = new TMonaLisaValue("cputime",fStopwatch.CpuTime());
594
595 ProcInfo_t pinfo;
596 gSystem->GetProcInfo(&pinfo);
597 Double_t totmem = (Double_t)(pinfo.fMemVirtual) * 1024.;
598 Double_t rssmem = (Double_t)(pinfo.fMemResident) * 1024.;
599 Double_t shdmem = 0.;
600
601 TMonaLisaValue *valtotmem = new TMonaLisaValue("totmem",totmem);
602 TMonaLisaValue *valrssmem = new TMonaLisaValue("rssmem",rssmem);
603 TMonaLisaValue *valshdmem = new TMonaLisaValue("shdmem",shdmem);
604
605 TMonaLisaText *valsid = new TMonaLisaText("subid", fSubJobId.Data());
606 valuelist->Add(valsid);
607 valuelist->Add(valevent);
608 valuelist->Add(valbyte);
609 valuelist->Add(valrealtime);
610 valuelist->Add(valcputime);
611 valuelist->Add(valtotmem);
612 valuelist->Add(valrssmem);
613 valuelist->Add(valshdmem);
614
615 TString strevents="";
616 strevents += nevent;
617 TString strbytes="";
618 strbytes += nbytes;
619 TString strcpu="";
620 strcpu += fStopwatch.CpuTime();
621 TString strreal="";
622 strreal += fStopwatch.RealTime();
623 TString strtotmem="";
624 strtotmem += totmem;
625 TString strrssmem="";
626 strrssmem += rssmem;
627 TString strshdmem="";
628 strshdmem += shdmem;
629
631
632 TMonaLisaText *textevent = new TMonaLisaText("events_str", strevents.Data());
633 TMonaLisaText *textbyte = new TMonaLisaText("processedbytes_str", strbytes.Data());
634 TMonaLisaText *textreal = new TMonaLisaText("realtime_str", strreal.Data());
635 TMonaLisaText *textcpu = new TMonaLisaText("cputime_str", strcpu.Data());
636 TMonaLisaText *texttotmem = new TMonaLisaText("totmem_str", strtotmem.Data());
637 TMonaLisaText *textrssmem = new TMonaLisaText("rssmem_str", strrssmem.Data());
638 TMonaLisaText *textshdmem = new TMonaLisaText("shdmem_str", strshdmem.Data());
639 valuelist->Add(textevent);
640 valuelist->Add(textbyte);
641 valuelist->Add(textcpu);
642 valuelist->Add(textreal);
643 valuelist->Add(texttotmem);
644 valuelist->Add(textrssmem);
645 valuelist->Add(textshdmem);
646
647 TMonaLisaText *valhost = new TMonaLisaText("hostname",fHostname);
648 valuelist->Add(valhost);
649
650 // send it to monalisa
651 success = SendParameters(valuelist);
652 fLastProgressTime = time(0);
653 delete valuelist;
654 return success;
655}
656
657////////////////////////////////////////////////////////////////////////////////
658/// Send the fileopen progress to MonaLisa.
659/// If openphases=0 it means that the information is to be stored
660/// in a temp space, since there is not yet an object where to attach it to.
661/// This is typical in the static Open calls.
662/// The temp openphases are put into a list as soon as one is specified.
663///
664/// If thisopenphasename=0 it means that the stored phases (temp and object)
665/// have to be cleared.
666
668 const char *openphasename,
669 Bool_t forcesend)
670{
671 if (!fInitialized) {
672 Error("SendFileOpenProgress",
673 "Monitoring is not properly initialized!");
674 return kFALSE;
675 }
676
677 // Create the list, if not yet done
678 if (!fTmpOpenPhases && !openphases) {
679 fTmpOpenPhases = new TList;
681 }
682
683 if (!openphasename) {
684 // This means "reset my phases"
686 return kTRUE;
687 }
688
689 // Take a measurement
692
693 if (!openphases) {
694 fTmpOpenPhases->Add(nfo);
695 } else {
696 // Move info temporarly saved to object list
698 TParameter<Double_t> *nf = 0;
699 while ((nf = (TParameter<Double_t> *)nxt()))
700 openphases->Add(nf);
701 // Add this measurement
702 openphases->Add(nfo);
703 // Reset the temporary list
704 if (fTmpOpenPhases) {
707 }
708
709 }
710
711 if (!forcesend) return kTRUE;
712 if (!file) return kTRUE;
713
714 TList *op = openphases ? openphases : fTmpOpenPhases;
715
716 Bool_t success = kFALSE;
717
718
719 TList *valuelist = new TList();
720 valuelist->SetOwner(kTRUE);
721
722 // create a monitor text object
723
724 TMonaLisaText *valhost = new TMonaLisaText("hostname",fHostname);
725 valuelist->Add(valhost);
726 TMonaLisaText *valsid = new TMonaLisaText("subid", fSubJobId.Data());
727 valuelist->Add(valsid);
728 TMonaLisaText *valdest = new TMonaLisaText("destname", file->GetEndpointUrl()->GetHost());
729 valuelist->Add(valdest);
730
731 TMonaLisaValue *valfid = new TMonaLisaValue("fileid", file->GetFileCounter());
732 valuelist->Add(valfid);
733 TString strfid = Form("%lld", file->GetFileCounter());
734 TMonaLisaText *valstrfid = new TMonaLisaText("fileid_str", strfid.Data());
735 valuelist->Add(valstrfid);
736
737 Int_t kk = 1;
738 TIter nxt(op);
739 TParameter<Double_t> *nf1 = 0;
741 while ((nf1 = (TParameter<Double_t> *)nxt())) {
742 TString s = Form("openphase%d_%s", kk, nf0->GetName());
743 TMonaLisaValue *v = new TMonaLisaValue(s.Data(), nf1->GetVal() - nf0->GetVal());
744 valuelist->Add(v);
745 // Go to next
746 nf0 = nf1;
747 kk++;
748 }
749
750 // Now send how much time was elapsed in total
751 nf0 = (TParameter<Double_t> *)op->First();
752 nf1 = (TParameter<Double_t> *)op->Last();
753 TMonaLisaValue *valtottime =
754 new TMonaLisaValue("total_open_time", nf1->GetVal() - nf0->GetVal());
755 valuelist->Add(valtottime);
756
757 // send it to monalisa
758 success = SendParameters(valuelist);
759 delete valuelist;
760 return success;
761}
762
763////////////////////////////////////////////////////////////////////////////////
764
766 if (!fInitialized) {
767 Error("SendFileCloseEvent",
768 "Monitoring is not properly initialized!");
769 return kFALSE;
770 }
771
772 Bool_t success = kFALSE;
773 Double_t timenow = fFileStopwatch.RealTime();
775
776 MonitoredTFileInfo *mi = 0;
777 std::map<UInt_t, MonitoredTFileInfo *>::iterator iter = fMonInfoRepo->find(file->GetUniqueID());
778 if (iter != fMonInfoRepo->end()) mi = iter->second;
779
780 Double_t timelapsed = 0.0;
781 if (mi) timelapsed = timenow - mi->fCreationTime;
782
783 TList *valuelist = new TList();
784 valuelist->SetOwner(kTRUE);
785
786 TString valname;
787 TString pfx = file->ClassName();
788 if (file->InheritsFrom("TXNetFile"))
789 pfx = "TXNetFile";
790
791 pfx += "_";
792
793 // The info to be sent is the one relative to the specific file
794
795 TMonaLisaText *valdest = new TMonaLisaText("destname",file->GetEndpointUrl()->GetHost());
796 valuelist->Add(valdest);
797 TMonaLisaValue *valfid = new TMonaLisaValue("fileid",file->GetFileCounter());
798 valuelist->Add(valfid);
799 TString strfid="";
800 strfid+=file->GetFileCounter();
801 TMonaLisaText *valstrfid = new TMonaLisaText("fileid_str",strfid.Data());
802 valuelist->Add(valstrfid);
803
804 valname = pfx;
805 valname += "readbytes";
806 TMonaLisaValue *valread = new TMonaLisaValue(valname, file->GetBytesRead());
807 valuelist->Add(valread);
808
809// TString strbytes_r="";
810// strbytes_r += file->GetBytesRead();
811// TMonaLisaText *valstrread = new TMonaLisaText("readbytes_str", strbytes_r.Data());
812// valuelist->Add(valstrread);
813
814 valname = pfx;
815 valname += "writtenbytes";
816 TMonaLisaValue *valwrite = new TMonaLisaValue(valname, file->GetBytesWritten());
817 valuelist->Add(valwrite);
818
819// TString strbytes_w="";
820// strbytes_w += file->GetBytesWritten();
821// TMonaLisaText *valstrwrite = new TMonaLisaText("writtenbytes_str", strbytes_w.Data());
822// valuelist->Add(valstrwrite);
823
824 int thput;
825 if (timelapsed > 0.001) {
826 Int_t selapsed = std::round(std::floor(timelapsed * 1000));
827
828 thput = file->GetBytesRead() / selapsed * 1000;
829 valname = pfx;
830 valname += "filethrpt_rd";
831 TMonaLisaValue *valreadthavg = new TMonaLisaValue(valname, thput);
832 valuelist->Add(valreadthavg);
833
834 thput = file->GetBytesWritten() / selapsed * 1000;
835 valname = pfx;
836 valname += "filethrpt_wr";
837 TMonaLisaValue *valwritethavg = new TMonaLisaValue(valname, thput);
838 valuelist->Add(valwritethavg);
839 }
840
841 // And the specific file summary has to be removed from the repo
842 if (mi) {
843 mi->UpdateFileStatus(file);
844 mi->fKillme = kTRUE;
845 }
846
847 // send it to monalisa
848 success = SendParameters(valuelist);
849
850
851 delete valuelist;
852 return success;
853}
854////////////////////////////////////////////////////////////////////////////////
855
857 return SendFileCheckpoint(file);
858}
859////////////////////////////////////////////////////////////////////////////////
860
862 return SendFileCheckpoint(file);
863}
864////////////////////////////////////////////////////////////////////////////////
865
867{
868 if (!fInitialized) {
869 Error("SendFileCheckpoint",
870 "Monitoring is not properly initialized!");
871 return kFALSE;
872 }
873
874 if (!file->IsOpen()) return kTRUE;
875
876 // We cannot handle this kind of ongoing averaged monitoring for a file which has not an unique id. Sorry.
877 // This seems to affect raw files, for which only the Close() info is available
878 // Removing this check causes a mess for non-raw and raw TFiles, because
879 // the UUID in the Init phase has not yet been set, and the traffic
880 // reported during that phase is reported wrongly, causing various leaks and troubles
881 // TFiles without an unique id can be monitored only in their Open/Close event
882 if (!file->TestBit(kHasUUID)) return kTRUE;
883
884 Double_t timenow = fFileStopwatch.RealTime();
886
887 // This info has to be gathered in any case.
888
889 // Check if an MonitoredTFileInfo instance is already available
890 // If not, create one
891 MonitoredTFileInfo *mi = 0;
892 std::map<UInt_t, MonitoredTFileInfo *>::iterator iter = fMonInfoRepo->find(file->GetUniqueID());
893 if (iter != fMonInfoRepo->end()) mi = iter->second;
894 if (!mi) {
895
896 mi = new MonitoredTFileInfo(file, timenow);
897 if (mi) fMonInfoRepo->insert( make_pair( file->GetUniqueID(), mi ) );
898 }
899
900 // And now we get those partial values
901 if (mi) mi->UpdateFileStatus(file);
902
903 // Send the fileread progress to MonaLisa only if required or convenient
904 if ( timenow - fLastRWSendTime < fReportInterval) {
905 // if the progress is not forced, we send maximum 1 every fReportInterval seconds!
906 return kFALSE;
907 }
908
909 Bool_t success = kFALSE;
910
911 TList *valuelist = new TList();
912 valuelist->SetOwner(kTRUE);
913
914 TString valname;
915
916 // We send only a little throughput summary info
917 // Instead we send the info for the actual file passed
918
919 TMonaLisaText *valhost = new TMonaLisaText("hostname",fHostname);
920 valuelist->Add(valhost);
921 TMonaLisaText *valsid = new TMonaLisaText("subid", fSubJobId.Data());
922 valuelist->Add(valsid);
923
924 // First of all, we create an internal summary, sorted by kind of TFile used
925 THashList summary;
926 summary.SetOwner(kTRUE);
927
928 iter = fMonInfoRepo->begin();
929 if (iter != fMonInfoRepo->end()) mi = iter->second;
930 else mi = 0;
931
932 while (mi) {
933 MonitoredTFileSummary *sum = static_cast<MonitoredTFileSummary *>(summary.FindObject(mi->fFileClassName));
934 if (!sum) {
935 sum = new MonitoredTFileSummary(mi->fFileClassName);
936 if (sum) summary.AddLast(sum);
937 }
938
939 if (sum) {
940 sum->Update(mi, timenow, fLastRWSendTime);
941 mi->ResetFileStatus(timenow);
942 }
943
944 // This could be an info about an already closed file
945 if (mi->fKillme) {
946 iter->second = 0;
947 }
948
949 ++iter;
950 if (iter != fMonInfoRepo->end())
951 mi = iter->second;
952 else
953 mi = 0;
954 }
955
956 for (iter = fMonInfoRepo->begin(); iter != fMonInfoRepo->end(); )
957 {
958 if (!iter->second)
959 iter = fMonInfoRepo->erase(iter);
960 else
961 ++iter;
962 }
963
964 // This info is a summary valid for all the monitored files at once.
965 // It makes no sense at all to send data relative to a specific file here
966 // Cycle through the summary...
967 TIter nxt2(&summary);
968 MonitoredTFileSummary *sum;
969 while ((sum = (MonitoredTFileSummary *)nxt2())) {
970
971 if (sum->fReadThroughput >= 0) {
972 valname = sum->GetName();
973 valname += "_avgthrpt_rd";
974 TMonaLisaValue *valreadthr = new TMonaLisaValue(valname, sum->fReadThroughput);
975 valuelist->Add(valreadthr);
976 }
977
978 if ( sum->fWriteThroughput >= 0 ) {
979 valname = sum->GetName();
980 valname += "_avgthrpt_wr";
981 TMonaLisaValue *valwritethr = new TMonaLisaValue(valname, sum->fWriteThroughput);
982 valuelist->Add(valwritethr);
983 }
984
985 }
986
987
988 // send it to monalisa
989 success = SendParameters(valuelist);
990
991 fLastRWSendTime = timenow;
992
993 delete valuelist;
994 return success;
995}
996
997////////////////////////////////////////////////////////////////////////////////
998/// Send the parameters to MonaLisa.
999
1000Bool_t TMonaLisaWriter::SendParameters(TList *valuelist, const char *identifier)
1001{
1002 if (!fInitialized) {
1003 Error("SendParameters", "Monitoring is not properly initialized!");
1004 return kFALSE;
1005 }
1006
1007 if (!valuelist) {
1008 Error("SendParameters", "No values in the value list!");
1009 return kFALSE;
1010 }
1011
1012 if (identifier == 0)
1013 identifier = fJobId;
1014
1015 TIter nextvalue(valuelist);
1016
1017 TMonaLisaValue *objval;
1018 TMonaLisaText *objtext;
1019 TObject *monobj;
1020
1021 Int_t apmon_nparams = valuelist->GetSize();
1022 char **apmon_params = 0;
1023 Int_t *apmon_types = 0;
1024 char **apmon_values = 0;
1025 Double_t *bufDouble = 0; // buffer for int, long, etc. that is to be sent as double
1026
1027 if (apmon_nparams) {
1028
1029 apmon_params = (char **) malloc(apmon_nparams * sizeof(char *));
1030 apmon_values = (char **) malloc(apmon_nparams * sizeof(char *));
1031 apmon_types = (int *) malloc(apmon_nparams * sizeof(int));
1032 bufDouble = new Double_t[apmon_nparams];
1033
1034 Int_t looper = 0;
1035 while ((monobj = nextvalue())) {
1036 if (!strcmp(monobj->ClassName(), "TMonaLisaValue")) {
1037 objval = (TMonaLisaValue *) monobj;
1038
1039 if (fVerbose)
1040 Info("SendParameters", "adding tag %s with val %f",
1041 objval->GetName(), objval->GetValue());
1042
1043 apmon_params[looper] = (char *) objval->GetName();
1044 apmon_types[looper] = XDR_REAL64;
1045 apmon_values[looper] = (char *) (objval->GetValuePtr());
1046 looper++;
1047 }
1048 if (!strcmp(monobj->ClassName(), "TMonaLisaText")) {
1049 objtext = (TMonaLisaText *) monobj;
1050
1051 if (fVerbose)
1052 Info("SendParameters", "adding tag %s with text %s",
1053 objtext->GetName(), objtext->GetText());
1054
1055 apmon_params[looper] = (char *) objtext->GetName();
1056 apmon_types[looper] = XDR_STRING;
1057 apmon_values[looper] = (char *) (objtext->GetText());
1058 looper++;
1059 }
1060 if (!strcmp(monobj->ClassName(), "TNamed")) {
1061 TNamed* objNamed = (TNamed *) monobj;
1062
1063 if (fVerbose)
1064 Info("SendParameters", "adding tag %s with text %s",
1065 objNamed->GetName(), objNamed->GetTitle());
1066
1067 apmon_params[looper] = (char *) objNamed->GetName();
1068 apmon_types[looper] = XDR_STRING;
1069 apmon_values[looper] = (char *) (objNamed->GetTitle());
1070 looper++;
1071 }
1072 // unfortunately ClassName() converts Double_t to double, etc.
1073 if (!strcmp(monobj->ClassName(), "TParameter<double>")) {
1074 TParameter<double>* objParam = (TParameter<double> *) monobj;
1075
1076 if (fVerbose)
1077 Info("SendParameters", "adding tag %s with val %f",
1078 objParam->GetName(), objParam->GetVal());
1079
1080 apmon_params[looper] = (char *) objParam->GetName();
1081 apmon_types[looper] = XDR_REAL64;
1082 apmon_values[looper] = (char *) &(objParam->GetVal());
1083 looper++;
1084 }
1085 if (!strcmp(monobj->ClassName(), "TParameter<Long64_t>")) {
1086 TParameter<Long64_t>* objParam = (TParameter<Long64_t> *) monobj;
1087
1088 if (fVerbose)
1089 Info("SendParameters", "adding tag %s with val %lld",
1090 objParam->GetName(), objParam->GetVal());
1091
1092 apmon_params[looper] = (char *) objParam->GetName();
1093 apmon_types[looper] = XDR_REAL64;
1094 bufDouble[looper] = objParam->GetVal();
1095 apmon_values[looper] = (char *) (bufDouble + looper);
1096 looper++;
1097 }
1098 if (!strcmp(monobj->ClassName(), "TParameter<long>")) {
1099 TParameter<long>* objParam = (TParameter<long> *) monobj;
1100
1101 if (fVerbose)
1102 Info("SendParameters", "adding tag %s with val %ld",
1103 objParam->GetName(), objParam->GetVal());
1104
1105 apmon_params[looper] = (char *) objParam->GetName();
1106 apmon_types[looper] = XDR_REAL64;
1107 bufDouble[looper] = objParam->GetVal();
1108 apmon_values[looper] = (char *) (bufDouble + looper);
1109 looper++;
1110 }
1111 if (!strcmp(monobj->ClassName(), "TParameter<float>")) {
1112 TParameter<float>* objParam = (TParameter<float> *) monobj;
1113
1114 if (fVerbose)
1115 Info("SendParameters", "adding tag %s with val %f",
1116 objParam->GetName(), objParam->GetVal());
1117
1118 apmon_params[looper] = (char *) objParam->GetName();
1119 apmon_types[looper] = XDR_REAL64;
1120 bufDouble[looper] = objParam->GetVal();
1121 apmon_values[looper] = (char *) (bufDouble + looper);
1122 looper++;
1123 }
1124 if (!strcmp(monobj->ClassName(), "TParameter<int>")) {
1125 TParameter<int>* objParam = (TParameter<int> *) monobj;
1126
1127 if (fVerbose)
1128 Info("SendParameters", "adding tag %s with val %d",
1129 objParam->GetName(), objParam->GetVal());
1130
1131 apmon_params[looper] = (char *) objParam->GetName();
1132 apmon_types[looper] = XDR_REAL64;
1133 bufDouble[looper] = objParam->GetVal();
1134 apmon_values[looper] = (char *) (bufDouble + looper);
1135 looper++;
1136 }
1137 }
1138
1139 // change number of parameters to the actual found value
1140 apmon_nparams = looper;
1141
1142 if (fVerbose)
1143 Info("SendParameters", "n: %d name: %s identifier %s ...,",
1144 apmon_nparams, GetName(), identifier);
1145
1146 ((ApMon *) fApmon)->sendParameters((char *) GetName(), (char*)identifier,
1147 apmon_nparams, apmon_params,
1148 apmon_types, apmon_values);
1149
1150 free(apmon_params);
1151 free(apmon_values);
1152 free(apmon_types);
1153 delete[] bufDouble;
1154 }
1155 return kTRUE;
1156}
1157
1158////////////////////////////////////////////////////////////////////////////////
1159/// Set MonaLisa log level.
1160
1161void TMonaLisaWriter::SetLogLevel(const char *loglevel)
1162{
1163 ((ApMon *) fApmon)->setLogLevel((char *) loglevel);
1164}
1165
1166////////////////////////////////////////////////////////////////////////////////
1167/// Print info about MonaLisa object.
1168
1170{
1171 std::cout << "Site (Farm) : " << fName << std::endl;
1172 std::cout << "JobId (Node) : " << fJobId << std::endl;
1173 std::cout << "SubJobId (Node) : " << fSubJobId << std::endl;
1174 std::cout << "HostName : " << fHostname << std::endl;
1175 std::cout << "Pid : " << fPid << std::endl;
1176 std::cout << "Inititialized : " << fInitialized << std::endl;
1177 std::cout << "Verbose : " << fVerbose << std::endl;
1178
1179}
#define e(i)
Definition RSha256.hxx:103
int Int_t
Definition RtypesCore.h:45
const Bool_t kFALSE
Definition RtypesCore.h:92
bool Bool_t
Definition RtypesCore.h:63
double Double_t
Definition RtypesCore.h:59
long long Long64_t
Definition RtypesCore.h:73
const Bool_t kTRUE
Definition RtypesCore.h:91
const char Option_t
Definition RtypesCore.h:66
#define ClassImp(name)
Definition Rtypes.h:364
R__EXTERN TGrid * gGrid
Definition TGrid.h:128
char * Form(const char *fmt,...)
R__EXTERN TSystem * gSystem
Definition TSystem.h:559
R__EXTERN TVirtualMonitoringWriter * gMonitoringWriter
#define free
Definition civetweb.c:1539
#define malloc
Definition civetweb.c:1536
virtual void SetOwner(Bool_t enable=kTRUE)
Set whether this collection is the owner (enable==true) of its content.
virtual Int_t GetSize() const
Return the capacity of the collection, i.e.
A ROOT file is a suite of consecutive data records (TKey instances) with a well defined format.
Definition TFile.h:54
const char * GetUser() const
Definition TGrid.h:62
THashList implements a hybrid collection class consisting of a hash table and a list to store TObject...
Definition THashList.h:34
TObject * FindObject(const char *name) const
Find object using its name.
void AddLast(TObject *obj)
Add object at the end of the list.
Definition THashList.cxx:95
A doubly linked list.
Definition TList.h:44
virtual void Add(TObject *obj)
Definition TList.h:87
virtual TObject * Last() const
Return the last object in the list. Returns 0 when list is empty.
Definition TList.cxx:693
virtual TObject * First() const
Return the first object in the list. Returns 0 when list is empty.
Definition TList.cxx:659
virtual void Clear(Option_t *option="")
Remove all objects from the list.
Definition TList.cxx:402
const char * GetText() const
Double_t GetValue() const
Double_t * GetValuePtr()
virtual Bool_t SendInfoTime()
Sends the current time to MonaLisa following the processing scheme <site> --> <jobid> --> 'time' = >u...
TStopwatch fFileStopwatch
TString fHostname
sub job id
TString fJobId
connection to MonaLisa
virtual Bool_t SendFileWriteProgress(TFile *file)
Int_t fPid
hostname of MonaLisa server
void Print(Option_t *option="") const
Print info about MonaLisa object.
Double_t fLastRWSendTime
virtual Bool_t SendFileReadProgress(TFile *file)
virtual Bool_t SendParameters(TList *valuelist, const char *identifier=0)
Send the parameters to MonaLisa.
std::map< UInt_t, MonitoredTFileInfo * > * fMonInfoRepo
virtual void SetLogLevel(const char *loglevel="WARNING")
Set MonaLisa log level.
virtual Bool_t SendProcessingProgress(Double_t nevent, Double_t nbytes, Bool_t force=kFALSE)
Send the procesing progress to MonaLisa.
void Init(const char *monserver, const char *montag, const char *monid, const char *monsubid, const char *option)
Creates a TMonaLisaWriter object to send monitoring information to a MonaLisa server using the MonaLi...
Int_t fReportInterval
repo to gather per-file-instance mon info;
virtual Bool_t SendProcessingStatus(const char *status, Bool_t restarttimer=kFALSE)
Send the procesing status 'status' to MonaLisa following the processing scheme: <site> --> <jobid> --...
TMonaLisaWriter(const TMonaLisaWriter &)
Bool_t SendFileCheckpoint(TFile *file)
Bool_t fInitialized
process id
virtual Bool_t SendFileCloseEvent(TFile *file)
TString fSubJobId
job id
Double_t fLastFCloseSendTime
virtual Bool_t SendInfoUser(const char *user=0)
Sends the <user> text to MonaLisa following the process scheme: <site> --> <jobid> --> 'user' = <user...
virtual ~TMonaLisaWriter()
Cleanup.
TStopwatch fStopwatch
virtual Bool_t SendInfoStatus(const char *status)
Sends a <status> text to MonaLisa following the process scheme: <site> --> <jobid> --> 'status' = <st...
virtual Bool_t SendFileOpenProgress(TFile *file, TList *openphases, const char *openphasename, Bool_t forcesend=kFALSE)
Send the fileopen progress to MonaLisa.
virtual Bool_t SendInfoDescription(const char *jobtag)
Sends the description <jobtag> following the processing scheme: <site> --> <jobid> --> 'jobname' = <j...
The TNamed class is the base class for all named ROOT classes.
Definition TNamed.h:29
virtual void SetTitle(const char *title="")
Set the title of the TNamed.
Definition TNamed.cxx:164
TString fName
Definition TNamed.h:32
virtual void SetName(const char *name)
Set the name of the TNamed.
Definition TNamed.cxx:140
virtual const char * GetTitle() const
Returns title of object.
Definition TNamed.h:48
virtual const char * GetName() const
Returns name of object.
Definition TNamed.h:47
Mother of all ROOT objects.
Definition TObject.h:37
virtual const char * ClassName() const
Returns name of class to which the object belongs.
Definition TObject.cxx:130
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition TObject.cxx:893
@ kHasUUID
if object has a TUUID (its fUniqueID=UUIDNumber)
Definition TObject.h:62
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition TObject.cxx:867
Named parameter, streamable and storable.
Definition TParameter.h:35
const AParamType & GetVal() const
Definition TParameter.h:67
const char * GetName() const
Returns name of object.
Definition TParameter.h:66
Double_t RealTime()
Stop the stopwatch (if it is running) and return the realtime (in seconds) passed between the start a...
void Start(Bool_t reset=kTRUE)
Start the stopwatch.
Double_t CpuTime()
Stop the stopwatch (if it is running) and return the cputime (in seconds) passed between the start an...
void Continue()
Resume a stopped stopwatch.
Basic string class.
Definition TString.h:136
const char * Data() const
Definition TString.h:369
Bool_t Contains(const char *pat, ECaseCompare cmp=kExact) const
Definition TString.h:624
virtual int GetPid()
Get process id.
Definition TSystem.cxx:708
virtual const char * Getenv(const char *env)
Get environment variable.
Definition TSystem.cxx:1661
virtual int GetProcInfo(ProcInfo_t *info) const
Returns cpu and memory used by this process into the ProcInfo_t structure.
Definition TSystem.cxx:2493
virtual const char * HostName()
Return the system's host name.
Definition TSystem.cxx:304
Definition file.py:1
Long_t fMemVirtual
Definition TSystem.h:196
Long_t fMemResident
Definition TSystem.h:195
static uint64_t sum(uint64_t i)
Definition Factory.cxx:2345