root/galaxy-central/lib/galaxy/tags/tag_handler.py @ 2

リビジョン 2, 12.1 KB (コミッタ: hatakeyama, 14 年 前)

import galaxy-central

行番号 
1import re, logging
2from sqlalchemy.sql.expression import func, and_
3from sqlalchemy.sql import select
4
5log = logging.getLogger( __name__ )
6
7# Item-specific information needed to perform tagging.
8class ItemTagAssocInfo( object ):
9    def __init__( self, item_class, tag_assoc_class, item_id_col ):
10        self.item_class = item_class
11        self.tag_assoc_class = tag_assoc_class
12        self.item_id_col = item_id_col
13
14class TagHandler( object ):
15    def __init__( self ):
16        # Minimum tag length.
17        self.min_tag_len = 2
18        # Maximum tag length.
19        self.max_tag_len = 255
20        # Tag separator.
21        self.tag_separators = ',;'
22        # Hierarchy separator.
23        self.hierarchy_separator = '.'
24        # Key-value separator.
25        self.key_value_separators = "=:"
26        # Initialize with known classes - add to this in subclasses.
27        self.item_tag_assoc_info = {}
28    def get_tag_assoc_class( self, item_class ):
29        """Returns tag association class for item class."""
30        return self.item_tag_assoc_info[item_class.__name__].tag_assoc_class
31    def get_id_col_in_item_tag_assoc_table( self, item_class ):
32        """Returns item id column in class' item-tag association table."""
33        return self.item_tag_assoc_info[item_class.__name__].item_id_col
34    def get_community_tags( self, trans, item=None, limit=None ):
35        """Returns community tags for an item."""
36        # Get item-tag association class.
37        item_class = item.__class__
38        item_tag_assoc_class = self.get_tag_assoc_class( item_class )
39        if not item_tag_assoc_class:
40            return []
41        # Build select statement.   
42        cols_to_select = [ item_tag_assoc_class.table.c.tag_id, func.count( '*' ) ]
43        from_obj = item_tag_assoc_class.table.join( item_class.table ).join( trans.app.model.Tag.table )
44        where_clause = ( self.get_id_col_in_item_tag_assoc_table( item_class ) == item.id )
45        order_by = [ func.count( "*" ).desc() ]
46        group_by = item_tag_assoc_class.table.c.tag_id
47        # Do query and get result set.
48        query = select( columns=cols_to_select,
49                        from_obj=from_obj,
50                        whereclause=where_clause,
51                        group_by=group_by,
52                        order_by=order_by,
53                        limit=limit )
54        result_set = trans.sa_session.execute( query )
55        # Return community tags.
56        community_tags = []
57        for row in result_set:
58            tag_id = row[0]
59            community_tags.append( self.get_tag_by_id( trans, tag_id ) )       
60        return community_tags
61    def remove_item_tag( self, trans, user, item, tag_name ):
62        """Remove a tag from an item."""
63        # Get item tag association.
64        item_tag_assoc = self._get_item_tag_assoc( user, item, tag_name )
65        # Remove association.
66        if item_tag_assoc:
67            # Delete association.
68            trans.sa_session.delete( item_tag_assoc )
69            item.tags.remove( item_tag_assoc )
70            return True
71        return False
72    def delete_item_tags( self, trans, user, item ):
73        """Delete tags from an item."""
74        # Delete item-tag associations.
75        for tag in item.tags:
76            trans.sa_session.delete( tag )
77        # Delete tags from item.
78        del item.tags[:]
79    def item_has_tag( self, trans, user, item, tag ):
80        """Returns true if item is has a given tag."""
81        # Get tag name.
82        if isinstance( tag, basestring ):
83            tag_name = tag
84        elif isinstance( tag, trans.app.model.Tag ):
85            tag_name = tag.name
86        # Check for an item-tag association to see if item has a given tag.
87        item_tag_assoc = self._get_item_tag_assoc( user, item, tag_name )
88        if item_tag_assoc:
89            return True
90        return False
91    def apply_item_tags( self, trans, user, item, tags_str ):
92        """Apply tags to an item."""
93        # Parse tags.
94        parsed_tags = self.parse_tags( tags_str )
95        # Apply each tag.
96        for name, value in parsed_tags.items():
97            # Use lowercase name for searching/creating tag.
98            lc_name = name.lower()
99            # Get or create item-tag association.
100            item_tag_assoc = self._get_item_tag_assoc( user, item, lc_name )
101            if not item_tag_assoc:
102                # Create item-tag association.
103                # Create tag; if None, skip the tag (and log error).
104                tag = self._get_or_create_tag( trans, lc_name )
105                if not tag:
106                    # Log error?
107                    continue
108                # Create tag association based on item class.
109                item_tag_assoc_class = self.get_tag_assoc_class( item.__class__ )
110                item_tag_assoc = item_tag_assoc_class()
111                # Add tag to association.
112                item.tags.append( item_tag_assoc )
113                item_tag_assoc.tag = tag
114                item_tag_assoc.user = user   
115            # Apply attributes to item-tag association. Strip whitespace from user name and tag.
116            lc_value = None
117            if value:
118                lc_value = value.lower()
119            item_tag_assoc.user_tname = name
120            item_tag_assoc.user_value = value
121            item_tag_assoc.value = lc_value
122    def get_tags_str( self, tags ):
123        """Build a string from an item's tags."""
124        # Return empty string if there are no tags.
125        if not tags:
126            return ""
127        # Create string of tags.
128        tags_str_list = list()
129        for tag in tags:
130            tag_str = tag.user_tname
131            if tag.value is not None:
132                tag_str += ":" + tag.user_value
133            tags_str_list.append( tag_str )
134        return ", ".join( tags_str_list )
135    def get_tag_by_id( self, trans, tag_id ):
136        """Get a Tag object from a tag id."""
137        return trans.sa_session.query( trans.app.model.Tag ).filter_by( id=tag_id ).first()   
138    def get_tag_by_name( self, trans, tag_name ):
139        """Get a Tag object from a tag name (string)."""
140        if tag_name:
141            return trans.sa_session.query( trans.app.model.Tag ).filter_by( name=tag_name.lower() ).first()
142        return None
143    def _create_tag( self, trans, tag_str ):
144        """Create a Tag object from a tag string."""
145        tag_hierarchy = tag_str.split( self.hierarchy_separator )
146        tag_prefix = ""
147        parent_tag = None
148        for sub_tag in tag_hierarchy:
149            # Get or create subtag.
150            tag_name = tag_prefix + self._scrub_tag_name( sub_tag )
151            tag = trans.sa_session.query( trans.app.model.Tag ).filter_by( name=tag_name).first()
152            if not tag:
153                tag = trans.app.model.Tag( type=0, name=tag_name )
154            # Set tag parent.
155            tag.parent = parent_tag
156            # Update parent and tag prefix.
157            parent_tag = tag
158            tag_prefix = tag.name + self.hierarchy_separator
159        return tag
160    def _get_or_create_tag( self, trans, tag_str ):
161        """Get or create a Tag object from a tag string."""
162        # Scrub tag; if tag is None after being scrubbed, return None.
163        scrubbed_tag_str = self._scrub_tag_name( tag_str )
164        if not scrubbed_tag_str:
165            return None
166        # Get item tag.
167        tag = self.get_tag_by_name( trans, scrubbed_tag_str )
168        # Create tag if necessary.
169        if tag is None:
170            tag = self._create_tag( trans, scrubbed_tag_str )
171        return tag
172    def _get_item_tag_assoc( self, user, item, tag_name ):
173        """
174        Return ItemTagAssociation object for a user, item, and tag string; returns None if there is
175        no such association.
176        """
177        scrubbed_tag_name = self._scrub_tag_name( tag_name )
178        for item_tag_assoc in item.tags:
179            if ( item_tag_assoc.user == user ) and ( item_tag_assoc.user_tname == scrubbed_tag_name ):
180                return item_tag_assoc
181        return None
182    def parse_tags( self, tag_str ):
183        """
184        Returns a list of raw (tag-name, value) pairs derived from a string; method scrubs tag names and values as well.
185        Return value is a dictionary where tag-names are keys.
186        """
187        # Gracefully handle None.
188        if not tag_str:
189            return dict()
190        # Split tags based on separators.
191        reg_exp = re.compile( '[' + self.tag_separators + ']' )
192        raw_tags = reg_exp.split( tag_str )
193        # Extract name-value pairs.
194        name_value_pairs = dict()
195        for raw_tag in raw_tags:
196            nv_pair = self._get_name_value_pair( raw_tag )
197            scrubbed_name = self._scrub_tag_name( nv_pair[0] )
198            scrubbed_value = self._scrub_tag_value( nv_pair[1] )
199            name_value_pairs[scrubbed_name] = scrubbed_value
200        return name_value_pairs
201    def _scrub_tag_value( self, value ):
202        """Scrub a tag value."""
203        # Gracefully handle None:
204        if not value:
205            return None
206        # Remove whitespace from value.
207        reg_exp = re.compile( '\s' )
208        scrubbed_value = re.sub( reg_exp, "", value )
209        return scrubbed_value
210    def _scrub_tag_name( self, name ):
211        """Scrub a tag name."""
212        # Gracefully handle None:
213        if not name:
214            return None
215        # Remove whitespace from name.
216        reg_exp = re.compile( '\s' )
217        scrubbed_name = re.sub( reg_exp, "", name )
218        # Ignore starting ':' char.
219        if scrubbed_name.startswith( self.hierarchy_separator ):
220            scrubbed_name = scrubbed_name[1:]
221        # If name is too short or too long, return None.
222        if len( scrubbed_name ) < self.min_tag_len or len( scrubbed_name ) > self.max_tag_len:
223            return None
224        return scrubbed_name
225    def _scrub_tag_name_list( self, tag_name_list ):
226        """Scrub a tag name list."""
227        scrubbed_tag_list = list()
228        for tag in tag_name_list:
229            scrubbed_tag_list.append( self._scrub_tag_name( tag ) )
230        return scrubbed_tag_list
231    def _get_name_value_pair( self, tag_str ):
232        """Get name, value pair from a tag string."""
233        # Use regular expression to parse name, value.
234        reg_exp = re.compile( "[" + self.key_value_separators + "]" )
235        name_value_pair = reg_exp.split( tag_str )
236        # Add empty slot if tag does not have value.
237        if len( name_value_pair ) < 2:
238            name_value_pair.append( None )
239        return name_value_pair
240
241class GalaxyTagHandler( TagHandler ):
242    def __init__( self ):
243        from galaxy import model
244        TagHandler.__init__( self )
245        self.item_tag_assoc_info["History"] = ItemTagAssocInfo( model.History,
246                                                                model.HistoryTagAssociation,
247                                                                model.HistoryTagAssociation.table.c.history_id )
248        self.item_tag_assoc_info["HistoryDatasetAssociation"] = \
249            ItemTagAssocInfo( model.HistoryDatasetAssociation,
250                              model.HistoryDatasetAssociationTagAssociation,
251                              model.HistoryDatasetAssociationTagAssociation.table.c.history_dataset_association_id )
252        self.item_tag_assoc_info["Page"] = ItemTagAssocInfo( model.Page,
253                                                             model.PageTagAssociation,
254                                                             model.PageTagAssociation.table.c.page_id )
255        self.item_tag_assoc_info["StoredWorkflow"] = ItemTagAssocInfo( model.StoredWorkflow,
256                                                                       model.StoredWorkflowTagAssociation,
257                                                                       model.StoredWorkflowTagAssociation.table.c.stored_workflow_id )
258        self.item_tag_assoc_info["Visualization"] = ItemTagAssocInfo( model.Visualization,
259                                                                      model.VisualizationTagAssociation,
260                                                                      model.VisualizationTagAssociation.table.c.visualization_id )
261
262class CommunityTagHandler( TagHandler ):
263    def __init__( self ):
264        from galaxy.webapps.community import model
265        TagHandler.__init__( self )
Note: リポジトリブラウザについてのヘルプは TracBrowser を参照してください。