このドキュメントは、このリポジトリで使っている Quarkus Redis の非同期 API を題材に、ReactiveRedisDataSource と Uni の基本的な使い方をまとめたものです。
対象コード:
このリポジトリの非同期 API は 2 層に分かれています。
- Service 層
- Redis へのアクセスを担当
ReactiveRedisDataSourceを使う- 戻り値は基本的に
Uni<T>
- Resource 層
- HTTP レスポンスへの変換を担当
Uni<T>をCompletionStage<Response>へ変換する
この分離が重要です。
理由:
- Redis の reactive API は Mutiny (
Uni) と相性が良い - HTTP 層では JAX-RS の契約に合わせて
CompletionStage<Response>を返せる - サービス層で早く
CompletionStageに落とさない方が、処理の合成とエラー伝播が素直になる
補足:
UniとCompletionStageの使い分けルールはdocs/reactive-return-types.mdにまとめています
TodoAsyncService.java では、コンストラクタで ReactiveRedisDataSource を受け取り、用途ごとの command object を取り出しています。
public TodoAsyncService(ReactiveRedisDataSource ds, Logger logger) {
this.todoCommands = ds.value(TodoTask.class);
this.keyCommands = ds.key();
this.logger = logger;
}ポイント:
ds.value(TodoTask.class)- 値型が
TodoTaskの key-value 操作を行う
- 値型が
ds.key()- key の列挙や削除を行う
この時点ではまだ Redis にアクセスしていません。実際のアクセスは get / set / del / keys を呼んだときに Uni として表現されます。
最も単純なパターンは、キー 1 つを非同期に取得するケースです。
対象:
TodoAsyncService.javaasyncTask
public Uni<TodoTask> asyncTask(String id) {
return todoCommands.get(id).onFailure().retry().atMost(5);
}ここでやっていること:
todoCommands.get(id)- Redis から値を読む
- 戻り値は
Uni<TodoTask>
onFailure().retry().atMost(5)- 失敗時に最大 5 回までリトライする
使いどころ:
- 一時的な接続失敗を吸収したいとき
注意:
- すべての失敗にリトライしてよいとは限らない
- 入力値不正や永続的な障害では、無条件リトライは逆効果になることがある
一覧取得は、Redis では「キー一覧を取る」処理と「各キーの値を取る」処理を組み合わせます。
対象:
TodoAsyncService.javatasks
public Uni<List<TodoTask>> tasks() {
return keyCommands
.keys("*")
.onItem()
.transform(keys -> keys.stream()
.filter(k -> k.matches("^\\d+$"))
.collect(Collectors.toList()))
.onItem()
.transformToMulti(keys -> Multi.createFrom().items(keys.stream()))
.onItem()
.transformToUniAndConcatenate(todoCommands::get)
.collect()
.asList()
.onItem()
.transform(tasks -> tasks.stream()
.sorted(Comparator.comparingInt(TodoTask::id))
.collect(Collectors.toList()));
}流れを分解するとこうなります。
keyCommands.keys("*")戻り値:
Uni<List<String>>
.onItem().transform(keys -> ...)このリポジトリでは数値 ID のキーだけを Todo として扱っているため、正規表現で絞っています。
.transformToMulti(keys -> Multi.createFrom().items(keys.stream()))なぜ必要か:
List<String>の各キーに対して非同期getを 1 件ずつ適用したいから
.transformToUniAndConcatenate(todoCommands::get)ここが重要です。
transformToUniAndConcatenate- 順序を保ちやすい
- 1 件ずつ連結して処理する
transformToUniAndMerge- 並列性は上がるが、返却順が不安定になりやすい
このリポジトリでは API 応答を安定させるために Concatenate を使い、最後に ID 昇順で明示的にソートしています。
.collect().asList()Multi<TodoTask> を Uni<List<TodoTask>> に戻しています。
Redis に値を書き込んで、そのまま作成済みオブジェクトを返す例です。
対象:
TodoAsyncService.javacreate
public Uni<TodoTask> create(TodoTask task) {
return nextId().onItem()
.transform(nextId -> new TodoTask(nextId, task.title(), task.isCompleted()))
.onItem()
.transformToUni(newTask ->
todoCommands.set(newTask.id().toString(), newTask)
.replaceWith(newTask));
}ポイント:
nextId()はUni<Integer>transform(...)- ID をもとに
TodoTaskを組み立てる
- ID をもとに
transformToUni(...)- 書き込み処理そのものが
Uniを返すため、flat map 的に接続する
- 書き込み処理そのものが
replaceWith(newTask)setの戻り値ではなく、作成済みのTodoTaskを返す
重要:
- 書き込んだあとに同じキーを再度
getしなくても、通常はnewTaskを返せば十分 - 不要な read-after-write は Redis 往復を増やすので避ける
更新では「存在確認」と「書き込み」をつなげます。
対象:
TodoAsyncService.javaupdateAsync
public Uni<TodoTask> updateAsync(Integer id, TodoTask task) {
return asyncTask(id.toString())
.onItem()
.transformToUni(currentTask -> {
if (currentTask == null) {
return Uni.createFrom().nullItem();
}
TodoTask updatedTask = new TodoTask(id, task.title(), task.isCompleted());
return todoCommands.set(id.toString(), updatedTask)
.onItem()
.transform(v -> updatedTask);
});
}ポイント:
- まず既存データを読む
- 存在しないなら
nullItem()を返す - 存在するなら
setして更新済みオブジェクトを返す
設計上の意味:
- Service 層では HTTP ステータスを知らない
- 「見つからなかった」という事実だけを
nullで表現し、HTTP 404 への変換は Resource 層に任せる
Redis の DEL は削除件数を返します。
対象:
TodoAsyncService.javadelete
public Uni<Boolean> delete(Integer id) {
return keyCommands.del(id.toString()).map(l -> l == 1L);
}ポイント:
delの戻り値は件数map(l -> l == 1L)- 「1 件削除できたか」を boolean に変換している
この形だと Resource 層で:
trueなら 204falseなら 404
と素直に変換できます。
Service 層の Uni<T> を HTTP レスポンスへ変えるのが Resource 層です。
対象:
public CompletionStage<Response> keys() {
return service.tasks()
.onItem()
.transform(list -> Response.ok(list).build())
.subscribeAsCompletionStage();
}ここで初めて subscribeAsCompletionStage() を呼びます。
意図:
- Service 層は
Uni - Resource 層の外向き契約は
CompletionStage<Response>
public CompletionStage<Response> detail(Integer id) {
return service.asyncTask(id.toString())
.onItem()
.transform(todoTask -> {
if (todoTask == null) {
return Response.status(Response.Status.NOT_FOUND).build();
}
return Response.ok(todoTask).build();
})
.subscribeAsCompletionStage();
}ポイント:
nullを 404 へ変換しているのは Resource 層
post / put / delete も同じ考え方です。
- Service 層で Redis 操作を完了させる
- Resource 層で HTTP ステータスへ変換する
- 最後に
subscribeAsCompletionStage()
避けたい例:
todoCommands.get(id).subscribeAsCompletionStage()問題:
- 以後の処理が Mutiny 演算子で書きづらくなる
UniとCompletionStageが同じ層で混ざる
避けたい例:
subscribeAsCompletionStage().toCompletableFuture().get()問題:
- ブロッキングになる
- reactive 設計を壊す
transformToUniAndMerge は便利ですが、返却順が不安定になりやすいです。
対策:
transformToUniAndConcatenateを使う- もしくは最後に明示ソートする
このリポジトリでは両方を使って順序を安定化しています。
- Redis reactive API を使う service メソッドは
Uni<T>を返しているか subscribeAsCompletionStage()は Resource 層だけで呼んでいるか.get()で待っていないか- 一覧取得の順序が API として安定しているか
set後に不要なgetをしていないかnullや boolean を HTTP ステータスへ変換する責務が Resource 層にあるか
最後に、このリポジトリの考え方を最小化すると次の 2 行に集約されます。
Uni<TodoTask> task = todoCommands.get(id);return task.onItem().transform(t -> Response.ok(t).build()).subscribeAsCompletionStage();まずはこの分離を守ると、Redis 非同期 API のコードはかなり読みやすくなります。