root/galaxy-central/scripts/galaxy_messaging/server/amqp_consumer.py

リビジョン 2, 5.7 KB (コミッタ: hatakeyama, 14 年 前)

import galaxy-central

行番号 
1'''
2Galaxy Messaging with AMQP (RabbitMQ)
3Galaxy uses AMQ protocol to receive messages from external sources like
4bar code scanners. Galaxy has been tested against RabbitMQ AMQP implementation.
5For Galaxy to receive messages from a message queue the RabbitMQ server has
6to be set up with a user account and other parameters listed in the [galaxy:amq]
7section in the universe_wsgi.ini config file
8Once the RabbitMQ server has been setup and started with the given parameters,
9this script can be run to receive messages and update the Galaxy database accordingly
10'''
11
12import ConfigParser
13import sys, os
14import optparse
15import xml.dom.minidom
16import subprocess
17from galaxydb_interface import GalaxyDbInterface
18
19new_path = [ os.path.join( os.getcwd(), "scripts/galaxy_messaging/server" ) ]
20new_path.extend( sys.path[1:] ) # remove scripts/ from the path
21sys.path = new_path
22from galaxyweb_interface import GalaxyWebInterface
23
24assert sys.version_info[:2] >= ( 2, 4 )
25new_path = [ os.path.join( os.getcwd(), "lib" ) ]
26new_path.extend( sys.path[1:] ) # remove scripts/ from the path
27sys.path = new_path
28
29
30
31from galaxy import eggs
32import pkg_resources
33pkg_resources.require( "amqplib" )
34
35from amqplib import client_0_8 as amqp
36
37import logging
38log = logging.getLogger("GalaxyAMQP")
39log.setLevel(logging.DEBUG)
40fh = logging.FileHandler("galaxy_listener.log")
41fh.setLevel(logging.DEBUG)
42formatter = logging.Formatter("%(asctime)s - %(name)s - %(message)s")
43fh.setFormatter(formatter)
44log.addHandler(fh)
45
46global dbconnstr
47global config
48
49def get_value(dom, tag_name):
50    '''
51    This method extracts the tag value from the xml message
52    '''
53    nodelist = dom.getElementsByTagName(tag_name)[0].childNodes
54    rc = ""
55    for node in nodelist:
56        if node.nodeType == node.TEXT_NODE:
57            rc = rc + node.data
58    return rc
59
60def get_value_index(dom, tag_name, index):
61    '''
62    This method extracts the tag value from the xml message
63    '''
64    try:
65        nodelist = dom.getElementsByTagName(tag_name)[index].childNodes
66    except:
67        return None
68    rc = ""
69    for node in nodelist:
70        if node.nodeType == node.TEXT_NODE:
71            rc = rc + node.data
72    return rc
73
74def recv_callback(msg):
75    global config
76    global webconfig
77    # check the meesage type.
78    msg_type = msg.properties['application_headers'].get('msg_type')
79    log.debug('\nMESSAGE RECVD: '+str(msg_type))
80    if msg_type == 'data_transfer':
81        log.debug('DATA TRANSFER')
82        # fork a new process to transfer datasets
83        transfer_script = os.path.join(os.getcwd(),
84                                       "scripts/galaxy_messaging/server/data_transfer.py")
85        cmd = '%s "%s" "%s" "%s"' % ("python",
86                                     transfer_script,
87                                     msg.body,
88                                     config.get("app:main", "id_secret") )
89        pid = subprocess.Popen(cmd, shell=True).pid
90        log.debug('Started process (%i): %s' % (pid, str(cmd)))
91    elif msg_type == 'sample_state_update':
92        log.debug('SAMPLE STATE UPDATE')
93        dom = xml.dom.minidom.parseString(msg.body)
94        barcode = get_value(dom, 'barcode')
95        state = get_value(dom, 'state')
96        log.debug('Barcode: '+barcode)
97        log.debug('State: '+state)
98        # update the galaxy db
99        galaxydb = GalaxyDbInterface(dbconnstr)
100        sample_id = galaxydb.get_sample_id(field_name='bar_code', value=barcode)
101        if sample_id == -1:
102           log.debug('Invalid barcode.')
103           return
104        galaxydb.change_state(sample_id, state)
105        # update the request state
106        galaxyweb = GalaxyWebInterface(webconfig.get("universe_wsgi_config", "host"),
107                                       webconfig.get("universe_wsgi_config", "port"),
108                                       webconfig.get("data_transfer_user_login_info", "email"),
109                                       webconfig.get("data_transfer_user_login_info", "password"),
110                                       config.get("app:main", "id_secret"))
111        galaxyweb.update_request_state(galaxydb.get_request_id(sample_id))
112        galaxyweb.logout()
113
114def main():
115    if len(sys.argv) < 2:
116        print 'Usage: python amqp_consumer.py <Galaxy config file>'
117        return
118    global config
119    config = ConfigParser.ConfigParser()
120    config.read(sys.argv[1])
121    global dbconnstr
122    dbconnstr = config.get("app:main", "database_connection")
123    amqp_config = {}
124    for option in config.options("galaxy_amqp"):
125        amqp_config[option] = config.get("galaxy_amqp", option)
126    log.debug(str(amqp_config))
127    # web server config
128    global webconfig
129    webconfig = ConfigParser.ConfigParser()
130    webconfig.read('transfer_datasets.ini')
131
132
133
134   
135   
136    conn = amqp.Connection(host=amqp_config['host']+":"+amqp_config['port'],
137                           userid=amqp_config['userid'],
138                           password=amqp_config['password'],
139                           virtual_host=amqp_config['virtual_host'],
140                           insist=False)
141    chan = conn.channel()
142    chan.queue_declare(queue=amqp_config['queue'], durable=True, exclusive=True, auto_delete=False)
143    chan.exchange_declare(exchange=amqp_config['exchange'], type="direct", durable=True, auto_delete=False,)   
144    chan.queue_bind(queue=amqp_config['queue'],
145                    exchange=amqp_config['exchange'],
146                    routing_key=amqp_config['routing_key'])
147
148    chan.basic_consume(queue=amqp_config['queue'],
149                       no_ack=True,
150                       callback=recv_callback,
151                       consumer_tag="testtag")
152    while True:
153        chan.wait()
154    chan.basic_cancel("testtag")   
155    chan.close()
156    conn.close()
157
158if __name__ == '__main__':
159    main()
Note: リポジトリブラウザについてのヘルプは TracBrowser を参照してください。