Skip to content

Commit b5a03bb

Browse files
committed
CBQE-4553: P0s for XDCR compression - part I
Change-Id: I55b1b19d644e7ae2a990f47f82d85ab822ccad2f Reviewed-on: http://review.couchbase.org/89712 Tested-by: Arunkumar Senthilnathan <arun.couchbase@yahoo.com> Reviewed-by: Thuan Nguyen <soccon@gmail.com>
1 parent 716be0b commit b5a03bb

2 files changed

Lines changed: 109 additions & 2 deletions

File tree

conf/py-xdcr-compression.conf

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,10 @@ xdcr.compressionXDCR.compression:
22
test_compression_with_unixdcr_incr_load,items=10000,rdirection=unidirection,ctopology=chain,update=C1,delete=C1,default_bucket=False,standard_buckets=2,replication_type=xmem,GROUP=P0
33
test_compression_with_unixdcr_backfill_load,items=10000,rdirection=unidirection,ctopology=chain,update=C1,delete=C1,default_bucket=False,standard_buckets=2,replication_type=xmem,GROUP=P0
44
test_compression_with_bixdcr_incr_load,items=10000,rdirection=bidirection,ctopology=chain,update=C1-C2,delete=C1-C2,default_bucket=False,standard_buckets=2,replication_type=xmem,GROUP=P0
5-
test_compression_with_bixdcr_backfill_load,items=10000,rdirection=bidirection,ctopology=chain,update=C1-C2,delete=C1-C2,default_bucket=False,standard_buckets=2,replication_type=xmem,GROUP=P0
5+
test_compression_with_bixdcr_backfill_load,items=10000,rdirection=bidirection,ctopology=chain,update=C1-C2,delete=C1-C2,default_bucket=False,standard_buckets=2,replication_type=xmem,GROUP=P0
6+
test_compression_with_pause_resume,items=10000,rdirection=unidirection,ctopology=chain,update=C1,delete=C1,default_bucket=False,standard_buckets=2,replication_type=xmem,GROUP=P0
7+
test_compression_with_optimistic_threshold_change,items=10000,rdirection=unidirection,ctopology=chain,update=C1,delete=C1,default_bucket=False,standard_buckets=2,replication_type=xmem,optimistic_threshold=200,value_size=100,GROUP=P0
8+
test_compression_with_optimistic_threshold_change,items=10000,rdirection=unidirection,ctopology=chain,update=C1,delete=C1,default_bucket=False,standard_buckets=2,replication_type=xmem,optimistic_threshold=200,value_size=300,GROUP=P0
9+
test_compression_with_advanced_settings,batch_count=100,batch_size=4096,source_nozzle=5,target_nozzle=5,items=10000,rdirection=unidirection,ctopology=chain,update=C1,delete=C1,default_bucket=False,standard_buckets=2,replication_type=xmem,GROUP=P0
10+
test_compression_with_capi,items=1000,rdirection=unidirection,replication_type=capi,GROUP=P0
11+
test_compression_with_unixdcr_incr_load,items=10000,rdirection=unidirection,ctopology=chain,update=C1,delete=C1,default_bucket=False,standard_buckets=2,replication_type=xmem,standard_bucket_1@C1=filter_expression:abcd,standard_bucket_2@C1=filter_expression:abcd,GROUP=P0

pytests/xdcr/compressionXDCR.py

Lines changed: 102 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from xdcrnewbasetests import XDCRNewBaseTest, NodeHelper
2-
from remote.remote_util import RemoteMachineShellConnection
2+
from remote.remote_util import RemoteMachineShellConnection, RestConnection
33
from couchbase_helper.documentgenerator import BlobGenerator
44
import json
55

@@ -26,6 +26,7 @@ def _set_compression_type(self, cluster, bucket_name, compression_type="None"):
2626
output, error = shell.execute_command(command)
2727
shell.log_command_output(output, error)
2828
shell.disconnect()
29+
return output, error
2930

3031
def _verify_compression(self, cluster, compr_bucket_name="", uncompr_bucket_name="", compression_type="None"):
3132
repls = cluster.get_remote_clusters()[0].get_replications()
@@ -162,3 +163,103 @@ def test_compression_with_bixdcr_backfill_load(self):
162163
uncompr_bucket_name="standard_bucket_2",
163164
compression_type=compression_type)
164165
self.verify_results()
166+
167+
def test_compression_with_pause_resume(self):
168+
repeat = self._input.param("repeat", 5)
169+
self.setup_xdcr()
170+
self.sleep(60)
171+
compression_type = self._input.param("compression_type", "Snappy")
172+
self._set_compression_type(self.src_cluster, "standard_bucket_1", compression_type)
173+
174+
gen_create = BlobGenerator('comprOne-', 'comprOne-', self._value_size, end=self._num_items)
175+
self.src_cluster.load_all_buckets_from_generator(kv_gen=gen_create)
176+
gen_create = BlobGenerator('comprTwo-', 'comprTwo-', self._value_size, end=self._num_items)
177+
self.dest_cluster.load_all_buckets_from_generator(kv_gen=gen_create)
178+
179+
self.async_perform_update_delete()
180+
181+
for i in range(0, repeat):
182+
self.src_cluster.pause_all_replications()
183+
self.sleep(30)
184+
self.src_cluster.resume_all_replications()
185+
186+
self._verify_compression(cluster=self.src_cluster,
187+
compr_bucket_name="standard_bucket_1",
188+
uncompr_bucket_name="standard_bucket_2",
189+
compression_type=compression_type)
190+
self.verify_results()
191+
192+
def test_compression_with_optimistic_threshold_change(self):
193+
self.setup_xdcr()
194+
self.sleep(60)
195+
compression_type = self._input.param("compression_type", "Snappy")
196+
self._set_compression_type(self.src_cluster, "standard_bucket_1", compression_type)
197+
198+
src_conn = RestConnection(self.src_cluster.get_master_node())
199+
src_conn.set_xdcr_param('standard_bucket_1', 'standard_bucket_1', 'optimisticReplicationThreshold',
200+
self._optimistic_threshold)
201+
src_conn.set_xdcr_param('standard_bucket_2', 'standard_bucket_2', 'optimisticReplicationThreshold',
202+
self._optimistic_threshold)
203+
204+
self.src_cluster.pause_all_replications()
205+
206+
gen_create = BlobGenerator('comprOne-', 'comprOne-', self._value_size, end=self._num_items)
207+
self.src_cluster.load_all_buckets_from_generator(kv_gen=gen_create)
208+
gen_create = BlobGenerator('comprTwo-', 'comprTwo-', self._value_size, end=self._num_items)
209+
self.dest_cluster.load_all_buckets_from_generator(kv_gen=gen_create)
210+
211+
self.src_cluster.resume_all_replications()
212+
213+
self.perform_update_delete()
214+
215+
self._verify_compression(cluster=self.src_cluster,
216+
compr_bucket_name="standard_bucket_1",
217+
uncompr_bucket_name="standard_bucket_2",
218+
compression_type=compression_type)
219+
self.verify_results()
220+
221+
def test_compression_with_advanced_settings(self):
222+
batch_count = self._input.param("batch_count", 10)
223+
batch_size = self._input.param("batch_size", 2048)
224+
source_nozzle = self._input.param("source_nozzle", 2)
225+
target_nozzle = self._input.param("target_nozzle", 2)
226+
227+
self.setup_xdcr()
228+
self.sleep(60)
229+
compression_type = self._input.param("compression_type", "Snappy")
230+
self._set_compression_type(self.src_cluster, "standard_bucket_1", compression_type)
231+
232+
src_conn = RestConnection(self.src_cluster.get_master_node())
233+
src_conn.set_xdcr_param('standard_bucket_1', 'standard_bucket_1', 'workerBatchSize', batch_count)
234+
src_conn.set_xdcr_param('standard_bucket_1', 'standard_bucket_1', 'docBatchSizeKb', batch_size)
235+
src_conn.set_xdcr_param('standard_bucket_1', 'standard_bucket_1', 'sourceNozzlePerNode', source_nozzle)
236+
src_conn.set_xdcr_param('standard_bucket_1', 'standard_bucket_1', 'targetNozzlePerNode', target_nozzle)
237+
src_conn.set_xdcr_param('standard_bucket_2', 'standard_bucket_1', 'workerBatchSize', batch_count)
238+
src_conn.set_xdcr_param('standard_bucket_2', 'standard_bucket_1', 'docBatchSizeKb', batch_size)
239+
src_conn.set_xdcr_param('standard_bucket_2', 'standard_bucket_1', 'sourceNozzlePerNode', source_nozzle)
240+
src_conn.set_xdcr_param('standard_bucket_2', 'standard_bucket_1', 'targetNozzlePerNode', target_nozzle)
241+
242+
self.src_cluster.pause_all_replications()
243+
244+
gen_create = BlobGenerator('comprOne-', 'comprOne-', self._value_size, end=self._num_items)
245+
self.src_cluster.load_all_buckets_from_generator(kv_gen=gen_create)
246+
gen_create = BlobGenerator('comprTwo-', 'comprTwo-', self._value_size, end=self._num_items)
247+
self.dest_cluster.load_all_buckets_from_generator(kv_gen=gen_create)
248+
249+
self.src_cluster.resume_all_replications()
250+
251+
self.perform_update_delete()
252+
253+
self._verify_compression(cluster=self.src_cluster,
254+
compr_bucket_name="standard_bucket_1",
255+
uncompr_bucket_name="standard_bucket_2",
256+
compression_type=compression_type)
257+
self.verify_results()
258+
259+
def test_compression_with_capi(self):
260+
self.setup_xdcr()
261+
self.sleep(60)
262+
compression_type = self._input.param("compression_type", "Snappy")
263+
output, error = self._set_compression_type(self.src_cluster, "default", compression_type)
264+
self.assertTrue("The value can not be specified for CAPI replication" in output[0], "Compression enabled for CAPI")
265+
self.log.info("Compression not enabled for CAPI as expected")

0 commit comments

Comments
 (0)