forked from CellProfiling/pipex
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpipex.py
116 lines (100 loc) · 4.52 KB
/
pipex.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import os
import sys
import datetime
import psutil
import shutil
import fnmatch
import random
def batch_processor():
batch_filename = './pipex_batch_list.txt'
python_command = 'python -u -W ignore '
if os.path.exists("./bin/python"):
python_command = './bin/python -u '
pidfile_filename = './RUNNING'
run_id_filename = './run_id.txt'
run_id_result_filename = './LAST_RUN_ID'
run_id = random.randint(1, 1000000)
log_filename = './log.txt'
if "PIPEX_DATA" not in os.environ:
os.environ['PIPEX_DATA'] = './data'
if "PIPEX_WORK" in os.environ:
batch_filename = './work/pipex_batch_list.txt'
python_command = 'python -u '
pidfile_filename = './work/RUNNING'
run_id_filename = './work/run_id.txt'
run_id_result_filename = './work/LAST_RUN_ID'
log_filename = './work/log.txt'
print(">>> Start time pipex =", datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S"), flush=True)
if os.path.exists(run_id_filename):
with open(run_id_filename, 'r', encoding='utf-8') as f:
run_id = f.read().strip()
print(">>> Setting run_id =",run_id, datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S"), flush=True)
swap_used = False
batch_file = open(batch_filename, 'r')
while True:
try:
with open(pidfile_filename,'r') as f:
lines = f.readlines()
if psutil.pid_exists(int(lines[0])):
print(">>> Another PIPEX process seems to be running, exiting =", datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S"), flush=True)
sys.exit()
except IOError:
pass
curr_command = batch_file.readline()
if not curr_command:
#EOF
break
elif curr_command.startswith('#'):
#comment
continue
elif curr_command.startswith('run_id'):
#using the run_id provided by the user
run_id = curr_command.replace('run_id', '').strip()
print(">>> Setting run_id =",run_id, datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S"), flush=True)
elif curr_command.startswith('swap'):
#creating required swap via bash script 'enable_swap.sh'
print(">>> Creating swap space =", datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S"), flush=True)
swap_req = curr_command.replace('swap', '').strip()
os.system('./enable_swap.sh ' + swap_req)
swap_used = True
elif len(curr_command.strip()) > 5:
#pipex command
try:
print(">>> Processing next job =", datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S"), flush=True)
print('>>> ' + curr_command.strip())
os.system(python_command + curr_command.strip())
except Exception:
pass
if curr_command.find('-data=') > 0:
arg_start_index = curr_command.index('-data=') + 6
end_char = ' '
if curr_command[arg_start_index:arg_start_index + 1] == '\'':
end_char = '\''
arg_start_index = arg_start_index + 1
elif curr_command[arg_start_index:arg_start_index + 1] == '\"':
end_char = '\"'
arg_start_index = arg_start_index + 1
try:
arg_end_index = curr_command.index(end_char, arg_start_index + 1)
except ValueError:
arg_end_index = len(curr_command)
curr_data_folder = curr_command[arg_start_index:arg_end_index].strip()
shutil.copyfile(log_filename, curr_data_folder + '/' + os.path.basename(log_filename))
batch_file.close()
if "PIPEX_WORK" in os.environ:
os.system('chmod -R 777 /opt/pipex/work')
if swap_used:
#deleting previously created swap via bash script 'disable_swap.sh'
print(">>> Deleting swap space =", datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S"), flush=True)
os.system('./disable_swap.sh')
for file in os.listdir('.'):
if fnmatch.fnmatch(file, '<property*'):
os.rmdir(file)
if os.path.exists(pidfile_filename):
os.remove(pidfile_filename)
with open(run_id_result_filename, 'w', encoding='utf-8') as f:
f.write(str(run_id))
f.close()
print(">>> End time pipex =", datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S"), flush=True)
if __name__ == '__main__':
batch_processor()