root/galaxy-central/lib/galaxy/tools/parameters/sanitize.py @ 3

リビジョン 2, 6.9 KB (コミッタ: hatakeyama, 14 年 前)

import galaxy-central

行番号 
1"""
2Tool Parameter specific sanitizing.
3"""
4
5import logging
6import string
7import galaxy.util
8
9log = logging.getLogger( __name__ )
10
11class ToolParameterSanitizer( object ):
12    """
13    Handles tool parameter specific sanitizing.
14   
15    >>> from elementtree.ElementTree import XML
16    >>> sanitizer = ToolParameterSanitizer.from_element( XML(
17    ... '''
18    ... <sanitizer invalid_char="">
19    ...   <valid initial="string.letters"/>
20    ... </sanitizer>
21    ... ''' ) )
22    >>> sanitizer.sanitize_param( string.printable ) == string.letters
23    True
24    >>> slash = chr( 92 )
25    >>> sanitizer = ToolParameterSanitizer.from_element( XML(
26    ... '''
27    ... <sanitizer>
28    ...   <valid initial="none">
29    ...    <add preset="string.printable"/>
30    ...    <remove value="&quot;"/>
31    ...    <remove value="%s"/>
32    ...   </valid>
33    ...   <mapping initial="none">
34    ...     <add source="&quot;" target="%s&quot;"/>
35    ...     <add source="%s" target="%s%s"/>
36    ...   </mapping>
37    ... </sanitizer>
38    ... ''' % ( slash, slash, slash, slash, slash ) ) )
39    >>> text = '%s"$rm&#!' % slash
40    >>> [ c for c in sanitizer.sanitize_param( text ) ] == [ slash, slash, slash, '"', '$', 'r', 'm', '&', '#', '!' ]
41    True
42    """
43   
44    VALID_PRESET = { 'default':( string.letters + string.digits +" -=_.()/+*^,:?!" ), 'none':'' }
45    MAPPING_PRESET = { 'default':galaxy.util.mapped_chars, 'none':{} }
46    DEFAULT_INVALID_CHAR = 'X'
47   
48    #class methods
49    @classmethod
50    def from_element( cls, elem ):
51        """Loads the proper filter by the type attribute of elem"""
52        #TODO: Add ability to generically specify a method to use for sanitizing input via specification in tool XML
53        rval = ToolParameterSanitizer()
54        rval._invalid_char = elem.get( 'invalid_char', cls.DEFAULT_INVALID_CHAR )
55        rval.sanitize = galaxy.util.string_as_bool( elem.get( 'sanitize', 'True' ) )
56        for valid_elem in elem.findall( 'valid' ):
57            rval._valid_chars = rval.get_valid_by_name( valid_elem.get( 'initial', 'default' ) )
58            for action_elem in valid_elem:
59                preset = rval.get_valid_by_name( action_elem.get( 'preset', 'none' ) )
60                valid_value = [ val for val in action_elem.get( 'value', [] ) ]
61                if action_elem.tag.lower() == 'add':
62                    for val in ( preset + valid_value ):
63                        if val not in rval._valid_chars:
64                            rval._valid_chars.append( val )
65                elif action_elem.tag.lower() == 'remove':
66                    for val in ( preset + valid_value ):
67                        while val in rval._valid_chars:
68                            rval._valid_chars.remove( val )
69                else:
70                    log.debug( 'Invalid action tag in valid: %s' % action_elem.tag )
71        for mapping_elem in elem.findall( 'mapping' ):
72            rval._mapped_chars = rval.get_mapping_by_name( mapping_elem.get( 'initial', 'default' ) )
73            for action_elem in mapping_elem:
74                map_source = action_elem.get( 'source', None )
75                map_target = action_elem.get( 'target', None )
76                preset = rval.get_mapping_by_name( action_elem.get( 'preset', 'none' ) )
77                if action_elem.tag.lower() == 'add':
78                    rval._mapped_chars.update( preset )
79                    if None not in [ map_source, map_target ]:
80                        rval._mapped_chars[ map_source ] = map_target
81                elif action_elem.tag.lower() == 'remove':
82                    for map_key in preset.keys():
83                        if map_key in rval._mapped_chars:
84                            del rval._mapped_chars[ map_key ]
85                    if map_source is not None and map_key in rval._mapped_chars:
86                        del rval._mapped_chars[ map_key ]
87                else:
88                    log.debug( 'Invalid action tag in mapping: %s' % action_elem.tag )       
89        return rval
90   
91    @classmethod
92    def get_valid_by_name( cls, name ):
93        rval = []
94        for split_name in name.split( ',' ):
95            split_name = split_name.strip()
96            value = []
97            if split_name.startswith( 'string.' ):
98                try:
99                    value = eval( split_name )
100                except NameError, e:
101                    log.debug( 'Invalid string preset specified: %s' % e )
102            elif split_name in cls.VALID_PRESET:
103                value = cls.VALID_PRESET[ split_name ]
104            else:
105                log.debug( 'Invalid preset name specified: %s' % split_name )
106            rval.extend( [ val for val in value if val not in rval ] )
107        return rval
108   
109    @classmethod
110    def get_mapping_by_name( cls, name ):
111        rval = {}
112        for split_name in name.split( ',' ):
113            split_name = split_name.strip()
114            if split_name in cls.MAPPING_PRESET:
115                rval.update( cls.MAPPING_PRESET[ split_name ] )
116            else:
117                log.debug( 'Invalid preset name specified: %s' % split_name )
118        return rval
119    #end class methods
120   
121    def __init__( self ):
122        self._valid_chars = [] #List of valid characters
123        self._mapped_chars = {} #Replace a char with a any number of characters
124        self._invalid_char = self.DEFAULT_INVALID_CHAR #Replace invalid characters with this character
125        self.sanitize = True #Simply pass back the passed in value
126   
127    def restore_text( self, text ):
128        """Restores sanitized text"""
129        if self.sanitize:
130            for key, value in self._mapped_chars.iteritems():
131                text = text.replace( value, key )
132        return text
133   
134    def restore_param( self, value ):
135        if self.sanitize:
136            if isinstance( value, basestring ):
137                return self.restore_text( value )
138            elif isinstance( value, list ):
139                return map( self.restore_text, value )
140            else:
141                raise Exception, 'Unknown parameter type (%s:%s)' % ( type( value ), value )
142        return value
143   
144    def sanitize_text( self, text ):
145        """Restricts the characters that are allowed in a text"""
146        if not self.sanitize:
147            return text
148        rval = []
149        for c in text:
150            if c in self._valid_chars:
151                rval.append( c )
152            elif c in self._mapped_chars:
153                rval.append( self._mapped_chars[ c ] )
154            else:
155                rval.append( self._invalid_char )
156        return ''.join( rval )
157   
158    def sanitize_param( self, value ):
159        """Clean incoming parameters (strings or lists)"""
160        if not self.sanitize:
161            return value
162        if isinstance( value, basestring ):
163            return self.sanitize_text( value )
164        elif isinstance( value, list ):
165            return map( self.sanitize_text, value )
166        else:
167            raise Exception, 'Unknown parameter type (%s:%s)' % ( type( value ), value )
Note: リポジトリブラウザについてのヘルプは TracBrowser を参照してください。