@@ -120,10 +120,15 @@ new(ID) ->
120120 lager :log (info , self (), " ID ~p Proxy host ~p TCP port ~p \n " ,
121121 [ID , ProxyH , ProxyP ]),
122122
123- {ok , # state {client_id = ID , hosts = Hosts ,
124- bucket = basho_bench_config :get (cs_bucket , " test" ),
125- report_fun = ReportFun ,
126- http_proxy_host = ProxyH , http_proxy_port = ProxyP }}.
123+ State = # state {client_id = ID , hosts = Hosts ,
124+ bucket = basho_bench_config :get (cs_bucket , " test" ),
125+ report_fun = ReportFun ,
126+ http_proxy_host = ProxyH , http_proxy_port = ProxyP },
127+ case ID of
128+ 1 -> ok = setup_user_and_bucket (State );
129+ _ -> ok
130+ end ,
131+ {ok , State }.
127132
128133% % This module does some crazy stuff, but it's there for a reason.
129134% % The reason is that basho_bench is expecting the run() function to
@@ -186,6 +191,19 @@ run2(delete, KeyGen, _ValueGen, #state{bucket = Bucket} = State) ->
186191 {error , Reason } ->
187192 {error , Reason , S2 }
188193 end ;
194+ run2 (copy , KeyGen , _ValueGen , # state {bucket = Bucket } = State ) ->
195+ {NextHost , S2 } = next_host (State ),
196+ {Host , Port } = NextHost ,
197+ SourceKey = KeyGen (),
198+ DestKey = SourceKey ++ basho_bench_config :get (cs_copy_suffix , " -copy" ),
199+ Url = url (Host , Port , Bucket , DestKey ),
200+ Headers = [{" x-amz-copy-source" , " /" ++ Bucket ++ " /" ++ SourceKey }],
201+ case do_copy ({Host , Port }, Url , Headers , State ) of
202+ ok ->
203+ {ok , S2 };
204+ {error , Reason } ->
205+ {error , Reason , S2 }
206+ end ;
189207run2 (Op , _KeyGen , _ValueGen , State ) ->
190208 {error , {unknown_op , Op }, State }.
191209
@@ -278,10 +296,12 @@ insert(KeyGen, ValueGen, {Host, Port}, Bucket, State) ->
278296 do_put ({Host , Port }, Url , [{" content-length" , integer_to_list (CL )}], ValueT ,
279297 State ).
280298
299+ url (Host , Port , Bucket , undefined ) ->
300+ UnparsedUrl = lists :concat ([" http://" , Host , " :" , Port , " /" , Bucket ]),
301+ ibrowse_lib :parse_url (UnparsedUrl );
281302url (Host , Port , Bucket , Key ) ->
282303 UnparsedUrl = lists :concat ([" http://" , Host , " :" , Port , " /" , Bucket , " /" , Key ]),
283- Url = ibrowse_lib :parse_url (UnparsedUrl ),
284- Url .
304+ ibrowse_lib :parse_url (UnparsedUrl ).
285305
286306- spec next_host (term ()) -> {term (), term ()}.
287307% % TODO:
@@ -415,6 +435,16 @@ do_delete(Host, Url, Headers, State) ->
415435 {error , Reason }
416436 end .
417437
438+ do_copy (Host , Url , Headers , State ) ->
439+ case send_request (Host , Url , Headers , put , <<>>, proxy_opts (State )) of
440+ {ok , " 200" , _Header , _Body } ->
441+ ok ;
442+ {ok , Code , _Header , _Body } ->
443+ {error , {http_error , Code }};
444+ {error , Reason } ->
445+ {error , Reason }
446+ end .
447+
418448do_get_first_unit (Host , Url , Headers , State ) ->
419449 BufSize = 128 * 1024 ,
420450 Opts = [{max_pipeline_size , 9999999 },
@@ -494,7 +524,9 @@ uppercase_verb(put) ->
494524uppercase_verb (get ) ->
495525 'GET' ;
496526uppercase_verb (delete ) ->
497- 'DELETE' .
527+ 'DELETE' ;
528+ uppercase_verb (post ) ->
529+ 'POST' .
498530
499531to_list (A ) when is_atom (A ) ->
500532 atom_to_list (A );
@@ -507,14 +539,126 @@ initiate_request(Host, Url, Headers0, Method, Body, Options) ->
507539 'Content-Type' , Headers0 ,
508540 'application/octet-stream' )),
509541 Date = httpd_util :rfc1123_date (),
542+ Uri = element (7 , Url ),
510543 Headers = [{'Content-Type' , ContentTypeStr },
511544 {'Date' , Date }|lists :keydelete ('Content-Type' , 1 , Headers0 )],
512- Uri = element (7 , Url ),
513- Sig = stanchion_auth :request_signature (
514- uppercase_verb (Method ), Headers , Uri ,
515- basho_bench_config :get (cs_secret_key )),
516- AuthStr = [" AWS " , basho_bench_config :get (cs_access_key ), " :" , Sig ],
517- HeadersWithAuth = [{'Authorization' , AuthStr }|Headers ],
545+ HeadersWithAuth =
546+ case basho_bench_config :get (cs_access_key , undefined ) of
547+ undefined ->
548+ Headers ;
549+ AccessKey ->
550+ AuthSig = auth_sig (AccessKey , basho_bench_config :get (cs_secret_key ),
551+ uppercase_verb (Method ), ContentTypeStr , Date ,
552+ Headers , Uri ),
553+ [{'Authorization' , AuthSig }|Headers ]
554+ end ,
518555 Timeout = basho_bench_config :get (cs_request_timeout , 5000 ),
519556 ibrowse_http_client :send_req (Pid , Url , HeadersWithAuth , Method ,
520557 Body , Options , Timeout ).
558+
559+ % % S3 utilities
560+
561+ auth_sig (AccessKey , SecretKey , Method , ContentType , Date , Headers , Resource ) ->
562+ AmzHeaders = lists :filter (fun ({" x-amz-" ++ _ , V }) when V =/= undefined -> true ; (_ ) -> false end , Headers ),
563+ CanonizedAmzHeaders =
564+ [[Name , $: , Value , $\n ] || {Name , Value } <- lists :sort (AmzHeaders )],
565+ StringToSign = [string :to_upper (atom_to_list (Method )), $\n ,
566+ " " , $\n , % Content-MD5
567+ ContentType , $\n ,
568+ Date , $\n ,
569+ CanonizedAmzHeaders ,
570+ Resource
571+ ],
572+ Signature = base64 :encode (stanchion_utils :sha_mac (SecretKey , StringToSign )),
573+ [" AWS " , AccessKey , $: , Signature ].
574+
575+ % % CS utilities
576+
577+ setup_user_and_bucket (State ) ->
578+ case basho_bench_config :get (cs_access_key , undefined ) of
579+ undefined ->
580+ DisplayName = basho_bench_config :get (cs_display_name , " test-user" ),
581+ ok = maybe_create_user (DisplayName , State ),
582+ {ok , {_DisplayName , KeyId , KeySecret }} = fetch_user_info (DisplayName , State ),
583+ lager :info (" Target User: ~p " , [{DisplayName , KeyId , KeySecret }]),
584+ ok = basho_bench_config :set (cs_access_key , KeyId ),
585+ ok = basho_bench_config :set (cs_secret_key , KeySecret );
586+ _ ->
587+ ok
588+ end ,
589+ ok = maybe_create_bucket (State # state .bucket , State ).
590+
591+ maybe_create_user (DisplayName , # state {hosts = Hosts } = State ) ->
592+ {Host , Port } = hd (Hosts ),
593+ Json = io_lib :format (" {\" email\" : \" ~s @example.com\" , \" name\" : \" ~s \" }" ,
594+ [DisplayName , DisplayName ]),
595+ Url = url (Host , Port , " riak-cs" , " user" ),
596+ Headers = [{'Content-Type' , 'application/json' }],
597+ case send_request ({Host , Port }, Url , Headers , post , Json , proxy_opts (State )) of
598+ {ok , " 201" , _Header , Body } ->
599+ lager :debug (" User created: ~p~n " , [Body ]),
600+ ok ;
601+ {ok , " 409" , _Header , Body } ->
602+ lager :debug (" User already exists: ~p~n " , [Body ]),
603+ ok ;
604+ {ok , Code , Header , Body } ->
605+ {error , {user_creation , Code , Header , Body }};
606+ {error , Reason } ->
607+ {error , {user_creation , Reason }}
608+ end .
609+
610+ fetch_user_info (DisplayName , State ) ->
611+ case list_users (State ) of
612+ {ok , UserList } ->
613+ {ok , lists :keyfind (DisplayName , 1 , UserList )};
614+ {error , Reason } ->
615+ {error , Reason }
616+ end .
617+
618+ list_users (# state {hosts = Hosts } = State ) ->
619+ {Host , Port } = hd (Hosts ),
620+ Url = url (Host , Port , " riak-cs" , " users" ),
621+ case send_request ({Host , Port }, Url , [{'Accept' , 'application/json' }], get ,
622+ [], proxy_opts (State )) of
623+ {ok , " 200" , _Headers , Body } ->
624+ {ok , parse_user_info (Body )};
625+ {error , Reason } ->
626+ {error , {list_users , Reason }}
627+ end .
628+
629+ parse_user_info (Output ) ->
630+ [Boundary | Tokens ] = string :tokens (binary_to_list (Output ), " \r\n " ),
631+ parse_user_info (Tokens , Boundary , []).
632+
633+ parse_user_info ([_LastToken ], _ , Users ) ->
634+ ordsets :from_list (Users );
635+ parse_user_info ([" Content-Type: application/json" , RawJson | RestTokens ],
636+ Boundary , Users ) ->
637+ UpdUsers = parse_user_records (RawJson , json ) ++ Users ,
638+ parse_user_info (RestTokens , Boundary , UpdUsers );
639+ parse_user_info ([_ | RestTokens ], Boundary , Users ) ->
640+ parse_user_info (RestTokens , Boundary , Users ).
641+
642+ parse_user_records (Output , json ) ->
643+ JsonData = mochijson2 :decode (Output ),
644+ [begin
645+ KeyId = binary_to_list (proplists :get_value (<<" key_id" >>, UserJson )),
646+ KeySecret = binary_to_list (proplists :get_value (<<" key_secret" >>, UserJson )),
647+ Name = binary_to_list (proplists :get_value (<<" name" >>, UserJson )),
648+ {Name , KeyId , KeySecret }
649+ end || {struct , UserJson } <- JsonData ].
650+
651+ maybe_create_bucket (Bucket , # state {hosts = Hosts } = State ) ->
652+ {Host , Port } = hd (Hosts ),
653+ Url = url (Host , Port , Bucket , undefined ),
654+ case send_request ({Host , Port }, Url , [], put , [], proxy_opts (State )) of
655+ {ok , " 200" , _Headers , _Body } ->
656+ lager :debug (" Bucket created (maybe): ~p~n " , [Bucket ]),
657+ ok ;
658+ {ok , Code , Header , Body } ->
659+ lager :error (" Create bucket: ~p~n " , [{Code , Header , Body }]),
660+ {error , {bucket_creation , Code , Header , Body }};
661+ {error , Reason } ->
662+ lager :error (" Create bucket: ~p~n " , [Reason ]),
663+ {error , {bucket_creation , Reason }}
664+ end .
0 commit comments