root/galaxy-central/eggs/twill-0.9-py2.6.egg/twill/browser.py @ 3

リビジョン 3, 15.5 KB (コミッタ: kohda, 14 年 前)

Install Unix tools  http://hannonlab.cshl.edu/galaxy_unix_tools/galaxy.html

行番号 
1"""
2Implements TwillBrowser, a simple stateful wrapper for mechanize.Browser.
3
4See _browser.py for mechanize code.
5"""
6
7OUT=None
8
9# Python imports
10import re
11
12# wwwsearch imports
13import _mechanize_dist as mechanize
14from _mechanize_dist import BrowserStateError, LinkNotFoundError, ClientForm
15
16# twill package imports
17from _browser import PatchedMechanizeBrowser
18from utils import print_form, ConfigurableParsingFactory, \
19     ResultWrapper, unique_match, HistoryStack
20from errors import TwillException
21     
22
23#
24# TwillBrowser
25#
26
27class TwillBrowser(object):
28    """
29    Wrap mechanize behavior in a simple stateful way.
30
31    Public variables:
32
33      * result -- mechanize-style 'result' object.
34    """
35    def __init__(self):
36        #
37        # create special link/forms parsing code to run tidy on HTML first.
38        #
39       
40        factory = ConfigurableParsingFactory()
41
42        #
43        # Create the mechanize browser.
44        #
45       
46        b = PatchedMechanizeBrowser(history=HistoryStack(), factory=factory)
47
48        self._browser = b
49       
50        self.result = None
51        self.last_submit_button = None
52
53        #
54        # create & set a cookie jar.
55        #
56       
57        policy = mechanize.DefaultCookiePolicy(rfc2965=True)
58        cj = mechanize.LWPCookieJar(policy=policy)
59        self._browser.set_cookiejar(cj)
60        self.cj = cj
61
62        # Ask for MIME type 'text/html' by preference.
63        self._browser.addheaders = [("Accept", "text/html; */*")]
64
65        # ignore robots.txt
66        self._browser.set_handle_robots(None)
67
68        # create an HTTP auth handler
69        self.creds = mechanize.HTTPPasswordMgr()
70
71        # do handle HTTP-EQUIV properly.
72        self._browser.set_handle_equiv(True)
73
74        # callables to be called after each page load.
75        self._post_load_hooks = []
76
77    ### get/set HTTP authentication stuff.
78
79    def _set_creds(self, creds):
80        self._creds = creds
81        self._browser.set_password_manager(creds)
82
83    def _get_creds(self):
84        return self._creds
85
86    creds = property(_get_creds, _set_creds)
87       
88    def go(self, url):
89        """
90        Visit given URL.
91        """
92        try_urls = [ url, ]
93
94        # if this is an absolute URL that is just missing the 'http://' at
95        # the beginning, try fixing that.
96       
97        if url.find('://') == -1:
98            full_url = 'http://%s' % (url,)  # mimic browser behavior
99            try_urls.append(full_url)
100
101        # if this is a '?' URL, then assume that we want to tack it onto
102        # the end of the current URL.
103
104        if url.startswith('?'):
105            current_url = self.get_url()
106            current_url = current_url.split('?')[0]
107            try_urls = [ current_url + url, ]
108
109        success = False
110
111        for u in try_urls:
112            try:
113                self._journey('open', u)
114                success = True
115                break
116            except IOError:             # @CTB test this!
117                pass
118
119        if success:
120            print>>OUT, '==> at', self.get_url()
121        else:
122            raise BrowserStateError("cannot go to '%s'" % (url,))
123
124    def reload(self):
125        """
126        Tell the browser to reload the current page.
127        """
128        self._journey('reload')
129        print>>OUT, '==> reloaded'
130
131    def back(self):
132        """
133        Return to previous page, if possible.
134        """
135        try:
136            self._journey('back')
137            print>>OUT, '==> back to', self.get_url()
138        except BrowserStateError:
139            print>>OUT, '==> back at empty page.'
140
141    def get_code(self):
142        """
143        Get the HTTP status code received for the current page.
144        """
145        if self.result:
146            return self.result.get_http_code()
147        return None
148
149    def get_html(self):
150        """
151        Get the HTML for the current page.
152        """
153        if self.result:
154            return self.result.get_page()
155        return None
156
157    def get_title(self):
158        """
159        Get content of the HTML title element for the current page.
160        """
161        return self._browser.title()
162
163    def get_url(self):
164        """
165        Get the URL of the current page.
166        """
167        if self.result:
168            return self.result.get_url()
169        return None
170
171    def find_link(self, pattern):
172        """
173        Find the first link with a URL, link text, or name matching the
174        given pattern.
175        """
176
177        #
178        # first, try to find a link matching that regexp.
179        #
180       
181        try:
182            l = self._browser.find_link(url_regex=pattern)
183        except LinkNotFoundError:
184
185            #
186            # then, look for a text match.
187            #
188           
189            try:
190                l = self._browser.find_link(text_regex=pattern)
191            except LinkNotFoundError:
192                #
193                # finally, look for a name match.
194                #
195               
196                try:
197                    l = self._browser.find_link(name_regex=pattern)
198                except LinkNotFoundError:
199                    l = None
200
201        return l
202
203    def follow_link(self, link):
204        """
205        Follow the given link.
206        """
207        self._journey('follow_link', link)
208        print>>OUT, '==> at', self.get_url()
209
210    def set_agent_string(self, agent):
211        """
212        Set the agent string to the given value.
213        """
214        for i in xrange(len(self._browser.addheaders)):
215            if self._browser.addheaders[i][0] == "User-agent":
216                del self._browser.addheaders[i]
217                break
218        self._browser.addheaders += [("User-agent", agent)]
219
220    def showforms(self):
221        """
222        Pretty-print all of the forms.  Include the global form (form
223        elements outside of <form> pairs) as forms[0] iff present.
224        """
225        forms = self.get_all_forms()
226       
227        for n, f in enumerate(forms):
228            print_form(n, f, OUT)
229
230    def showlinks(self):
231        """
232        Pretty-print all of the links.
233        """
234        print>>OUT, 'Links:\n'
235        for n, link in enumerate(self._browser.links()):
236            print>>OUT, "%d. %s ==> %s" % (n, link.text, link.url,)
237        print>>OUT, ''
238
239    def showhistory(self):
240        """
241        Pretty-print the history of links visited.
242        """
243        print>>OUT, ''
244        print>>OUT, 'History: (%d pages total) ' % (len(self._browser._history))
245
246        n = 1
247        for (req, resp) in self._browser._history:
248            if req and resp:            # only print those that back() will go
249                print>>OUT, "\t%d. %s" % (n, resp.geturl())
250                n += 1
251           
252        print>>OUT, ''
253
254    def get_all_forms(self):
255        """
256        Return a list of all of the forms, with global_form at index 0
257        iff present.
258        """
259        global_form = self._browser.global_form()
260        forms = list(self._browser.forms())
261
262        if global_form.controls:
263            forms.insert(0, global_form)
264           
265        return forms
266
267    def get_form(self, formname):
268        """
269        Return the first form that matches 'formname'.
270        """
271        formname = str(formname)
272       
273        forms = self.get_all_forms()
274       
275        # first try ID
276        for f in forms:
277            id = f.attrs.get("id")
278            if id and str(id) == formname:
279                return f
280       
281        # next try regexps
282        regexp = re.compile(formname)
283        for f in forms:
284            if f.name and regexp.search(f.name):
285                return f
286
287        # ok, try number
288        try:
289            formnum = int(formname)
290            if formnum >= 1 and formnum <= len(forms):
291                return forms[formnum - 1]
292        except ValueError:              # int() failed
293            pass
294        except IndexError:              # formnum was incorrect
295            pass
296
297        return None
298
299    def get_form_field(self, form, fieldname):
300        """
301        Return the control that matches 'fieldname'.  Must be
302        a *unique* regexp/exact string match.
303        """
304        fieldname = str(fieldname)
305       
306        found = None
307        found_multiple = False
308
309        matches = [ c for c in form.controls if str(c.id) == fieldname ]
310
311        # test exact match.
312        if matches:
313            if unique_match(matches):
314                found = matches[0]
315            else:
316                found_multiple = True   # record for error reporting.
317       
318        matches = [ c for c in form.controls if str(c.name) == fieldname ]
319
320        # test exact match.
321        if matches:
322            if unique_match(matches):
323                found = matches[0]
324            else:
325                found_multiple = True   # record for error reporting.
326
327        # test index.
328        if found is None:
329            # try num
330            clickies = [c for c in form.controls]
331            try:
332                fieldnum = int(fieldname) - 1
333                found = clickies[fieldnum]
334            except ValueError:          # int() failed
335                pass
336            except IndexError:          # fieldnum was incorrect
337                pass
338
339        # test regexp match
340        if found is None:
341            regexp = re.compile(fieldname)
342
343            matches = [ ctl for ctl in form.controls \
344                        if regexp.search(str(ctl.name)) ]
345
346            if matches:
347                if unique_match(matches):
348                    found = matches[0]
349                else:
350                    found_multiple = True # record for error
351
352        if found is None:
353            # try value, for readonly controls like submit keys
354            clickies = [ c for c in form.controls if c.value == fieldname \
355                         and c.readonly ]
356            if clickies:
357                if len(clickies) == 1:
358                    found = clickies[0]
359                else:
360                    found_multiple = True   # record for error
361
362        # error out?
363        if found is None:
364            if not found_multiple:
365                raise TwillException('no field matches "%s"' % (fieldname,))
366            else:
367                raise TwillException('multiple matches to "%s"' % (fieldname,))
368
369        return found
370
371    def clicked(self, form, control):
372        """
373        Record a 'click' in a specific form.
374        """
375        if self._browser.form != form:
376            # construct a function to choose a particular form; select_form
377            # can use this to pick out a precise form.
378
379            def choose_this_form(test_form, this_form=form):
380                if test_form is this_form:
381                    return True
382
383                return False
384
385            self._browser.select_form(predicate=choose_this_form)
386            assert self._browser.form == form
387
388            self.last_submit_button = None
389
390        # record the last submit button clicked.
391        if isinstance(control, ClientForm.SubmitControl):
392            self.last_submit_button = control
393
394    def submit(self, fieldname=None):
395        """
396        Submit the currently clicked form using the given field.
397        """
398        if fieldname is not None:
399            fieldname = str(fieldname)
400       
401        if not self.get_all_forms():
402            raise TwillException("no forms on this page!")
403       
404        ctl = None
405       
406        form = self._browser.form
407        if form is None:
408            forms = [ i for i in self.get_all_forms() ]
409            if len(forms) == 1:
410                form = forms[0]
411            else:
412                raise TwillException("""\
413more than one form; you must select one (use 'fv') before submitting\
414""")
415
416        # no fieldname?  see if we can use the last submit button clicked...
417        if not fieldname:
418            if self.last_submit_button:
419                ctl = self.last_submit_button
420            else:
421                # get first submit button in form.
422                submits = [ c for c in form.controls \
423                            if isinstance(c, ClientForm.SubmitControl) ]
424
425                if len(submits):
426                    ctl = submits[0]
427               
428        else:
429            # fieldname given; find it.
430            ctl = self.get_form_field(form, fieldname)
431
432        #
433        # now set up the submission by building the request object that
434        # will be sent in the form submission.
435        #
436       
437        if ctl:
438            # submit w/button
439            print>>OUT, """\
440Note: submit is using submit button: name="%s", value="%s"
441""" % (ctl.name, ctl.value)
442           
443            if isinstance(ctl, ClientForm.ImageControl):
444                request = ctl._click(form, (1,1), "", mechanize.Request)
445            else:
446                request = ctl._click(form, True, "", mechanize.Request)
447               
448        else:
449            # submit w/o submit button.
450            request = form._click(None, None, None, None, 0, None,
451                                  "", mechanize.Request)
452
453        #
454        # add referer information.  this may require upgrading the
455        # request object to have an 'add_unredirected_header' function.
456        #
457
458        upgrade = self._browser._ua_handlers.get('_http_request_upgrade')
459        if upgrade:
460            request = upgrade.http_request(request)
461            request = self._browser._add_referer_header(request)
462
463        #
464        # now actually GO.
465        #
466       
467        self._journey('open', request)
468
469    def save_cookies(self, filename):
470        """
471        Save cookies into the given file.
472        """
473        self.cj.save(filename, ignore_discard=True, ignore_expires=True)
474
475    def load_cookies(self, filename):
476        """
477        Load cookies from the given file.
478        """
479        self.cj.load(filename, ignore_discard=True, ignore_expires=True)
480
481    def clear_cookies(self):
482        """
483        Delete all of the cookies.
484        """
485        self.cj.clear()
486
487    def show_cookies(self):
488        """
489        Pretty-print all of the cookies.
490        """
491        print>>OUT, '''
492There are %d cookie(s) in the cookiejar.
493''' % (len(self.cj,))
494       
495        if len(self.cj):
496            for cookie in self.cj:
497                print>>OUT, '\t', cookie
498
499            print>>OUT, ''
500
501    #### private functions.
502
503    def _journey(self, func_name, *args, **kwargs):
504        """
505        'func_name' should be the name of a mechanize method that either
506        returns a 'result' object or raises a HTTPError, e.g.
507        one of 'open', 'reload', 'back', or 'follow_link'.
508
509        journey then runs that function with the given arguments and turns
510        the results into a nice friendly standard ResultWrapper object, which
511        is stored as 'self.result'.
512
513        All exceptions other than HTTPError are unhandled.
514       
515        (Idea stolen straight from PBP.)
516        """
517        # reset
518        self.last_submit_button = None
519        self.result = None
520
521        func = getattr(self._browser, func_name)
522        try:
523            r = func(*args, **kwargs)
524        except mechanize.HTTPError, e:
525            r = e
526
527        # seek back to 0 if a seek() function is present.
528        seek_fn = getattr(r, 'seek', None)
529        if seek_fn:
530            seek_fn(0)
531
532        # some URLs, like 'file:' URLs, don't have return codes.  In this
533        # case, assume success (code=200) if no such attribute.
534        code = getattr(r, 'code', 200)
535
536        ## special case refresh loops!?
537        if code == 'refresh':
538            raise TwillException("""\
539infinite refresh loop discovered; aborting.
540Try turning off acknowledge_equiv_refresh...""")
541
542        self.result = ResultWrapper(code, r.geturl(), r.read())
543
544        #
545        # Now call all of the post load hooks with the function name.
546        #
547       
548        for callable in self._post_load_hooks:
549            callable(func_name, *args, **kwargs)
Note: リポジトリブラウザについてのヘルプは TracBrowser を参照してください。