Mercurial > repos > petrn > repeatexplorer
comparison lib/utils.py @ 0:f6ebec6e235e draft
Uploaded
author | petrn |
---|---|
date | Thu, 19 Dec 2019 13:46:43 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:f6ebec6e235e |
---|---|
1 #!/usr/bin/env python3 | |
2 import os | |
3 import hashlib | |
4 | |
5 from itertools import chain | |
6 | |
7 | |
8 | |
9 def md5checksum(filename, fail_if_missing=True): | |
10 try: | |
11 md5 = hashlib.md5() | |
12 with open(filename, "rb") as f: | |
13 for i in iter(lambda: f.read(4096), b""): | |
14 md5.update(i) | |
15 except FileNotFoundError as e: | |
16 if not fail_if_missing: | |
17 return "Not calculated!!!! File {} is missing".format(filename) | |
18 else: | |
19 raise e | |
20 | |
21 return md5.hexdigest() | |
22 | |
23 | |
24 class FilePath(str): | |
25 ''' | |
26 Extension of str - it just contain additional atribute showing that the string is alsp path to file | |
27 ''' | |
28 | |
29 def __new__(cls, string): | |
30 obj = super(FilePath, cls).__new__(cls, string) | |
31 obj.filepath = True | |
32 return obj | |
33 | |
34 def relative(self, start): | |
35 ''' return path relative to start''' | |
36 return os.path.relpath(self, start) | |
37 | |
38 | |
39 def save_as_table(d, path, header=None, relative=True): | |
40 ''' takes list of dictionaries and save csv file | |
41 define header if you want to use specific order! | |
42 ''' | |
43 pathdir = os.path.dirname(path) | |
44 if not header: | |
45 | |
46 all_keys = [i.keys() for i in d] | |
47 header = set(chain(*all_keys)) | |
48 print("header: ---------", header) | |
49 with open(path, 'w') as f: | |
50 f.write("\t".join(header)) | |
51 f.write("\n") | |
52 for i in d: | |
53 istr = [] | |
54 for key in header: | |
55 if isinstance(i[key], FilePath): | |
56 if relative: | |
57 istr.append('"' + str(i[key].relative(pathdir)) + '"') | |
58 else: | |
59 istr.append('"' + str(i[key]) + '"') | |
60 else: | |
61 if isinstance(i[key], str): | |
62 istr.append('"' + str(i[key] + '"')) | |
63 else: | |
64 istr.append(str(i[key])) | |
65 | |
66 f.write("\t".join(istr)) | |
67 f.write("\n") | |
68 | |
69 | |
70 def export_tandem_consensus(clusters_info, path, rank=1, n=1): | |
71 ''' export tr consensu to file''' | |
72 print("exporting fasta files") | |
73 print(clusters_info) | |
74 s = None | |
75 with open(path, 'w') as f: | |
76 for cl in clusters_info: | |
77 print(cl) | |
78 print(dir(cl)) | |
79 if cl.TR_consensus and rank == cl.tandem_rank: | |
80 s = ">CL{index}_TR_{n}_x_{L}nt\n{sequence}\n".format( | |
81 index=cl.index, | |
82 n=n, | |
83 L=cl.TR_monomer_length, | |
84 sequence=n * cl.TR_consensus.replace('<pre>', '')) | |
85 f.write(s) | |
86 if s: | |
87 return path | |
88 else: | |
89 return None | |
90 | |
91 | |
92 def file_len(filename): | |
93 '''count number of lines in file''' | |
94 with open(filename) as f: | |
95 i = 0 | |
96 for i in f: | |
97 i += i | |
98 return i | |
99 | |
100 def go2line(f, L): | |
101 ''' find line L in file object f ''' | |
102 f.seek(0) | |
103 if L == 0: | |
104 return | |
105 i = 0 | |
106 pos = f.tell() | |
107 for line in f: | |
108 i += 1 | |
109 if i == L: | |
110 f.seek(pos) | |
111 return | |
112 else: | |
113 pos = pos + len(line) | |
114 | |
115 def format_query(x): | |
116 ''' | |
117 make list for query in format ("x","y","x",...) | |
118 ''' | |
119 out = '("'+ '","'.join( | |
120 map(str, x) | |
121 ) + '")' | |
122 return out |