// @(#)root/proofplayer:$Id$
// Author: J.F. Grosse-Oetringhaus, G.Ganis

/*************************************************************************
 * Copyright (C) 1995-2007, Rene Brun and Fons Rademakers.               *
 * All rights reserved.                                                  *
 *                                                                       *
 * For the licensing terms see $ROOTSYS/LICENSE.                         *
 * For the list of contributors see $ROOTSYS/README/CREDITS.             *
 *************************************************************************/

//////////////////////////////////////////////////////////////////////////
//                                                                      //
// TSQLMonitoringWriter                                                 //
//                                                                      //
// SQL implementation of TVirtualMonitoringWriter.                      //
//                                                                      //
//////////////////////////////////////////////////////////////////////////

#include "TList.h"
#include "TParameter.h"
#include "TEnv.h"
#include "TObjString.h"
#include "TSQLMonitoring.h"
#include "TSQLServer.h"
#include "TSQLResult.h"

//______________________________________________________________________________
TSQLMonitoringWriter::TSQLMonitoringWriter(const char *serv, const char *user,
                                           const char *pass, const char *table)
  : TVirtualMonitoringWriter("SQL", 0.0), fTable(table), fVerbose(kFALSE)
{
   // Constructor.

   // Open connection to SQL server
   fDB = TSQLServer::Connect(serv, user, pass);
   if (!fDB || fDB->IsZombie()) {
      SafeDelete(fDB);
      // Invalid object
      MakeZombie();
   }
   // Set the max bulk insertion size
   fMaxBulkSize = 16 * 1024 * 1024;
   TString smx = gEnv->GetValue("SQLMonitoringWriter.MaxBulkSize", "16M");
   if (!smx.IsDigit()) {
      if (smx.EndsWith("K", TString::kIgnoreCase)) {
         smx.Remove(smx.Length()-1);
         if (smx.IsDigit()) fMaxBulkSize = smx.Atoi() * 1024;
      } else if (smx.EndsWith("M", TString::kIgnoreCase)) {
         smx.Remove(smx.Length()-1);
         if (smx.IsDigit()) fMaxBulkSize = smx.Atoi() * 1024 * 1024;
      } else if (smx.EndsWith("G", TString::kIgnoreCase)) {
         smx.Remove(smx.Length()-1);
         if (smx.IsDigit()) fMaxBulkSize = smx.Atoi() * 1024 * 1024 * 1024;
      }
   } else {
      fMaxBulkSize = smx.Atoi();
   }
}

//______________________________________________________________________________
TSQLMonitoringWriter::~TSQLMonitoringWriter()
{
   // Destructor

   SafeDelete(fDB);
}

//______________________________________________________________________________
Bool_t TSQLMonitoringWriter::SendParameters(TList *values, const char *opt)
{
   // Register query log using the information in the list which is in the form
   // TParameter(<par>,<value>) or TNamed(<name>,<string>). For bulk sending,
   // the first entry in the list is an TObjString defining the variable names
   // in the format
   //                    VARname1,VARname2,...
   // while the other entries are TObjStrings with the multiplets to be sent
   //                    VARvalue1,VARvalue2,...
   //
   // The string 'opt' allows the following additional control:
   //      table=[<db>.]<table>  allows to insert to a different table from the
   //                            one defined at construction (change is not
   //                            persistent); if <db> is not specified, the same
   //                            db defined at cinstruction is used.
   //      bulk                  Do a bulk insert
   // More options can be given concurrently, comma-separated .
   // The specified table must already have been created in the DB.

   if (!fDB) {
      // Invalid instance
      return kFALSE;
   }

   // The list must contain something
   if (!values || (values && values->GetSize() < 1))
      return kFALSE;

   // Parse options
   TString table(fTable), op, ops(opt);
   Ssiz_t from = 0;
   Bool_t bulk = kFALSE;
   while (ops.Tokenize(op, from, ",")) {
      if (op == "bulk") {
         bulk = kTRUE;
      } else if (op.BeginsWith("table=")) {
         op.ReplaceAll("table=", "");
         if (!op.IsNull()) {
            Ssiz_t idot = table.Index('.');
            if (idot != kNPOS && op.Index('.') == kNPOS) {
               table.Remove(idot+1);
               table += op;
            } else {
               table = op;
            }
         }
      }
   }
   
   TIter nxi(values);
   TObject *o = 0;
      
   // now prepare the strings
   TString sql = TString::Format("INSERT INTO %s", table.Data());

   TSQLResult *res = 0;
   if (!bulk) {

      // the column and values strings
      char c = '(';
      TString cols, vals;
      while ((o = nxi())) {
         if (!strncmp(o->ClassName(), "TNamed", 6)) {
            cols += TString::Format("%c%s", c, ((TNamed *)o)->GetName());
            vals += TString::Format("%c'%s'", c, ((TNamed *)o)->GetTitle());
         } else if (!strcmp(o->ClassName(), "TParameter<Long64_t>")) {
            cols += TString::Format("%c%s", c, ((TParameter<Long64_t> *)o)->GetName());
            vals += TString::Format("%c%lld", c, ((TParameter<Long64_t> *)o)->GetVal());
         } else if (!strcmp(o->ClassName(), "TParameter<double>")) {
            cols += TString::Format("%c%s", c, ((TParameter<double> *)o)->GetName());
            vals += TString::Format("%c%f", c, ((TParameter<double> *)o)->GetVal());
         } else if (!strcmp(o->ClassName(), "TParameter<float>")) {
            cols += TString::Format("%c%s", c, ((TParameter<float> *)o)->GetName());
            vals += TString::Format("%c%f", c, ((TParameter<float> *)o)->GetVal());
         } else if (!strcmp(o->ClassName(), "TParameter<int>")) {
            cols += TString::Format("%c%s", c, ((TParameter<int> *)o)->GetName());
            vals += TString::Format("%c%d", c, ((TParameter<int> *)o)->GetVal());
         } else if (!strcmp(o->ClassName(), "TParameter<long>")) {
            cols += TString::Format("%c%s", c, ((TParameter<long> *)o)->GetName());
            vals += TString::Format("%c%ld", c, ((TParameter<long> *)o)->GetVal());
         }
         c = ',';
      }
      cols += ")";
      vals += ")";

      // Put everything together
      sql += TString::Format(" %s VALUES %s", cols.Data(), vals.Data());

      // Post query
      if (fVerbose) Info("SendParameters", "sending: '%s'", sql.Data());
      if (!(res = fDB->Query(sql))) {
         Error("SendParameters", "insert into %s failed", table.Data());
         if (sql.Length() > 1024) {
            TString head(sql(0,508)), tail(sql(sql.Length()-512,512));
            Printf("%s...%s", head.Data(), tail.Data());
         } else {
            Printf("%s", sql.Data());
         }
         return kFALSE;
      }
      delete res;
      
   } else {
      // Prepare for bulk submission
      o = nxi();
      TObjString *os = dynamic_cast<TObjString *>(o);
      if (!os) {
         Error("SendParameters", "bulk insert: first entry in list is not 'TObjString' but '%s'", o->ClassName() );
         return kFALSE;         
      }
      // Continue preparing the string
      sql += TString::Format(" (%s) VALUES ", os->GetName());
      TString head = sql;
      if (fVerbose) Info("SendParameters", "sending: '%s' (bulk of %d nplets)", head.Data(), values->GetSize() - 1);
      char c = ' ';
      while ((o = nxi())) {
         if ((os = dynamic_cast<TObjString *>(o))) {
           sql += TString::Format("%c(%s)", c, os->GetName());
           c = ',';
         } else {
            Warning("SendParameters", "bulk insert: ignoring not 'TObjString' entry ('%s')", o->ClassName() );
         }
         // Check size (we cannot exceed fMaxBulkSize ('max_allowed_packet' in [mysqld] conf section)
         if (sql.Length() > 0.9 * fMaxBulkSize) {
            if (!(res = fDB->Query(sql))) {
               Error("SendParameters", "bulk insert into %s failed", table.Data());
               if (sql.Length() > 1024) {
                  TString hd(sql(0,508)), tl(sql(sql.Length()-512,512));
                  Printf("%s...%s", hd.Data(), tl.Data());
               } else {
                  Printf("%s", sql.Data());
               }
               return kFALSE;
            }
            delete res;
            sql = head;
            c = ' ';
         }
      }
      // Check if there is still something to send
      if (sql.Length() > head.Length()) {
         if (!(res = fDB->Query(sql))) {
            Error("SendParameters", "bulk insert into %s failed", table.Data());
            if (sql.Length() > 1024) {
               TString hd(sql(0,508)), tl(sql(sql.Length()-512,512));
               Printf("%s...%s", hd.Data(), tl.Data());
            } else {
               Printf("%s", sql.Data());
            }
            return kFALSE;
         }
         delete res;
      }
   }

   // Done successfully
   return kTRUE;
}
 TSQLMonitoring.cxx:1
 TSQLMonitoring.cxx:2
 TSQLMonitoring.cxx:3
 TSQLMonitoring.cxx:4
 TSQLMonitoring.cxx:5
 TSQLMonitoring.cxx:6
 TSQLMonitoring.cxx:7
 TSQLMonitoring.cxx:8
 TSQLMonitoring.cxx:9
 TSQLMonitoring.cxx:10
 TSQLMonitoring.cxx:11
 TSQLMonitoring.cxx:12
 TSQLMonitoring.cxx:13
 TSQLMonitoring.cxx:14
 TSQLMonitoring.cxx:15
 TSQLMonitoring.cxx:16
 TSQLMonitoring.cxx:17
 TSQLMonitoring.cxx:18
 TSQLMonitoring.cxx:19
 TSQLMonitoring.cxx:20
 TSQLMonitoring.cxx:21
 TSQLMonitoring.cxx:22
 TSQLMonitoring.cxx:23
 TSQLMonitoring.cxx:24
 TSQLMonitoring.cxx:25
 TSQLMonitoring.cxx:26
 TSQLMonitoring.cxx:27
 TSQLMonitoring.cxx:28
 TSQLMonitoring.cxx:29
 TSQLMonitoring.cxx:30
 TSQLMonitoring.cxx:31
 TSQLMonitoring.cxx:32
 TSQLMonitoring.cxx:33
 TSQLMonitoring.cxx:34
 TSQLMonitoring.cxx:35
 TSQLMonitoring.cxx:36
 TSQLMonitoring.cxx:37
 TSQLMonitoring.cxx:38
 TSQLMonitoring.cxx:39
 TSQLMonitoring.cxx:40
 TSQLMonitoring.cxx:41
 TSQLMonitoring.cxx:42
 TSQLMonitoring.cxx:43
 TSQLMonitoring.cxx:44
 TSQLMonitoring.cxx:45
 TSQLMonitoring.cxx:46
 TSQLMonitoring.cxx:47
 TSQLMonitoring.cxx:48
 TSQLMonitoring.cxx:49
 TSQLMonitoring.cxx:50
 TSQLMonitoring.cxx:51
 TSQLMonitoring.cxx:52
 TSQLMonitoring.cxx:53
 TSQLMonitoring.cxx:54
 TSQLMonitoring.cxx:55
 TSQLMonitoring.cxx:56
 TSQLMonitoring.cxx:57
 TSQLMonitoring.cxx:58
 TSQLMonitoring.cxx:59
 TSQLMonitoring.cxx:60
 TSQLMonitoring.cxx:61
 TSQLMonitoring.cxx:62
 TSQLMonitoring.cxx:63
 TSQLMonitoring.cxx:64
 TSQLMonitoring.cxx:65
 TSQLMonitoring.cxx:66
 TSQLMonitoring.cxx:67
 TSQLMonitoring.cxx:68
 TSQLMonitoring.cxx:69
 TSQLMonitoring.cxx:70
 TSQLMonitoring.cxx:71
 TSQLMonitoring.cxx:72
 TSQLMonitoring.cxx:73
 TSQLMonitoring.cxx:74
 TSQLMonitoring.cxx:75
 TSQLMonitoring.cxx:76
 TSQLMonitoring.cxx:77
 TSQLMonitoring.cxx:78
 TSQLMonitoring.cxx:79
 TSQLMonitoring.cxx:80
 TSQLMonitoring.cxx:81
 TSQLMonitoring.cxx:82
 TSQLMonitoring.cxx:83
 TSQLMonitoring.cxx:84
 TSQLMonitoring.cxx:85
 TSQLMonitoring.cxx:86
 TSQLMonitoring.cxx:87
 TSQLMonitoring.cxx:88
 TSQLMonitoring.cxx:89
 TSQLMonitoring.cxx:90
 TSQLMonitoring.cxx:91
 TSQLMonitoring.cxx:92
 TSQLMonitoring.cxx:93
 TSQLMonitoring.cxx:94
 TSQLMonitoring.cxx:95
 TSQLMonitoring.cxx:96
 TSQLMonitoring.cxx:97
 TSQLMonitoring.cxx:98
 TSQLMonitoring.cxx:99
 TSQLMonitoring.cxx:100
 TSQLMonitoring.cxx:101
 TSQLMonitoring.cxx:102
 TSQLMonitoring.cxx:103
 TSQLMonitoring.cxx:104
 TSQLMonitoring.cxx:105
 TSQLMonitoring.cxx:106
 TSQLMonitoring.cxx:107
 TSQLMonitoring.cxx:108
 TSQLMonitoring.cxx:109
 TSQLMonitoring.cxx:110
 TSQLMonitoring.cxx:111
 TSQLMonitoring.cxx:112
 TSQLMonitoring.cxx:113
 TSQLMonitoring.cxx:114
 TSQLMonitoring.cxx:115
 TSQLMonitoring.cxx:116
 TSQLMonitoring.cxx:117
 TSQLMonitoring.cxx:118
 TSQLMonitoring.cxx:119
 TSQLMonitoring.cxx:120
 TSQLMonitoring.cxx:121
 TSQLMonitoring.cxx:122
 TSQLMonitoring.cxx:123
 TSQLMonitoring.cxx:124
 TSQLMonitoring.cxx:125
 TSQLMonitoring.cxx:126
 TSQLMonitoring.cxx:127
 TSQLMonitoring.cxx:128
 TSQLMonitoring.cxx:129
 TSQLMonitoring.cxx:130
 TSQLMonitoring.cxx:131
 TSQLMonitoring.cxx:132
 TSQLMonitoring.cxx:133
 TSQLMonitoring.cxx:134
 TSQLMonitoring.cxx:135
 TSQLMonitoring.cxx:136
 TSQLMonitoring.cxx:137
 TSQLMonitoring.cxx:138
 TSQLMonitoring.cxx:139
 TSQLMonitoring.cxx:140
 TSQLMonitoring.cxx:141
 TSQLMonitoring.cxx:142
 TSQLMonitoring.cxx:143
 TSQLMonitoring.cxx:144
 TSQLMonitoring.cxx:145
 TSQLMonitoring.cxx:146
 TSQLMonitoring.cxx:147
 TSQLMonitoring.cxx:148
 TSQLMonitoring.cxx:149
 TSQLMonitoring.cxx:150
 TSQLMonitoring.cxx:151
 TSQLMonitoring.cxx:152
 TSQLMonitoring.cxx:153
 TSQLMonitoring.cxx:154
 TSQLMonitoring.cxx:155
 TSQLMonitoring.cxx:156
 TSQLMonitoring.cxx:157
 TSQLMonitoring.cxx:158
 TSQLMonitoring.cxx:159
 TSQLMonitoring.cxx:160
 TSQLMonitoring.cxx:161
 TSQLMonitoring.cxx:162
 TSQLMonitoring.cxx:163
 TSQLMonitoring.cxx:164
 TSQLMonitoring.cxx:165
 TSQLMonitoring.cxx:166
 TSQLMonitoring.cxx:167
 TSQLMonitoring.cxx:168
 TSQLMonitoring.cxx:169
 TSQLMonitoring.cxx:170
 TSQLMonitoring.cxx:171
 TSQLMonitoring.cxx:172
 TSQLMonitoring.cxx:173
 TSQLMonitoring.cxx:174
 TSQLMonitoring.cxx:175
 TSQLMonitoring.cxx:176
 TSQLMonitoring.cxx:177
 TSQLMonitoring.cxx:178
 TSQLMonitoring.cxx:179
 TSQLMonitoring.cxx:180
 TSQLMonitoring.cxx:181
 TSQLMonitoring.cxx:182
 TSQLMonitoring.cxx:183
 TSQLMonitoring.cxx:184
 TSQLMonitoring.cxx:185
 TSQLMonitoring.cxx:186
 TSQLMonitoring.cxx:187
 TSQLMonitoring.cxx:188
 TSQLMonitoring.cxx:189
 TSQLMonitoring.cxx:190
 TSQLMonitoring.cxx:191
 TSQLMonitoring.cxx:192
 TSQLMonitoring.cxx:193
 TSQLMonitoring.cxx:194
 TSQLMonitoring.cxx:195
 TSQLMonitoring.cxx:196
 TSQLMonitoring.cxx:197
 TSQLMonitoring.cxx:198
 TSQLMonitoring.cxx:199
 TSQLMonitoring.cxx:200
 TSQLMonitoring.cxx:201
 TSQLMonitoring.cxx:202
 TSQLMonitoring.cxx:203
 TSQLMonitoring.cxx:204
 TSQLMonitoring.cxx:205
 TSQLMonitoring.cxx:206
 TSQLMonitoring.cxx:207
 TSQLMonitoring.cxx:208
 TSQLMonitoring.cxx:209
 TSQLMonitoring.cxx:210
 TSQLMonitoring.cxx:211
 TSQLMonitoring.cxx:212
 TSQLMonitoring.cxx:213
 TSQLMonitoring.cxx:214
 TSQLMonitoring.cxx:215
 TSQLMonitoring.cxx:216
 TSQLMonitoring.cxx:217
 TSQLMonitoring.cxx:218
 TSQLMonitoring.cxx:219
 TSQLMonitoring.cxx:220
 TSQLMonitoring.cxx:221
 TSQLMonitoring.cxx:222
 TSQLMonitoring.cxx:223
 TSQLMonitoring.cxx:224
 TSQLMonitoring.cxx:225
 TSQLMonitoring.cxx:226
 TSQLMonitoring.cxx:227
 TSQLMonitoring.cxx:228