comparison eutils.py @ 0:1b4ac594d02a draft

planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/tools/ncbi_entrez_eutils commit 780c9984a9c44d046aadf1e316a668d1e53aa1f0
author iuc
date Sat, 31 Oct 2015 12:44:54 -0400
parents
children aa88712a7536
comparison
equal deleted inserted replaced
-1:000000000000 0:1b4ac594d02a
1 import os
2 import json
3 import StringIO
4 from Bio import Entrez
5 Entrez.tool = "GalaxyEutils_1_0"
6 BATCH_SIZE = 200
7
8
9 class Client(object):
10
11 def __init__(self, history_file=None, user_email=None, admin_email=None):
12 self.using_history = False
13
14 if user_email is not None and admin_email is not None:
15 Entrez.email = ';'.join((admin_email, user_email))
16 elif user_email is not None:
17 Entrez.email = user_email
18 elif admin_email is not None:
19 Entrez.email = admin_email
20 else:
21 Entrez.email = os.environ.get('NCBI_EUTILS_CONTACT', None)
22
23 if Entrez.email is None:
24 raise Exception("Cannot continue without an email; please set "
25 "administrator email in NCBI_EUTILS_CONTACT")
26
27 if history_file is not None:
28 with open(history_file, 'r') as handle:
29 data = json.loads(handle.read())
30 self.query_key = data['QueryKey']
31 self.webenv = data['WebEnv']
32 self.using_history = True
33
34 def get_history(self):
35 if not self.using_history:
36 return {}
37 else:
38 return {
39 'query_key': self.query_key,
40 'WebEnv': self.webenv,
41 }
42
43 def post(self, database, **payload):
44 return json.dumps(Entrez.read(Entrez.epost(database, **payload)), indent=4)
45
46 def fetch(self, db, whole=False, **payload):
47 if whole:
48 if 'id' in payload:
49 summary = self.id_summary(db, payload['id'])
50 else:
51 summary = self.history_summary(db)
52
53 count = len(summary)
54
55 payload['retmax'] = BATCH_SIZE
56
57 # Print the first one
58 print Entrez.efetch(db, **payload).read()
59 # Then write subsequent to files for <discover datasets>
60 for i in range(BATCH_SIZE, count, BATCH_SIZE):
61 payload['retstart'] = i
62 # TODO: output multiple files??? Collection?
63 with open('%s.out' % i, 'w') as handle:
64 handle.write(Entrez.efetch(db, **payload).read())
65 else:
66 print Entrez.efetch(db, **payload).read()
67
68 def id_summary(self, db, id_list):
69 payload = {
70 'db': db,
71 'id': id_list,
72 }
73 return Entrez.read(Entrez.esummary(**payload))
74
75 def history_summary(self, db):
76 if not self.using_history:
77 raise Exception("History must be available for this method")
78
79 payload = {
80 'db': db,
81 'query_key': self.query_key,
82 'WebEnv': self.webenv,
83 }
84 return Entrez.read(Entrez.esummary(**payload))
85
86 def summary(self, **payload):
87 return Entrez.esummary(**payload).read()
88
89 def link(self, **payload):
90 return Entrez.elink(**payload).read()
91
92 def extract_history(self, xml_data):
93 parsed_data = Entrez.read(StringIO.StringIO(xml_data))
94 history = {}
95 for key in ('QueryKey', 'WebEnv'):
96 if key in parsed_data:
97 history[key] = parsed_data[key]
98
99 return history
100
101 def search(self, **payload):
102 return Entrez.esearch(**payload).read()
103
104 def info(self, **kwargs):
105 return Entrez.einfo(**kwargs).read()
106
107 def gquery(self, **kwargs):
108 return Entrez.egquery(**kwargs).read()
109
110 def citmatch(self, **kwargs):
111 return Entrez.ECitMatch(**kwargs).read()
112
113 @classmethod
114 def parse_ids(cls, id_list, id, history_file):
115 """Parse IDs passed on --cli or in a file passed to the cli
116 """
117 merged_ids = []
118 if id is not None:
119 for pid in id.replace('__cn__', ',').replace('\n', ',').split(','):
120 if pid is not None and len(pid) > 0:
121 merged_ids.append(pid)
122
123 if id_list is not None:
124 with open(id_list, 'r') as handle:
125 merged_ids += [x.strip() for x in handle.readlines()]
126
127 # Exception hanlded here for uniformity
128 if len(merged_ids) == 0 and history_file is None:
129 raise Exception("Must provide history file or IDs")
130
131 return merged_ids