1 | ''' |
---|
2 | Galaxy Messaging with AMQP (RabbitMQ) |
---|
3 | Galaxy uses AMQ protocol to receive messages from external sources like |
---|
4 | bar code scanners. Galaxy has been tested against RabbitMQ AMQP implementation. |
---|
5 | For Galaxy to receive messages from a message queue the RabbitMQ server has |
---|
6 | to be set up with a user account and other parameters listed in the [galaxy:amq] |
---|
7 | section in the universe_wsgi.ini config file |
---|
8 | Once the RabbitMQ server has been setup and started with the given parameters, |
---|
9 | this script can be run to receive messages and update the Galaxy database accordingly |
---|
10 | ''' |
---|
11 | |
---|
12 | import ConfigParser |
---|
13 | import sys, os |
---|
14 | import optparse |
---|
15 | import xml.dom.minidom |
---|
16 | import subprocess |
---|
17 | from galaxydb_interface import GalaxyDbInterface |
---|
18 | |
---|
19 | new_path = [ os.path.join( os.getcwd(), "scripts/galaxy_messaging/server" ) ] |
---|
20 | new_path.extend( sys.path[1:] ) # remove scripts/ from the path |
---|
21 | sys.path = new_path |
---|
22 | from galaxyweb_interface import GalaxyWebInterface |
---|
23 | |
---|
24 | assert sys.version_info[:2] >= ( 2, 4 ) |
---|
25 | new_path = [ os.path.join( os.getcwd(), "lib" ) ] |
---|
26 | new_path.extend( sys.path[1:] ) # remove scripts/ from the path |
---|
27 | sys.path = new_path |
---|
28 | |
---|
29 | |
---|
30 | |
---|
31 | from galaxy import eggs |
---|
32 | import pkg_resources |
---|
33 | pkg_resources.require( "amqplib" ) |
---|
34 | |
---|
35 | from amqplib import client_0_8 as amqp |
---|
36 | |
---|
37 | import logging |
---|
38 | log = logging.getLogger("GalaxyAMQP") |
---|
39 | log.setLevel(logging.DEBUG) |
---|
40 | fh = logging.FileHandler("galaxy_listener.log") |
---|
41 | fh.setLevel(logging.DEBUG) |
---|
42 | formatter = logging.Formatter("%(asctime)s - %(name)s - %(message)s") |
---|
43 | fh.setFormatter(formatter) |
---|
44 | log.addHandler(fh) |
---|
45 | |
---|
46 | global dbconnstr |
---|
47 | global config |
---|
48 | |
---|
49 | def get_value(dom, tag_name): |
---|
50 | ''' |
---|
51 | This method extracts the tag value from the xml message |
---|
52 | ''' |
---|
53 | nodelist = dom.getElementsByTagName(tag_name)[0].childNodes |
---|
54 | rc = "" |
---|
55 | for node in nodelist: |
---|
56 | if node.nodeType == node.TEXT_NODE: |
---|
57 | rc = rc + node.data |
---|
58 | return rc |
---|
59 | |
---|
60 | def get_value_index(dom, tag_name, index): |
---|
61 | ''' |
---|
62 | This method extracts the tag value from the xml message |
---|
63 | ''' |
---|
64 | try: |
---|
65 | nodelist = dom.getElementsByTagName(tag_name)[index].childNodes |
---|
66 | except: |
---|
67 | return None |
---|
68 | rc = "" |
---|
69 | for node in nodelist: |
---|
70 | if node.nodeType == node.TEXT_NODE: |
---|
71 | rc = rc + node.data |
---|
72 | return rc |
---|
73 | |
---|
74 | def recv_callback(msg): |
---|
75 | global config |
---|
76 | global webconfig |
---|
77 | # check the meesage type. |
---|
78 | msg_type = msg.properties['application_headers'].get('msg_type') |
---|
79 | log.debug('\nMESSAGE RECVD: '+str(msg_type)) |
---|
80 | if msg_type == 'data_transfer': |
---|
81 | log.debug('DATA TRANSFER') |
---|
82 | # fork a new process to transfer datasets |
---|
83 | transfer_script = os.path.join(os.getcwd(), |
---|
84 | "scripts/galaxy_messaging/server/data_transfer.py") |
---|
85 | cmd = '%s "%s" "%s" "%s"' % ("python", |
---|
86 | transfer_script, |
---|
87 | msg.body, |
---|
88 | config.get("app:main", "id_secret") ) |
---|
89 | pid = subprocess.Popen(cmd, shell=True).pid |
---|
90 | log.debug('Started process (%i): %s' % (pid, str(cmd))) |
---|
91 | elif msg_type == 'sample_state_update': |
---|
92 | log.debug('SAMPLE STATE UPDATE') |
---|
93 | dom = xml.dom.minidom.parseString(msg.body) |
---|
94 | barcode = get_value(dom, 'barcode') |
---|
95 | state = get_value(dom, 'state') |
---|
96 | log.debug('Barcode: '+barcode) |
---|
97 | log.debug('State: '+state) |
---|
98 | # update the galaxy db |
---|
99 | galaxydb = GalaxyDbInterface(dbconnstr) |
---|
100 | sample_id = galaxydb.get_sample_id(field_name='bar_code', value=barcode) |
---|
101 | if sample_id == -1: |
---|
102 | log.debug('Invalid barcode.') |
---|
103 | return |
---|
104 | galaxydb.change_state(sample_id, state) |
---|
105 | # update the request state |
---|
106 | galaxyweb = GalaxyWebInterface(webconfig.get("universe_wsgi_config", "host"), |
---|
107 | webconfig.get("universe_wsgi_config", "port"), |
---|
108 | webconfig.get("data_transfer_user_login_info", "email"), |
---|
109 | webconfig.get("data_transfer_user_login_info", "password"), |
---|
110 | config.get("app:main", "id_secret")) |
---|
111 | galaxyweb.update_request_state(galaxydb.get_request_id(sample_id)) |
---|
112 | galaxyweb.logout() |
---|
113 | |
---|
114 | def main(): |
---|
115 | if len(sys.argv) < 2: |
---|
116 | print 'Usage: python amqp_consumer.py <Galaxy config file>' |
---|
117 | return |
---|
118 | global config |
---|
119 | config = ConfigParser.ConfigParser() |
---|
120 | config.read(sys.argv[1]) |
---|
121 | global dbconnstr |
---|
122 | dbconnstr = config.get("app:main", "database_connection") |
---|
123 | amqp_config = {} |
---|
124 | for option in config.options("galaxy_amqp"): |
---|
125 | amqp_config[option] = config.get("galaxy_amqp", option) |
---|
126 | log.debug(str(amqp_config)) |
---|
127 | # web server config |
---|
128 | global webconfig |
---|
129 | webconfig = ConfigParser.ConfigParser() |
---|
130 | webconfig.read('transfer_datasets.ini') |
---|
131 | |
---|
132 | |
---|
133 | |
---|
134 | |
---|
135 | |
---|
136 | conn = amqp.Connection(host=amqp_config['host']+":"+amqp_config['port'], |
---|
137 | userid=amqp_config['userid'], |
---|
138 | password=amqp_config['password'], |
---|
139 | virtual_host=amqp_config['virtual_host'], |
---|
140 | insist=False) |
---|
141 | chan = conn.channel() |
---|
142 | chan.queue_declare(queue=amqp_config['queue'], durable=True, exclusive=True, auto_delete=False) |
---|
143 | chan.exchange_declare(exchange=amqp_config['exchange'], type="direct", durable=True, auto_delete=False,) |
---|
144 | chan.queue_bind(queue=amqp_config['queue'], |
---|
145 | exchange=amqp_config['exchange'], |
---|
146 | routing_key=amqp_config['routing_key']) |
---|
147 | |
---|
148 | chan.basic_consume(queue=amqp_config['queue'], |
---|
149 | no_ack=True, |
---|
150 | callback=recv_callback, |
---|
151 | consumer_tag="testtag") |
---|
152 | while True: |
---|
153 | chan.wait() |
---|
154 | chan.basic_cancel("testtag") |
---|
155 | chan.close() |
---|
156 | conn.close() |
---|
157 | |
---|
158 | if __name__ == '__main__': |
---|
159 | main() |
---|