root/galaxy-central/eggs/Cheetah-2.2.2-py2.6-macosx-10.6-universal-ucs2.egg/Cheetah/Utils/memcache.py @ 3

リビジョン 3, 18.4 KB (コミッタ: kohda, 14 年 前)

Install Unix tools  http://hannonlab.cshl.edu/galaxy_unix_tools/galaxy.html

行番号 
1
2"""
3client module for memcached (memory cache daemon)
4
5Overview
6========
7
8See U{the MemCached homepage<http://www.danga.com/memcached>} for more about memcached.
9
10Usage summary
11=============
12
13This should give you a feel for how this module operates::
14
15    import memcache
16    mc = memcache.Client(['127.0.0.1:11211'], debug=0)
17
18    mc.set("some_key", "Some value")
19    value = mc.get("some_key")
20
21    mc.set("another_key", 3)
22    mc.delete("another_key")
23   
24    mc.set("key", "1")   # note that the key used for incr/decr must be a string.
25    mc.incr("key")
26    mc.decr("key")
27
28The standard way to use memcache with a database is like this::
29
30    key = derive_key(obj)
31    obj = mc.get(key)
32    if not obj:
33        obj = backend_api.get(...)
34        mc.set(key, obj)
35
36    # we now have obj, and future passes through this code
37    # will use the object from the cache.
38
39Detailed Documentation
40======================
41
42More detailed documentation is available in the L{Client} class.
43"""
44
45import sys
46import socket
47import time
48import types
49try:
50    import cPickle as pickle
51except ImportError:
52    import pickle
53
54__author__    = "Evan Martin <martine@danga.com>"
55__version__   = "1.2_tummy5"
56__copyright__ = "Copyright (C) 2003 Danga Interactive"
57__license__   = "Python"
58
59class _Error(Exception):
60    pass
61
62class Client:
63    """
64    Object representing a pool of memcache servers.
65   
66    See L{memcache} for an overview.
67
68    In all cases where a key is used, the key can be either:
69        1. A simple hashable type (string, integer, etc.).
70        2. A tuple of C{(hashvalue, key)}.  This is useful if you want to avoid
71        making this module calculate a hash value.  You may prefer, for
72        example, to keep all of a given user's objects on the same memcache
73        server, so you could use the user's unique id as the hash value.
74
75    @group Setup: __init__, set_servers, forget_dead_hosts, disconnect_all, debuglog
76    @group Insertion: set, add, replace
77    @group Retrieval: get, get_multi
78    @group Integers: incr, decr
79    @group Removal: delete
80    @sort: __init__, set_servers, forget_dead_hosts, disconnect_all, debuglog,\
81           set, add, replace, get, get_multi, incr, decr, delete
82    """
83
84    _usePickle = False
85    _FLAG_PICKLE  = 1<<0
86    _FLAG_INTEGER = 1<<1
87    _FLAG_LONG    = 1<<2
88
89    _SERVER_RETRIES = 10  # how many times to try finding a free server.
90
91    def __init__(self, servers, debug=0):
92        """
93        Create a new Client object with the given list of servers.
94
95        @param servers: C{servers} is passed to L{set_servers}.
96        @param debug: whether to display error messages when a server can't be
97        contacted.
98        """
99        self.set_servers(servers)
100        self.debug = debug
101        self.stats = {}
102   
103    def set_servers(self, servers):
104        """
105        Set the pool of servers used by this client.
106
107        @param servers: an array of servers.
108        Servers can be passed in two forms:
109            1. Strings of the form C{"host:port"}, which implies a default weight of 1.
110            2. Tuples of the form C{("host:port", weight)}, where C{weight} is
111            an integer weight value.
112        """
113        self.servers = [_Host(s, self.debuglog) for s in servers]
114        self._init_buckets()
115
116    def get_stats(self):
117        '''Get statistics from each of the servers. 
118
119        @return: A list of tuples ( server_identifier, stats_dictionary ).
120            The dictionary contains a number of name/value pairs specifying
121            the name of the status field and the string value associated with
122            it.  The values are not converted from strings.
123        '''
124        data = []
125        for s in self.servers:
126            if not s.connect(): continue
127            name = '%s:%s (%s)' % ( s.ip, s.port, s.weight )
128            s.send_cmd('stats')
129            serverData = {}
130            data.append(( name, serverData ))
131            readline = s.readline
132            while 1:
133                line = readline()
134                if not line or line.strip() == 'END': break
135                stats = line.split(' ', 2)
136                serverData[stats[1]] = stats[2]
137
138        return(data)
139
140    def flush_all(self):
141        'Expire all data currently in the memcache servers.'
142        for s in self.servers:
143            if not s.connect(): continue
144            s.send_cmd('flush_all')
145            s.expect("OK")
146
147    def debuglog(self, str):
148        if self.debug:
149            sys.stderr.write("MemCached: %s\n" % str)
150
151    def _statlog(self, func):
152        if not self.stats.has_key(func):
153            self.stats[func] = 1
154        else:
155            self.stats[func] += 1
156
157    def forget_dead_hosts(self):
158        """
159        Reset every host in the pool to an "alive" state.
160        """
161        for s in self.servers:
162            s.dead_until = 0
163
164    def _init_buckets(self):
165        self.buckets = []
166        for server in self.servers:
167            for i in range(server.weight):
168                self.buckets.append(server)
169
170    def _get_server(self, key):
171        if type(key) == types.TupleType:
172            serverhash = key[0]
173            key = key[1]
174        else:
175            serverhash = hash(key)
176
177        for i in range(Client._SERVER_RETRIES):
178            server = self.buckets[serverhash % len(self.buckets)]
179            if server.connect():
180                #print "(using server %s)" % server,
181                return server, key
182            serverhash = hash(str(serverhash) + str(i))
183        return None, None
184
185    def disconnect_all(self):
186        for s in self.servers:
187            s.close_socket()
188   
189    def delete(self, key, time=0):
190        '''Deletes a key from the memcache.
191       
192        @return: Nonzero on success.
193        @rtype: int
194        '''
195        server, key = self._get_server(key)
196        if not server:
197            return 0
198        self._statlog('delete')
199        if time != None:
200            cmd = "delete %s %d" % (key, time)
201        else:
202            cmd = "delete %s" % key
203
204        try:
205            server.send_cmd(cmd)
206            server.expect("DELETED")
207        except socket.error, msg:
208            server.mark_dead(msg[1])
209            return 0
210        return 1
211
212    def incr(self, key, delta=1):
213        """
214        Sends a command to the server to atomically increment the value for C{key} by
215        C{delta}, or by 1 if C{delta} is unspecified.  Returns None if C{key} doesn't
216        exist on server, otherwise it returns the new value after incrementing.
217
218        Note that the value for C{key} must already exist in the memcache, and it
219        must be the string representation of an integer.
220
221        >>> mc.set("counter", "20")  # returns 1, indicating success
222        1
223        >>> mc.incr("counter")
224        21
225        >>> mc.incr("counter")
226        22
227
228        Overflow on server is not checked.  Be aware of values approaching
229        2**32.  See L{decr}.
230
231        @param delta: Integer amount to increment by (should be zero or greater).
232        @return: New value after incrementing.
233        @rtype: int
234        """
235        return self._incrdecr("incr", key, delta)
236
237    def decr(self, key, delta=1):
238        """
239        Like L{incr}, but decrements.  Unlike L{incr}, underflow is checked and
240        new values are capped at 0.  If server value is 1, a decrement of 2
241        returns 0, not -1.
242
243        @param delta: Integer amount to decrement by (should be zero or greater).
244        @return: New value after decrementing.
245        @rtype: int
246        """
247        return self._incrdecr("decr", key, delta)
248
249    def _incrdecr(self, cmd, key, delta):
250        server, key = self._get_server(key)
251        if not server:
252            return 0
253        self._statlog(cmd)
254        cmd = "%s %s %d" % (cmd, key, delta)
255        try:
256            server.send_cmd(cmd)
257            line = server.readline()
258            return int(line)
259        except socket.error, msg:
260            server.mark_dead(msg[1])
261            return None
262
263    def add(self, key, val, time=0):
264        '''
265        Add new key with value.
266       
267        Like L{set}, but only stores in memcache if the key doesn\'t already exist.
268
269        @return: Nonzero on success.
270        @rtype: int
271        '''
272        return self._set("add", key, val, time)
273    def replace(self, key, val, time=0):
274        '''Replace existing key with value.
275       
276        Like L{set}, but only stores in memcache if the key already exists. 
277        The opposite of L{add}.
278
279        @return: Nonzero on success.
280        @rtype: int
281        '''
282        return self._set("replace", key, val, time)
283    def set(self, key, val, time=0):
284        '''Unconditionally sets a key to a given value in the memcache.
285
286        The C{key} can optionally be an tuple, with the first element being the
287        hash value, if you want to avoid making this module calculate a hash value.
288        You may prefer, for example, to keep all of a given user's objects on the
289        same memcache server, so you could use the user's unique id as the hash
290        value.
291
292        @return: Nonzero on success.
293        @rtype: int
294        '''
295        return self._set("set", key, val, time)
296   
297    def _set(self, cmd, key, val, time):
298        server, key = self._get_server(key)
299        if not server:
300            return 0
301
302        self._statlog(cmd)
303
304        flags = 0
305        if isinstance(val, types.StringTypes):
306            pass
307        elif isinstance(val, int):
308            flags |= Client._FLAG_INTEGER
309            val = "%d" % val
310        elif isinstance(val, long):
311            flags |= Client._FLAG_LONG
312            val = "%d" % val
313        elif self._usePickle:
314            flags |= Client._FLAG_PICKLE
315            val = pickle.dumps(val, 2)
316        else:
317            pass
318       
319        fullcmd = "%s %s %d %d %d\r\n%s" % (cmd, key, flags, time, len(val), val)
320        try:
321            server.send_cmd(fullcmd)
322            server.expect("STORED")
323        except socket.error, msg:
324            server.mark_dead(msg[1])
325            return 0
326        return 1
327
328    def get(self, key):
329        '''Retrieves a key from the memcache.
330       
331        @return: The value or None.
332        '''
333        server, key = self._get_server(key)
334        if not server:
335            return None
336
337        self._statlog('get')
338
339        try:
340            server.send_cmd("get %s" % key)
341            rkey, flags, rlen, = self._expectvalue(server)
342            if not rkey:
343                return None
344            value = self._recv_value(server, flags, rlen)
345            server.expect("END")
346        except (_Error, socket.error), msg:
347            if type(msg) is types.TupleType:
348                msg = msg[1]
349            server.mark_dead(msg)
350            return None
351        return value
352
353    def get_multi(self, keys):
354        '''
355        Retrieves multiple keys from the memcache doing just one query.
356       
357        >>> success = mc.set("foo", "bar")
358        >>> success = mc.set("baz", 42)
359        >>> mc.get_multi(["foo", "baz", "foobar"]) == {"foo": "bar", "baz": 42}
360        1
361
362        This method is recommended over regular L{get} as it lowers the number of
363        total packets flying around your network, reducing total latency, since
364        your app doesn\'t have to wait for each round-trip of L{get} before sending
365        the next one.
366
367        @param keys: An array of keys.
368        @return:  A dictionary of key/value pairs that were available.
369
370        '''
371
372        self._statlog('get_multi')
373
374        server_keys = {}
375
376        # build up a list for each server of all the keys we want.
377        for key in keys:
378            server, key = self._get_server(key)
379            if not server:
380                continue
381            if not server_keys.has_key(server):
382                server_keys[server] = []
383            server_keys[server].append(key)
384
385        # send out all requests on each server before reading anything
386        dead_servers = []
387        for server in server_keys.keys():
388            try:
389                server.send_cmd("get %s" % " ".join(server_keys[server]))
390            except socket.error, msg:
391                server.mark_dead(msg[1])
392                dead_servers.append(server)
393
394        # if any servers died on the way, don't expect them to respond.
395        for server in dead_servers:
396            del server_keys[server]
397
398        retvals = {}
399        for server in server_keys.keys():
400            try:
401                line = server.readline()
402                while line and line != 'END':
403                    rkey, flags, rlen = self._expectvalue(server, line)
404                    #  Bo Yang reports that this can sometimes be None
405                    if rkey is not None:
406                        val = self._recv_value(server, flags, rlen)
407                        retvals[rkey] = val
408                    line = server.readline()
409            except (_Error, socket.error), msg:
410                server.mark_dead(msg)
411        return retvals
412
413    def _expectvalue(self, server, line=None):
414        if not line:
415            line = server.readline()
416
417        if line[:5] == 'VALUE':
418            resp, rkey, flags, len = line.split()
419            flags = int(flags)
420            rlen = int(len)
421            return (rkey, flags, rlen)
422        else:
423            return (None, None, None)
424
425    def _recv_value(self, server, flags, rlen):
426        rlen += 2 # include \r\n
427        buf = server.recv(rlen)
428        if len(buf) != rlen:
429            raise _Error("received %d bytes when expecting %d" % (len(buf), rlen))
430
431        if len(buf) == rlen:
432            buf = buf[:-2]  # strip \r\n
433
434        if flags == 0:
435            val = buf
436        elif flags & Client._FLAG_INTEGER:
437            val = int(buf)
438        elif flags & Client._FLAG_LONG:
439            val = long(buf)
440        elif self._usePickle and flags & Client._FLAG_PICKLE:
441            try:
442                val = pickle.loads(buf)
443            except:
444                self.debuglog('Pickle error...\n')
445                val = None
446        else:
447            self.debuglog("unknown flags on get: %x\n" % flags)
448
449        return val
450
451class _Host:
452    _DEAD_RETRY = 30  # number of seconds before retrying a dead server.
453
454    def __init__(self, host, debugfunc=None):
455        if isinstance(host, types.TupleType):
456            host = host[0]
457            self.weight = host[1]
458        else:
459            self.weight = 1
460
461        if host.find(":") > 0:
462            self.ip, self.port = host.split(":")
463            self.port = int(self.port)
464        else:
465            self.ip, self.port = host, 11211
466
467        if not debugfunc:
468            debugfunc = lambda x: x
469        self.debuglog = debugfunc
470
471        self.deaduntil = 0
472        self.socket = None
473   
474    def _check_dead(self):
475        if self.deaduntil and self.deaduntil > time.time():
476            return 1
477        self.deaduntil = 0
478        return 0
479
480    def connect(self):
481        if self._get_socket():
482            return 1
483        return 0
484
485    def mark_dead(self, reason):
486        self.debuglog("MemCache: %s: %s.  Marking dead." % (self, reason))
487        self.deaduntil = time.time() + _Host._DEAD_RETRY
488        self.close_socket()
489       
490    def _get_socket(self):
491        if self._check_dead():
492            return None
493        if self.socket:
494            return self.socket
495        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
496        # Python 2.3-ism:  s.settimeout(1)
497        try:
498            s.connect((self.ip, self.port))
499        except socket.error, msg:
500            self.mark_dead("connect: %s" % msg[1])
501            return None
502        self.socket = s
503        return s
504   
505    def close_socket(self):
506        if self.socket:
507            self.socket.close()
508            self.socket = None
509
510    def send_cmd(self, cmd):
511        if len(cmd) > 100:
512            self.socket.sendall(cmd)
513            self.socket.sendall('\r\n')
514        else:
515            self.socket.sendall(cmd + '\r\n')
516
517    def readline(self):
518        buffers = ''
519        recv = self.socket.recv
520        while 1:
521            data = recv(1)
522            if not data:
523                self.mark_dead('Connection closed while reading from %s'
524                        % repr(self))
525                break
526            if data == '\n' and buffers and buffers[-1] == '\r':
527                return(buffers[:-1])
528            buffers = buffers + data
529        return(buffers)
530
531    def expect(self, text):
532        line = self.readline()
533        if line != text:
534            self.debuglog("while expecting '%s', got unexpected response '%s'" % (text, line))
535        return line
536   
537    def recv(self, rlen):
538        buf = ''
539        recv = self.socket.recv
540        while len(buf) < rlen:
541            buf = buf + recv(rlen - len(buf))
542        return buf
543
544    def __str__(self):
545        d = ''
546        if self.deaduntil:
547            d = " (dead until %d)" % self.deaduntil
548        return "%s:%d%s" % (self.ip, self.port, d)
549
550def _doctest():
551    import doctest, memcache
552    servers = ["127.0.0.1:11211"]
553    mc = Client(servers, debug=1)
554    globs = {"mc": mc}
555    return doctest.testmod(memcache, globs=globs)
556
557if __name__ == "__main__":
558    print "Testing docstrings..."
559    _doctest()
560    print "Running tests:"
561    print
562    #servers = ["127.0.0.1:11211", "127.0.0.1:11212"]
563    servers = ["127.0.0.1:11211"]
564    mc = Client(servers, debug=1)
565
566    def to_s(val):
567        if not isinstance(val, types.StringTypes):
568            return "%s (%s)" % (val, type(val))
569        return "%s" % val
570    def test_setget(key, val):
571        print "Testing set/get {'%s': %s} ..." % (to_s(key), to_s(val)),
572        mc.set(key, val)
573        newval = mc.get(key)
574        if newval == val:
575            print "OK"
576            return 1
577        else:
578            print "FAIL"
579            return 0
580
581    class FooStruct:
582        def __init__(self):
583            self.bar = "baz"
584        def __str__(self):
585            return "A FooStruct"
586        def __eq__(self, other):
587            if isinstance(other, FooStruct):
588                return self.bar == other.bar
589            return 0
590       
591    test_setget("a_string", "some random string")
592    test_setget("an_integer", 42)
593    if test_setget("long", long(1<<30)):
594        print "Testing delete ...",
595        if mc.delete("long"):
596            print "OK"
597        else:
598            print "FAIL"
599    print "Testing get_multi ...",
600    print mc.get_multi(["a_string", "an_integer"])
601
602    print "Testing get(unknown value) ...",
603    print to_s(mc.get("unknown_value"))
604
605    f = FooStruct()
606    test_setget("foostruct", f)
607
608    print "Testing incr ...",
609    x = mc.incr("an_integer", 1)
610    if x == 43:
611        print "OK"
612    else:
613        print "FAIL"
614
615    print "Testing decr ...",
616    x = mc.decr("an_integer", 1)
617    if x == 42:
618        print "OK"
619    else:
620        print "FAIL"
621
622
623
624# vim: ts=4 sw=4 et :
Note: リポジトリブラウザについてのヘルプは TracBrowser を参照してください。