root/galaxy-central/lib/galaxy/tools/imp_exp/__init__.py

リビジョン 2, 10.3 KB (コミッタ: hatakeyama, 14 年 前)

import galaxy-central

行番号 
1import os, shutil, logging, tempfile, simplejson
2from galaxy import model
3from galaxy.web.framework.helpers import to_unicode
4from galaxy.model.item_attrs import UsesAnnotations
5from galaxy.util.json import to_json_string
6from galaxy.web.base.controller import UsesHistory
7
8log = logging.getLogger(__name__)
9
10def load_history_imp_exp_tools( toolbox ):
11    """ Adds tools for importing/exporting histories to archives. """
12    # Use same process as that used in load_external_metadata_tool; see that
13    # method for why create tool description files on the fly.
14    tool_xml_text = """
15        <tool id="__EXPORT_HISTORY__" name="Export History" version="0.1" tool_type="export_history">
16          <type class="ExportHistoryTool" module="galaxy.tools"/>
17          <action module="galaxy.tools.actions.history_imp_exp" class="ExportHistoryToolAction"/>
18          <command>$__EXPORT_HISTORY_COMMAND_INPUTS_OPTIONS__ $output_file</command>
19          <inputs>
20            <param name="__HISTORY_TO_EXPORT__" type="hidden"/>
21            <param name="compress" type="boolean"/>
22            <param name="__EXPORT_HISTORY_COMMAND_INPUTS_OPTIONS__" type="hidden"/>
23          </inputs>
24          <outputs>
25            <data format="gzip" name="output_file"/>
26          </outputs>
27        </tool>
28        """
29    tmp_name = tempfile.NamedTemporaryFile()
30    tmp_name.write( tool_xml_text )
31    tmp_name.flush()
32    history_exp_tool = toolbox.load_tool( tmp_name.name )
33    toolbox.tools_by_id[ history_exp_tool.id ] = history_exp_tool
34    log.debug( "Loaded history export tool: %s", history_exp_tool.id )
35
36class JobExportHistoryArchiveWrapper( object, UsesHistory, UsesAnnotations ):
37    """ Class provides support for performing jobs that export a history to an archive. """
38    def __init__( self, job_id ):
39        self.job_id = job_id
40       
41    # TODO: should use db_session rather than trans in this method.
42    def setup_job( self, trans, jeha, include_hidden=False, include_deleted=False ):
43        # jeha = job_export_history_archive for the job.
44        """ Perform setup for job to export a history into an archive. Method generates
45            attribute files for export, sets the corresponding attributes in the jeha
46            object, and returns a command line for running the job. The command line
47            includes the command, inputs, and options; it does not include the output
48            file because it must be set at runtime. """
49           
50        #
51        # Helper methods/classes.
52        #
53
54        def get_item_tag_dict( item ):
55            """ Create dictionary of an item's tags. """
56            tags = {}
57            for tag in item.tags:
58                tag_user_tname = to_unicode( tag.user_tname )
59                tag_user_value = to_unicode( tag.user_value )
60                tags[ tag_user_tname ] = tag_user_value
61            return tags
62           
63        def prepare_metadata( metadata ):
64            """ Prepare metatdata for exporting. """
65            for name, value in metadata.items():
66                # Metadata files are not needed for export because they can be
67                # regenerated.
68                if isinstance( value, trans.app.model.MetadataFile ):
69                    del metadata[ name ]
70            return metadata
71           
72        class HistoryDatasetAssociationEncoder( simplejson.JSONEncoder ):
73            """ Custom JSONEncoder for a HistoryDatasetAssociation. """
74            def default( self, obj ):
75                """ Encode an HDA, default encoding for everything else. """
76                if isinstance( obj, trans.app.model.HistoryDatasetAssociation ):
77                    return {
78                        "__HistoryDatasetAssociation__" : True,
79                        "create_time" : obj.create_time.__str__(),
80                        "update_time" : obj.update_time.__str__(),
81                        "hid" : obj.hid,
82                        "name" : to_unicode( obj.name ),
83                        "info" : to_unicode( obj.info ),
84                        "blurb" : obj.blurb,
85                        "peek" : obj.peek,
86                        "extension" : obj.extension,
87                        "metadata" : prepare_metadata( dict( obj.metadata.items() ) ),
88                        "parent_id" : obj.parent_id,
89                        "designation" : obj.designation,
90                        "deleted" : obj.deleted,
91                        "visible" : obj.visible,
92                        "file_name" : obj.file_name,
93                        "annotation" : to_unicode( getattr( obj, 'annotation', '' ) ),
94                        "tags" : get_item_tag_dict( obj ),
95                    }
96                return simplejson.JSONEncoder.default( self, obj )
97       
98        #
99        # Create attributes/metadata files for export.
100        #   
101        temp_output_dir = tempfile.mkdtemp()
102   
103        # Write history attributes to file.
104        history = jeha.history
105        history_attrs = {
106            "create_time" : history.create_time.__str__(),
107            "update_time" : history.update_time.__str__(),
108            "name" : to_unicode( history.name ),
109            "hid_counter" : history.hid_counter,
110            "genome_build" : history.genome_build,
111            "annotation" : to_unicode( self.get_item_annotation_str( trans.sa_session, history.user, history ) ),
112            "tags" : get_item_tag_dict( history ),
113            "includes_hidden_datasets" : include_hidden,
114            "includes_deleted_datasets" : include_deleted
115        }
116        history_attrs_filename = tempfile.NamedTemporaryFile( dir=temp_output_dir ).name
117        history_attrs_out = open( history_attrs_filename, 'w' )
118        history_attrs_out.write( to_json_string( history_attrs ) )
119        history_attrs_out.close()
120        jeha.history_attrs_filename = history_attrs_filename
121                           
122        # Write datasets' attributes to file.
123        datasets = self.get_history_datasets( trans, history )
124        included_datasets = []
125        datasets_attrs = []
126        for dataset in datasets:
127            if not dataset.visible and not include_hidden:
128                continue
129            if dataset.deleted and not include_deleted:
130                continue
131            dataset.annotation = self.get_item_annotation_str( trans.sa_session, history.user, dataset )
132            datasets_attrs.append( dataset )
133            included_datasets.append( dataset )
134        datasets_attrs_filename = tempfile.NamedTemporaryFile( dir=temp_output_dir ).name
135        datasets_attrs_out = open( datasets_attrs_filename, 'w' )
136        datasets_attrs_out.write( to_json_string( datasets_attrs, cls=HistoryDatasetAssociationEncoder ) )
137        datasets_attrs_out.close()
138        jeha.datasets_attrs_filename = datasets_attrs_filename
139
140        #
141        # Write jobs attributes file.
142        #
143
144        # Get all jobs associated with included HDAs.
145        jobs_dict = {}
146        for hda in included_datasets:
147            # Get the associated job, if any. If this hda was copied from another,
148            # we need to find the job that created the origial hda
149            job_hda = hda
150            while job_hda.copied_from_history_dataset_association: #should this check library datasets as well?
151                job_hda = job_hda.copied_from_history_dataset_association
152            if not job_hda.creating_job_associations:
153                # No viable HDA found.
154                continue
155   
156            # Get the job object.
157            job = None
158            for assoc in job_hda.creating_job_associations:
159                job = assoc.job
160                break
161            if not job:
162                # No viable job.
163                continue
164       
165            jobs_dict[ job.id ] = job
166       
167        # Get jobs' attributes.
168        jobs_attrs = []
169        for id, job in jobs_dict.items():
170            job_attrs = {}
171            job_attrs[ 'tool_id' ] = job.tool_id
172            job_attrs[ 'tool_version' ] = job.tool_version
173            job_attrs[ 'state' ] = job.state
174                           
175            # Get the job's parameters
176            try:
177                params_objects = job.get_param_values( trans.app )
178            except:
179                # Could not get job params.
180                continue
181
182            params_dict = {}
183            for name, value in params_objects.items():
184                params_dict[ name ] = value
185            job_attrs[ 'params' ] = params_dict
186   
187            # Get input, output datasets.
188            input_datasets = [ assoc.dataset.hid for assoc in job.input_datasets ]
189            job_attrs[ 'input_datasets' ] = input_datasets
190            output_datasets = [ assoc.dataset.hid for assoc in job.output_datasets ]
191            job_attrs[ 'output_datasets' ] = output_datasets
192   
193            jobs_attrs.append( job_attrs )
194   
195        jobs_attrs_filename = tempfile.NamedTemporaryFile( dir=temp_output_dir ).name
196        jobs_attrs_out = open( jobs_attrs_filename, 'w' )
197        jobs_attrs_out.write( to_json_string( jobs_attrs, cls=HistoryDatasetAssociationEncoder ) )
198        jobs_attrs_out.close()
199        jeha.jobs_attrs_filename = jobs_attrs_filename
200       
201        #
202        # Create and return command line for running tool.
203        #
204        options = ""
205        if jeha.compressed:
206            options = "-G"
207        return "python %s %s %s %s %s" % (
208            os.path.join( os.path.abspath( os.getcwd() ), "lib/galaxy/tools/imp_exp/export_history.py" ), \
209            options, history_attrs_filename, datasets_attrs_filename, jobs_attrs_filename )
210                                   
211    def cleanup_after_job( self, db_session ):
212        """ Remove temporary directory and attribute files generated during setup for this job. """
213        # Get jeha for job.
214        jeha = db_session.query( model.JobExportHistoryArchive ).filter_by( job_id=self.job_id ).first()
215        if jeha:
216            for filename in [ jeha.history_attrs_filename, jeha.datasets_attrs_filename, jeha.jobs_attrs_filename ]:
217                try:
218                    os.remove( filename )
219                except Exception, e:
220                    log.debug( 'Failed to cleanup attributes file (%s): %s' % ( filename, e ) )
221            temp_dir = os.path.split( jeha.history_attrs_filename )[0]
222            try:
223                shutil.rmtree( temp_dir )
224            except Exception, e:
225                log.debug( 'Error deleting directory containing attribute files (%s): %s' % ( temp_dir, e ) )
226               
Note: リポジトリブラウザについてのヘルプは TracBrowser を参照してください。