Skip to content

Commit

Permalink
general fixes to the daemon (#39)
Browse files Browse the repository at this point in the history
  • Loading branch information
emanueledimarco authored Jul 15, 2022
1 parent 842c70b commit 5b0c063
Showing 1 changed file with 23 additions and 31 deletions.
54 changes: 23 additions & 31 deletions submit/calibJobHandlerCondor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

# def checkNjobsCondor(noDaemon=True):
# checkJobs = subprocess.Popen(['condor_q'], stdout=subprocess.PIPE, shell=True);
# datalines = (checkJobs.communicate()[0]).splitlines()
# datalines = (checkJobs.communicate()[0].decode()).splitlines()
# nRetjobs = 0
# for l in datalines:
# if all(x in l for x in ["jobs", "completed", "removed"]):
Expand All @@ -34,11 +34,10 @@ def checkNjobsCondor(grepArg="ecalpro"):
# 0 jobs; 0 completed, 0 removed, 0 idle, 0 running, 0 held, 0 suspended

checkJobs = subprocess.Popen(['condor_q'], stdout=subprocess.PIPE, shell=True);
tmp = str(checkJobs.communicate()[0])
tmp = str(checkJobs.communicate()[0].decode())
if all(x in tmp for x in ["OWNER", "BATCH_NAME", "SUBMITTED", "jobs", "completed", "removed"]): # a very dumb check, I know
checkJobs = subprocess.Popen(['condor_q -af JobBatchName| grep {gr} | wc -l'.format(gr=grepArg)], stdout=subprocess.PIPE, shell=True);
nRetjobs = checkJobs.communicate()[0]
# check nRetjobs is not a null string, but a number (either 0 or something else)
nRetjobs = checkJobs.communicate()[0].decode()
if len(nRetjobs) and nRetjobs != "\n" and nRetjobs.replace('\n','').isdigit():
return int(nRetjobs)
else:
Expand Down Expand Up @@ -173,7 +172,7 @@ def writeCondorSubmitBase(condor_file="", dummy_exec_name="", logdir="", jobBatc
print(">>> Running --> " + submit_s)
# actually submitting filling tasks
submitJobs = subprocess.Popen([submit_s], stdout=subprocess.PIPE, shell=True);
output = submitJobs.communicate()
output = submitJobs.communicate()[0].decode()
print("Out: " + str(output))

time.sleep(15)
Expand All @@ -189,7 +188,7 @@ def writeCondorSubmitBase(condor_file="", dummy_exec_name="", logdir="", jobBatc
nFilljobs = checkNjobsCondor("ecalpro_Fill")
print("I still see {n} jobs for Fill part".format(n=nFilljobs))
checkJobs2 = subprocess.Popen(['rm -rf ' + pwd + '/core.*'], stdout=subprocess.PIPE, shell=True);
datalines2 = (checkJobs2.communicate()[0]).splitlines()
datalines2 = (checkJobs2.communicate()[0].decode()).splitlines()
nCheck += 1
if nCheck * sleeptime > 43200:
renewTokenAFS(daemonLocal=options.daemonLocal, infile=options.tokenFile)
Expand Down Expand Up @@ -261,7 +260,7 @@ def writeCondorSubmitBase(condor_file="", dummy_exec_name="", logdir="", jobBatc
Ntpsubmit_s = "condor_submit {cfn}".format(cfn=condor_file_name)
# actually submitting recovery tasks
subJobs = subprocess.Popen([Ntpsubmit_s], stdout=subprocess.PIPE, shell=True);
outJobs = subJobs.communicate()
outJobs = subJobs.communicate().decode()
print(outJobs)

time.sleep(30)
Expand All @@ -278,7 +277,7 @@ def writeCondorSubmitBase(condor_file="", dummy_exec_name="", logdir="", jobBatc
nFilljobs = checkNjobsCondor("ecalpro_Fill_recovery")
print("I still see {n} jobs for Fill_recovery part ({nr})".format(n=nFilljobs,nr=NtpRecoveryAttempt))
checkJobs2 = subprocess.Popen(['rm -rf ' + pwd + '/core.*'], stdout=subprocess.PIPE, shell=True);
datalines2 = (checkJobs2.communicate()[0]).splitlines()
datalines2 = (checkJobs2.communicate()[0].decode()).splitlines()
nCheck += 1
if nCheck * sleeptime > 43200:
renewTokenAFS(daemonLocal=options.daemonLocal, infile=options.tokenFile)
Expand Down Expand Up @@ -323,20 +322,13 @@ def writeCondorSubmitBase(condor_file="", dummy_exec_name="", logdir="", jobBatc
#BUT we do that only if you are working on batch
Grepcommand = "grep -i list " + Hadd_src_n + " | grep -v echo | grep -v bash | awk '{print $2}'"
myGrep = subprocess.Popen([Grepcommand], stdout=subprocess.PIPE, shell=True )
FoutGrep = myGrep.communicate()
# FoutGrep is something like the following
# ('/afs_path_to_dirName/src/hadd/hadd_iter_XXX_step_YYY.list`\n', None)
# we want to keep /afs_path_to_dirName/src/hadd/hadd_iter_XXX_step_YYY.list
# removing (' and `\n', None)
FoutGrep_2 = str(FoutGrep)[3:]
FoutGrep_2 = str(FoutGrep_2)[:-11]
#print('Checking ' + str(FoutGrep_2))
FoutGrep = myGrep.communicate()[0].decode().replace("'","")[:-2]
#Chech The size for each line
f = open( str(FoutGrep_2) )
f = open(FoutGrep)
lines = f.readlines()
f.close()
# create backup of original list of files
fbckp = open( FoutGrep_2.replace(".list","_backup.list"), "w")
fbckp = open( FoutGrep.replace(".list","_backup.list"), "w")
for l in lines:
fbckp.write(l)
fbckp.close()
Expand Down Expand Up @@ -369,16 +361,16 @@ def writeCondorSubmitBase(condor_file="", dummy_exec_name="", logdir="", jobBatc

#moving the .list to the correct one
if( len(newlines) ):
prunedfile = FoutGrep_2.replace(".list","_pruned.list")
prunedfile = FoutGrep.replace(".list","_pruned.list")
fprun = open(prunedfile,"w")
for l in newlines:
fprun.write(l)
fprun.close()
MoveComm = "cp " + prunedfile + " " + str(FoutGrep_2)
MoveComm = "cp " + prunedfile + " " + str(FoutGrep)
MoveC = subprocess.Popen([MoveComm], stdout=subprocess.PIPE, shell=True);
mvOut = MoveC.communicate()
#print "Some files were removed in " + str(FoutGrep_2)
#print "Copied " + prunedfile + " into " + str(FoutGrep_2)
mvOut = MoveC.communicate()[0].decode()
#print "Some files were removed in " + str(FoutGrep)
#print "Copied " + prunedfile + " into " + str(FoutGrep)

#End of the check, sending the job
print("Preparing job to hadd files in list number " + str(nHadds) + "/" + str(Nlist - 1)) #nHadds goes from 0 to Nlist -1
Expand All @@ -387,7 +379,7 @@ def writeCondorSubmitBase(condor_file="", dummy_exec_name="", logdir="", jobBatc
Hsubmit_s = "condor_submit {cfn}".format(cfn=condor_file_name)
print(">>> Running --> " + Hsubmit_s)
subJobs = subprocess.Popen([Hsubmit_s], stdout=subprocess.PIPE, shell=True);
outJobs = subJobs.communicate()
outJobs = subJobs.communicate()[0].decode()

print(str(outJobs))
time.sleep(15)
Expand All @@ -401,7 +393,7 @@ def writeCondorSubmitBase(condor_file="", dummy_exec_name="", logdir="", jobBatc
nHaddjobs = checkNjobsCondor("ecalpro_Hadd")
print("I still see {n} jobs for Hadd part".format(n=nHaddjobs))
#checkJobs2 = subprocess.Popen(['rm -rf ' + pwd + '/core.*'], stdout=subprocess.PIPE, shell=True);
#datalines2 = (checkJobs2.communicate()[0]).splitlines()
#datalines2 = (checkJobs2.communicate()[0].decode()).splitlines()
print('Done with various hadd')

# Check if all the hadds are there and files are not empty
Expand Down Expand Up @@ -470,7 +462,7 @@ def writeCondorSubmitBase(condor_file="", dummy_exec_name="", logdir="", jobBatc
print(">>> Running --> " + Hsubmit_s)
# actually submitting recovery tasks
subJobs = subprocess.Popen([Hsubmit_s], stdout=subprocess.PIPE, shell=True);
outJobs = subJobs.communicate()
outJobs = subJobs.communicate()[0].decode()
print(outJobs)

time.sleep(15)
Expand Down Expand Up @@ -521,7 +513,7 @@ def writeCondorSubmitBase(condor_file="", dummy_exec_name="", logdir="", jobBatc
FHsubmit_s = "condor_submit {cfn}".format(cfn=condor_file_name)
print(">>> Running --> " + FHsubmit_s)
FsubJobs = subprocess.Popen([FHsubmit_s], stdout=subprocess.PIPE, shell=True);
FoutJobs = FsubJobs.communicate()
FoutJobs = FsubJobs.communicate()[0].decode()
print(FoutJobs)
time.sleep(5)

Expand Down Expand Up @@ -559,7 +551,7 @@ def writeCondorSubmitBase(condor_file="", dummy_exec_name="", logdir="", jobBatc
Fitsubmit_s = "{src}".format(src=srcFold_n)
print(">>> Running --> " + Fitsubmit_s)
FsubJobs = subprocess.Popen([Fitsubmit_s], stdout=subprocess.PIPE, shell=True);
FoutJobs = FsubJobs.communicate() # do I really need to communicate to stdout?
FoutJobs = FsubJobs.communicate()[0].decode() # do I really need to communicate to stdout?
print(FoutJobs)
#os.system(Fitsubmit_s)

Expand Down Expand Up @@ -619,7 +611,7 @@ def writeCondorSubmitBase(condor_file="", dummy_exec_name="", logdir="", jobBatc
Fitsubmit_s = "condor_submit {cfn}".format(cfn=condor_file_name)
print(">>> Running --> " + Fitsubmit_s)
FsubJobs = subprocess.Popen([Fitsubmit_s], stdout=subprocess.PIPE, shell=True);
FoutJobs = FsubJobs.communicate()
FoutJobs = FsubJobs.communicate()[0].decode()
print(FoutJobs)
time.sleep(5)

Expand Down Expand Up @@ -672,7 +664,7 @@ def writeCondorSubmitBase(condor_file="", dummy_exec_name="", logdir="", jobBatc
Fitsubmit_s = "condor_submit {cfn}".format(cfn=condor_file_name)
print(">>> Running --> " + Fitsubmit_s)
FsubJobs = subprocess.Popen([Fitsubmit_s], stdout=subprocess.PIPE, shell=True);
FoutJobs = FsubJobs.communicate()
FoutJobs = FsubJobs.communicate()[0].decode()
print(FoutJobs)
time.sleep(5)

Expand Down Expand Up @@ -923,7 +915,7 @@ def writeCondorSubmitBase(condor_file="", dummy_exec_name="", logdir="", jobBatc
# print "############################"
# cmdEosLs = 'ls ' + eosPath + '/' + dirname + '/iter_' + str(iters) + "/"
# eosFileList = subprocess.Popen([cmdEosLs], stdout=subprocess.PIPE, shell=True);
# print eosFileList.communicate()
# print eosFileList.communicate()[0].decode()
# print "############################"

print("Going to merge the following files ...")
Expand Down

0 comments on commit 5b0c063

Please sign in to comment.