comparison CHAP/writer.py @ 0:cbbe42422d56 draft

planemo upload for repository https://github.com/CHESSComputing/ChessAnalysisPipeline/tree/galaxy commit 1401a7e1ae007a6bda260d147f9b879e789b73e0-dirty
author kls286
date Tue, 28 Mar 2023 15:07:30 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:cbbe42422d56
1 #!/usr/bin/env python
2 """
3 File : writer.py
4 Author : Valentin Kuznetsov <vkuznet AT gmail dot com>
5 Description: generic Writer module
6 """
7
8 # system modules
9 import argparse
10 import json
11 import logging
12 import os
13 import sys
14 from time import time
15
16 # local modules
17 # from pipeline import PipelineObject
18
19 class Writer():
20 """
21 Writer represent generic file writer
22 """
23
24 def __init__(self):
25 """
26 Constructor of Writer class
27 """
28 self.__name__ = self.__class__.__name__
29 self.logger = logging.getLogger(self.__name__)
30 self.logger.propagate = False
31
32 def write(self, data, filename, **_write_kwargs):
33 """
34 write API
35
36 :param filename: Name of file to write to
37 :param data: data to write to file
38 :return: data written to file
39 """
40
41 t0 = time()
42 self.logger.info(f'Executing "write" with filename={filename}, type(data)={type(data)}, kwargs={_write_kwargs}')
43
44 data = self._write(data, filename, **_write_kwargs)
45
46 self.logger.info(f'Finished "write" in {time()-t0:.3f} seconds\n')
47
48 return(data)
49
50 def _write(self, data, filename):
51 with open(filename, 'a') as file:
52 file.write(data)
53 return(data)
54
55 class YAMLWriter(Writer):
56 def _write(self, data, filename, force_overwrite=False):
57 '''If `data` is a `dict`, write it to `filename`.
58
59 :param data: the dictionary to write to `filename`.
60 :type data: dict
61 :param filename: name of the file to write to.
62 :type filename: str
63 :param force_overwrite: flag to allow data in `filename` to be
64 overwritten if it already exists.
65 :type force_overwrite: bool
66 :raises TypeError: if `data` is not a `dict`
67 :raises RuntimeError: if `filename` already exists and
68 `force_overwrite` is `False`.
69 :return: the original input data
70 :rtype: dict
71 '''
72
73 import yaml
74
75 if not isinstance(data, (dict, list)):
76 raise(TypeError(f'{self.__name__}.write: input data must be a dict or list.'))
77
78 if not force_overwrite:
79 if os.path.isfile(filename):
80 raise(RuntimeError(f'{self.__name__}: {filename} already exists.'))
81
82 with open(filename, 'w') as outf:
83 yaml.dump(data, outf, sort_keys=False)
84
85 return(data)
86
87 class ExtractArchiveWriter(Writer):
88 def _write(self, data, filename):
89 '''Take a .tar archive represented as bytes in `data` and write the
90 extracted archive to files.
91
92 :param data: the archive data
93 :type data: bytes
94 :param filename: the name of a directory to which the archive files will
95 be written
96 :type filename: str
97 :return: the original `data`
98 :rtype: bytes
99 '''
100
101 from io import BytesIO
102 import tarfile
103
104 tar = tarfile.open(fileobj=BytesIO(data))
105 tar.extractall(path=filename)
106
107 return(data)
108
109
110 class NexusWriter(Writer):
111 def _write(self, data, filename, force_overwrite=False):
112 '''Write `data` to a NeXus file
113
114 :param data: the data to write to `filename`.
115 :param filename: name of the file to write to.
116 :param force_overwrite: flag to allow data in `filename` to be
117 overwritten, if it already exists.
118 :return: the original input data
119 '''
120
121 from nexusformat.nexus import NXobject
122 import xarray as xr
123
124 if isinstance(data, NXobject):
125 nxstructure = data
126
127 elif isinstance(data, xr.Dataset):
128 nxstructure = self.get_nxdata_from_dataset(data)
129
130 elif isinstance(data, xr.DataArray):
131 nxstructure = self.get_nxdata_from_dataarray(data)
132
133 else:
134 raise(TypeError(f'{self.__name__}.write: unknown data format: {type(data).__name__}'))
135
136 mode = 'w' if force_overwrite else 'w-'
137 nxstructure.save(filename, mode=mode)
138
139 return(data)
140
141
142 def get_nxdata_from_dataset(self, dset):
143 '''Return an instance of `nexusformat.nexus.NXdata` that represents the
144 data and metadata attributes contained in `dset`.
145
146 :param dset: the input dataset to represent
147 :type data: xarray.Dataset
148 :return: `dset` represented as an instance of `nexusformat.nexus.NXdata`
149 :rtype: nexusformat.nexus.NXdata
150 '''
151
152 from nexusformat.nexus import NXdata, NXfield
153
154 nxdata_args = {'signal':None, 'axes':()}
155
156 for var in dset.data_vars:
157 data_var = dset[var]
158 nxfield = NXfield(data_var.data,
159 name=data_var.name,
160 attrs=data_var.attrs)
161 if nxdata_args['signal'] is None:
162 nxdata_args['signal'] = nxfield
163 else:
164 nxdata_args[var] = nxfield
165
166 for coord in dset.coords:
167 coord_var = dset[coord]
168 nxfield = NXfield(coord_var.data,
169 name=coord_var.name,
170 attrs=coord_var.attrs)
171 nxdata_args['axes'] = (*nxdata_args['axes'], nxfield)
172
173 nxdata = NXdata(**nxdata_args)
174 nxdata.attrs['xarray_attrs'] = json.dumps(dset.attrs)
175
176 return(nxdata)
177
178 def get_nxdata_from_dataarray(self, darr):
179 '''Return an instance of `nexusformat.nexus.NXdata` that represents the
180 data and metadata attributes contained in `darr`.
181
182 :param darr: the input dataset to represent
183 :type darr: xarray.DataArray
184 :return: `darr` represented as an instance of `nexusformat.nexus.NXdata`
185 :rtype: nexusformat.nexus.NXdata
186 '''
187
188 from nexusformat.nexus import NXdata, NXfield
189
190 nxdata_args = {'signal':None, 'axes':()}
191
192 nxdata_args['signal'] = NXfield(darr.data,
193 name=darr.name,
194 attrs=darr.attrs)
195
196
197 for coord in darr.coords:
198 coord_var = darr[coord]
199 nxfield = NXfield(coord_var.data,
200 name=coord_var.name,
201 attrs=coord_var.attrs)
202 nxdata_args['axes'] = (*nxdata_args['axes'], nxfield)
203
204 nxdata = NXdata(**nxdata_args)
205 nxdata.attrs['xarray_attrs'] = json.dumps(darr.attrs)
206
207 return(nxdata)
208
209
210 class OptionParser():
211 '''User based option parser'''
212 def __init__(self):
213 self.parser = argparse.ArgumentParser(prog='PROG')
214 self.parser.add_argument("--data", action="store",
215 dest="data", default="", help="Input data")
216 self.parser.add_argument("--filename", action="store",
217 dest="filename", default="", help="Output file")
218 self.parser.add_argument("--writer", action="store",
219 dest="writer", default="Writer", help="Writer class name")
220 self.parser.add_argument('--log-level', choices=logging._nameToLevel.keys(),
221 dest='log_level', default='INFO', help='logging level')
222
223 def main():
224 '''Main function'''
225 optmgr = OptionParser()
226 opts = optmgr.parser.parse_args()
227 clsName = opts.writer
228 try:
229 writerCls = getattr(sys.modules[__name__],clsName)
230 except:
231 print(f'Unsupported writer {clsName}')
232 sys.exit(1)
233
234 writer = writerCls()
235 writer.logger.setLevel(getattr(logging, opts.log_level))
236 log_handler = logging.StreamHandler()
237 log_handler.setFormatter(logging.Formatter('{name:20}: {message}', style='{'))
238 writer.logger.addHandler(log_handler)
239 data = writer.write(opts.data, opts.filename)
240 print(f"Writer {writer} writes to {opts.filename}, data {data}")
241
242 if __name__ == '__main__':
243 main()