Skip to content
This repository was archived by the owner on Jan 27, 2023. It is now read-only.

Commit 5185471

Browse files
authored
Merge pull request #25 from cipherstash/put-stream-bi-direction
Update put stream to handle bi directional stream
2 parents e13b018 + 9f98496 commit 5185471

2 files changed

Lines changed: 17 additions & 8 deletions

File tree

cipherstash-client.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ Gem::Specification.new do |s|
3232
s.add_runtime_dependency "aws-sdk-core", "~> 3.0"
3333
s.add_runtime_dependency "aws-sdk-kms", "~> 1.0"
3434
s.add_runtime_dependency 'cbor', '~> 0.5.9.6'
35-
s.add_runtime_dependency "cipherstash-grpc", "= 0.20220801.1"
35+
s.add_runtime_dependency "cipherstash-grpc", "= 0.20220928.0"
3636
s.add_runtime_dependency "enveloperb", "~> 0.0"
3737
s.add_runtime_dependency "launchy", "~> 2.5"
3838
s.add_runtime_dependency "ore-rs", "~> 0.0"

lib/cipherstash/client/rpc.rb

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -270,17 +270,26 @@ def put_stream(collection, records)
270270
# isn't the most idiomatic code on earth...
271271
requests = [[begin_request], records].lazy.flat_map { |x| x }
272272

273-
res = @metrics.measure_rpc_call("putStream") do
274-
stub.put_stream(requests, metadata: rpc_headers)
275-
end
273+
num_inserted = 0
274+
@logger.debug("Cipherstash::Client::RPC#put_stream") { "Start streaming upsert of documents..." }
275+
276+
@metrics.measure_rpc_call("putStream") do
277+
stub.put_stream_bi_direction(requests, metadata: rpc_headers).each do |res|
278+
unless res.is_a?(Documents::StreamingPutReply)
279+
raise Error::StreamingPutFailure, "expected Documents::StreamingPutReply response, got #{res.class} instead"
280+
end
281+
282+
raise_if_error(res)
276283

277-
unless res.is_a?(Documents::StreamingPutReply)
278-
raise Error::StreamingPutFailure, "expected Documents::StreamingPutReply response, got #{res.class} instead"
284+
@logger.debug("Cipherstash::Client::RPC#put_stream") { "#{res.numInserted} records inserted." }
285+
286+
num_inserted = res.numInserted
287+
end
279288
end
280289

281-
raise_if_error(res)
290+
@logger.debug("Cipherstash::Client::RPC#put_stream") { "Streaming upsert complete. Number of records inserted: #{res.numInserted}" }
282291

283-
res.numInserted
292+
num_inserted
284293
rescue ::GRPC::NotFound
285294
raise Error::RecordPutFailure, "Collection '#{collection.name}' not found"
286295
rescue ::GRPC::InvalidArgument => ex

0 commit comments

Comments
 (0)