Skip to content

Commit a8ae90a

Browse files
committed
add bulk import test
1 parent dd86d42 commit a8ae90a

1 file changed

Lines changed: 83 additions & 0 deletions

File tree

src/intTest/java/V1ClientTest.java

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,12 @@
77
import java.util.Iterator;
88
import java.util.Random;
99
import java.util.Set;
10+
import java.util.concurrent.CountDownLatch;
1011

1112
import com.authzed.api.v1.*;
1213
import com.authzed.grpcutil.BearerToken;
1314

15+
import io.grpc.stub.StreamObserver;
1416
import org.junit.Test;
1517

1618
import io.grpc.ManagedChannel;
@@ -174,6 +176,87 @@ private static String writeRelationship(PermissionsServiceGrpc.PermissionsServic
174176
return relResponse.getWrittenAt().getToken();
175177
}
176178

179+
180+
class BulkImportObserver implements StreamObserver<BulkImportRelationshipsResponse> {
181+
final CountDownLatch done = new CountDownLatch(1);
182+
private long loaded;
183+
184+
@Override
185+
public void onNext(BulkImportRelationshipsResponse resp) {
186+
loaded += resp.getNumLoaded();
187+
}
188+
189+
@Override
190+
public void onError(Throwable throwable) {
191+
// TODO need to capture error so that blocking callsite is able to access it
192+
System.out.println("onError");
193+
done.countDown();
194+
}
195+
196+
@Override
197+
public void onCompleted() {
198+
System.out.println("onCompleted");
199+
done.countDown();
200+
}
201+
202+
public void await() throws InterruptedException {
203+
done.await();
204+
}
205+
206+
public long loaded() {
207+
return loaded;
208+
}
209+
};
210+
211+
@Test
212+
public void testBulkImport() {
213+
214+
ManagedChannel channel = ManagedChannelBuilder.forTarget(target).usePlaintext().build();
215+
String token = generateToken();
216+
ExperimentalServiceGrpc.ExperimentalServiceStub experimentalService = ExperimentalServiceGrpc.
217+
newStub(channel)
218+
.withCallCredentials(new BearerToken(token));
219+
220+
BulkImportObserver responseObserver = new BulkImportObserver();
221+
writeTestSchema(token, channel);
222+
io.grpc.stub.StreamObserver<com.authzed.api.v1.BulkImportRelationshipsRequest>
223+
observer = experimentalService.bulkImportRelationships(responseObserver);
224+
225+
for (int i = 0; i < 10; i++) {
226+
BulkImportRelationshipsRequest req = BulkImportRelationshipsRequest.newBuilder()
227+
.addRelationships(relationship("test/article", "java_test_" + i, "author", "test/user", "george")).build();
228+
observer.onNext(req);
229+
}
230+
observer.onCompleted();
231+
232+
try {
233+
responseObserver.await();
234+
} catch (InterruptedException e) {
235+
throw new RuntimeException(e);
236+
}
237+
238+
assertEquals(10, responseObserver.loaded());
239+
}
240+
241+
private static Relationship relationship(String resourceType, String resourceID, String relation, String subjectType, String subjectID) {
242+
return Relationship.newBuilder()
243+
.setResource(
244+
ObjectReference.newBuilder()
245+
.setObjectType(resourceType)
246+
.setObjectId(resourceID)
247+
.build())
248+
.setRelation(relation)
249+
.setSubject(
250+
SubjectReference.newBuilder()
251+
.setObject(
252+
ObjectReference.newBuilder()
253+
.setObjectType(subjectType)
254+
.setObjectId(subjectID)
255+
.build())
256+
.build())
257+
.build();
258+
}
259+
177260
private static SchemaServiceGrpc.SchemaServiceBlockingStub writeTestSchema(String token, ManagedChannel channel) {
178261
SchemaServiceGrpc.SchemaServiceBlockingStub schemaService = SchemaServiceGrpc.newBlockingStub(channel)
179262
.withCallCredentials(new BearerToken(token));

0 commit comments

Comments
 (0)