// @(#)root/net:$Id$
// Author: Philippe Canal October 2011.

/*************************************************************************
 * Copyright (C) 1995-2011, Rene Brun, Fons Rademakers and al.           *
 * All rights reserved.                                                  *
 *                                                                       *
 * For the licensing terms see $ROOTSYS/LICENSE.                         *
 * For the list of contributors see $ROOTSYS/README/CREDITS.             *
 *************************************************************************/

//////////////////////////////////////////////////////////////////////////
//                                                                      //
// TParallelMergingFile                                                 //
//                                                                      //
// Specialization of TMemFile to connect to a parallel file merger.     //
// Upon a call to UploadAndReset, the content already written to the    //
// file is upload to the server and the object implementing the function//
// ResetAfterMerge (like TTree) are reset.                              //
// The parallel file merger will then collate the information coming    //
// from this client and any other client in to the file described by    //
// the filename of this object.                                         //
//                                                                      //
//////////////////////////////////////////////////////////////////////////

#include "TParallelMergingFile.h"
#include "TSocket.h"
#include "TArrayC.h"

//______________________________________________________________________________
TParallelMergingFile::TParallelMergingFile(const char *filename, Option_t *option /* = "" */,
                                           const char *ftitle /* = "" */, Int_t compress /* = 1 */) : 
   TMemFile(filename,option,ftitle,compress),fSocket(0),fServerIdx(-1),fServerVersion(0),fClassSent(0),fMessage(kMESS_OBJECT)
{
   // Constructor.
   // We do no yet open any connection to the server.  This will be done at the
   // time the first upload will be requested.
   
   TString serverurl = strstr(fUrl.GetOptions(),"pmerge=");
   if (serverurl.Length()) {
      serverurl.ReplaceAll("pmerge=","pmerge://");
      fServerLocation = TUrl(serverurl);
   }
}

//______________________________________________________________________________
TParallelMergingFile::~TParallelMergingFile()
{
   // Destructor.
   
   // We need to call Close, right here so that it is executed _before_
   // the data member of TParallelMergingFile are destructed.
   Close();
   delete fClassSent;
}

//______________________________________________________________________________
void TParallelMergingFile::Close(Option_t *option)
{
   TMemFile::Close(option);
   if (fSocket) {
      if (0==fSocket->Send("Finished")) {          // tell server we are finished
         Warning("Close","Failed to send the finishing message to the server %s:%d",fServerLocation.GetHost(),fServerLocation.GetPort());
      }
      fSocket->Close();
      delete fSocket;
   }
   fSocket = 0;
}

//______________________________________________________________________________
Bool_t TParallelMergingFile::UploadAndReset() 
{
   // Upload the current file data to the merging server.
   // Reset the file and return true in case of success.
   
   // Open connection to server
   if (fSocket == 0) {
      const char *host = fServerLocation.GetHost();
      Int_t port = fServerLocation.GetPort();
      if (host == 0 || host[0] == '\0') {
         host = "localhost";
      }
      if (port <= 0) {
         port = 1095;
      }
      fSocket = new TSocket(host,port);
      if (!fSocket->IsValid()) {
         Error("UploadAndReset","Could not contact the server %s:%d\n",host,port);
         delete fSocket;
         fSocket = 0;
         return kFALSE;
      }
      // Wait till we get the start message
      // server tells us who we are
      Int_t kind;
      Int_t n = fSocket->Recv(fServerIdx, kind);
      
      if (n < 0 && kind != 0 /* kStartConnection */) 
      {
         Error("UploadAndReset","Unexpected server message: kind=%d idx=%d\n",kind,fServerIdx);
         delete fSocket;
         fSocket = 0;
         return kTRUE;
      }
      n = fSocket->Recv(fServerVersion, kind);
      if (n < 0 && kind != 1 /* kProtocol */) 
      {
         Fatal("UploadAndReset","Unexpected server message: kind=%d status=%d\n",kind,fServerVersion);
      } else {
         Info("UploadAndReset","Connected to fastMergeServer version %d with index %d\n",fServerVersion,fServerIdx);
      }
      TMessage::EnableSchemaEvolutionForAll(kTRUE);         
   }
   
   fMessage.Reset(kMESS_ANY); // re-use TMessage object
   fMessage.WriteInt(fServerIdx);
   fMessage.WriteTString(GetName());
   fMessage.WriteLong64(GetEND());
   CopyTo(fMessage);
   
   if (int error = fSocket->Send(fMessage) <= 0) {
      Error("UploadAndReset","Upload to the merging server failed with %d\n",error);
      delete fSocket;
      fSocket = 0;
      return kFALSE;
   }
   
   // Record the StreamerInfo we sent over.
   Int_t isize = fClassIndex->GetSize();
   if (!fClassSent) {
      fClassSent = new TArrayC(isize);
   } else {
      if (isize > fClassSent->GetSize()) {
         fClassSent->Set(isize);
      }
   }
   for(Int_t c = 0; c < isize; ++c) {
      if (fClassIndex->fArray[c]) {
         fClassSent->fArray[c] = 1;
      }
   }
   ResetAfterMerge(0);
   
   return kTRUE;
}

//______________________________________________________________________________
Int_t TParallelMergingFile::Write(const char *, Int_t opt, Int_t bufsiz)
{
   // Write memory objects to this file and upload them to the parallel merge server.
   // Then reset all the resetable object (those with a ResetAfterMerge routine,
   // like TTree).
   //
   // Loop on all objects in memory (including subdirectories).
   // A new key is created in the KEYS linked list for each object.
   // The list of keys is then saved on the file (via WriteKeys)
   // as a single data record.
   // For values of opt see TObject::Write().
   // The directory header info is rewritten on the directory header record.
   // The linked list of FREE segments is written.
   // The file header is written (bytes 1->fBEGIN).
   
   Int_t nbytes = TMemFile::Write(0,opt,bufsiz);
   if (nbytes) {
      UploadAndReset();
   }
   return nbytes;
}

//______________________________________________________________________________
Int_t TParallelMergingFile::Write(const char *n, Int_t opt, Int_t bufsize) const
{
   // One can not save a const TDirectory object.
   
   Error("Write const","A const TFile object should not be saved. We try to proceed anyway.");
   return const_cast<TParallelMergingFile*>(this)->Write(n, opt, bufsize);
}

//______________________________________________________________________________
void TParallelMergingFile::WriteStreamerInfo()
{
   // Write the list of TStreamerInfo as a single object in this file
   // The class Streamer description for all classes written to this file
   // is saved. See class TStreamerInfo.
   
   if (!fWritable) return;
   if (!fClassIndex) return;
   //no need to update the index if no new classes added to the file
   if (fClassIndex->fArray[0] == 0) return;

   // clear fClassIndex for anything we already sent.
   if (fClassSent) {
      Int_t isize = fClassIndex->GetSize();
      Int_t ssize = fClassSent->GetSize();
      for(Int_t c = 0; c < isize && c < ssize; ++c) {
         if (fClassSent->fArray[c]) {
            fClassIndex->fArray[c] = 0;
         }
      }
   }
   
   TMemFile::WriteStreamerInfo();
}
 TParallelMergingFile.cxx:1
 TParallelMergingFile.cxx:2
 TParallelMergingFile.cxx:3
 TParallelMergingFile.cxx:4
 TParallelMergingFile.cxx:5
 TParallelMergingFile.cxx:6
 TParallelMergingFile.cxx:7
 TParallelMergingFile.cxx:8
 TParallelMergingFile.cxx:9
 TParallelMergingFile.cxx:10
 TParallelMergingFile.cxx:11
 TParallelMergingFile.cxx:12
 TParallelMergingFile.cxx:13
 TParallelMergingFile.cxx:14
 TParallelMergingFile.cxx:15
 TParallelMergingFile.cxx:16
 TParallelMergingFile.cxx:17
 TParallelMergingFile.cxx:18
 TParallelMergingFile.cxx:19
 TParallelMergingFile.cxx:20
 TParallelMergingFile.cxx:21
 TParallelMergingFile.cxx:22
 TParallelMergingFile.cxx:23
 TParallelMergingFile.cxx:24
 TParallelMergingFile.cxx:25
 TParallelMergingFile.cxx:26
 TParallelMergingFile.cxx:27
 TParallelMergingFile.cxx:28
 TParallelMergingFile.cxx:29
 TParallelMergingFile.cxx:30
 TParallelMergingFile.cxx:31
 TParallelMergingFile.cxx:32
 TParallelMergingFile.cxx:33
 TParallelMergingFile.cxx:34
 TParallelMergingFile.cxx:35
 TParallelMergingFile.cxx:36
 TParallelMergingFile.cxx:37
 TParallelMergingFile.cxx:38
 TParallelMergingFile.cxx:39
 TParallelMergingFile.cxx:40
 TParallelMergingFile.cxx:41
 TParallelMergingFile.cxx:42
 TParallelMergingFile.cxx:43
 TParallelMergingFile.cxx:44
 TParallelMergingFile.cxx:45
 TParallelMergingFile.cxx:46
 TParallelMergingFile.cxx:47
 TParallelMergingFile.cxx:48
 TParallelMergingFile.cxx:49
 TParallelMergingFile.cxx:50
 TParallelMergingFile.cxx:51
 TParallelMergingFile.cxx:52
 TParallelMergingFile.cxx:53
 TParallelMergingFile.cxx:54
 TParallelMergingFile.cxx:55
 TParallelMergingFile.cxx:56
 TParallelMergingFile.cxx:57
 TParallelMergingFile.cxx:58
 TParallelMergingFile.cxx:59
 TParallelMergingFile.cxx:60
 TParallelMergingFile.cxx:61
 TParallelMergingFile.cxx:62
 TParallelMergingFile.cxx:63
 TParallelMergingFile.cxx:64
 TParallelMergingFile.cxx:65
 TParallelMergingFile.cxx:66
 TParallelMergingFile.cxx:67
 TParallelMergingFile.cxx:68
 TParallelMergingFile.cxx:69
 TParallelMergingFile.cxx:70
 TParallelMergingFile.cxx:71
 TParallelMergingFile.cxx:72
 TParallelMergingFile.cxx:73
 TParallelMergingFile.cxx:74
 TParallelMergingFile.cxx:75
 TParallelMergingFile.cxx:76
 TParallelMergingFile.cxx:77
 TParallelMergingFile.cxx:78
 TParallelMergingFile.cxx:79
 TParallelMergingFile.cxx:80
 TParallelMergingFile.cxx:81
 TParallelMergingFile.cxx:82
 TParallelMergingFile.cxx:83
 TParallelMergingFile.cxx:84
 TParallelMergingFile.cxx:85
 TParallelMergingFile.cxx:86
 TParallelMergingFile.cxx:87
 TParallelMergingFile.cxx:88
 TParallelMergingFile.cxx:89
 TParallelMergingFile.cxx:90
 TParallelMergingFile.cxx:91
 TParallelMergingFile.cxx:92
 TParallelMergingFile.cxx:93
 TParallelMergingFile.cxx:94
 TParallelMergingFile.cxx:95
 TParallelMergingFile.cxx:96
 TParallelMergingFile.cxx:97
 TParallelMergingFile.cxx:98
 TParallelMergingFile.cxx:99
 TParallelMergingFile.cxx:100
 TParallelMergingFile.cxx:101
 TParallelMergingFile.cxx:102
 TParallelMergingFile.cxx:103
 TParallelMergingFile.cxx:104
 TParallelMergingFile.cxx:105
 TParallelMergingFile.cxx:106
 TParallelMergingFile.cxx:107
 TParallelMergingFile.cxx:108
 TParallelMergingFile.cxx:109
 TParallelMergingFile.cxx:110
 TParallelMergingFile.cxx:111
 TParallelMergingFile.cxx:112
 TParallelMergingFile.cxx:113
 TParallelMergingFile.cxx:114
 TParallelMergingFile.cxx:115
 TParallelMergingFile.cxx:116
 TParallelMergingFile.cxx:117
 TParallelMergingFile.cxx:118
 TParallelMergingFile.cxx:119
 TParallelMergingFile.cxx:120
 TParallelMergingFile.cxx:121
 TParallelMergingFile.cxx:122
 TParallelMergingFile.cxx:123
 TParallelMergingFile.cxx:124
 TParallelMergingFile.cxx:125
 TParallelMergingFile.cxx:126
 TParallelMergingFile.cxx:127
 TParallelMergingFile.cxx:128
 TParallelMergingFile.cxx:129
 TParallelMergingFile.cxx:130
 TParallelMergingFile.cxx:131
 TParallelMergingFile.cxx:132
 TParallelMergingFile.cxx:133
 TParallelMergingFile.cxx:134
 TParallelMergingFile.cxx:135
 TParallelMergingFile.cxx:136
 TParallelMergingFile.cxx:137
 TParallelMergingFile.cxx:138
 TParallelMergingFile.cxx:139
 TParallelMergingFile.cxx:140
 TParallelMergingFile.cxx:141
 TParallelMergingFile.cxx:142
 TParallelMergingFile.cxx:143
 TParallelMergingFile.cxx:144
 TParallelMergingFile.cxx:145
 TParallelMergingFile.cxx:146
 TParallelMergingFile.cxx:147
 TParallelMergingFile.cxx:148
 TParallelMergingFile.cxx:149
 TParallelMergingFile.cxx:150
 TParallelMergingFile.cxx:151
 TParallelMergingFile.cxx:152
 TParallelMergingFile.cxx:153
 TParallelMergingFile.cxx:154
 TParallelMergingFile.cxx:155
 TParallelMergingFile.cxx:156
 TParallelMergingFile.cxx:157
 TParallelMergingFile.cxx:158
 TParallelMergingFile.cxx:159
 TParallelMergingFile.cxx:160
 TParallelMergingFile.cxx:161
 TParallelMergingFile.cxx:162
 TParallelMergingFile.cxx:163
 TParallelMergingFile.cxx:164
 TParallelMergingFile.cxx:165
 TParallelMergingFile.cxx:166
 TParallelMergingFile.cxx:167
 TParallelMergingFile.cxx:168
 TParallelMergingFile.cxx:169
 TParallelMergingFile.cxx:170
 TParallelMergingFile.cxx:171
 TParallelMergingFile.cxx:172
 TParallelMergingFile.cxx:173
 TParallelMergingFile.cxx:174
 TParallelMergingFile.cxx:175
 TParallelMergingFile.cxx:176
 TParallelMergingFile.cxx:177
 TParallelMergingFile.cxx:178
 TParallelMergingFile.cxx:179
 TParallelMergingFile.cxx:180
 TParallelMergingFile.cxx:181
 TParallelMergingFile.cxx:182
 TParallelMergingFile.cxx:183
 TParallelMergingFile.cxx:184
 TParallelMergingFile.cxx:185
 TParallelMergingFile.cxx:186
 TParallelMergingFile.cxx:187
 TParallelMergingFile.cxx:188
 TParallelMergingFile.cxx:189
 TParallelMergingFile.cxx:190
 TParallelMergingFile.cxx:191
 TParallelMergingFile.cxx:192
 TParallelMergingFile.cxx:193
 TParallelMergingFile.cxx:194
 TParallelMergingFile.cxx:195
 TParallelMergingFile.cxx:196
 TParallelMergingFile.cxx:197
 TParallelMergingFile.cxx:198
 TParallelMergingFile.cxx:199
 TParallelMergingFile.cxx:200
 TParallelMergingFile.cxx:201
 TParallelMergingFile.cxx:202
 TParallelMergingFile.cxx:203
 TParallelMergingFile.cxx:204