1 | # (c) 2005 Ian Bicking and contributors; written for Paste (http://pythonpaste.org) |
---|
2 | # Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php |
---|
3 | """ |
---|
4 | Module to find differences over time in a filesystem |
---|
5 | |
---|
6 | Basically this takes a snapshot of a directory, then sees what changes |
---|
7 | were made. The contents of the files are not checked, so you can |
---|
8 | detect that the content was changed, but not what the old version of |
---|
9 | the file was. |
---|
10 | """ |
---|
11 | |
---|
12 | import os |
---|
13 | from fnmatch import fnmatch |
---|
14 | from datetime import datetime |
---|
15 | from paste.util.UserDict24 import IterableUserDict |
---|
16 | import operator |
---|
17 | import re |
---|
18 | |
---|
19 | __all__ = ['Diff', 'Snapshot', 'File', 'Dir', 'report_expected_diffs', |
---|
20 | 'show_diff'] |
---|
21 | |
---|
22 | class Diff(object): |
---|
23 | |
---|
24 | """ |
---|
25 | Represents the difference between two snapshots |
---|
26 | """ |
---|
27 | |
---|
28 | def __init__(self, before, after): |
---|
29 | self.before = before |
---|
30 | self.after = after |
---|
31 | self._calculate() |
---|
32 | |
---|
33 | def _calculate(self): |
---|
34 | before = self.before.data |
---|
35 | after = self.after.data |
---|
36 | self.deleted = {} |
---|
37 | self.updated = {} |
---|
38 | self.created = after.copy() |
---|
39 | for path, f in before.items(): |
---|
40 | if path not in after: |
---|
41 | self.deleted[path] = f |
---|
42 | continue |
---|
43 | del self.created[path] |
---|
44 | if f.mtime < after[path].mtime: |
---|
45 | self.updated[path] = after[path] |
---|
46 | |
---|
47 | def __str__(self): |
---|
48 | return self.report() |
---|
49 | |
---|
50 | def report(self, header=True, dates=False): |
---|
51 | s = [] |
---|
52 | if header: |
---|
53 | s.append('Difference in %s from %s to %s:' % |
---|
54 | (self.before.base_path, |
---|
55 | self.before.calculated, |
---|
56 | self.after.calculated)) |
---|
57 | for name, files, show_size in [ |
---|
58 | ('created', self.created, True), |
---|
59 | ('deleted', self.deleted, True), |
---|
60 | ('updated', self.updated, True)]: |
---|
61 | if files: |
---|
62 | s.append('-- %s: -------------------' % name) |
---|
63 | files = files.items() |
---|
64 | files.sort() |
---|
65 | last = '' |
---|
66 | for path, f in files: |
---|
67 | t = ' %s' % _space_prefix(last, path, indent=4, |
---|
68 | include_sep=False) |
---|
69 | last = path |
---|
70 | if show_size and f.size != 'N/A': |
---|
71 | t += ' (%s bytes)' % f.size |
---|
72 | if dates: |
---|
73 | parts = [] |
---|
74 | if self.before.get(path): |
---|
75 | parts.append(self.before[path].mtime) |
---|
76 | if self.after.get(path): |
---|
77 | parts.append(self.after[path].mtime) |
---|
78 | t += ' (mtime: %s)' % ('->'.join(map(repr, parts))) |
---|
79 | s.append(t) |
---|
80 | if len(s) == 1: |
---|
81 | s.append(' (no changes)') |
---|
82 | return '\n'.join(s) |
---|
83 | |
---|
84 | class Snapshot(IterableUserDict): |
---|
85 | |
---|
86 | """ |
---|
87 | Represents a snapshot of a set of files. Has a dictionary-like |
---|
88 | interface, keyed relative to ``base_path`` |
---|
89 | """ |
---|
90 | |
---|
91 | def __init__(self, base_path, files=None, ignore_wildcards=(), |
---|
92 | ignore_paths=(), ignore_hidden=True): |
---|
93 | self.base_path = base_path |
---|
94 | self.ignore_wildcards = ignore_wildcards |
---|
95 | self.ignore_hidden = ignore_hidden |
---|
96 | self.ignore_paths = ignore_paths |
---|
97 | self.calculated = None |
---|
98 | self.data = files or {} |
---|
99 | if files is None: |
---|
100 | self.find_files() |
---|
101 | |
---|
102 | ############################################################ |
---|
103 | ## File finding |
---|
104 | ############################################################ |
---|
105 | |
---|
106 | def find_files(self): |
---|
107 | """ |
---|
108 | Find all the files under the base path, and put them in |
---|
109 | ``self.data`` |
---|
110 | """ |
---|
111 | self._find_traverse('', self.data) |
---|
112 | self.calculated = datetime.now() |
---|
113 | |
---|
114 | def _ignore_file(self, fn): |
---|
115 | if fn in self.ignore_paths: |
---|
116 | return True |
---|
117 | if self.ignore_hidden and os.path.basename(fn).startswith('.'): |
---|
118 | return True |
---|
119 | for pat in self.ignore_wildcards: |
---|
120 | if fnmatch(fn, pat): |
---|
121 | return True |
---|
122 | return False |
---|
123 | |
---|
124 | def _ignore_file(self, fn): |
---|
125 | if fn in self.ignore_paths: |
---|
126 | return True |
---|
127 | if self.ignore_hidden and os.path.basename(fn).startswith('.'): |
---|
128 | return True |
---|
129 | return False |
---|
130 | |
---|
131 | def _find_traverse(self, path, result): |
---|
132 | full = os.path.join(self.base_path, path) |
---|
133 | if os.path.isdir(full): |
---|
134 | if path: |
---|
135 | # Don't actually include the base path |
---|
136 | result[path] = Dir(self.base_path, path) |
---|
137 | for fn in os.listdir(full): |
---|
138 | fn = os.path.join(path, fn) |
---|
139 | if self._ignore_file(fn): |
---|
140 | continue |
---|
141 | self._find_traverse(fn, result) |
---|
142 | else: |
---|
143 | result[path] = File(self.base_path, path) |
---|
144 | |
---|
145 | def __repr__(self): |
---|
146 | return '<%s in %r from %r>' % ( |
---|
147 | self.__class__.__name__, self.base_path, |
---|
148 | self.calculated or '(no calculation done)') |
---|
149 | |
---|
150 | def compare_expected(self, expected, comparison=operator.eq, |
---|
151 | differ=None, not_found=None, |
---|
152 | include_success=False): |
---|
153 | """ |
---|
154 | Compares a dictionary of ``path: content`` to the |
---|
155 | found files. Comparison is done by equality, or the |
---|
156 | ``comparison(actual_content, expected_content)`` function given. |
---|
157 | |
---|
158 | Returns dictionary of differences, keyed by path. Each |
---|
159 | difference is either noted, or the output of |
---|
160 | ``differ(actual_content, expected_content)`` is given. |
---|
161 | |
---|
162 | If a file does not exist and ``not_found`` is given, then |
---|
163 | ``not_found(path)`` is put in. |
---|
164 | """ |
---|
165 | result = {} |
---|
166 | for path in expected: |
---|
167 | orig_path = path |
---|
168 | path = path.strip('/') |
---|
169 | if path not in self.data: |
---|
170 | if not_found: |
---|
171 | msg = not_found(path) |
---|
172 | else: |
---|
173 | msg = 'not found' |
---|
174 | result[path] = msg |
---|
175 | continue |
---|
176 | expected_content = expected[orig_path] |
---|
177 | file = self.data[path] |
---|
178 | actual_content = file.bytes |
---|
179 | if not comparison(actual_content, expected_content): |
---|
180 | if differ: |
---|
181 | msg = differ(actual_content, expected_content) |
---|
182 | else: |
---|
183 | if len(actual_content) < len(expected_content): |
---|
184 | msg = 'differ (%i bytes smaller)' % ( |
---|
185 | len(expected_content) - len(actual_content)) |
---|
186 | elif len(actual_content) > len(expected_content): |
---|
187 | msg = 'differ (%i bytes larger)' % ( |
---|
188 | len(actual_content) - len(expected_content)) |
---|
189 | else: |
---|
190 | msg = 'diff (same size)' |
---|
191 | result[path] = msg |
---|
192 | elif include_success: |
---|
193 | result[path] = 'same!' |
---|
194 | return result |
---|
195 | |
---|
196 | def diff_to_now(self): |
---|
197 | return Diff(self, self.clone()) |
---|
198 | |
---|
199 | def clone(self): |
---|
200 | return self.__class__(base_path=self.base_path, |
---|
201 | ignore_wildcards=self.ignore_wildcards, |
---|
202 | ignore_paths=self.ignore_paths, |
---|
203 | ignore_hidden=self.ignore_hidden) |
---|
204 | |
---|
205 | class File(object): |
---|
206 | |
---|
207 | """ |
---|
208 | Represents a single file found as the result of a command. |
---|
209 | |
---|
210 | Has attributes: |
---|
211 | |
---|
212 | ``path``: |
---|
213 | The path of the file, relative to the ``base_path`` |
---|
214 | |
---|
215 | ``full``: |
---|
216 | The full path |
---|
217 | |
---|
218 | ``stat``: |
---|
219 | The results of ``os.stat``. Also ``mtime`` and ``size`` |
---|
220 | contain the ``.st_mtime`` and ``st_size`` of the stat. |
---|
221 | |
---|
222 | ``bytes``: |
---|
223 | The contents of the file. |
---|
224 | |
---|
225 | You may use the ``in`` operator with these objects (tested against |
---|
226 | the contents of the file), and the ``.mustcontain()`` method. |
---|
227 | """ |
---|
228 | |
---|
229 | file = True |
---|
230 | dir = False |
---|
231 | |
---|
232 | def __init__(self, base_path, path): |
---|
233 | self.base_path = base_path |
---|
234 | self.path = path |
---|
235 | self.full = os.path.join(base_path, path) |
---|
236 | self.stat = os.stat(self.full) |
---|
237 | self.mtime = self.stat.st_mtime |
---|
238 | self.size = self.stat.st_size |
---|
239 | self._bytes = None |
---|
240 | |
---|
241 | def bytes__get(self): |
---|
242 | if self._bytes is None: |
---|
243 | f = open(self.full, 'rb') |
---|
244 | self._bytes = f.read() |
---|
245 | f.close() |
---|
246 | return self._bytes |
---|
247 | bytes = property(bytes__get) |
---|
248 | |
---|
249 | def __contains__(self, s): |
---|
250 | return s in self.bytes |
---|
251 | |
---|
252 | def mustcontain(self, s): |
---|
253 | __tracebackhide__ = True |
---|
254 | bytes = self.bytes |
---|
255 | if s not in bytes: |
---|
256 | print 'Could not find %r in:' % s |
---|
257 | print bytes |
---|
258 | assert s in bytes |
---|
259 | |
---|
260 | def __repr__(self): |
---|
261 | return '<%s %s:%s>' % ( |
---|
262 | self.__class__.__name__, |
---|
263 | self.base_path, self.path) |
---|
264 | |
---|
265 | class Dir(File): |
---|
266 | |
---|
267 | """ |
---|
268 | Represents a directory created by a command. |
---|
269 | """ |
---|
270 | |
---|
271 | file = False |
---|
272 | dir = True |
---|
273 | |
---|
274 | def __init__(self, base_path, path): |
---|
275 | self.base_path = base_path |
---|
276 | self.path = path |
---|
277 | self.full = os.path.join(base_path, path) |
---|
278 | self.size = 'N/A' |
---|
279 | self.mtime = 'N/A' |
---|
280 | |
---|
281 | def __repr__(self): |
---|
282 | return '<%s %s:%s>' % ( |
---|
283 | self.__class__.__name__, |
---|
284 | self.base_path, self.path) |
---|
285 | |
---|
286 | def bytes__get(self): |
---|
287 | raise NotImplementedError( |
---|
288 | "Directory %r doesn't have content" % self) |
---|
289 | |
---|
290 | bytes = property(bytes__get) |
---|
291 | |
---|
292 | |
---|
293 | def _space_prefix(pref, full, sep=None, indent=None, include_sep=True): |
---|
294 | """ |
---|
295 | Anything shared by pref and full will be replaced with spaces |
---|
296 | in full, and full returned. |
---|
297 | |
---|
298 | Example:: |
---|
299 | |
---|
300 | >>> _space_prefix('/foo/bar', '/foo') |
---|
301 | ' /bar' |
---|
302 | """ |
---|
303 | if sep is None: |
---|
304 | sep = os.path.sep |
---|
305 | pref = pref.split(sep) |
---|
306 | full = full.split(sep) |
---|
307 | padding = [] |
---|
308 | while pref and full and pref[0] == full[0]: |
---|
309 | if indent is None: |
---|
310 | padding.append(' ' * (len(full[0]) + len(sep))) |
---|
311 | else: |
---|
312 | padding.append(' ' * indent) |
---|
313 | full.pop(0) |
---|
314 | pref.pop(0) |
---|
315 | if padding: |
---|
316 | if include_sep: |
---|
317 | return ''.join(padding) + sep + sep.join(full) |
---|
318 | else: |
---|
319 | return ''.join(padding) + sep.join(full) |
---|
320 | else: |
---|
321 | return sep.join(full) |
---|
322 | |
---|
323 | def report_expected_diffs(diffs, colorize=False): |
---|
324 | """ |
---|
325 | Takes the output of compare_expected, and returns a string |
---|
326 | description of the differences. |
---|
327 | """ |
---|
328 | if not diffs: |
---|
329 | return 'No differences' |
---|
330 | diffs = diffs.items() |
---|
331 | diffs.sort() |
---|
332 | s = [] |
---|
333 | last = '' |
---|
334 | for path, desc in diffs: |
---|
335 | t = _space_prefix(last, path, indent=4, include_sep=False) |
---|
336 | if colorize: |
---|
337 | t = color_line(t, 11) |
---|
338 | last = path |
---|
339 | if len(desc.splitlines()) > 1: |
---|
340 | cur_indent = len(re.search(r'^[ ]*', t).group(0)) |
---|
341 | desc = indent(cur_indent+2, desc) |
---|
342 | if colorize: |
---|
343 | t += '\n' |
---|
344 | for line in desc.splitlines(): |
---|
345 | if line.strip().startswith('+'): |
---|
346 | line = color_line(line, 10) |
---|
347 | elif line.strip().startswith('-'): |
---|
348 | line = color_line(line, 9) |
---|
349 | else: |
---|
350 | line = color_line(line, 14) |
---|
351 | t += line+'\n' |
---|
352 | else: |
---|
353 | t += '\n' + desc |
---|
354 | else: |
---|
355 | t += ' '+desc |
---|
356 | s.append(t) |
---|
357 | s.append('Files with differences: %s' % len(diffs)) |
---|
358 | return '\n'.join(s) |
---|
359 | |
---|
360 | def color_code(foreground=None, background=None): |
---|
361 | """ |
---|
362 | 0 black |
---|
363 | 1 red |
---|
364 | 2 green |
---|
365 | 3 yellow |
---|
366 | 4 blue |
---|
367 | 5 magenta (purple) |
---|
368 | 6 cyan |
---|
369 | 7 white (gray) |
---|
370 | |
---|
371 | Add 8 to get high-intensity |
---|
372 | """ |
---|
373 | if foreground is None and background is None: |
---|
374 | # Reset |
---|
375 | return '\x1b[0m' |
---|
376 | codes = [] |
---|
377 | if foreground is None: |
---|
378 | codes.append('[39m') |
---|
379 | elif foreground > 7: |
---|
380 | codes.append('[1m') |
---|
381 | codes.append('[%im' % (22+foreground)) |
---|
382 | else: |
---|
383 | codes.append('[%im' % (30+foreground)) |
---|
384 | if background is None: |
---|
385 | codes.append('[49m') |
---|
386 | else: |
---|
387 | codes.append('[%im' % (40+background)) |
---|
388 | return '\x1b' + '\x1b'.join(codes) |
---|
389 | |
---|
390 | def color_line(line, foreground=None, background=None): |
---|
391 | match = re.search(r'^(\s*)', line) |
---|
392 | return (match.group(1) + color_code(foreground, background) |
---|
393 | + line[match.end():] + color_code()) |
---|
394 | |
---|
395 | def indent(indent, text): |
---|
396 | return '\n'.join( |
---|
397 | [' '*indent + l for l in text.splitlines()]) |
---|
398 | |
---|
399 | def show_diff(actual_content, expected_content): |
---|
400 | actual_lines = [l.strip() for l in actual_content.splitlines() |
---|
401 | if l.strip()] |
---|
402 | expected_lines = [l.strip() for l in expected_content.splitlines() |
---|
403 | if l.strip()] |
---|
404 | if len(actual_lines) == len(expected_lines) == 1: |
---|
405 | return '%r not %r' % (actual_lines[0], expected_lines[0]) |
---|
406 | if not actual_lines: |
---|
407 | return 'Empty; should have:\n'+expected_content |
---|
408 | import difflib |
---|
409 | return '\n'.join(difflib.ndiff(actual_lines, expected_lines)) |
---|