#include "RConfigure.h"
#include "TThread.h"
#include "TThreadImp.h"
#include "TThreadFactory.h"
#include "TROOT.h"
#include "TApplication.h"
#include "TVirtualPad.h"
#include "TMethodCall.h"
#include "TTimeStamp.h"
#include "TInterpreter.h"
#include "TError.h"
#include "Varargs.h"
#include "ThreadLocalStorage.h"
TThreadImp *TThread::fgThreadImp = 0;
Long_t TThread::fgMainId = 0;
TThread *TThread::fgMain = 0;
TMutex *TThread::fgMainMutex;
char *volatile TThread::fgXAct = 0;
TMutex *TThread::fgXActMutex = 0;
TCondition *TThread::fgXActCondi = 0;
void **volatile TThread::fgXArr = 0;
volatile Int_t TThread::fgXAnb = 0;
volatile Int_t TThread::fgXArt = 0;
static void CINT_alloc_lock() { gGlobalMutex->Lock(); }
static void CINT_alloc_unlock() { gGlobalMutex->UnLock(); }
static TMutex *gMainInternalMutex = 0;
static void ThreadInternalLock() { if (gMainInternalMutex) gMainInternalMutex->Lock(); }
static void ThreadInternalUnLock() { if (gMainInternalMutex) gMainInternalMutex->UnLock(); }
static Bool_t fgIsTearDown(kFALSE);
class TThreadTearDownGuard {
public:
TThreadTearDownGuard() { fgIsTearDown = kFALSE; }
~TThreadTearDownGuard() {
fgIsTearDown = kTRUE;
TVirtualMutex *m = gGlobalMutex;
gGlobalMutex = 0;
delete m;
TThreadImp *imp = TThread::fgThreadImp;
TThread::fgThreadImp = 0;
delete imp;
}
};
static TThreadTearDownGuard gTearDownGuard;
class TJoinHelper {
private:
TThread *fT;
TThread *fH;
void **fRet;
Long_t fRc;
TMutex *fM;
TCondition *fC;
Bool_t fJoined;
static void* JoinFunc(void *p);
public:
TJoinHelper(TThread *th, void **ret);
~TJoinHelper();
Int_t Join();
};
TJoinHelper::TJoinHelper(TThread *th, void **ret)
: fT(th), fRet(ret), fRc(0), fM(new TMutex), fC(new TCondition(fM)), fJoined(kFALSE)
{
fH = new TThread("JoinHelper", JoinFunc, this);
}
TJoinHelper::~TJoinHelper()
{
delete fC;
delete fM;
delete fH;
}
void* TJoinHelper::JoinFunc(void *p)
{
TJoinHelper *jp = (TJoinHelper*)p;
jp->fRc = jp->fT->Join(jp->fRet);
jp->fM->Lock();
jp->fJoined = kTRUE;
jp->fC->Signal();
jp->fM->UnLock();
TThread::Exit(0);
return 0;
}
Int_t TJoinHelper::Join()
{
fM->Lock();
fH->Run();
while (kTRUE) {
int r = fC->TimedWaitRelative(100);
if (r == 0 || r == 1) {
if (fJoined) break;
} else {
break;
}
gSystem->ProcessEvents();
}
fM->UnLock();
TThread::fgThreadImp->Join(fH, 0);
return fRc;
}
ClassImp(TThread)
TThread::TThread(VoidRtnFunc_t fn, void *arg, EPriority pri)
: TNamed("<anon>", "")
{
fDetached = kFALSE;
fFcnVoid = 0;
fFcnRetn = fn;
fPriority = pri;
fThreadArg = arg;
Constructor();
fNamed = kFALSE;
}
TThread::TThread(VoidFunc_t fn, void *arg, EPriority pri)
: TNamed("<anon>", "")
{
fDetached = kTRUE;
fFcnRetn = 0;
fFcnVoid = fn;
fPriority = pri;
fThreadArg = arg;
Constructor();
fNamed = kFALSE;
}
TThread::TThread(const char *thname, VoidRtnFunc_t fn, void *arg,
EPriority pri) : TNamed(thname, "")
{
fDetached = kFALSE;
fFcnVoid = 0;
fFcnRetn = fn;
fPriority = pri;
fThreadArg = arg;
Constructor();
fNamed = kTRUE;
}
TThread::TThread(const char *thname, VoidFunc_t fn, void *arg,
EPriority pri) : TNamed(thname, "")
{
fDetached = kTRUE;
fFcnRetn = 0;
fFcnVoid = fn;
fPriority = pri;
fThreadArg = arg;
Constructor();
fNamed = kTRUE;
}
TThread::TThread(Long_t id)
{
fDetached = kTRUE;
fFcnRetn = 0;
fFcnVoid = 0;
fPriority = kNormalPriority;
fThreadArg = 0;
Constructor();
ThreadInternalLock();
fNamed = kFALSE;
fId = (id ? id : SelfId());
fState = kRunningState;
ThreadInternalUnLock();
if (gDebug)
Info("TThread::TThread", "TThread attached to running thread");
}
void TThread::Initialize()
{
Init();
}
Bool_t TThread::IsInitialized()
{
if (fgThreadImp)
return kTRUE;
return kFALSE;
}
void TThread::Init()
{
if (fgThreadImp || fgIsTearDown) return;
fgThreadImp = gThreadFactory->CreateThreadImp();
gMainInternalMutex = new TMutex(kTRUE);
fgMainId = fgThreadImp->SelfId();
fgMainMutex = new TMutex(kTRUE);
gThreadTsd = TThread::Tsd;
gThreadXAR = TThread::XARequest;
gGlobalMutex = new TMutex(kTRUE);
gCint->SetAlloclockfunc(CINT_alloc_lock);
gCint->SetAllocunlockfunc(CINT_alloc_unlock);
{
R__LOCKGUARD(gGlobalMutex);
if (!gCINTMutex) {
gCINTMutex = gGlobalMutex->Factory(kTRUE);
}
gROOTMutex = gCINTMutex;
}
}
void TThread::Constructor()
{
fHolder = 0;
fClean = 0;
fState = kNewState;
fId = -1;
fHandle= 0;
if (!fgThreadImp) Init();
SetComment("Constructor: MainInternalMutex Locking");
ThreadInternalLock();
SetComment("Constructor: MainInternalMutex Locked");
memset(fTsd, 0, ROOT::kMaxThreadSlot*sizeof(void*));
fTsd[ROOT::kDirectoryThreadSlot] = gROOT;
if (fgMain) fgMain->fPrev = this;
fNext = fgMain; fPrev = 0; fgMain = this;
ThreadInternalUnLock();
SetComment();
}
TThread::~TThread()
{
if (gDebug)
Info("TThread::~TThread", "thread deleted");
SetComment("Destructor: MainInternalMutex Locking");
ThreadInternalLock();
SetComment("Destructor: MainInternalMutex Locked");
if (fPrev) fPrev->fNext = fNext;
if (fNext) fNext->fPrev = fPrev;
if (fgMain == this) fgMain = fNext;
ThreadInternalUnLock();
SetComment();
if (fHolder) *fHolder = 0;
}
Int_t TThread::Delete(TThread *&th)
{
if (!th) return 0;
th->fHolder = &th;
if (th->fState == kRunningState) {
th->fState = kDeletingState;
if (gDebug)
th->Info("TThread::Delete", "deleting thread");
th->Kill();
return -1;
}
CleanUp();
return 0;
}
Int_t TThread::Exists()
{
ThreadInternalLock();
Int_t num = 0;
for (TThread *l = fgMain; l; l = l->fNext)
num++;
ThreadInternalUnLock();
return num;
}
void TThread::SetPriority(EPriority pri)
{
fPriority = pri;
}
TThread *TThread::GetThread(Long_t id)
{
TThread *myTh;
ThreadInternalLock();
for (myTh = fgMain; myTh && (myTh->fId != id); myTh = myTh->fNext) { }
ThreadInternalUnLock();
return myTh;
}
TThread *TThread::GetThread(const char *name)
{
TThread *myTh;
ThreadInternalLock();
for (myTh = fgMain; myTh && (strcmp(name, myTh->GetName())); myTh = myTh->fNext) { }
ThreadInternalUnLock();
return myTh;
}
TThread *TThread::Self()
{
TTHREAD_TLS(TThread*) self = 0;
if (!self || fgIsTearDown) {
if (fgIsTearDown) self = 0;
self = GetThread(SelfId());
}
return self;
}
Long_t TThread::Join(void **ret)
{
if (fId == -1) {
Error("Join", "thread not running");
return -1;
}
if (fDetached) {
Error("Join", "cannot join detached thread");
return -1;
}
if (SelfId() != fgMainId)
return fgThreadImp->Join(this, ret);
TJoinHelper helper(this, ret);
return helper.Join();
}
Long_t TThread::Join(Long_t jid, void **ret)
{
TThread *myTh = GetThread(jid);
if (!myTh) {
::Error("TThread::Join", "cannot find thread 0x%lx", jid);
return -1L;
}
return myTh->Join(ret);
}
Long_t TThread::SelfId()
{
if (fgIsTearDown) return -1;
if (!fgThreadImp) Init();
return fgThreadImp->SelfId();
}
Int_t TThread::Run(void *arg)
{
if (arg) fThreadArg = arg;
SetComment("Run: MainInternalMutex locking");
ThreadInternalLock();
SetComment("Run: MainMutex locked");
int iret = fgThreadImp->Run(this);
fState = iret ? kInvalidState : kRunningState;
if (gDebug)
Info("TThread::Run", "thread run requested");
ThreadInternalUnLock();
SetComment();
return iret;
}
Int_t TThread::Kill()
{
if (fState != kRunningState && fState != kDeletingState) {
if (gDebug)
Warning("TThread::Kill", "thread is not running");
return 13;
} else {
if (fState == kRunningState ) fState = kCancelingState;
return fgThreadImp->Kill(this);
}
}
Int_t TThread::Kill(Long_t id)
{
TThread *th = GetThread(id);
if (th) {
return fgThreadImp->Kill(th);
} else {
if (gDebug)
::Warning("TThread::Kill(Long_t)", "thread 0x%lx not found", id);
return 13;
}
}
Int_t TThread::Kill(const char *name)
{
TThread *th = GetThread(name);
if (th) {
return fgThreadImp->Kill(th);
} else {
if (gDebug)
::Warning("TThread::Kill(const char*)", "thread %s not found", name);
return 13;
}
}
Int_t TThread::SetCancelOff()
{
return fgThreadImp ? fgThreadImp->SetCancelOff() : -1;
}
Int_t TThread::SetCancelOn()
{
return fgThreadImp ? fgThreadImp->SetCancelOn() : -1;
}
Int_t TThread::SetCancelAsynchronous()
{
return fgThreadImp ? fgThreadImp->SetCancelAsynchronous() : -1;
}
Int_t TThread::SetCancelDeferred()
{
return fgThreadImp ? fgThreadImp->SetCancelDeferred() : -1;
}
Int_t TThread::CancelPoint()
{
return fgThreadImp ? fgThreadImp->CancelPoint() : -1;
}
Int_t TThread::CleanUpPush(void *free, void *arg)
{
TThread *th = Self();
if (th)
return fgThreadImp->CleanUpPush(&(th->fClean), free, arg);
return -1;
}
Int_t TThread::CleanUpPop(Int_t exe)
{
TThread *th = Self();
if (th)
return fgThreadImp->CleanUpPop(&(th->fClean), exe);
return -1;
}
Int_t TThread::CleanUp()
{
TThread *th = Self();
if (!th) return 13;
fgThreadImp->CleanUp(&(th->fClean));
fgMainMutex->CleanUp();
if (fgXActMutex)
fgXActMutex->CleanUp();
gMainInternalMutex->CleanUp();
if (th->fHolder)
delete th;
return 0;
}
void TThread::AfterCancel(TThread *th)
{
if (th) {
th->fState = kCanceledState;
if (gDebug)
th->Info("TThread::AfterCancel", "thread is canceled");
} else
::Error("TThread::AfterCancel", "zero thread pointer passed");
}
Int_t TThread::Exit(void *ret)
{
return fgThreadImp ? fgThreadImp->Exit(ret) : -1;
}
Int_t TThread::Sleep(ULong_t secs, ULong_t nanos)
{
UInt_t ms = UInt_t(secs * 1000) + UInt_t(nanos / 1000000);
if (gSystem) gSystem->Sleep(ms);
return 0;
}
Int_t TThread::GetTime(ULong_t *absSec, ULong_t *absNanoSec)
{
TTimeStamp t;
if (absSec) *absSec = t.GetSec();
if (absNanoSec) *absNanoSec = t.GetNanoSec();
return t.GetSec();
}
Int_t TThread::Lock()
{
return (fgMainMutex ? fgMainMutex->Lock() : 0);
}
Int_t TThread::TryLock()
{
return (fgMainMutex ? fgMainMutex->TryLock() : 0);
}
Int_t TThread::UnLock()
{
return (fgMainMutex ? fgMainMutex->UnLock() : 0);
}
void *TThread::Function(void *ptr)
{
TThread *th;
void *ret, *arg;
TThreadCleaner dummy;
th = (TThread *)ptr;
SetCancelOff();
SetCancelDeferred();
CleanUpPush((void *)&AfterCancel, th);
if (gDebug)
th->Info("TThread::Function", "thread is running");
arg = th->fThreadArg;
th->fState = kRunningState;
if (th->fDetached) {
(th->fFcnVoid)(arg);
ret = 0;
th->fState = kFinishedState;
} else {
ret = (th->fFcnRetn)(arg);
th->fState = kTerminatedState;
}
CleanUpPop(1);
if (gDebug)
th->Info("TThread::Function", "thread has finished");
TThread::Exit(ret);
return ret;
}
void TThread::Ps()
{
TThread *l;
int i;
if (!fgMain) {
::Info("TThread::Ps", "no threads have been created");
return;
}
ThreadInternalLock();
int num = 0;
for (l = fgMain; l; l = l->fNext)
num++;
char cbuf[256];
printf(" Thread State\n");
for (l = fgMain; l; l = l->fNext) {
memset(cbuf, ' ', sizeof(cbuf));
snprintf(cbuf, sizeof(cbuf), "%3d %s:0x%lx", num--, l->GetName(), l->fId);
i = strlen(cbuf);
if (i < 30)
cbuf[i] = ' ';
cbuf[30] = 0;
printf("%30s", cbuf);
switch (l->fState) {
case kNewState: printf("Idle "); break;
case kRunningState: printf("Running "); break;
case kTerminatedState: printf("Terminated "); break;
case kFinishedState: printf("Finished "); break;
case kCancelingState: printf("Canceling "); break;
case kCanceledState: printf("Canceled "); break;
case kDeletingState: printf("Deleting "); break;
default: printf("Invalid ");
}
if (l->fComment[0]) printf(" // %s", l->fComment);
printf("\n");
}
ThreadInternalUnLock();
}
void **TThread::Tsd(void *dflt, Int_t k)
{
TThread *th = TThread::Self();
if (!th) {
return (void**)dflt;
} else {
return &(th->fTsd[k]);
}
}
void TThread::Printf(const char *va_(fmt), ...)
{
va_list ap;
va_start(ap,va_(fmt));
Int_t buf_size = 2048;
char *buf;
again:
buf = new char[buf_size];
int n = vsnprintf(buf, buf_size, va_(fmt), ap);
if (n == -1 || n >= buf_size) {
buf_size *= 2;
delete [] buf;
goto again;
}
va_end(ap);
void *arr[2];
arr[1] = (void*) buf;
if (XARequest("PRTF", 2, arr, 0)) return;
printf("%s\n", buf);
fflush(stdout);
delete [] buf;
}
void TThread::ErrorHandler(int level, const char *location, const char *fmt,
va_list ap) const
{
Int_t buf_size = 2048;
char *buf, *bp;
again:
buf = new char[buf_size];
int n = vsnprintf(buf, buf_size, fmt, ap);
if (n == -1 || n >= buf_size) {
buf_size *= 2;
delete [] buf;
goto again;
}
if (level >= kSysError && level < kFatal) {
char *buf1 = new char[buf_size + strlen(gSystem->GetError()) + 5];
sprintf(buf1, "%s (%s)", buf, gSystem->GetError());
bp = buf1;
delete [] buf;
} else
bp = buf;
void *arr[4];
arr[1] = (void*) Long_t(level);
arr[2] = (void*) location;
arr[3] = (void*) bp;
if (XARequest("ERRO", 4, arr, 0)) return;
if (level != kFatal)
::GetErrorHandler()(level, level >= gErrorAbortLevel, location, bp);
else
::GetErrorHandler()(level, kTRUE, location, bp);
delete [] bp;
}
void TThread::DoError(int level, const char *location, const char *fmt,
va_list va) const
{
char *loc = 0;
if (location) {
loc = new char[strlen(location) + strlen(GetName()) + 32];
sprintf(loc, "%s %s:0x%lx", location, GetName(), fId);
} else {
loc = new char[strlen(GetName()) + 32];
sprintf(loc, "%s:0x%lx", GetName(), fId);
}
ErrorHandler(level, loc, fmt, va);
delete [] loc;
}
Int_t TThread::XARequest(const char *xact, Int_t nb, void **ar, Int_t *iret)
{
if (!gApplication || !gApplication->IsRunning()) return 0;
if (!fgXActMutex && gGlobalMutex) {
gGlobalMutex->Lock();
if (!fgXActMutex) {
fgXActMutex = new TMutex(kTRUE);
fgXActCondi = new TCondition;
new TThreadTimer;
}
gGlobalMutex->UnLock();
}
TThread *th = Self();
if (th && th->fId != fgMainId) {
th->SetComment("XARequest: XActMutex Locking");
fgXActMutex->Lock();
th->SetComment("XARequest: XActMutex Locked");
TConditionImp *condimp = fgXActCondi->fConditionImp;
TMutexImp *condmutex = fgXActCondi->GetMutex()->fMutexImp;
condmutex->Lock();
fgXAnb = nb;
fgXArr = ar;
fgXArt = 0;
fgXAct = (char*) xact;
th->SetComment(fgXAct);
if (condimp) condimp->Wait();
condmutex->UnLock();
if (iret) *iret = fgXArt;
fgXActMutex->UnLock();
th->SetComment();
return 1997;
} else
return 0;
}
void TThread::XAction()
{
TConditionImp *condimp = fgXActCondi->fConditionImp;
TMutexImp *condmutex = fgXActCondi->GetMutex()->fMutexImp;
condmutex->Lock();
char const acts[] = "PRTF CUPD CANV CDEL PDCD METH ERRO";
enum { kPRTF = 0, kCUPD = 5, kCANV = 10, kCDEL = 15,
kPDCD = 20, kMETH = 25, kERRO = 30 };
int iact = strstr(acts, fgXAct) - acts;
char *cmd = 0;
switch (iact) {
case kPRTF:
printf("%s\n", (const char*)fgXArr[1]);
fflush(stdout);
break;
case kERRO:
{
int level = (int)Long_t(fgXArr[1]);
const char *location = (const char*)fgXArr[2];
char *mess = (char*)fgXArr[3];
if (level != kFatal)
GetErrorHandler()(level, level >= gErrorAbortLevel, location, mess);
else
GetErrorHandler()(level, kTRUE, location, mess);
delete [] mess;
}
break;
case kCUPD:
cmd = Form("((TCanvas *)0x%lx)->Update();",(Long_t)fgXArr[1]);
gROOT->ProcessLine(cmd);
break;
case kCANV:
switch(fgXAnb) {
case 2:
cmd = Form("((TCanvas *)0x%lx)->Constructor();",(Long_t)fgXArr[1]);
gROOT->ProcessLine(cmd);
break;
case 5:
cmd = Form("((TCanvas *)0x%lx)->Constructor((char*)0x%lx,(char*)0x%lx,*((Int_t*)(0x%lx)));",(Long_t)fgXArr[1],(Long_t)fgXArr[2],(Long_t)fgXArr[3],(Long_t)fgXArr[4]);
gROOT->ProcessLine(cmd);
break;
case 6:
cmd = Form("((TCanvas *)0x%lx)->Constructor((char*)0x%lx,(char*)0x%lx,*((Int_t*)(0x%lx)),*((Int_t*)(0x%lx)));",(Long_t)fgXArr[1],(Long_t)fgXArr[2],(Long_t)fgXArr[3],(Long_t)fgXArr[4],(Long_t)fgXArr[5]);
gROOT->ProcessLine(cmd);
break;
case 8:
cmd = Form("((TCanvas *)0x%lx)->Constructor((char*)0x%lx,(char*)0x%lx,*((Int_t*)(0x%lx)),*((Int_t*)(0x%lx)),*((Int_t*)(0x%lx)),*((Int_t*)(0x%lx)));",(Long_t)fgXArr[1],(Long_t)fgXArr[2],(Long_t)fgXArr[3],(Long_t)fgXArr[4],(Long_t)fgXArr[5],(Long_t)fgXArr[6],(Long_t)fgXArr[7]);
gROOT->ProcessLine(cmd);
break;
}
break;
case kCDEL:
cmd = Form("((TCanvas *)0x%lx)->Destructor();",(Long_t)fgXArr[1]);
gROOT->ProcessLine(cmd);
break;
case kPDCD:
((TVirtualPad*) fgXArr[1])->Divide( *((Int_t*)(fgXArr[2])),
*((Int_t*)(fgXArr[3])),
*((Float_t*)(fgXArr[4])),
*((Float_t*)(fgXArr[5])),
*((Int_t*)(fgXArr[6])));
break;
case kMETH:
((TMethodCall *) fgXArr[1])->Execute((void*)(fgXArr[2]),(const char*)(fgXArr[3]));
break;
default:
::Error("TThread::XAction", "wrong case");
}
fgXAct = 0;
if (condimp) condimp->Signal();
condmutex->UnLock();
}
TThreadTimer::TThreadTimer(Long_t ms) : TTimer(ms, kTRUE)
{
gSystem->AddTimer(this);
}
Bool_t TThreadTimer::Notify()
{
if (TThread::fgXAct) { TThread::XAction(); }
Reset();
return kFALSE;
}
TThreadCleaner::~TThreadCleaner()
{
TThread::CleanUp();
}