root/projects/sAsync/trunk/sasync/parray.py

Revision 3, 9.0 kB (checked in by edsuom, 2 years ago)

Import of trunk from old repo

Line 
1 # sAsync:
2 # An enhancement to the SQLAlchemy package that provides persistent
3 # dictionaries, text indexing and searching, and an access broker for
4 # conveniently managing database access, table setup, and
5 # transactions. Everything can be run in an asynchronous fashion using the
6 # Twisted framework and its deferred processing capabilities.
7 #
8 # Copyright (C) 2006-2007 by Edwin A. Suominen, http://www.eepatents.com
9 #
10 # This program is free software; you can redistribute it and/or modify it under
11 # the terms of the GNU General Public License as published by the Free Software
12 # Foundation; either version 2 of the License, or (at your option) any later
13 # version.
14 #
15 # This program is distributed in the hope that it will be useful, but WITHOUT
16 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
17 # FOR A PARTICULAR PURPOSE.  See the file COPYING for more details.
18 #
19 # You should have received a copy of the GNU General Public License along with
20 # this program; if not, write to the Free Software Foundation, Inc., 51
21 # Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
22
23 """
24 Persistent Three-dimensional array objects
25 """
26
27 # Imports
28 from twisted.internet import defer
29 import sqlalchemy as SA
30
31 from database import transact, AccessBroker
32 import search
33
34
35 NICENESS_WRITE = 6
36
37
38 class Transactor(AccessBroker):
39     """
40     I do the hands-on work of (potentially) non-blocking database access for
41     the persistence of array elements within a uniquely-identified group.
42
43     My methods return Twisted deferred instances to the results of their
44     database accesses rather than forcing the client code to block while the
45     database access is being completed.
46     
47     """
48     def __init__(self, ID, *url, **kw):
49         """
50         Instantiates me for a three-dimensional array of elements within a
51         particular group uniquely identified by the supplied integer I{ID},
52         using a database connection to I{url}.
53         """
54         if not isinstance(ID, int):
55             raise TypeError("Item IDs must be integers")
56         self.groupID = ID
57         if url:
58             super(Transactor, self).__init__(url[0], **kw)
59         else:
60             super(Transactor, self).__init__()
61
62     def startup(self):
63         """
64         You can run my transaction methods when the deferred returned from
65         this method fires, and not before.
66         """
67         d = self.table(
68             'sasync_array',
69             SA.Column('group_id', SA.Integer),
70             SA.Column('x', SA.Integer),
71             SA.Column('y', SA.Integer),
72             SA.Column('z', SA.Integer),
73             SA.Column('value', SA.PickleType, nullable=False),
74             unique_elements=['group_id', 'x', 'y', 'z']
75             )
76         return d
77    
78     @transact
79     def load(self, x, y, z):
80         """
81         Element load transaction
82         """
83         array = self.sasync_array
84         if not self.s('load'):
85             self.s(
86                 [array.c.value],
87                 SA.and_(array.c.group_id == self.groupID,
88                         array.c.x == SA.bindparam('x'),
89                         array.c.y == SA.bindparam('y'),
90                         array.c.z == SA.bindparam('z'))
91                 )
92         rows = self.s().execute(x=hash(x), y=hash(y), z=hash(z)).fetchone()
93         if not rows:
94             return None
95         else:
96             return rows['value']
97
98     @transact
99     def update(self, x, y, z, value):
100         """
101         Element overwrite (entry update) transaction
102         """
103         elements = self.sasync_array
104         u = elements.update(
105             SA.and_(elements.c.group_id == self.groupID,
106                     elements.c.x == hash(x),
107                     elements.c.y == hash(y),
108                     elements.c.z == hash(z))
109             )
110         u.execute(value=value)
111
112     @transact
113     def insert(self, x, y, z, value):
114         """
115         Element add (entry insert) transaction
116         """
117         self.sasync_array.insert().execute(
118             group_id=self.groupID,
119             x=hash(x), y=hash(y), z=hash(z), value=value)
120
121     @transact
122     def delete(self, x, y, z):
123         """
124         Element delete transaction
125         """
126         elements = self.sasync_array
127         self.sasync_array.delete(
128             SA.and_(elements.c.group_id == self.groupID,
129                     elements.c.x == hash(x),
130                     elements.c.y == hash(y),
131                     elements.c.z == hash(z))
132             ).execute()
133
134     @transact
135     def clear(self):
136         """
137         Transaction to clear all elements (B{Use with care!})
138         """
139         elements = self.sasync_array
140         self.sasync_array.delete(
141             elements.c.group_id == self.groupID).execute()
142
143
144 class PersistentArray(object):
145     """
146     I am a three-dimensional array of Python objects, addressable by any
147     three-way combination of hashable Python objects. You can use me as a
148     two-dimensional array by simply using some constant, e.g., C{None} when
149     supplying an address for my third dimension.
150
151     B{IMPORTANT}: Make sure you call my L{shutdown} method for an instance of
152     me that you're done with before allowing that instance to be deleted.
153     """
154     search = None
155
156     def __init__(self, ID, *url, **kw):
157         """
158         Constructor, with a URL and any engine-specifying keywords supplied if
159         a particular engine is to be used for this instance. The following
160         additional keyword is particular to this constructor:
161         
162         @keyword search: Set C{True} if text indexing is to be performed on items
163             as they are written.
164
165         """
166         try:
167             self.ID = hash(ID)
168         except:
169             raise TypeError("Item IDs must be hashable")
170         if kw.pop('search', False):
171             # No search object, worry about searching later
172             self.search = None
173         if url:
174             self.t = Transactor(self.ID, url[0], **kw)
175         else:
176             self.t = Transactor(self.ID)
177    
178     def shutdown(self, *null):
179         """
180         Shuts down my database L{Transactor} and its synchronous task queue.
181         """
182         return self.t.shutdown()
183
184     def write(self, funcName, *args, **kw):
185         """
186         Performs a database write transaction, returning a deferred to its
187         completion.
188
189         If we are updating the search index, there's a nuance to the
190         deferred processing. In that case, when the write is done, the
191         deferred is fired and processing separately proceeds with indexing
192         of the written value. Here's how it works:
193
194             1. Create a clean deferred B{d1} to return to the caller, whose
195                callback(s) will be fired from the callback to the transaction's
196                own deferred B{d2}.
197
198             2. Start the write transaction and assign the C{writeDone} function
199                as the callback to its deferred B{d2}. Note that the
200                defer-to-queue transaction keeps a reference to the deferred
201                object it instantiates, so we don't have to do so for either
202                B{d2} or B{d3}. Those references are merely defined in the
203                method for code readability.
204
205         """
206         def writeDone(noneResult, d1):
207             x, y, z = [hash(arg) for arg in args[0:3]]
208             document = "%d-%d" % (self.groupID, x)
209             section = "%d-%d" % (y, z)
210             d3 = self.search.index(
211                 value, document=document, section=section)
212             d3.addCallback(self.search.ready)
213             d1.callback(None)
214        
215         func = getattr(self.t, funcName)
216         kwNew = {'niceness':kw['niceness']}
217         if self.search is None:
218             return func(*args, **kwNew)
219         else:       
220             d1 = defer.Deferred()
221             self.search.busy()
222             d2 = func(*args, **kwNew)
223             d2.addCallback(writeDone, d1)
224             return d1
225
226     def get(self, x, y, z):
227         """
228         Retrieves an element (x,y,z) from the database.
229         """
230         d = self.t.dt.deferToAll()
231         d.addCallback(lambda _: self.t.load(x, y, z))
232         return d
233
234     def set(self, x, y, z, value):
235         """
236         Persists the supplied I{value} of element (x,y,z) to the database,
237         inserting or updating a row as appropriate.
238         """
239         def loaded(loadedValue):
240             if loadedValue is None:
241                 return self.write(
242                     "insert", x, y, z, value, niceness=NICENESS_WRITE)
243             else:
244                 return self.write(
245                     "update", x, y, z, value, niceness=NICENESS_WRITE)
246        
247         d = self.t.load(x, y, z)
248         d.addCallback(loaded)
249         self.t.dt.put(d)
250         return d
251
252     def delete(self, x, y, z):
253         """
254         Deletes the database row for element (x,y,z).
255         """
256         d = self.write("delete", x, y, z, niceness=NICENESS_WRITE)
257         self.t.dt.put(d)
258         return d
259
260     def clear(self):
261         """
262         Deletes the entire group of database rows for U{all} of my elements
263         (B{Use with care!})
264         """
265         d =self.write("clear", niceness=0)
266         self.t.dt.put(d)
267         return d
268
269
270 __all__ = ['PersistentArray']
Note: See TracBrowser for help on using the browser.