Skip to content
Open
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
0fca897
add shumaich proto
aenglisc Nov 25, 2020
b65b2f1
add shumaich proto to app.src
aenglisc Nov 25, 2020
49da102
update compose file
aenglisc Nov 25, 2020
b18d264
add bender client
aenglisc Nov 25, 2020
bd7f022
add bender client to app.src
aenglisc Nov 25, 2020
bca20aa
add gen_sequence to utils
aenglisc Nov 25, 2020
e46bb17
add new accounting module
aenglisc Nov 25, 2020
7b6fd9c
fix thrift compiler version
aenglisc Nov 25, 2020
d88626a
add shumaich service handling
aenglisc Nov 26, 2020
c92dc7f
update new accounter
aenglisc Nov 29, 2020
21539f0
update hg_proto
aenglisc Nov 29, 2020
784168d
update account balances test
aenglisc Nov 30, 2020
aaba872
fix format
aenglisc Nov 30, 2020
8562e08
fix xref
aenglisc Nov 30, 2020
fef9a7b
some fixes
aenglisc Nov 30, 2020
b46924d
type fix
aenglisc Nov 30, 2020
27b264f
type fix
aenglisc Nov 30, 2020
1e9f9e6
Merge remote-tracking branch 'origin/master' into HG-544/ft/shumaich
kehitt Jun 23, 2021
186fb65
fix format
kehitt Jun 23, 2021
a6b0dc3
accounter retries via hg_retry
kehitt Jul 12, 2021
6f9b7ae
fix test
kehitt Jul 12, 2021
cb86a3c
fix clock decoding, relax retry strategy
kehitt Jul 12, 2021
0c9806a
fix balance consistency
kehitt Jul 12, 2021
0c66e4c
relax default retry strategy
kehitt Jul 14, 2021
42831ae
tighter default retry strategy
kehitt Jul 15, 2021
4e65175
fail machine (somewhat) properly when out of retries to accounter
kehitt Sep 6, 2021
e1daeb5
fix plan function, remove duplicate code
kehitt Sep 10, 2021
cb7c878
fix types
kehitt Sep 10, 2021
8b2b9fa
only retry sometimes
kehitt Sep 14, 2021
a821836
change hg_accounting_new api
kehitt Sep 20, 2021
230cae8
work around a shumway bug
kehitt Sep 21, 2021
2ffa288
add some logging
kehitt Sep 21, 2021
8a1e73c
fix types
kehitt Sep 22, 2021
42efffa
another one
kehitt Sep 22, 2021
8d70d14
update shumaich to fix the Hold bug
kehitt Sep 24, 2021
55eca2f
bump proto
kehitt Sep 28, 2021
91f62ac
bump shumway in tests
kehitt Sep 29, 2021
b9298a4
some review fixes
kehitt Oct 5, 2021
bc9deda
fixes for review fixes
kehitt Oct 5, 2021
a8afcf9
fix docker-compose
kehitt Oct 5, 2021
d74d2d1
remove constant container names
kehitt Oct 6, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions apps/hellgate/src/hellgate.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@
fault_detector_proto,
hg_proto,
shumpune_proto,
shumaich_proto,
cowboy,
how_are_you, % must be after ranch and before any woody usage
woody,
scoper, % should be before any scoper event handler usage
gproc,
dmt_client,
party_client,
bender_client,
woody_user_identity,
payproc_errors,
erl_health,
Expand Down
336 changes: 336 additions & 0 deletions apps/hellgate/src/hg_accounting_new.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,336 @@
%%% Accounting for shumaich
%%%
%%% TODO
%%% - Brittle posting id assignment, it should be a level upper, maybe even in
%%% `hg_cashflow`.
%%% - Stuff cash flow details in the posting description fields.

-module(hg_accounting_new).

-export([get_account/1]).
-export([get_account/2]).

-export([get_balance/1]).
-export([get_balance/2]).

-export([create_account/1]).

-export([collect_account_map/6]).
-export([collect_merchant_account_map/2]).
-export([collect_provider_account_map/3]).
-export([collect_system_account_map/4]).
-export([collect_external_account_map/4]).
Comment thread
keynslug marked this conversation as resolved.

-export([hold/3]).
-export([hold/4]).

-export([plan/3]).
-export([plan/4]).

-export([commit/3]).
-export([commit/4]).

-export([rollback/3]).
-export([rollback/4]).

-include_lib("damsel/include/dmsl_payment_processing_thrift.hrl").
-include_lib("damsel/include/dmsl_domain_thrift.hrl").
-include_lib("shumaich_proto/include/shumaich_shumaich_thrift.hrl").

-type amount() :: dmsl_domain_thrift:'Amount'().
-type currency_code() :: dmsl_domain_thrift:'CurrencySymbolicCode'().
-type account_id() :: dmsl_accounter_thrift:'AccountID'().
-type plan_id() :: dmsl_accounter_thrift:'PlanID'().
-type batch_id() :: dmsl_accounter_thrift:'BatchID'().
-type final_cash_flow() :: dmsl_domain_thrift:'FinalCashFlow'().
-type batch() :: {batch_id(), final_cash_flow()}.
-type clock() :: dmsl_domain_thrift:'AccounterClock'().

-type payment() :: dmsl_domain_thrift:'InvoicePayment'().
-type shop() :: dmsl_domain_thrift:'Shop'().
-type payment_institution() :: dmsl_domain_thrift:'PaymentInstitution'().
-type provider() :: dmsl_domain_thrift:'Provider'().
-type varset() :: pm_selector:varset().
-type revision() :: hg_domain:revision().

-export_type([batch/0]).

-type account() :: #{
account_id => account_id(),
currency_code => currency_code()
}.

-type balance() :: #{
account_id => account_id(),
own_amount => amount(),
min_available_amount => amount(),
max_available_amount => amount(),
clock => clock()
}.

-define(DEFAULT_RETRY_STRATEGY, {exponential, 10, 1.1, 100}).

-spec get_account(account_id()) -> account().
get_account(AccountID) ->
get_account(AccountID, undefined).

-spec get_account(account_id(), undefined | clock()) -> account().
get_account(AccountID, Clock) ->
case call_accounter('GetAccountByID', {AccountID, to_accounter_clock(Clock)}) of
{ok, Result} ->
construct_account(AccountID, Result);
{exception, #shumaich_AccountNotFound{}} ->
hg_woody_wrapper:raise(#payproc_AccountNotFound{})
end.

-spec get_balance(account_id()) -> balance().
get_balance(AccountID) ->
get_balance(AccountID, undefined).

-spec get_balance(account_id(), undefined | clock()) -> balance().
get_balance(AccountID, Clock) ->
case call_accounter('GetBalanceByID', {AccountID, to_accounter_clock(Clock)}) of
{ok, Result} ->
construct_balance(AccountID, Result);
{exception, #shumaich_AccountNotFound{}} ->
hg_woody_wrapper:raise(#payproc_AccountNotFound{})
end.

-spec create_account(currency_code()) -> account_id().
create_account(_CurrencyCode) ->
WoodyCtx = hg_context:get_woody_context(hg_context:load()),
% FIXME: placeholder, the sequence id should probably be passed externally
% not sure about the minimum too
Comment thread
keynslug marked this conversation as resolved.
Outdated
hg_utils:gen_sequence(<<"create_shumaich_account">>, WoodyCtx, #{minimum => 10000}).

-spec collect_account_map(payment(), shop(), payment_institution(), provider(), varset(), revision()) -> map().
collect_account_map(Payment, Shop, PaymentInstitution, Provider, VS, Revision) ->
Map0 = collect_merchant_account_map(Shop, #{}),
Map1 = collect_provider_account_map(Payment, Provider, Map0),
Map2 = collect_system_account_map(Payment, PaymentInstitution, Revision, Map1),
collect_external_account_map(Payment, VS, Revision, Map2).

-spec collect_merchant_account_map(shop(), map()) -> map().
collect_merchant_account_map(#domain_Shop{account = MerchantAccount}, Acc) ->
Acc#{
{merchant, settlement} => MerchantAccount#domain_ShopAccount.settlement,
{merchant, guarantee} => MerchantAccount#domain_ShopAccount.guarantee
}.

-spec collect_provider_account_map(payment(), provider(), map()) -> map().
collect_provider_account_map(Payment, #domain_Provider{accounts = ProviderAccounts}, Acc) ->
Currency = get_currency(get_payment_cost(Payment)),
ProviderAccount = hg_payment_institution:choose_provider_account(Currency, ProviderAccounts),
Acc#{
{provider, settlement} => ProviderAccount#domain_ProviderAccount.settlement
}.

-spec collect_system_account_map(payment(), payment_institution(), revision(), map()) -> map().
collect_system_account_map(Payment, PaymentInstitution, Revision, Acc) ->
Currency = get_currency(get_payment_cost(Payment)),
SystemAccount = hg_payment_institution:get_system_account(Currency, Revision, PaymentInstitution),
Acc#{
{system, settlement} => SystemAccount#domain_SystemAccount.settlement,
{system, subagent} => SystemAccount#domain_SystemAccount.subagent
}.

-spec collect_external_account_map(payment(), varset(), revision(), map()) -> map().
collect_external_account_map(Payment, VS, Revision, Acc) ->
Currency = get_currency(get_payment_cost(Payment)),
case hg_payment_institution:choose_external_account(Currency, VS, Revision) of
#domain_ExternalAccount{income = Income, outcome = Outcome} ->
Acc#{
{external, income} => Income,
{external, outcome} => Outcome
};
undefined ->
Acc
end.

%%
-spec plan(plan_id(), [batch()], hg_datetime:timestamp()) -> clock().
plan(_PlanID, [], _Timestamp) ->
error(badarg);
plan(_PlanID, Batches, _Timestamp) when not is_list(Batches) ->
error(badarg);
plan(PlanID, Batches, Timestamp) ->
lists:foldl(
fun(Batch, _) -> hold(PlanID, Batch, Timestamp) end,
Comment thread
keynslug marked this conversation as resolved.
Outdated
undefined,
Batches
).

-spec plan(plan_id(), [batch()], hg_datetime:timestamp(), clock()) -> clock().
plan(_PlanID, [], _Timestamp, _Clock) ->
error(badarg);
plan(_PlanID, Batches, _Timestamp, _Clock) when not is_list(Batches) ->
error(badarg);
plan(PlanID, Batches, Timestamp, Clock) ->
lists:foldl(
fun(Batch, _) -> hold(PlanID, Batch, Timestamp, Clock) end,
undefined,
Batches
).

-spec hold(plan_id(), batch(), hg_datetime:timestamp()) -> clock().
hold(PlanID, Batch, Timestamp) ->
Comment thread
keynslug marked this conversation as resolved.
Outdated
do('Hold', construct_plan_change(PlanID, Batch, Timestamp)).

-spec hold(plan_id(), batch(), hg_datetime:timestamp(), clock()) -> clock().
hold(PlanID, Batches, Timestamp, Clock) ->
AccounterClock = to_accounter_clock(Clock),
do('Hold', construct_plan_change(PlanID, Batches, Timestamp), AccounterClock).

-spec commit(plan_id(), [batch()], hg_datetime:timestamp()) -> clock().
commit(PlanID, Batches, Timestamp) ->
do('CommitPlan', construct_plan(PlanID, Batches, Timestamp)).

-spec commit(plan_id(), [batch()], hg_datetime:timestamp(), clock()) -> clock().
commit(PlanID, Batches, Timestamp, Clock) ->
AccounterClock = to_accounter_clock(Clock),
do('CommitPlan', construct_plan(PlanID, Batches, Timestamp), AccounterClock).

-spec rollback(plan_id(), [batch()], hg_datetime:timestamp()) -> clock().
rollback(PlanID, Batches, Timestamp) ->
do('RollbackPlan', construct_plan(PlanID, Batches, Timestamp)).

-spec rollback(plan_id(), [batch()], hg_datetime:timestamp(), clock()) -> clock().
rollback(PlanID, Batches, Timestamp, Clock) ->
AccounterClock = to_accounter_clock(Clock),
do('RollbackPlan', construct_plan(PlanID, Batches, Timestamp), AccounterClock).

do(Op, Plan) ->
do(Op, Plan, {latest, #shumaich_LatestClock{}}).

do(Op, Plan, PreviousClock) ->
case call_accounter(Op, {Plan, PreviousClock}) of
{ok, Clock} ->
to_domain_clock(Clock);
{exception, Exception} ->
% FIXME
error({accounting, Exception})
end.

construct_plan_change(PlanID, {BatchID, Cashflow}, Timestamp) ->
#shumaich_PostingPlanChange{
id = PlanID,
creation_time = Timestamp,
batch = #shumaich_PostingBatch{
id = BatchID,
postings = collect_postings(Cashflow)
}
}.

construct_plan(PlanID, Batches, Timestamp) ->
#shumaich_PostingPlan{
id = PlanID,
creation_time = Timestamp,
batch_list = [
#shumaich_PostingBatch{
id = BatchID,
postings = collect_postings(Cashflow)
}
|| {BatchID, Cashflow} <- Batches
]
}.

collect_postings(Cashflow) ->
[
#shumaich_Posting{
from_account = #shumaich_Account{id = Source, currency_symbolic_code = CurrencyCode},
to_account = #shumaich_Account{id = Destination, currency_symbolic_code = CurrencyCode},
amount = Amount,
currency_symbolic_code = CurrencyCode,
description = construct_posting_description(Details)
}
|| #domain_FinalCashFlowPosting{
source = #domain_FinalCashFlowAccount{account_id = Source},
destination = #domain_FinalCashFlowAccount{account_id = Destination},
details = Details,
volume = #domain_Cash{
amount = Amount,
currency = #domain_CurrencyRef{symbolic_code = CurrencyCode}
}
} <- Cashflow
].

construct_posting_description(Details) when is_binary(Details) ->
Details;
construct_posting_description(undefined) ->
<<>>.

%%

construct_account(
AccountID,
#shumaich_Account{
currency_symbolic_code = CurrencyCode
}
) ->
#{
account_id => AccountID,
currency_code => CurrencyCode
}.

construct_balance(
AccountID,
#shumaich_Balance{
own_amount = OwnAmount,
min_available_amount = MinAvailableAmount,
max_available_amount = MaxAvailableAmount,
clock = Clock
}
) ->
genlib_map:compact(#{
account_id => AccountID,
own_amount => OwnAmount,
min_available_amount => MinAvailableAmount,
max_available_amount => MaxAvailableAmount,
clock => to_domain_clock(Clock)
}).

%%

call_accounter(Function, Args) ->
%% Really not sure what to best do when we run out of retries
try
call_with_retry(Function, Args)
catch
throw:{error, no_more_retries} ->
error({accounter, not_ready})
end.

call_with_retry(Function, Args) ->
hg_retry:call_with_retry(
fun() ->
case hg_woody_wrapper:call(accounter_new, Function, Args) of
{ok, _} = Ok ->
{return, Ok};
{exception, #shumaich_NotReady{}} ->
retry;
{exception, _} = Exception ->
{return, Exception}
end
end,
get_retry_strategy(Function)
).
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Не уверен конечно, что это рабочий вариант. Если конечно ш̙͍̯͒̅͊а̜̥͙̜͔̣͙͗ͥ͑̍̐̆̓м͖аи̩̜̓ͪӵ̻̠̯͚̩̣̺́̅̓̏̎̍̚ не даёт гарантий на upper bound по времени обработки изменений своего состояния, в чём я немного сомневаюсь. Быть может это не единственная конечно стратегия работы с NotReady?

В моём представлении везде, где идёт взаимодействие с ш̙͍̯͒̅͊а̜̥͙̜͔̣͙͗ͥ͑̍̐̆̓м͖аи̩̜̓ͪӵ̻̠̯͚̩̣̺́̅̓̏̎̍̚ем, и надо получить баланс, нам придётся сначала сохранять clock в mg, а потом уже долбить ш̙͍̯͒̅͊а̜̥͙̜͔̣͙͗ͥ͑̍̐̆̓м͖аи̩̜̓ͪӵ̻̠̯͚̩̣̺́̅̓̏̎̍̚ запросами. Опять же, быть может ты это тоже понимаешь.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Я честно говоря и не вижу теперь других стратегий, подумав ещё. Либо ретраить запросы тут, используя потенциально бесконечную стратегию, либо городить какой-то сложный механизм (особенно если учесть твои замечания по функции plan), который будет перед каждым походом в аккаунтер сохранять ивент вроде waiting_accounter, и только после успешного сохранять *_clock_updated (и так n-раз, в случае с plan), но мне не кажется что преимущества этого подхода перевешивают трудоемкость его реализации (учитывая что проблема, которую решит этот подход, никак не обрабатывается и в текущем коде).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

используя потенциально бесконечную стратегию

Бесконечно нас mg не будет ждать всё равно. Смысл в ретраях тут есть на самом деле некоторый, потому что минимальный ненулевой таймаут на mg − это аж целая 1 секунда, а шамаич по идее должен гораздо быстрее синкаться в штатном режиме.

(особенно если учесть твои замечания по функции plan)

Ну там на самом деле нам критично как я понимаю только финальный clock (после всех холдов) сохранить, а не промежуточные.

Опять же, я вот только что почему-то подумал, что в теории контроль идемпотентности на стороне шамаича + его гарантии на максимальное штатное время синка (например, хз, 5 секунд) могут нас избавить от необходимости clock хранить. Потому что даже в обычных условиях crash-recovery хеллгейта может привести к тому, что мы (как минимум) один батч два раза пошлём или (как максимум) один и тот же план.

Тут правда возможно не помешает вспомнить, что в рамках работ по лимитеру ивенты про clock update уже начинают заезжать в протокол.

К тому же кстати результат операции Hold не может быть NotReady:
https://github.com/rbkmoney/shumaich-proto/blob/700a5b4d635b6d7c26fe3cf3a58896f8d2330645/proto/shumaich.thrift#L176-L178

никак не обрабатывается и в текущем коде

Но текущий код же не работает с асинхронным аккаунтером. Или ты про что-то другое?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Или ты про что-то другое?

Ну вот скорее про это:

Потому что даже в обычных условиях crash-recovery хеллгейта может привести к тому, что мы (как минимум) один батч два раза пошлём или (как максимум) один и тот же план.

А то что mg не будет ждать бесконечно я наоборот часто забываю, лол. В целом кажется этот вопрос надо решать именно в ключе того поведения, когда ш̙͍̯͒̅͊а̜̥͙̜͔̣͙͗ͥ͑̍̐̆̓м͖аи̩̜̓ͪӵ̻̠̯͚̩̣̺́̅̓̏̎̍̚ сильно не укладывается в ожидаемые таймауты по какой-то причине. Во варианте с ретраями на этом уровне мы, наверное, будем работать быстрее в 99% случаев, но вот в том 1% проценте, когда попытки будут заканчиваться, как правильнее поступить не очень понятно. Кажется, что все так же ничто не будет мешать просто поднять машину и, как и раньше, сделать те же холды еще раз.

Можно совместить ретраи здесь с ретраями посредством mg, но это видится большим объемом кода, который наверное надо как-то оправдать.

Ну там на самом деле нам критично как я понимаю только финальный clock (после всех холдов) сохранить, а не промежуточные.

Ну тут мысль была что мы должны будем не просто начать ходлить план сначала, а продолжить с того места, с которого зафейлились, но да, это излишне.

Тут правда возможно не помешает вспомнить, что в рамках работ по лимитеру ивенты про clock update уже начинают заезжать в протокол.

Но лимитер это же вроде отдельный сервис еще между хг и ш̙͍̯͒̅͊а̜̥͙̜͔̣͙͗ͥ͑̍̐̆̓м͖аи̩̜̓ͪӵ̻̠̯͚̩̣̺́̅̓̏̎̍̚ем, который, подозреваю, хендлит и NotReady кейсы. *_clock_update- ивенты сами по себе есть и в этом эпике в других пиарах.

К тому же кстати результат операции Hold не может быть NotReady

Это кстати да, лол, plan/hold в принципе можно исключить из механики ретраев. Но commit/rollback все еще могут.


get_payment_cost(#domain_InvoicePayment{cost = Cost}) ->
Cost.

get_currency(#domain_Cash{currency = Currency}) ->
Currency.

to_domain_clock({latest, #shumaich_LatestClock{}}) ->
undefined;
to_domain_clock({vector, #shumaich_VectorClock{state = State}}) ->
{vector, #domain_VectorClock{state = State}}.

to_accounter_clock(undefined) ->
{latest, #shumaich_LatestClock{}};
to_accounter_clock({vector, #domain_VectorClock{state = State}}) ->
{vector, #shumaich_VectorClock{state = State}}.

get_retry_strategy(Function) ->
PolicyConfig = genlib_app:env(hellgate, accounter_retry_policy, #{}),
hg_retry:new_strategy(maps:get(Function, PolicyConfig, ?DEFAULT_RETRY_STRATEGY)).
21 changes: 20 additions & 1 deletion apps/hellgate/src/hg_retry.erl
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
-export([
next_step/1,
skip_steps/2,
new_strategy/1
new_strategy/1,
call_with_retry/2
]).

-type retries_num() :: pos_integer() | infinity.
Expand Down Expand Up @@ -58,4 +59,22 @@ skip_steps(Strategy, N) when N > 0 ->
end,
skip_steps(NewStrategy, N - 1).

-type retry_fun_return() :: term() | no_return().
Comment thread
keynslug marked this conversation as resolved.
Outdated
-type retry_fun() :: fun(() -> retry_fun_return()).

-spec call_with_retry(retry_fun(), strategy()) -> retry_fun_return().
call_with_retry(Fun, Strategy) ->
Comment thread
keynslug marked this conversation as resolved.
Outdated
case Fun() of
{return, Result} ->
Result;
retry ->
case next_step(Strategy) of
{wait, Timeout, NextStrategy} ->
_ = timer:sleep(Timeout),
call_with_retry(Fun, NextStrategy);
finish ->
throw({error, no_more_retries})
end
end.

%%% Internal functions
Loading