# # randpool.py : Cryptographically strong random number generation # # Part of the Python Cryptography Toolkit # # Distribute and use freely; there are no restrictions on further # dissemination and usage except those imposed by the laws of your # country of residence. This software is provided "as is" without # warranty of fitness for use or suitability for any purpose, express # or implied. Use at your own risk or not at all. # __revision__ = "$Id: randpool.py,v 1.14 2004/05/06 12:56:54 akuchling Exp $" import time, array, types, warnings, os.path from Crypto.Util.number import long_to_bytes try: import Crypto.Util.winrandom as winrandom except: winrandom = None STIRNUM = 3 class RandomPool: """randpool.py : Cryptographically strong random number generation. The implementation here is similar to the one in PGP. To be cryptographically strong, it must be difficult to determine the RNG's output, whether in the future or the past. This is done by using a cryptographic hash function to "stir" the random data. Entropy is gathered in the same fashion as PGP; the highest-resolution clock around is read and the data is added to the random number pool. A conservative estimate of the entropy is then kept. If a cryptographically secure random source is available (/dev/urandom on many Unixes, Windows CryptGenRandom on most Windows), then use it. Instance Attributes: bits : int Maximum size of pool in bits bytes : int Maximum size of pool in bytes entropy : int Number of bits of entropy in this pool. Methods: add_event([s]) : add some entropy to the pool get_bytes(int) : get N bytes of random data randomize([N]) : get N bytes of randomness from external source """ def __init__(self, numbytes = 160, cipher=None, hash=None): if hash is None: from Crypto.Hash import SHA as hash # The cipher argument is vestigial; it was removed from # version 1.1 so RandomPool would work even in the limited # exportable subset of the code if cipher is not None: warnings.warn("'cipher' parameter is no longer used") if isinstance(hash, types.StringType): # ugly hack to force __import__ to give us the end-path module hash = __import__('Crypto.Hash.'+hash, None, None, ['new']) warnings.warn("'hash' parameter should now be a hashing module") self.bytes = numbytes self.bits = self.bytes*8 self.entropy = 0 self._hash = hash # Construct an array to hold the random pool, # initializing it to 0. self._randpool = array.array('B', [0]*self.bytes) self._event1 = self._event2 = 0 self._addPos = 0 self._getPos = hash.digest_size self._lastcounter=time.time() self.__counter = 0 self._measureTickSize() # Estimate timer resolution self._randomize() def _updateEntropyEstimate(self, nbits): self.entropy += nbits if self.entropy < 0: self.entropy = 0 elif self.entropy > self.bits: self.entropy = self.bits def _randomize(self, N = 0, devname = '/dev/urandom'): """_randomize(N, DEVNAME:device-filepath) collects N bits of randomness from some entropy source (e.g., /dev/urandom on Unixes that have it, Windows CryptoAPI CryptGenRandom, etc) DEVNAME is optional, defaults to /dev/urandom. You can change it to /dev/random if you want to block till you get enough entropy. """ data = '' if N <= 0: nbytes = int((self.bits - self.entropy)/8+0.5) else: nbytes = int(N/8+0.5) if winrandom: # Windows CryptGenRandom provides random data. data = winrandom.new().get_bytes(nbytes) elif os.path.exists(devname): # Many OSes support a /dev/urandom device try: f=open(devname) data=f.read(nbytes) f.close() except IOError, (num, msg): if num!=2: raise IOError, (num, msg) # If the file wasn't found, ignore the error if data: self._addBytes(data) # Entropy estimate: The number of bits of # data obtained from the random source. self._updateEntropyEstimate(8*len(data)) self.stir_n() # Wash the random pool def randomize(self, N=0): """randomize(N:int) use the class entropy source to get some entropy data. This is overridden by KeyboardRandomize(). """ return self._randomize(N) def stir_n(self, N = STIRNUM): """stir_n(N) stirs the random pool N times """ for i in xrange(N): self.stir() def stir (self, s = ''): """stir(s:string) Mix up the randomness pool. This will call add_event() twice, but out of paranoia the entropy attribute will not be increased. The optional 's' parameter is a string that will be hashed with the randomness pool. """ entropy=self.entropy # Save inital entropy value self.add_event() # Loop over the randomness pool: hash its contents # along with a counter, and add the resulting digest # back into the pool. for i in range(self.bytes / self._hash.digest_size): h = self._hash.new(self._randpool) h.update(str(self.__counter) + str(i) + str(self._addPos) + s) self._addBytes( h.digest() ) self.__counter = (self.__counter + 1) & 0xFFFFffffL self._addPos, self._getPos = 0, self._hash.digest_size self.add_event() # Restore the old value of the entropy. self.entropy=entropy def get_bytes (self, N): """get_bytes(N:int) : string Return N bytes of random data. """ s='' i, pool = self._getPos, self._randpool h=self._hash.new() dsize = self._hash.digest_size num = N while num > 0: h.update( self._randpool[i:i+dsize] ) s = s + h.digest() num = num - dsize i = (i + dsize) % self.bytes if i>1, bits+1 if bits>8: bits=8 self._event1, self._event2 = event, self._event1 self._updateEntropyEstimate(bits) return bits # Private functions def _noise(self): # Adds a bit of noise to the random pool, by adding in the # current time and CPU usage of this process. # The difference from the previous call to _noise() is taken # in an effort to estimate the entropy. t=time.time() delta = (t - self._lastcounter)/self._ticksize*1e6 self._lastcounter = t self._addBytes(long_to_bytes(long(1000*time.time()))) self._addBytes(long_to_bytes(long(1000*time.clock()))) self._addBytes(long_to_bytes(long(1000*time.time()))) self._addBytes(long_to_bytes(long(delta))) # Reduce delta to a maximum of 8 bits so we don't add too much # entropy as a result of this call. delta=delta % 0xff return int(delta) def _measureTickSize(self): # _measureTickSize() tries to estimate a rough average of the # resolution of time that you can see from Python. It does # this by measuring the time 100 times, computing the delay # between measurements, and taking the median of the resulting # list. (We also hash all the times and add them to the pool) interval = [None] * 100 h = self._hash.new(`(id(self),id(interval))`) # Compute 100 differences t=time.time() h.update(`t`) i = 0 j = 0 while i < 100: t2=time.time() h.update(`(i,j,t2)`) j += 1 delta=int((t2-t)*1e6) if delta: interval[i] = delta i += 1 t=t2 # Take the median of the array of intervals interval.sort() self._ticksize=interval[len(interval)/2] h.update(`(interval,self._ticksize)`) # mix in the measurement times and wash the random pool self.stir(h.digest()) def _addBytes(self, s): "XOR the contents of the string S into the random pool" i, pool = self._addPos, self._randpool for j in range(0, len(s)): pool[i]=pool[i] ^ ord(s[j]) i=(i+1) % self.bytes self._addPos = i # Deprecated method names: remove in PCT 2.1 or later. def getBytes(self, N): warnings.warn("getBytes() method replaced by get_bytes()", DeprecationWarning) return self.get_bytes(N) def addEvent (self, event, s=""): warnings.warn("addEvent() method replaced by add_event()", DeprecationWarning) return self.add_event(s + str(event)) class PersistentRandomPool (RandomPool): def __init__ (self, filename=None, *args, **kwargs): RandomPool.__init__(self, *args, **kwargs) self.filename = filename if filename: try: # the time taken to open and read the file might have # a little disk variability, modulo disk/kernel caching... f=open(filename, 'rb') self.add_event() data = f.read() self.add_event() # mix in the data from the file and wash the random pool self.stir(data) f.close() except IOError: # Oh, well; the file doesn't exist or is unreadable, so # we'll just ignore it. pass def save(self): if self.filename == "": raise ValueError, "No filename set for this object" # wash the random pool before save, provides some forward secrecy for # old values of the pool. self.stir_n() f=open(self.filename, 'wb') self.add_event() f.write(self._randpool.tostring()) f.close() self.add_event() # wash the pool again, provide some protection for future values self.stir() # non-echoing Windows keyboard entry _kb = 0 if not _kb: try: import msvcrt class KeyboardEntry: def getch(self): c = msvcrt.getch() if c in ('\000', '\xe0'): # function key c += msvcrt.getch() return c def close(self, delay = 0): if delay: time.sleep(delay) while msvcrt.kbhit(): msvcrt.getch() _kb = 1 except: pass # non-echoing Posix keyboard entry if not _kb: try: import termios class KeyboardEntry: def __init__(self, fd = 0): self._fd = fd self._old = termios.tcgetattr(fd) new = termios.tcgetattr(fd) new[3]=new[3] & ~termios.ICANON & ~termios.ECHO termios.tcsetattr(fd, termios.TCSANOW, new) def getch(self): termios.tcflush(0, termios.TCIFLUSH) # XXX Leave this in? return os.read(self._fd, 1) def close(self, delay = 0): if delay: time.sleep(delay) termios.tcflush(self._fd, termios.TCIFLUSH) termios.tcsetattr(self._fd, termios.TCSAFLUSH, self._old) _kb = 1 except: pass class KeyboardRandomPool (PersistentRandomPool): def __init__(self, *args, **kwargs): PersistentRandomPool.__init__(self, *args, **kwargs) def randomize(self, N = 0): "Adds N bits of entropy to random pool. If N is 0, fill up pool." import os, string, time if N <= 0: bits = self.bits - self.entropy else: bits = N*8 if bits == 0: return print bits,'bits of entropy are now required. Please type on the keyboard' print 'until enough randomness has been accumulated.' kb = KeyboardEntry() s='' # We'll save the characters typed and add them to the pool. hash = self._hash e = 0 try: while e < bits: temp=str(bits-e).rjust(6) os.write(1, temp) s=s+kb.getch() e += self.add_event(s) os.write(1, 6*chr(8)) self.add_event(s+hash.new(s).digest() ) finally: kb.close() print '\n\007 Enough. Please wait a moment.\n' self.stir_n() # wash the random pool. kb.close(4) if __name__ == '__main__': pool = RandomPool() print 'random pool entropy', pool.entropy, 'bits' pool.add_event('something') print `pool.get_bytes(100)` import tempfile, os fname = tempfile.mktemp() pool = KeyboardRandomPool(filename=fname) print 'keyboard random pool entropy', pool.entropy, 'bits' pool.randomize() print 'keyboard random pool entropy', pool.entropy, 'bits' pool.randomize(128) pool.save() saved = open(fname, 'rb').read() print 'saved', `saved` print 'pool ', `pool._randpool.tostring()` newpool = PersistentRandomPool(fname) print 'persistent random pool entropy', pool.entropy, 'bits' os.remove(fname)