[2] | 1 | ''' |
---|
| 2 | Galaxy Messaging with AMQP (RabbitMQ) |
---|
| 3 | Galaxy uses AMQ protocol to receive messages from external sources like |
---|
| 4 | bar code scanners. Galaxy has been tested against RabbitMQ AMQP implementation. |
---|
| 5 | For Galaxy to receive messages from a message queue the RabbitMQ server has |
---|
| 6 | to be set up with a user account and other parameters listed in the [galaxy:amq] |
---|
| 7 | section in the universe_wsgi.ini config file |
---|
| 8 | Once the RabbitMQ server has been setup and started with the given parameters, |
---|
| 9 | this script can be run to receive messages and update the Galaxy database accordingly |
---|
| 10 | ''' |
---|
| 11 | |
---|
| 12 | import ConfigParser |
---|
| 13 | import sys, os |
---|
| 14 | import optparse |
---|
| 15 | import xml.dom.minidom |
---|
| 16 | import subprocess |
---|
| 17 | from galaxydb_interface import GalaxyDbInterface |
---|
| 18 | |
---|
| 19 | new_path = [ os.path.join( os.getcwd(), "scripts/galaxy_messaging/server" ) ] |
---|
| 20 | new_path.extend( sys.path[1:] ) # remove scripts/ from the path |
---|
| 21 | sys.path = new_path |
---|
| 22 | from galaxyweb_interface import GalaxyWebInterface |
---|
| 23 | |
---|
| 24 | assert sys.version_info[:2] >= ( 2, 4 ) |
---|
| 25 | new_path = [ os.path.join( os.getcwd(), "lib" ) ] |
---|
| 26 | new_path.extend( sys.path[1:] ) # remove scripts/ from the path |
---|
| 27 | sys.path = new_path |
---|
| 28 | |
---|
| 29 | |
---|
| 30 | |
---|
| 31 | from galaxy import eggs |
---|
| 32 | import pkg_resources |
---|
| 33 | pkg_resources.require( "amqplib" ) |
---|
| 34 | |
---|
| 35 | from amqplib import client_0_8 as amqp |
---|
| 36 | |
---|
| 37 | import logging |
---|
| 38 | log = logging.getLogger("GalaxyAMQP") |
---|
| 39 | log.setLevel(logging.DEBUG) |
---|
| 40 | fh = logging.FileHandler("galaxy_listener.log") |
---|
| 41 | fh.setLevel(logging.DEBUG) |
---|
| 42 | formatter = logging.Formatter("%(asctime)s - %(name)s - %(message)s") |
---|
| 43 | fh.setFormatter(formatter) |
---|
| 44 | log.addHandler(fh) |
---|
| 45 | |
---|
| 46 | global dbconnstr |
---|
| 47 | global config |
---|
| 48 | |
---|
| 49 | def get_value(dom, tag_name): |
---|
| 50 | ''' |
---|
| 51 | This method extracts the tag value from the xml message |
---|
| 52 | ''' |
---|
| 53 | nodelist = dom.getElementsByTagName(tag_name)[0].childNodes |
---|
| 54 | rc = "" |
---|
| 55 | for node in nodelist: |
---|
| 56 | if node.nodeType == node.TEXT_NODE: |
---|
| 57 | rc = rc + node.data |
---|
| 58 | return rc |
---|
| 59 | |
---|
| 60 | def get_value_index(dom, tag_name, index): |
---|
| 61 | ''' |
---|
| 62 | This method extracts the tag value from the xml message |
---|
| 63 | ''' |
---|
| 64 | try: |
---|
| 65 | nodelist = dom.getElementsByTagName(tag_name)[index].childNodes |
---|
| 66 | except: |
---|
| 67 | return None |
---|
| 68 | rc = "" |
---|
| 69 | for node in nodelist: |
---|
| 70 | if node.nodeType == node.TEXT_NODE: |
---|
| 71 | rc = rc + node.data |
---|
| 72 | return rc |
---|
| 73 | |
---|
| 74 | def recv_callback(msg): |
---|
| 75 | global config |
---|
| 76 | global webconfig |
---|
| 77 | # check the meesage type. |
---|
| 78 | msg_type = msg.properties['application_headers'].get('msg_type') |
---|
| 79 | log.debug('\nMESSAGE RECVD: '+str(msg_type)) |
---|
| 80 | if msg_type == 'data_transfer': |
---|
| 81 | log.debug('DATA TRANSFER') |
---|
| 82 | # fork a new process to transfer datasets |
---|
| 83 | transfer_script = os.path.join(os.getcwd(), |
---|
| 84 | "scripts/galaxy_messaging/server/data_transfer.py") |
---|
| 85 | cmd = '%s "%s" "%s" "%s"' % ("python", |
---|
| 86 | transfer_script, |
---|
| 87 | msg.body, |
---|
| 88 | config.get("app:main", "id_secret") ) |
---|
| 89 | pid = subprocess.Popen(cmd, shell=True).pid |
---|
| 90 | log.debug('Started process (%i): %s' % (pid, str(cmd))) |
---|
| 91 | elif msg_type == 'sample_state_update': |
---|
| 92 | log.debug('SAMPLE STATE UPDATE') |
---|
| 93 | dom = xml.dom.minidom.parseString(msg.body) |
---|
| 94 | barcode = get_value(dom, 'barcode') |
---|
| 95 | state = get_value(dom, 'state') |
---|
| 96 | log.debug('Barcode: '+barcode) |
---|
| 97 | log.debug('State: '+state) |
---|
| 98 | # update the galaxy db |
---|
| 99 | galaxydb = GalaxyDbInterface(dbconnstr) |
---|
| 100 | sample_id = galaxydb.get_sample_id(field_name='bar_code', value=barcode) |
---|
| 101 | if sample_id == -1: |
---|
| 102 | log.debug('Invalid barcode.') |
---|
| 103 | return |
---|
| 104 | galaxydb.change_state(sample_id, state) |
---|
| 105 | # update the request state |
---|
| 106 | galaxyweb = GalaxyWebInterface(webconfig.get("universe_wsgi_config", "host"), |
---|
| 107 | webconfig.get("universe_wsgi_config", "port"), |
---|
| 108 | webconfig.get("data_transfer_user_login_info", "email"), |
---|
| 109 | webconfig.get("data_transfer_user_login_info", "password"), |
---|
| 110 | config.get("app:main", "id_secret")) |
---|
| 111 | galaxyweb.update_request_state(galaxydb.get_request_id(sample_id)) |
---|
| 112 | galaxyweb.logout() |
---|
| 113 | |
---|
| 114 | def main(): |
---|
| 115 | if len(sys.argv) < 2: |
---|
| 116 | print 'Usage: python amqp_consumer.py <Galaxy config file>' |
---|
| 117 | return |
---|
| 118 | global config |
---|
| 119 | config = ConfigParser.ConfigParser() |
---|
| 120 | config.read(sys.argv[1]) |
---|
| 121 | global dbconnstr |
---|
| 122 | dbconnstr = config.get("app:main", "database_connection") |
---|
| 123 | amqp_config = {} |
---|
| 124 | for option in config.options("galaxy_amqp"): |
---|
| 125 | amqp_config[option] = config.get("galaxy_amqp", option) |
---|
| 126 | log.debug(str(amqp_config)) |
---|
| 127 | # web server config |
---|
| 128 | global webconfig |
---|
| 129 | webconfig = ConfigParser.ConfigParser() |
---|
| 130 | webconfig.read('transfer_datasets.ini') |
---|
| 131 | |
---|
| 132 | |
---|
| 133 | |
---|
| 134 | |
---|
| 135 | |
---|
| 136 | conn = amqp.Connection(host=amqp_config['host']+":"+amqp_config['port'], |
---|
| 137 | userid=amqp_config['userid'], |
---|
| 138 | password=amqp_config['password'], |
---|
| 139 | virtual_host=amqp_config['virtual_host'], |
---|
| 140 | insist=False) |
---|
| 141 | chan = conn.channel() |
---|
| 142 | chan.queue_declare(queue=amqp_config['queue'], durable=True, exclusive=True, auto_delete=False) |
---|
| 143 | chan.exchange_declare(exchange=amqp_config['exchange'], type="direct", durable=True, auto_delete=False,) |
---|
| 144 | chan.queue_bind(queue=amqp_config['queue'], |
---|
| 145 | exchange=amqp_config['exchange'], |
---|
| 146 | routing_key=amqp_config['routing_key']) |
---|
| 147 | |
---|
| 148 | chan.basic_consume(queue=amqp_config['queue'], |
---|
| 149 | no_ack=True, |
---|
| 150 | callback=recv_callback, |
---|
| 151 | consumer_tag="testtag") |
---|
| 152 | while True: |
---|
| 153 | chan.wait() |
---|
| 154 | chan.basic_cancel("testtag") |
---|
| 155 | chan.close() |
---|
| 156 | conn.close() |
---|
| 157 | |
---|
| 158 | if __name__ == '__main__': |
---|
| 159 | main() |
---|