diff --git a/easydock/preparation_for_docking.py b/easydock/preparation_for_docking.py index 3a27d17..75297ed 100644 --- a/easydock/preparation_for_docking.py +++ b/easydock/preparation_for_docking.py @@ -7,8 +7,11 @@ from meeko import (MoleculePreparation, PDBQTMolecule, PDBQTWriterLegacy, RDKitMolCreate) from rdkit import Chem -from rdkit.Chem import AllChem - +from rdkit.Chem import AllChem, Draw +import numpy as np +from sklearn.cluster import AgglomerativeClustering +from rdkit.Chem.rdMolAlign import AlignMolConformers +from typing import Optional, Iterator def cpu_type(x): return max(1, min(int(x), cpu_count())) @@ -35,43 +38,261 @@ def mol_from_smi_or_molblock(ligand_string): def mk_prepare_ligand(mol, verbose=False): + pdbqt_string_list = [] preparator = MoleculePreparation(hydrate=False, flexible_amides=False, rigid_macrocycles=True, min_ring_size=7, max_ring_size=33) try: - mol_setups = preparator.prepare(mol) - for setup in mol_setups: - pdbqt_string, is_ok, error_msg = PDBQTWriterLegacy.write_string(setup) - if verbose: - print(f"{setup}") + cids = [x.GetId() for x in mol.GetConformers()] + for cid in cids: + mol_setups = preparator.prepare(mol, conformer_id=cid) + + for setup in mol_setups: + pdbqt_string, is_ok, error_msg = PDBQTWriterLegacy.write_string(setup) + if not is_ok: + print(f"{mol.GetProp('_Name')} has error in converting to pdbqt: {error_msg}") + else: + pdbqt_string_list.append(pdbqt_string) + + if verbose: + print(f"{setup}") except Exception: sys.stderr.write( "Warning. Incorrect mol object to convert to pdbqt. Continue. \n" ) traceback.print_exc() - pdbqt_string = None + pdbqt_string_list = None + + return pdbqt_string_list + + +def GetConformerRMSFromAtomIds(mol: Chem.Mol, confId1: int, confId2: int, atomIds: Optional[list[int]]=None, prealigned: bool=False) -> float: + """ Returns the RMS between two conformations based on the atomIds passed as the input. + By default, the conformers will be aligned to the first conformer + before the RMS calculation and, as a side-effect, the second will be left + in the aligned state. - return pdbqt_string + Arguments: + - mol: the molecule + - confId1: the id of the first conformer + - confId2: the id of the second conformer + - atomIds: (optional) list of atom ids to use a points for + alingment **AND RMSD** calculation- defaults to all atoms + - prealigned: (optional) by default the conformers are assumed + be unaligned and the second conformer be aligned + to the first + """ + # align the conformers if necessary + # Note: the reference conformer is always the first one + if not prealigned: + if atomIds: + AlignMolConformers(mol, confIds=[confId1, confId2], atomIds=atomIds) + else: + AlignMolConformers(mol, confIds=[confId1, confId2]) + + # calculate the RMS between the two conformations + conf1 = mol.GetConformer(id=confId1) + conf2 = mol.GetConformer(id=confId2) + ssr = 0 + if atomIds: + for i in atomIds: + d = conf1.GetAtomPosition(i).Distance(conf2.GetAtomPosition(i)) + ssr += d * d + ssr /= len(atomIds) + else: + for i in range(mol.GetNumAtoms()): + d = conf1.GetAtomPosition(i).Distance(conf2.GetAtomPosition(i)) + ssr += d * d + ssr /= mol.GetNumAtoms() + + return np.sqrt(ssr) + +def GetConformerRMSMatrixForSaturatedRingMolecule(mol: Chem.Mol, atomIds:list[list[int]]=None, prealigned: bool=False) -> list[float]: + """ Returns the RMS matrix of the conformers of a molecule based on the alignment and rmsd of saturated ring. + The function calculates the mean of the RMSD of each saturated ring with the GetConformerRMSFromAtomIds. + The alignment is done per ring (for example, three alignments are done for three saturated ring) + + By default, the conformers will be aligned to the first conformer + before the RMS calculation and, as a side-effect, the second will be left + in the aligned state. + + As a side-effect, the conformers will be aligned to the first + conformer (i.e. the reference) and will left in the aligned state. + + Arguments: + - mol: the molecule + - atomIds: (optional) list of atom ids to use a points for + alingment - defaults to all atoms + - prealigned: (optional) by default the conformers are assumed + be unaligned and will therefore be aligned to the + first conformer + + Note that the returned RMS matrix is symmetrical, i.e. it is the + lower half of the matrix, e.g. for 5 conformers:: + + rmsmatrix = [ a, + b, c, + d, e, f, + g, h, i, j] + + where a is the RMS between conformers 0 and 1, b is the RMS between + conformers 0 and 2, etc. + This way it can be directly used as distance matrix in e.g. Butina + clustering. -def mol_embedding_3d(mol, seed=43): + """ - def gen_conf(mole, useRandomCoords, randomSeed): + cmat_list = [] + total_ring_atoms = sum([len(atom_in_ring) for atom_in_ring in atomIds]) + for atom_id_in_ring in atomIds: + # if necessary, align the conformers + # Note: the reference conformer is always the first one + rmsvals = [] + confIds = [conf.GetId() for conf in mol.GetConformers()] + if not prealigned: + if atom_id_in_ring: + AlignMolConformers(mol, atomIds=atom_id_in_ring, RMSlist=rmsvals) + else: + AlignMolConformers(mol, RMSlist=rmsvals) + else: # already prealigned + for i in range(1, len(confIds)): + rmsvals.append(GetConformerRMSFromAtomIds(mol, confIds[0], confIds[i], atomIds=atom_id_in_ring, prealigned=prealigned)) + # loop over the conformations (except the reference one) + cmat_per_ring = [] + for i in range(1, len(confIds)): + cmat_per_ring.append(rmsvals[i - 1]) + for j in range(1, i): + cmat_per_ring.append(GetConformerRMSFromAtomIds(mol, confIds[i], confIds[j], atomIds=atom_id_in_ring, prealigned=True)) + + cmat_list.append(np.square(np.array(cmat_per_ring))*len(atom_id_in_ring)) + + cmat_list_array = np.array(cmat_list) + return list(np.sqrt(np.sum(cmat_list_array, axis=0)/total_ring_atoms)) + + +def mol_embedding_3d(mol: Chem.Mol, seed: int=43) -> Chem.Mol: + + def find_saturated_ring(mol: Chem.Mol) -> list[list[int]]: + ssr = Chem.GetSymmSSSR(mol) + saturated_ring_list = [] + for ring_idx in ssr: + is_atom_saturated_array = [mol.GetAtomWithIdx(atom_id).GetHybridization() == Chem.HybridizationType.SP3 for atom_id in ring_idx] + if any(is_atom_saturated_array): + saturated_ring_list.append(ring_idx) + + return saturated_ring_list + + def find_saturated_ring_with_substituent(saturated_ring_list: list[list[int]], mol: Chem.Mol) -> list[list[int]]: + saturated_ring_with_substituent_list = [] + for ring_idx in saturated_ring_list: + ring_and_substituent_idx = [] + for ring_atom_idx in ring_idx: + ring_and_substituent_idx += [x.GetIdx() for x in mol.GetAtomWithIdx(ring_atom_idx).GetNeighbors()] + ring_and_substituent_idx = list(set(ring_and_substituent_idx)) + saturated_ring_with_substituent_list.append(ring_and_substituent_idx) + + return saturated_ring_with_substituent_list + + def gen_conf(mole: Chem.Mol, useRandomCoords: bool, randomSeed: int, has_saturated_ring: bool) -> tuple[Chem.Mol, float]: params = AllChem.ETKDGv3() params.useRandomCoords = useRandomCoords params.randomSeed = randomSeed - conf_stat = AllChem.EmbedMolecule(mole, params) + if has_saturated_ring: + #10 is used as default according to the C language documentation iirc, but I have to specify the numbers. + conf_stat = AllChem.EmbedMultipleConfs(mole, 100, params) + else: + conf_stat = AllChem.EmbedMolecule(mole, params) return mole, conf_stat - + + def calculate_ring_nconf(saturated_ring_ids: list[int]) -> int: + if len(saturated_ring_ids) <= 6: + return 2 + elif len(saturated_ring_ids) == 7: + return 3 + else: + return 4 + + def remove_confs_rms(mol: Chem.Mol, saturated_ring_list: list[list[int]], rms: float=0.25, keep_nconf: Optional[int]=None) -> Chem.Mol: + """ + The function uses AgglomerativeClustering to select conformers. + + :param mol: input molecule with multiple conformers + :param rms: discard conformers which are closer than given value to a kept conformer + :param keep_nconf: keep at most the given number of conformers. This parameter has precedence over rms + :return: + """ + + def gen_ids(ids: int) -> Iterator[int]: + for i in range(1, len(ids)): + for j in range(0, i): + yield j, i + + if keep_nconf and mol.GetNumConformers() <= keep_nconf: + return mol + + if mol.GetNumConformers() <= 1: + return mol + + rms_ = GetConformerRMSMatrixForSaturatedRingMolecule(Chem.RemoveHs(mol), atomIds=saturated_ring_list, prealigned=False) + + cids = [c.GetId() for c in mol.GetConformers()] + arr = np.zeros((len(cids), len(cids))) + for (i, j), v in zip(gen_ids(cids), rms_): + arr[i, j] = v + arr[j, i] = v + + cl = AgglomerativeClustering(n_clusters=None, linkage='complete', metric='precomputed', distance_threshold=rms).fit(arr) + keep_ids = [] + for i in set(cl.labels_): + ids = np.where(cl.labels_ == i)[0] + j = arr[np.ix_(ids, ids)].mean(axis=0).argmin() + keep_ids.append(cids[ids[j]]) + remove_ids = set(cids) - set(keep_ids) + + for cid in sorted(remove_ids, reverse=True): + mol.RemoveConformer(cid) + + if keep_nconf and mol.GetNumConformers() > keep_nconf and mol.GetNumConformers() > 1: + ids = np.in1d(cids, keep_ids) + arr = arr[np.ix_(ids, ids)] # here other indexing operation should be used, because ids is a boolean array + cl = AgglomerativeClustering(n_clusters=keep_nconf, linkage='complete', metric='precomputed').fit(arr) + + keep_ids = [] + cids = [c.GetId() for c in mol.GetConformers()] + for i in set(cl.labels_): + ids = np.where(cl.labels_ == i)[0] + j = arr[np.ix_(ids, ids)].mean(axis=0).argmin() + keep_ids.append(cids[ids[j]]) + remove_ids = set(cids) - set(keep_ids) + + for cid in sorted(remove_ids, reverse=True): + mol.RemoveConformer(cid) + + return mol + if not isinstance(mol, Chem.Mol): return None + + saturated_ring_list = find_saturated_ring(mol) + saturated_rings_with_substituents = find_saturated_ring_with_substituent(saturated_ring_list, mol) + has_saturated_ring = (len(saturated_rings_with_substituents)>0) + mol = Chem.AddHs(mol, addCoords=True) if not mol_is_3d(mol): # only for non 3D input structures - mol, conf_stat = gen_conf(mol, useRandomCoords=False, randomSeed=seed) + mol, conf_stat = gen_conf(mol, useRandomCoords=False, randomSeed=seed, has_saturated_ring=has_saturated_ring) if conf_stat == -1: # if molecule is big enough and rdkit cannot generate a conformation - use params.useRandomCoords = True - mol, conf_stat = gen_conf(mol, useRandomCoords=True, randomSeed=seed) + mol, conf_stat = gen_conf(mol, useRandomCoords=True, randomSeed=seed, has_saturated_ring=has_saturated_ring) if conf_stat == -1: return None AllChem.UFFOptimizeMolecule(mol, maxIters=100) + + print(f"[For Testing Only] {mol.GetProp('_Name')} has {len(saturated_rings_with_substituents)} saturated ring") + print(f"[For Testing Only] Before removing conformation: {mol.GetProp('_Name')} has {mol.GetNumConformers()} conf") + mol = remove_confs_rms(mol, saturated_rings_with_substituents, rms=1, keep_nconf= sum([calculate_ring_nconf(saturated_ring) for saturated_ring in saturated_ring_list])) + print(f"[For Testing Only] After removing conformation: {mol.GetProp('_Name')} has {mol.GetNumConformers()} conf") + + AlignMolConformers(mol) + return mol diff --git a/easydock/vina_dock.py b/easydock/vina_dock.py index 4bbe042..6c9236b 100755 --- a/easydock/vina_dock.py +++ b/easydock/vina_dock.py @@ -75,51 +75,66 @@ def mol_dock(mol, config): config = __parse_config(config) + + mol_id = mol.GetProp('_Name') - ligand_pdbqt = ligand_preparation(mol, boron_replacement=True) - if ligand_pdbqt is None: + ligand_pdbqt_list = ligand_preparation(mol, boron_replacement=True) + + if ligand_pdbqt_list is None: return mol_id, None - output_fd, output_fname = tempfile.mkstemp(suffix='_output.json', text=True) - ligand_fd, ligand_fname = tempfile.mkstemp(suffix='_ligand.pdbqt', text=True) - - try: - with open(ligand_fname, 'wt') as f: - f.write(ligand_pdbqt) - - p = os.path.realpath(__file__) - python_exec = sys.executable - cmd = f'{python_exec} {os.path.dirname(p)}/vina_dock_cli.py -l {ligand_fname} -p {config["protein"]} ' \ - f'-o {output_fname} --center {" ".join(map(str, config["center"]))} ' \ - f'--box_size {" ".join(map(str, config["box_size"]))} ' \ - f'-e {config["exhaustiveness"]} --seed {config["seed"]} --nposes {config["n_poses"]} -c {config["ncpu"]}' - start_time = timeit.default_timer() - subprocess.run(cmd, shell=True, check=True, capture_output=True, text=True) # this will trigger CalledProcessError and skip next lines) - dock_time = round(timeit.default_timer() - start_time, 1) - - with open(output_fname) as f: - res = f.read() - if res: - res = json.loads(res) - mol_block = pdbqt2molblock(res['poses'].split('MODEL')[1], mol, mol_id) - output = {'docking_score': res['docking_score'], - 'pdb_block': res['poses'], - 'mol_block': mol_block, - 'dock_time': dock_time} - - except subprocess.CalledProcessError as e: - sys.stderr.write(f'Error caused by docking of {mol_id}\n') - sys.stderr.write(str(e) + '\n') - sys.stderr.write('STDERR output:\n') - sys.stderr.write(e.stderr + '\n') - sys.stderr.flush() - output = None - - finally: - os.close(output_fd) - os.close(ligand_fd) - os.unlink(ligand_fname) - os.unlink(output_fname) + dock_output_conformer_list = [] + start_time = timeit.default_timer() + for ligand_pdbqt in ligand_pdbqt_list: + output_fd, output_fname = tempfile.mkstemp(suffix='_output.json', text=True) + ligand_fd, ligand_fname = tempfile.mkstemp(suffix='_ligand.pdbqt', text=True) + + try: + with open(ligand_fname, 'wt') as f: + f.write(ligand_pdbqt) + + p = os.path.realpath(__file__) + python_exec = sys.executable + cmd = f'{python_exec} {os.path.dirname(p)}/vina_dock_cli.py -l {ligand_fname} -p {config["protein"]} ' \ + f'-o {output_fname} --center {" ".join(map(str, config["center"]))} ' \ + f'--box_size {" ".join(map(str, config["box_size"]))} ' \ + f'-e {config["exhaustiveness"]} --seed {config["seed"]} --nposes {config["n_poses"]} -c {config["ncpu"]}' + subprocess.run(cmd, shell=True, check=True, capture_output=True, text=True) # this will trigger CalledProcessError and skip next lines) + + with open(output_fname) as f: + res = f.read() + if res: + res = json.loads(res) + mol_block = pdbqt2molblock(res['poses'].split('MODEL')[1], mol, mol_id) + output = {'docking_score': res['docking_score'], + 'pdb_block': res['poses'], + 'mol_block': mol_block} + + dock_output_conformer_list.append(output) + + except subprocess.CalledProcessError as e: + sys.stderr.write(f'Error caused by docking of {mol_id}\n') + sys.stderr.write(str(e) + '\n') + sys.stderr.write('STDERR output:\n') + sys.stderr.write(e.stderr + '\n') + sys.stderr.flush() + + finally: + os.close(output_fd) + os.close(ligand_fd) + os.unlink(ligand_fname) + os.unlink(output_fname) + + dock_time = round(timeit.default_timer() - start_time, 1) + print(f'[For Testing Only Sanity check]: There are {len(dock_output_conformer_list)} {mol_id} conformers that has been docked') + print(f'\n') + docking_score_list = [float(conformer_output['docking_score']) for conformer_output in dock_output_conformer_list] + + if not docking_score_list: + return mol_id, None + + output = dock_output_conformer_list[docking_score_list.index(min(docking_score_list))] + output['dock_time'] = dock_time return mol_id, output