33from threading import Thread , Lock , Event
44from multiprocessing import cpu_count
55from six import PY2
6-
76if PY2 :
87 from Queue import Queue
98else :
109 from queue import Queue
1110
12- __all__ = ['multiget' , 'MultiGetPool' ]
11+ __all__ = ['multiget' , 'multiput' , ' MultiGetPool' , 'MultiPutPool ' ]
1312
1413
1514try :
2120 POOL_SIZE = 6
2221
2322#: A :class:`namedtuple` for tasks that are fed to workers in the
24- #: multiget pool.
25- Task = namedtuple ('Task' , ['client' , 'outq' , 'bucket_type' , 'bucket' , 'key' ,
26- 'options' ])
23+ #: multi pool.
24+ Task = namedtuple (
25+ 'Task' ,
26+ ['client' , 'outq' ,
27+ 'bucket_type' , 'bucket' , 'key' ,
28+ 'object' , 'options' ])
2729
2830
29- class MultiGetPool (object ):
31+ class MultiPool (object ):
3032 """
31- Encapsulates a pool of fetcher threads. These threads can be used
32- across many multi-get requests.
33+ Encapsulates a pool of threads. These threads can be used
34+ across many multi requests.
3335 """
3436
35- def __init__ (self , size = POOL_SIZE ):
37+ def __init__ (self , size = POOL_SIZE , name = 'unknown' ):
3638 """
3739 :param size: the desired size of the worker pool
3840 :type size: int
3941 """
4042
4143 self ._inq = Queue ()
4244 self ._size = size
45+ self ._name = name
4346 self ._started = Event ()
4447 self ._stop = Event ()
4548 self ._lock = Lock ()
@@ -57,14 +60,14 @@ def enq(self, task):
5760 if not self ._stop .is_set ():
5861 self ._inq .put (task )
5962 else :
60- raise RuntimeError ("Attempted to enqueue a fetch operation while "
61- "multi-get pool was shutdown!" )
63+ raise RuntimeError ("Attempted to enqueue an operation while "
64+ "multi pool was shutdown!" )
6265
6366 def start (self ):
6467 """
6568 Starts the worker threads if they are not already started.
6669 This method is thread-safe and will be called automatically
67- when executing a MultiGet operation.
70+ when executing an operation.
6871 """
6972 # Check whether we are already started, skip if we are.
7073 if not self ._started .is_set ():
@@ -73,8 +76,9 @@ def start(self):
7376 # If we got the lock, go ahead and start the worker
7477 # threads, set the started flag, and release the lock.
7578 for i in range (self ._size ):
76- name = "riak.client.multiget-worker-{0}" .format (i )
77- worker = Thread (target = self ._fetcher , name = name )
79+ name = "riak.client.multi-worker-{0}-{1}" .format (
80+ self ._name , i )
81+ worker = Thread (target = self ._worker_method , name = name )
7882 worker .daemon = True
7983 worker .start ()
8084 self ._workers .append (worker )
@@ -105,7 +109,26 @@ def __del__(self):
105109 # shutting down.
106110 self .stop ()
107111
108- def _fetcher (self ):
112+ def _worker_method (self ):
113+ raise NotImplementedError
114+
115+ def _should_quit (self ):
116+ """
117+ Worker threads should exit when the stop flag is set and the
118+ input queue is empty. Once the stop flag is set, new enqueues
119+ are disallowed, meaning that the workers can safely drain the
120+ queue before exiting.
121+
122+ :rtype: bool
123+ """
124+ return self .stopped () and self ._inq .empty ()
125+
126+
127+ class MultiGetPool (MultiPool ):
128+ def __init__ (self , size = POOL_SIZE ):
129+ super (MultiGetPool , self ).__init__ (size = size , name = 'get' )
130+
131+ def _worker_method (self ):
109132 """
110133 The body of the multi-get worker. Loops until
111134 :meth:`_should_quit` returns ``True``, taking tasks off the
@@ -121,24 +144,40 @@ def _fetcher(self):
121144 except KeyboardInterrupt :
122145 raise
123146 except Exception as err :
124- task .outq .put ((task .bucket_type , task .bucket , task .key , err ), )
147+ errdata = (task .bucket_type , task .bucket , task .key , err )
148+ task .outq .put (errdata )
125149 finally :
126150 self ._inq .task_done ()
127151
128- def _should_quit (self ):
129- """
130- Worker threads should exit when the stop flag is set and the
131- input queue is empty. Once the stop flag is set, new enqueues
132- are disallowed, meaning that the workers can safely drain the
133- queue before exiting.
134152
135- :rtype: bool
153+ class MultiPutPool (MultiPool ):
154+ def __init__ (self , size = POOL_SIZE ):
155+ super (MultiPutPool , self ).__init__ (size = size , name = 'put' )
156+
157+ def _worker_method (self ):
136158 """
137- return self .stopped () and self ._inq .empty ()
159+ The body of the multi-put worker. Loops until
160+ :meth:`_should_quit` returns ``True``, taking tasks off the
161+ input queue, storing the object, and putting the result on
162+ the output queue.
163+ """
164+ while not self ._should_quit ():
165+ task = self ._inq .get ()
166+ try :
167+ robj = task .object
168+ rv = task .client .put (robj , ** task .options )
169+ task .outq .put (rv )
170+ except KeyboardInterrupt :
171+ raise
172+ except Exception as err :
173+ errdata = (task .object , err )
174+ task .outq .put (errdata )
175+ finally :
176+ self ._inq .task_done ()
138177
139178
140- #: The default pool is automatically created and stored in this constant.
141179RIAK_MULTIGET_POOL = MultiGetPool ()
180+ RIAK_MULTIPUT_POOL = MultiPutPool ()
142181
143182
144183def multiget (client , keys , ** options ):
@@ -160,8 +199,8 @@ def multiget(client, keys, **options):
160199 :meth:`RiakBucket.get <riak.bucket.RiakBucket.get>`
161200 :type options: dict
162201 :rtype: list
163- """
164202
203+ """
165204 outq = Queue ()
166205
167206 if 'pool' in options :
@@ -172,7 +211,7 @@ def multiget(client, keys, **options):
172211
173212 pool .start ()
174213 for bucket_type , bucket , key in keys :
175- task = Task (client , outq , bucket_type , bucket , key , options )
214+ task = Task (client , outq , bucket_type , bucket , key , None , options )
176215 pool .enq (task )
177216
178217 results = []
@@ -184,3 +223,48 @@ def multiget(client, keys, **options):
184223 outq .task_done ()
185224
186225 return results
226+
227+
228+ def multiput (client , objs , ** options ):
229+ """Executes a parallel-store across multiple threads. Returns a list
230+ containing booleans or :class:`~riak.riak_object.RiakObject`
231+
232+ If a ``pool`` option is included, the request will use the given worker
233+ pool and not the default :data:`RIAK_MULTIPUT_POOL`. This option will
234+ be passed by the client if the ``multiput_pool_size`` option was set on
235+ client initialization.
236+
237+ :param client: the client to use
238+ :type client: :class:`RiakClient <riak.client.RiakClient>`
239+ :param objs: the Riak Objects to store in parallel
240+ :type keys: list of `RiakObject <riak.riak_object.RiakObject>`
241+ :param options: request options to
242+ :meth:`RiakClient.put <riak.client.RiakClient.put>`
243+ :type options: dict
244+ :rtype: list
245+ """
246+ outq = Queue ()
247+
248+ if 'pool' in options :
249+ pool = options ['pool' ]
250+ del options ['pool' ]
251+ else :
252+ pool = RIAK_MULTIPUT_POOL
253+
254+ pool .start ()
255+ for robj in objs :
256+ bucket_type = robj .bucket .bucket_type
257+ bucket = robj .bucket .name
258+ key = robj .key
259+ task = Task (client , outq , bucket_type , bucket , key , robj , options )
260+ pool .enq (task )
261+
262+ results = []
263+ for _ in range (len (objs )):
264+ if pool .stopped ():
265+ raise RuntimeError ("Multi-put operation interrupted by pool "
266+ "stopping!" )
267+ results .append (outq .get ())
268+ outq .task_done ()
269+
270+ return results
0 commit comments