This repository has been archived by the owner on Jun 1, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
368d4d4
commit 8ee8cb6
Showing
10 changed files
with
163 additions
and
216 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,7 +3,7 @@ | |
import ujson | ||
from intervaltree import IntervalTree | ||
import multiprocessing as mp | ||
import time | ||
import time | ||
from .models import * | ||
#from django.db import connection | ||
from collections import Counter | ||
|
@@ -12,7 +12,7 @@ | |
from sqlalchemy.pool import QueuePool | ||
|
||
#engine = sqlalchemy.create_engine('mysql+pymysql://root:[email protected]:6603/circDraw', poolclass=QueuePool) | ||
engine = sqlalchemy.create_engine('mysql+pymysql://root:[email protected]:6603/circDraw', poolclass=QueuePool) | ||
engine = sqlalchemy.create_engine('mysql+pymysql://root:[email protected]:6603/circDraw?charset=utf8&local_infile=1', poolclass=QueuePool) | ||
print('Running handle') | ||
|
||
def line_counter(file): | ||
|
@@ -177,56 +177,65 @@ def process_file(file, assembly: str, file_type, new_file, task_id, bias=2): | |
assert l[0].lower().startswith('chr') | ||
assert len(l[0][3:-1]) <= 3 | ||
assert int(l[1]) < int(l[2]) | ||
|
||
except: | ||
valid_line = False | ||
|
||
print("Line:", valid_line) | ||
print("Invalid line:", line) | ||
|
||
|
||
if valid_line: | ||
# map circ to annotated_circ | ||
chr_num = l[0].lower() | ||
#try: | ||
chr_num = 'chr' + l[0][3::].upper() | ||
start = int(l[1]) | ||
end = int(l[2]) | ||
time1 = time.time() | ||
result_script = f'select * from {assembly}_circRNAs where chr_num="{chr_num}" and start>={start-bias} and start<={start+bias} and end<={end+bias} and end>={end-bias};' | ||
#time1 = time.time() | ||
result_script = f'select * from {assembly}_circRNAs_{chr_num} where start>={start-bias} and start<={start+bias} and end<={end+bias} and end>={end-bias};' | ||
with connection.cursor() as cur: | ||
cur.execute(result_script) | ||
result = cur.fetchall() | ||
time2 = time.time() | ||
#time2 = time.time() | ||
|
||
print('Query MySQL1', len(result), f'Used {round(time2-time1,2)}s') | ||
#print('Query MySQL1', len(result), f'Used {round(time2-time1,2)}s') | ||
|
||
possible_circ = [ | ||
(i[4] - i[3] - (end - start), i) for i in result] | ||
|
||
if len(possible_circ) > 0: | ||
# append to circ_on_gene | ||
circ = sorted(possible_circ, key=lambda x: x[0])[ | ||
0][1] | ||
0][1] | ||
circ_json = {"start": int(circ[3]), | ||
"end": int(circ[4]), | ||
"source": "CIRCpedia V2", | ||
"gene": circ[0], | ||
"transcript": circ[1], | ||
"components": circ[-1]} | ||
|
||
try: | ||
circ_on_gene[circ_json['gene']][1].append(circ_json) | ||
except: | ||
# get gene info | ||
geneINFO_script = f'''select * from {assembly}_genome_genes where id="{circ_json['gene']}";''' | ||
time3 = time.time() | ||
#print('This is gene of circ', circ[0]) | ||
|
||
if circ[0] in circ_on_gene.keys(): | ||
#print('Check if key in circ_on_gene exist:', circ_on_gene.keys()) | ||
circ_on_gene[circ[0]][1].append(circ_json) | ||
print('Gene existed:', len(circ_on_gene[circ[0]][1])) | ||
else: | ||
# get gene info | ||
geneINFO_script = f'''select * from {assembly}_genome_genes where id="{circ[0]}";''' | ||
#time3 = time.time() | ||
with connection.cursor() as cur: | ||
cur.execute(geneINFO_script) | ||
geneINFO = cur.fetchall() | ||
time4 = time.time() | ||
print('Query MySQL2', f'Used {round(time4-time3,2)}s') | ||
|
||
circ_on_gene[geneINFO[0][4]] = [ | ||
list(geneINFO[0]), [circ]] | ||
#time4 = time.time() | ||
#print('Query MySQL2', f'Used {round(time4-time3,2)}s') | ||
geneINFO = [str(i) for i in geneINFO[0]] | ||
print(geneINFO) | ||
circ_on_gene[geneINFO[4]] = [ | ||
list(geneINFO), [circ_json]] | ||
#print('Gene not existed:', circ_on_gene) | ||
|
||
else: | ||
unmap_circ.append(l) | ||
#except Exception as e: | ||
#print(mp.current_process().name, "Failed to process:", line, e) | ||
print('Unmapped circ number:', len(unmap_circ)) | ||
for circ in unmap_circ: | ||
combo, gene, transcript = find_exon_combo(circ[0].lower(), int(circ[1]), int(circ[2]), assembly) | ||
|
@@ -246,6 +255,8 @@ def process_file(file, assembly: str, file_type, new_file, task_id, bias=2): | |
cur.execute(geneINFO_script) | ||
geneINFO = cur.fetchall() | ||
|
||
geneINFO = [str(i) for i in geneINFO[0]] | ||
|
||
circ_on_gene[geneINFO[4]] = [geneINFO, [{"start": int(circ[1]), | ||
"end": int(circ[2]), | ||
"source": "circDraw_annotated", | ||
|
@@ -254,12 +265,20 @@ def process_file(file, assembly: str, file_type, new_file, task_id, bias=2): | |
"components": combo}]] | ||
|
||
print('Mapped circ:', len(circ_on_gene)) | ||
#print(circ_on_gene) | ||
#### write to file | ||
print('This is new_file:', new_file) | ||
with open(new_file, 'w+') as f: | ||
path = '/'.join(file.split('/')[0:-1]) + '/' | ||
print('This is new_file:', path + new_file) | ||
with open(path + new_file, 'w+') as f: | ||
for _,v in circ_on_gene.items(): | ||
info = v[0].append(v[1]) | ||
#print('Before Info',k, v[0],type(v[0]), type(v[1])) | ||
gene = v[0] | ||
info = [gene[4],gene[0],gene[1],gene[2],gene[5],gene[6]] | ||
info.append(ujson.dumps(v[1])) | ||
info.append(str(len(v[1]))) | ||
line = task_id + '\t' + '\t'.join(info) | ||
print('Check columns number:', len(line.split('\t')) == 9) | ||
print(line.split('\t')[-1]) | ||
f.write(line+'\n') | ||
print('Finish process file:', new_file) | ||
# columns | ||
|
@@ -285,13 +304,18 @@ def process_file(file, assembly: str, file_type, new_file, task_id, bias=2): | |
} """ | ||
|
||
def handle(config): | ||
engine.dispose() | ||
connection = engine.raw_connection() | ||
try: | ||
split_file(config) | ||
path = '/'.join(config['FILE_NAME'].split('/')[0:-1]) + '/' | ||
new_files = [f"""{config['FILE_NAME']}_circDraw_generate.{i}""" for i in range(1,config['CORE_NUM'] + 1)] | ||
splited_files = [f"""{config['FILE_NAME']}.{i}""" for i in range(1,config['CORE_NUM'] + 1)] | ||
print(splited_files) | ||
jobs = [] | ||
for i in new_files: | ||
p = mp.Process(target=process_file, args=(config['FILE_NAME'], config['ASSEMBLY'], config['FILE_TYPE'], i, config['TASK_ID'])) | ||
n = 0 | ||
for i in splited_files: | ||
n += 1 | ||
p = mp.Process(target=process_file, args=(i, config['ASSEMBLY'], config['FILE_TYPE'], f"{config['NEW_FILE']}.{n}", config['TASK_ID'])) | ||
jobs.append(p) | ||
|
||
for j in jobs: | ||
|
@@ -302,7 +326,7 @@ def handle(config): | |
|
||
print(f'Finish processing {config["TASK_ID"]}') | ||
|
||
concat_files(config) | ||
concat_files(config, delete_old=False) | ||
|
||
print(f'Concating success {config["TASK_ID"]}') | ||
|
||
|
@@ -312,37 +336,37 @@ def handle(config): | |
with open(f"{path}{config['NEW_FILE']}", 'r') as f: | ||
with open(f"{path}{config['TASK_ID']}_density", 'w') as c: | ||
for line in f: | ||
print(line) | ||
#print(line) | ||
info = line.split('\t') | ||
circINFO = ujson.loads(info[-1]) | ||
circINFO = ujson.loads(info[-2]) | ||
# density table | ||
# md5 id chr_num start end name type circ_num | ||
for i in circINFO: | ||
circRNA_length.append(i['end'] - i['start']) | ||
circRNA_isoform.append(info[5], len(circINFO)) | ||
c.write('\t'.join(info[0:-1]) + '\t' + len(circINFO) + '\n') | ||
circRNA_isoform.append((info[5], len(circINFO))) | ||
c.write('\t'.join(info[0:-2]) + '\t' + str(len(circINFO)) + '\n') | ||
|
||
print(f'circRNA length capacity: {len(circRNA_length)}', | ||
f'circRNA isoform capacity: {len(circRNA_isoform)}') | ||
|
||
tmp_circ_len = Counter(circRNA_length).items() | ||
|
||
circRNA_length_distribution = ujson.dumps({"x":[k for k in tmp_circ_len], | ||
circRNA_length_distribution = ujson.dumps({"x":[k for k,_ in tmp_circ_len], | ||
"y":[v for _,v in tmp_circ_len]}) | ||
|
||
tmp_circRNA_isoform = sorted(circRNA_isoform, key=lambda x: x[1], reverse=True)[0:20] | ||
circRNA_isoform = ujson.dumps({"x":[k for k in tmp_circRNA_isoform], | ||
circRNA_isoform = ujson.dumps({"x":[k for k,_ in tmp_circRNA_isoform], | ||
"y":[v for _,v in tmp_circRNA_isoform]}) | ||
|
||
# load file to database | ||
with connection.cursor() as cur: | ||
table_name = "UserTable" | ||
cur.execute('''SET GLOBAL local_infile = 1;''') | ||
cur.execute(f"""LOAD DATA LOCAL INFILE '{path}{config['TASK_ID']}_density' IGNORE INTO TABLE {table_name} character set utf8mb4 fields terminated by '\t' lines terminated by '\n' (`md5`,`id`,`chr_num`, `start`,`end`,`name`,`gene_type`, `circ_on_gene_all`, `circ_on_num`);""") | ||
cur.execute('''SET GLOBAL local_infile = 'ON';''') | ||
cur.execute('''SHOW GLOBAL VARIABLES LIKE 'local_infile';''') | ||
local_infile = cur.fetchall() | ||
print('LOCAL_INFILE =',local_infile) | ||
cur.execute(f"""LOAD DATA LOCAL INFILE '{path}{config['NEW_FILE']}' IGNORE INTO TABLE UserTable character set utf8mb4 fields terminated by '\t' lines terminated by '\n' (`md5`,`gene_id`,`chr_num`, `start`,`end`,`name`,`gene_type`, `circ_on_gene_all`, `circ_on_num`);""") | ||
cur.execute(f"""LOAD DATA LOCAL INFILE '{path}{config['TASK_ID']}_density' IGNORE INTO TABLE UserDensity character set utf8mb4 fields terminated by '\t' lines terminated by '\n' (`md5`,`gene_id`,`chr_num`, `start`,`end`,`name`,`gene_type`, `circ_num`);""") | ||
connection.commit() | ||
|
||
|
||
|
||
return True,circRNA_length_distribution,circRNA_isoform | ||
except Exception as e: | ||
print('Handle Error:', e) | ||
|
Oops, something went wrong.