importiofromtypingimportIterableimportpandasaspdfrombiothings_clientimportget_clientfrom.._normalizeimportNormalizeColumnsdefformat_into_dataframe(f):@wraps(f)defdataframe(*args,**kwargs)->pd.DataFrame:# Check if the first argument is selfidx=0if_is_function(args[0])else1ifisinstance(args[idx],pd.DataFrame):df=args[idx]reformat=Falseelse:df=pd.DataFrame(index=[dfordinargs[idx]])reformat=Trueargs_new=list(args)args_new[idx]=dfreturnf(*args_new,_reformat=reformat,**kwargs)returndataframeclassMygene:"""Wrapper of MyGene.info. See: https://docs.mygene.info/en/latest/index.html """def__init__(self)->None:self._server=get_client("gene")@propertydefserver(self):"""MyGene.info."""returnself._serverdefquery(self,genes:Iterable[str],scopes="symbol",fields="HGNC,symbol",species="human",as_dataframe=True,verbose=False,**kwargs,):"""Get HGNC IDs from Mygene. Args: genes: Input list scopes: ID types of the input fields: ID type of the output species: species as_dataframe: Whether to return a data frame verbose: Whether to print logging **kwargs: see **kwargs of `biothings_client.MyGeneInfo().querymany()` Returns: a dataframe ('HGNC' column is reformatted to be 'hgnc_id') """# query via mygeneres=self.server.querymany(qterms=genes,scopes=scopes,fields=fields,species=species,as_dataframe=as_dataframe,verbose=verbose,**kwargs,)# format HGNC IDs to match `hgnc_id` format ('HGNC:int')if"HGNC"inres.columns:res["HGNC"]=[f"HGNC:{i}"ifisinstance(i,str)elseiforiinres["HGNC"]]NormalizeColumns.gene(res)returnresdef_cleanup_mygene_returns(self,res:pd.DataFrame,unique_col="hgnc_id"):"""Clean up duplicates and NAs from the mygene returns. Args: res: Returned dataframe from `.mg.query` unique_col: Unique identifier column Returns: a dict with only uniquely mapped IDs """mapped_dict={}# drop columns without mapped unique IDs (HGNC)df=res.dropna(subset=unique_col)# for unique results, use returned HGNC IDs to get symbols from .hgncudf=df[~df.index.duplicated(keep=False)].copy()df_=self.df.reset_index().set_index("hgnc_id")udf["std_id"]=udf["hgnc_id"].map(df_.loc[df_.index.isin(udf["hgnc_id"]),["hgnc_symbol"]].to_dict()["hgnc_symbol"])mapped_dict.update(udf[["std_id"]].to_dict()["std_id"])# TODO: if the same HGNC ID is mapped to multiple inputs?ifdf[unique_col].duplicated().sum()>0:pass# if a query is mapped to multiple HGNC IDs, do the reverse mapping from .hgnc# keep the shortest symbol as readthrough transcripts or pseudogenes are longerifdf.index.duplicated().sum()>0:dups=df[df.index.duplicated(keep=False)].copy()fordupindups.index.unique():hids=dups[dups.index==dup][unique_col].tolist()df_=self.df.reset_index().set_index("hgnc_id")d=df_.loc[df_.index.isin(hids),["hgnc_symbol"]].to_dict()["hgnc_symbol"]mapped_dict[dup]=pd.DataFrame.from_dict(d,orient="index")[0].min()returnmapped_dict@format_into_dataframedefstandardize(self,data,id_type:Optional[_IDs]=None,new_index:bool=True,_reformat:bool=False,):"""Index a dataframe with the official gene symbols from HGNC. Args: data: A list of gene symbols to be standardized If dataframe, will take the index id_type: Default is to consider input as gene symbols and alias new_index: If True, set the standardized symbols as the index - unmapped will remain the original index - original index stored in the `index_orig` column If False, write to the `standardized_symbol` column Returns: Replaces the DataFrame mappable index with the standardized symbols Adds a `std_id` column The original index is stored in the `index_orig` column """ifid_typeisNone:mapped_dict=self._standardize_symbol(df=data)else:NotImplementedErrordata["std_id"]=data.index.map(mapped_dict)ifnew_index:data["index_orig"]=data.indexdata.index=data["std_id"].fillna(data["index_orig"])data.index.name=Noneif_reformat:returndatadef_standardize_symbol(self,df:pd.DataFrame,):"""Standardize gene symbols/aliases to symbol from `.reference` table. Args: df: A dataframe with index being the column to be standardized species: 'human' Returns: a dict with the standardized symbols """# 1. Mapping from symbol to hgnc_id using .hgnc tablemapped_dict=self.df.loc[self.df.index.isin(df.index),["hgnc_id"]].to_dict()["hgnc_id"]mapped_dict.update({k:kforkinmapped_dict.keys()})# 2. For not mapped symbols, map through aliasnotmapped=df[~df.index.isin(mapped_dict.keys())].copy()ifnotmapped.shape[0]>0:mg=Mygene()res=mg.query(notmapped.index,scopes="symbol,alias",species=self._species)mapped_dict.update(self._cleanup_mygene_returns(res))returnmapped_dictclassBiomart:"""Wrapper of Biomart python APIs, good for accessing Ensembl data. See: https://github.com/sebriois/biomart """def__init__(self)->None:try:importbiomartself._server=biomart.BiomartServer("http://uswest.ensembl.org/biomart")self._dataset=NoneexceptModuleNotFoundError:raiseModuleNotFoundError("Run `pip install biomart`")@propertydefserver(self):"""biomart.BiomartServer."""returnself._server@propertydefdatabases(self):"""Listing all databases."""returnself._server.databases@propertydefdatasets(self):"""Listing all datasets."""returnself._server.datasets@propertydefdataset(self):"""A biomart.BiomartDataset."""returnself._datasetdefget_gene_ensembl(self,species="human",attributes=None,filters={},**kwargs,):"""Fetch the reference table of gene ensembl from biomart. Args: species: common name of species attributes: gene attributes from gene_ensembl datasets filters: see biomart.search() **kwargs: see biomart.search() """# database namefrombionty.geneimportGenegn=Gene(species=species)sname=gn.species.search("short_name")self._dataset=self.datasets[f"{sname}_gene_ensembl"]# default is to get all the attributesattributes=gn.fieldsifattributesisNoneelseattributes# Get the mapping between the attributesresponse=self.dataset.search({"filters":filters,"attributes":attributes},**kwargs,)data=response.raw.data.decode("utf-8")# returns a dataframedf=pd.read_csv(io.StringIO(data),sep="\t",header=None)df.columns=attributesreturndf