summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/pool.py
blob: 7a88ac6f7487f2f199a03005f4ec4bf40e9ab82e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
# pool.py - Connection pooling for SQLAlchemy
# Copyright (C) 2005,2006 Michael Bayer mike_mp@zzzcomputing.com
#
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php


"""provides a connection pool implementation, which optionally manages connections
on a thread local basis.  Also provides a DBAPI2 transparency layer so that pools can
be managed automatically, based on module type and connect arguments,
 simply by calling regular DBAPI connect() methods."""

import weakref, string, cPickle
from sqlalchemy import util, exceptions
import sqlalchemy.queue as Queue

try:
    import thread
except:
    import dummythread as thread

proxies = {}

def manage(module, **params):
    """given a DBAPI2 module and pool management parameters, returns a proxy for the module
    that will automatically pool connections.  Options are delivered to an underlying DBProxy
    object.

    Arguments:
    module : a DBAPI2 database module.
    
    Options:
    echo=False : if set to True, connections being pulled and retrieved from/to the pool will
    be logged to the standard output, as well as pool sizing information.

    use_threadlocal=True : if set to True, repeated calls to connect() within the same
    application thread will be guaranteed to return the same connection object, if one has
    already been retrieved from the pool and has not been returned yet. This allows code to
    retrieve a connection from the pool, and then while still holding on to that connection,
    to call other functions which also ask the pool for a connection of the same arguments;
    those functions will act upon the same connection that the calling method is using.

    poolclass=QueuePool : the default class used by the pool module to provide pooling.
    QueuePool uses the Python Queue.Queue class to maintain a list of available connections.

    pool_size=5 : used by QueuePool - the size of the pool to be maintained. This is the
    largest number of connections that will be kept persistently in the pool. Note that the
    pool begins with no connections; once this number of connections is requested, that
    number of connections will remain.

    max_overflow=10 : the maximum overflow size of the pool. When the number of checked-out
    connections reaches the size set in pool_size, additional connections will be returned up
    to this limit. When those additional connections are returned to the pool, they are
    disconnected and discarded. It follows then that the total number of simultaneous
    connections the pool will allow is pool_size + max_overflow, and the total number of
    "sleeping" connections the pool will allow is pool_size. max_overflow can be set to -1 to
    indicate no overflow limit; no limit will be placed on the total number of concurrent
    connections.
    
    """
    try:
        return proxies[module]
    except KeyError:
        return proxies.setdefault(module, DBProxy(module, **params))    

def clear_managers():
    """removes all current DBAPI2 managers.  all pools and connections are disposed."""
    for manager in proxies.values():
        manager.close()
    proxies.clear()

    
class Pool(object):
    def __init__(self, echo = False, use_threadlocal = True, logger=None):
        self._threadconns = weakref.WeakValueDictionary()
        self._use_threadlocal = use_threadlocal
        self.echo = echo
        self._logger = logger or util.Logger(origin='pool')
    
    def unique_connection(self):
        return ConnectionFairy(self).checkout()
            
    def connect(self):
        if not self._use_threadlocal:
            return ConnectionFairy(self).checkout()
            
        try:
            return self._threadconns[thread.get_ident()].checkout()
        except KeyError:
            agent = ConnectionFairy(self).checkout()
            self._threadconns[thread.get_ident()] = agent
            return agent

    def _purge_for_threadlocal(self):
        if self._use_threadlocal:
            try:
                del self._threadconns[thread.get_ident()]
            except KeyError:
                pass

    def return_conn(self, agent):
        self._purge_for_threadlocal()
        self.do_return_conn(agent.connection)

    def return_invalid(self):
        self._purge_for_threadlocal()
        self.do_return_invalid()
        
    def get(self):
        return self.do_get()
    
    def do_get(self):
        raise NotImplementedError()
        
    def do_return_conn(self, conn):
        raise NotImplementedError()
        
    def do_return_invalid(self):
        raise NotImplementedError()
        
    def status(self):
        raise NotImplementedError()

    def log(self, msg):
        self._logger.write(msg)

    def dispose(self):
        raise NotImplementedError()
        
    def __del__(self):
        self.dispose()
        
class ConnectionFairy(object):
    def __init__(self, pool, connection=None):
        self.pool = pool
        self.__counter = 0
        if connection is not None:
            self.connection = connection
        else:
            try:
                self.connection = pool.get()
            except:
                self.connection = None
                self.pool.return_invalid()
                raise
        if self.pool.echo:
            self.pool.log("Connection %s checked out from pool" % repr(self.connection))
    def invalidate(self):
        if self.pool.echo:
            self.pool.log("Invalidate connection %s" % repr(self.connection))
        self.connection = None
        self.pool.return_invalid()
    def cursor(self, *args, **kwargs):
        return CursorFairy(self, self.connection.cursor(*args, **kwargs))
    def __getattr__(self, key):
        return getattr(self.connection, key)
    def checkout(self):
        if self.connection is None:
            raise "this connection is closed"
        self.__counter +=1
        return self    
    def close(self):
        self.__counter -=1
        if self.__counter == 0:
            self._close()
    def __del__(self):
        self._close()
    def _close(self):
        if self.connection is not None:
            if self.pool.echo:
                self.pool.log("Connection %s being returned to pool" % repr(self.connection))
            self.connection.rollback()
            self.pool.return_conn(self)
            self.pool = None
            self.connection = None
            
class CursorFairy(object):
    def __init__(self, parent, cursor):
        self.parent = parent
        self.cursor = cursor
    def __getattr__(self, key):
        return getattr(self.cursor, key)

class SingletonThreadPool(Pool):
    """Maintains one connection per each thread, never moving to another thread.  this is
    used for SQLite."""
    def __init__(self, creator, **params):
        Pool.__init__(self, **params)
        self._conns = {}
        self._creator = creator

    def dispose(self):
        for key, conn in self._conns.items():
            try:
                conn.close()
            except:
                # sqlite won't even let you close a conn from a thread that didn't create it
                pass
            del self._conns[key]
            
    def status(self):
        return "SingletonThreadPool id:%d thread:%d size: %d" % (id(self), thread.get_ident(), len(self._conns))

    def do_return_conn(self, conn):
        pass
        
    def do_return_invalid(self):
        try:
            del self._conns[thread.get_ident()]
        except KeyError:
            pass
            
    def do_get(self):
        try:
            return self._conns[thread.get_ident()]
        except KeyError:
            c = self._creator()
            self._conns[thread.get_ident()] = c
            return c
    
class QueuePool(Pool):
    """uses Queue.Queue to maintain a fixed-size list of connections."""
    def __init__(self, creator, pool_size = 5, max_overflow = 10, timeout=30, **params):
        Pool.__init__(self, **params)
        self._creator = creator
        self._pool = Queue.Queue(pool_size)

        self._overflow = 0 - pool_size
        self._max_overflow = max_overflow
        self._timeout = timeout
    
    def do_return_conn(self, conn):
        try:
            self._pool.put(conn, False)
        except Queue.Full:
            self._overflow -= 1

    def do_return_invalid(self):
        self._overflow -= 1
        
    def do_get(self):
        try:
            return self._pool.get(self._max_overflow > -1 and self._overflow >= self._max_overflow, self._timeout)
        except Queue.Empty:
            if self._max_overflow > -1 and self._overflow >= self._max_overflow:
                raise exceptions.TimeoutError("QueuePool limit of size %d overflow %d reached, connection timed out" % (self.size(), self.overflow()))
            self._overflow += 1
            return self._creator()

    def dispose(self):
        while True:
            try:
                conn = self._pool.get(False)
                conn.close()
            except Queue.Empty:
                break

    def status(self):
        tup = (self.size(), self.checkedin(), self.overflow(), self.checkedout())
        return "Pool size: %d  Connections in pool: %d Current Overflow: %d Current Checked out connections: %d" % tup
        
    def size(self):
        return self._pool.maxsize
    
    def checkedin(self):
        return self._pool.qsize()
    
    def overflow(self):
        return self._overflow
    
    def checkedout(self):
        return self._pool.maxsize - self._pool.qsize() + self._overflow
        

class DBProxy(object):
    """proxies a DBAPI2 connect() call to a pooled connection keyed to the specific connect
    parameters."""
    
    def __init__(self, module, poolclass = QueuePool, **params):
        """
        module is a DBAPI2 module
        poolclass is a Pool class, defaulting to QueuePool.
        other parameters are sent to the Pool object's constructor.
        """
        self.module = module
        self.params = params
        self.poolclass = poolclass
        self.pools = {}

    def close(self):
        for key in self.pools.keys():
            del self.pools[key]

    def __del__(self):
        self.close()
            
    def get_pool(self, *args, **params):
        key = self._serialize(*args, **params)
        try:
            return self.pools[key]
        except KeyError:
            pool = self.poolclass(lambda: self.module.connect(*args, **params), **self.params)
            self.pools[key] = pool
            return pool
        
    def connect(self, *args, **params):
        """connects to a database using this DBProxy's module and the given connect
        arguments.  if the arguments match an existing pool, the connection will be returned
        from the pool's current thread-local connection instance, or if there is no
        thread-local connection instance it will be checked out from the set of pooled
        connections.  If the pool has no available connections and allows new connections to
        be created, a new database connection will be made."""
        return self.get_pool(*args, **params).connect()
    
    def dispose(self, *args, **params):
        """disposes the connection pool referenced by the given connect arguments."""
        key = self._serialize(*args, **params)
        try:
            del self.pools[key]
        except KeyError:
            pass
        
    def _serialize(self, *args, **params):
        return cPickle.dumps([args, params])