Skip to content

Commit 174b34d

Browse files
committed
Merge pull request #276 from basho/feature/er/yokozuna
Add protobuf yokozuna admin support
2 parents 6d4af99 + 0528f33 commit 174b34d

10 files changed

Lines changed: 388 additions & 11 deletions

File tree

riak/client/operations.py

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -390,6 +390,85 @@ def stream_mapred(self, inputs, query, timeout):
390390
finally:
391391
stream.close()
392392

393+
@retryable
394+
def create_search_index(self, transport, index, schema=None):
395+
"""
396+
create_search_index(index, schema)
397+
398+
Create a search index of the given name, and optionally set
399+
a schema. If no schema is set, the default will be used.
400+
401+
:param index: the name of the index to create
402+
:type index: string
403+
:param schema: the schema that this index will follow
404+
:type schema: string, None
405+
"""
406+
return transport.create_search_index(index, schema)
407+
408+
@retryable
409+
def get_search_index(self, transport, index):
410+
"""
411+
get_search_index(index)
412+
413+
Gets a search index of the given name if it exists,
414+
which will also return the schema. Raises a RiakError
415+
if no such schema exists.
416+
417+
:param index: the name of the index to create
418+
:type index: string
419+
"""
420+
return transport.get_search_index(index)
421+
422+
@retryable
423+
def list_search_indexes(self, transport):
424+
"""
425+
list_search_indexes(bucket)
426+
427+
Gets all search indexes and their schemas. Returns
428+
a blank list if none exist
429+
"""
430+
return transport.list_search_indexes()
431+
432+
@retryable
433+
def delete_search_index(self, transport, index):
434+
"""
435+
delete_search_index(index)
436+
437+
Delete the search index that matches the given name.
438+
439+
:param index: the name of the index to delete
440+
:type index: string
441+
"""
442+
return transport.delete_search_index(index)
443+
444+
@retryable
445+
def create_search_schema(self, transport, schema, content):
446+
"""
447+
create_search_schema(schema, content)
448+
449+
Creates a solr schema of the given name and content.
450+
Content must be valid solr schema xml.
451+
452+
:param schema: the name of the schema to create
453+
:type schema: string
454+
:param schema: the solr schema xml content
455+
:type schema: string
456+
"""
457+
return transport.create_search_schema(schema, content)
458+
459+
@retryable
460+
def get_search_schema(self, transport, schema):
461+
"""
462+
get_search_schema(schema)
463+
464+
Gets a search schema of the given name if it exists.
465+
Raises a RiakError if no such schema exists.
466+
467+
:param schema: the name of the schema to get
468+
:type schema: string
469+
"""
470+
return transport.get_search_schema(schema)
471+
393472
@retryable
394473
def fulltext_search(self, transport, index, query, **params):
395474
"""

riak/tests/test_all.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
from riak.test_server import TestServer
1919

20+
from riak.tests.test_yokozuna import YZSearchTests
2021
from riak.tests.test_search import SearchTests, \
2122
EnableSearchTests, SolrSearchTests
2223
from riak.tests.test_mapreduce import MapReduceAliasTests, \
@@ -64,15 +65,14 @@ def setUpModule():
6465
testrun_sibs_bucket = 'sibsbucket'
6566
c.bucket(testrun_sibs_bucket).allow_mult = True
6667

67-
if not int(os.environ.get('SKIP_SEARCH', '0')):
68+
if not int(os.environ.get('SKIP_SEARCH', '0')) and not int(os.environ.get('RUN_YZ', '1')):
6869
testrun_search_bucket = 'searchbucket'
6970
b = c.bucket(testrun_search_bucket)
7071
b.enable_search()
7172

72-
7373
def tearDownModule():
7474
c = RiakClient(protocol='http', host=HTTP_HOST, http_port=HTTP_PORT)
75-
if not int(os.environ.get('SKIP_SEARCH', '0')):
75+
if not int(os.environ.get('SKIP_SEARCH', '0')) and not int(os.environ.get('RUN_YZ', '1')):
7676
b = c.bucket(testrun_search_bucket)
7777
b.clear_properties()
7878
b = c.bucket(testrun_sibs_bucket)
@@ -216,6 +216,7 @@ class RiakPbcTransportTestCase(BasicKVTests,
216216
MapReduceStreamTests,
217217
EnableSearchTests,
218218
SearchTests,
219+
YZSearchTests,
219220
ClientTests,
220221
CounterTests,
221222
BaseTestCase,
@@ -229,6 +230,13 @@ def setUp(self):
229230
self.protocol = 'pbc'
230231
self.http_client = self.create_client(HTTP_HOST,
231232
http_port=HTTP_PORT)
233+
# Only supporting yokozuna via PBC
234+
if int(os.environ.get('RUN_YZ', '0')):
235+
testrun_yz_bucket = 'yztest'
236+
self.http_client.create_search_index(testrun_yz_bucket)
237+
b = self.http_client.bucket(testrun_yz_bucket)
238+
b.set_property('yz_index', testrun_yz_bucket)
239+
232240
super(RiakPbcTransportTestCase, self).setUp()
233241

234242
def test_uses_client_id_if_given(self):

riak/tests/test_search.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
else:
77
import unittest
88

9-
SKIP_SEARCH = int(os.environ.get('SKIP_SEARCH', '0'))
9+
SKIP_SEARCH = int(os.environ.get('SKIP_SEARCH', '0')) or int(os.environ.get('RUN_YZ', '1'))
1010

1111

1212
class EnableSearchTests(object):

riak/tests/test_yokozuna.py

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
# -*- coding: utf-8 -*-
2+
import os
3+
import platform
4+
import time
5+
if platform.python_version() < '2.7':
6+
unittest = __import__('unittest2')
7+
else:
8+
import unittest
9+
10+
RUN_YZ = int(os.environ.get('RUN_YZ', '0'))
11+
12+
13+
class YZSearchTests(object):
14+
@unittest.skipUnless(RUN_YZ, 'RUN_YZ is undefined')
15+
def test_yz_search_from_bucket(self):
16+
bucket = self.client.bucket('yztest')
17+
bucket.new("user", {"user_s": "Z"}).store()
18+
time.sleep(1)
19+
results = bucket.search("user_s:Z")
20+
self.assertEquals(1, len(results['docs']))
21+
# TODO: check that docs return useful info
22+
result = results['docs'][0]
23+
self.assertEquals(True, result.has_key('_yz_rk'))
24+
self.assertEquals(u'user', result['_yz_rk'])
25+
self.assertEquals(True, result.has_key('_yz_rb'))
26+
self.assertEquals(u'yztest', result['_yz_rb'])
27+
self.assertEquals(True, result.has_key('score'))
28+
self.assertEquals(True, result.has_key('user_s'))
29+
self.assertEquals(u'Z', result['user_s'])
30+
31+
@unittest.skipUnless(RUN_YZ, 'RUN_YZ is undefined')
32+
def test_yz_get_search_index(self):
33+
index = self.client.get_search_index('yztest')
34+
self.assertEquals('yztest', index['name'])
35+
self.assertEquals('_yz_default', index['schema'])
36+
self.assertRaises(Exception, self.client.get_search_index, 'NOTyztest')
37+
38+
@unittest.skipUnless(RUN_YZ, 'RUN_YZ is undefined')
39+
def test_yz_delete_search_index(self):
40+
testrun_yz_bucket = 'yztest'
41+
# expected to fail, since there's an attached bucket
42+
self.assertRaises(Exception,
43+
self.client.delete_search_index, testrun_yz_bucket)
44+
# detatch bucket from index then delete
45+
b = self.client.bucket(testrun_yz_bucket)
46+
b.set_property('yz_index', '')
47+
resp = self.client.delete_search_index(testrun_yz_bucket)
48+
self.assertEquals(True, resp)
49+
# create it again
50+
self.client.create_search_index(testrun_yz_bucket)
51+
b = self.client.bucket(testrun_yz_bucket)
52+
b.set_property('yz_index', testrun_yz_bucket)
53+
time.sleep(1) # wait for index to apply
54+
55+
@unittest.skipUnless(RUN_YZ, 'RUN_YZ is undefined')
56+
def test_yz_list_search_indexes(self):
57+
indexes = self.client.list_search_indexes()
58+
self.assertEquals(1, len(indexes))
59+
60+
@unittest.skipUnless(RUN_YZ, 'RUN_YZ is undefined')
61+
def test_yz_create_schema(self):
62+
content = """<?xml version="1.0" encoding="UTF-8" ?>
63+
<schema name="test" version="1.5">
64+
<fields>
65+
<field name="_yz_id" type="_yz_str" indexed="true" stored="true" required="true" />
66+
<field name="_yz_ed" type="_yz_str" indexed="true" stored="true"/>
67+
<field name="_yz_pn" type="_yz_str" indexed="true" stored="true"/>
68+
<field name="_yz_fpn" type="_yz_str" indexed="true" stored="true"/>
69+
<field name="_yz_vtag" type="_yz_str" indexed="true" stored="true"/>
70+
<field name="_yz_node" type="_yz_str" indexed="true" stored="true"/>
71+
<field name="_yz_rk" type="_yz_str" indexed="true" stored="true"/>
72+
<field name="_yz_rb" type="_yz_str" indexed="true" stored="true"/>
73+
</fields>
74+
<uniqueKey>_yz_id</uniqueKey>
75+
<types>
76+
<fieldType name="_yz_str" class="solr.StrField" sortMissingLast="true" />
77+
</types>
78+
</schema>"""
79+
schema_name = 'yzgoodschema'
80+
resp = self.client.create_search_schema(schema_name, content)
81+
self.assertEquals(True, resp)
82+
schema = self.client.get_search_schema(schema_name)
83+
self.assertEquals(schema_name, schema['name'])
84+
self.assertEquals(content, schema['content'])
85+
86+
@unittest.skipUnless(RUN_YZ, 'RUN_YZ is undefined')
87+
def test_yz_create_bad_schema(self):
88+
bad_content = """
89+
<derp nope nope, how do i computer?
90+
"""
91+
self.assertRaises(Exception, self.client.create_search_schema,
92+
'yzbadschema', bad_content)
93+
94+
95+
@unittest.skipUnless(RUN_YZ, 'RUN_YZ is undefined')
96+
def test_yz_search_queries(self):
97+
bucket = self.client.bucket('yztest')
98+
bucket.new("Z", {"username_s": "Z", "name_s": "ryan", "age_i":30}).store()
99+
bucket.new("R", {"username_s": "R", "name_s": "eric", "age_i":34}).store()
100+
bucket.new("F", {"username_s": "F", "name_s": "bryan fink", "age_i":32}).store()
101+
bucket.new("H", {"username_s": "H", "name_s": "brett", "age_i":14}).store()
102+
time.sleep(1)
103+
# multiterm
104+
results = bucket.search("username_s:(F OR H)")
105+
self.assertEquals(2, len(results['docs']))
106+
# boolean
107+
results = bucket.search("username_s:Z AND name_s:ryan")
108+
self.assertEquals(1, len(results['docs']))
109+
# range
110+
results = bucket.search("age_i:[30 TO 33]")
111+
self.assertEquals(2, len(results['docs']))
112+
# phrase
113+
results = bucket.search('name_s:"bryan fink"')
114+
self.assertEquals(1, len(results['docs']))
115+
# wildcard
116+
results = bucket.search('name_s:*ryan*')
117+
self.assertEquals(2, len(results['docs']))
118+
# regexp
119+
results = bucket.search('name_s:/br.*/')
120+
self.assertEquals(2, len(results['docs']))
121+
# Parameters:
122+
# limit
123+
results = bucket.search('username_s:*', rows=2)
124+
self.assertEquals(2, len(results['docs']))
125+
# sort
126+
results = bucket.search('username_s:*', sort="age_i asc")
127+
self.assertEquals(14, int(results['docs'][0]['age_i']))
128+
129+
@unittest.skipUnless(RUN_YZ, 'RUN_YZ is undefined')
130+
def test_yz_search_utf8(self):
131+
bucket = self.client.bucket('yztest')
132+
body = {"text_ja" : u"私はハイビスカスを食べるのが 大好き"}
133+
bucket.new("shift_jis", body).store()
134+
# TODO: fails due to lack of direct PB unicode support
135+
# results = bucket.search(u"text_ja:大好き")
136+
# self.assertEquals(1, len(results['docs']))

riak/transports/feature_detect.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@
2424
1: LooseVersion("1.0.0"),
2525
1.1: LooseVersion("1.1.0"),
2626
1.2: LooseVersion("1.2.0"),
27-
1.4: LooseVersion("1.4.0")
27+
1.4: LooseVersion("1.4.0"),
28+
2.0: LooseVersion("2.0.0")
2829
}
2930

3031

@@ -65,6 +66,14 @@ def pb_indexes(self):
6566
"""
6667
return self.server_version >= versions[1.2]
6768

69+
def pb_search_admin(self):
70+
"""
71+
Whether search administration is supported over Protocol Buffers
72+
73+
:rtype: bool
74+
"""
75+
return self.server_version >= versions[2.0]
76+
6877
def pb_search(self):
6978
"""
7079
Whether search queries are supported over Protocol Buffers

riak/transports/pbc/codec.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ def _invert(d):
4949

5050
NORMAL_PROPS = ['n_val', 'allow_mult', 'last_write_wins', 'old_vclock',
5151
'young_vclock', 'big_vclock', 'small_vclock', 'basic_quorum',
52-
'notfound_ok', 'search', 'backend']
52+
'notfound_ok', 'search', 'backend', 'yz_index']
5353
COMMIT_HOOK_PROPS = ['precommit', 'postcommit']
5454
MODFUN_PROPS = ['chash_keyfun', 'linkfun']
5555
QUORUM_PROPS = ['r', 'pr', 'w', 'pw', 'dw', 'rw']
@@ -412,3 +412,17 @@ def _encode_index_req(self, bucket, index, startkey, endkey=None,
412412
if continuation:
413413
req.continuation = continuation
414414
return req
415+
416+
def _decode_yz_index(self, index):
417+
"""
418+
Fills an RpbYokozunaIndex message with the appropriate data.
419+
420+
:param index: a yz index message
421+
:type index: riak_pb.RpbYokozunaIndex
422+
:rtype dict
423+
"""
424+
result = {}
425+
result['name'] = index.name
426+
if index.HasField('schema'):
427+
result['schema'] = index.schema
428+
return result

riak/transports/pbc/messages.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,13 @@
5555
MSG_CODE_COUNTER_UPDATE_RESP = 51
5656
MSG_CODE_COUNTER_GET_REQ = 52
5757
MSG_CODE_COUNTER_GET_RESP = 53
58+
MSG_CODE_YOKOZUNA_INDEX_GET_REQ = 54
59+
MSG_CODE_YOKOZUNA_INDEX_GET_RESP = 55
60+
MSG_CODE_YOKOZUNA_INDEX_PUT_REQ = 56
61+
MSG_CODE_YOKOZUNA_INDEX_DELETE_REQ = 57
62+
MSG_CODE_YOKOZUNA_SCHEMA_GET_REQ = 58
63+
MSG_CODE_YOKOZUNA_SCHEMA_GET_RESP = 59
64+
MSG_CODE_YOKOZUNA_SCHEMA_PUT_REQ = 60
5865

5966
# These responses don't include messages
6067
EMPTY_RESPONSES = [
@@ -101,6 +108,12 @@
101108
MSG_CODE_COUNTER_UPDATE_REQ: riak_pb.RpbCounterUpdateReq,
102109
MSG_CODE_COUNTER_UPDATE_RESP: riak_pb.RpbCounterUpdateResp,
103110
MSG_CODE_COUNTER_GET_REQ: riak_pb.RpbCounterGetReq,
104-
MSG_CODE_COUNTER_GET_RESP: riak_pb.RpbCounterGetResp
105-
111+
MSG_CODE_COUNTER_GET_RESP: riak_pb.RpbCounterGetResp,
112+
MSG_CODE_YOKOZUNA_INDEX_GET_REQ: riak_pb.RpbYokozunaIndexGetReq,
113+
MSG_CODE_YOKOZUNA_INDEX_GET_RESP: riak_pb.RpbYokozunaIndexGetResp,
114+
MSG_CODE_YOKOZUNA_INDEX_PUT_REQ: riak_pb.RpbYokozunaIndexPutReq,
115+
MSG_CODE_YOKOZUNA_INDEX_DELETE_REQ: riak_pb.RpbYokozunaIndexDeleteReq,
116+
MSG_CODE_YOKOZUNA_SCHEMA_GET_REQ: riak_pb.RpbYokozunaSchemaGetReq,
117+
MSG_CODE_YOKOZUNA_SCHEMA_GET_RESP: riak_pb.RpbYokozunaSchemaGetResp,
118+
MSG_CODE_YOKOZUNA_SCHEMA_PUT_REQ: riak_pb.RpbYokozunaSchemaPutReq
106119
}

0 commit comments

Comments
 (0)