Compare commits
2 commits
master
...
mra/merkle
Author | SHA1 | Date | |
---|---|---|---|
|
e87e84efb1 | ||
|
421cf9a817 |
5 changed files with 175 additions and 0 deletions
|
@ -363,6 +363,7 @@ message Mpb_ProjectionV1 {
|
|||
// wedge_status()
|
||||
// delete_migration()
|
||||
// trunc_hack()
|
||||
// file_repair_data()
|
||||
//
|
||||
// Projection-related:
|
||||
//
|
||||
|
@ -503,6 +504,26 @@ message Mpb_LL_TruncHackResp {
|
|||
required Mpb_GeneralStatusCode status = 1;
|
||||
}
|
||||
|
||||
// Low level API: file_repair_data()
|
||||
|
||||
message Mpb_LL_FileRepairDataReq {
|
||||
required Mpb_EpochID epoch_id = 1;
|
||||
required string file = 2;
|
||||
}
|
||||
|
||||
message Mpb_LL_FileRepairDataResp {
|
||||
required Mpb_GeneralStatusCode status = 1;
|
||||
|
||||
required string file = 2;
|
||||
required string fluname = 3; // the name of the FLU where this data came from
|
||||
// TODO: should this be a better locator? Info
|
||||
// intended for humans
|
||||
required bytes root_csum = 4; // root node checksum
|
||||
optional bytes lvl1 = 5; // level 1 data serialized as Erlang
|
||||
// term_to_binary byte stream
|
||||
// TODO: should this be required?
|
||||
}
|
||||
|
||||
// Low level API: get_latest_epochid() request & response
|
||||
|
||||
message Mpb_LL_GetLatestEpochIDReq {
|
||||
|
@ -613,6 +634,7 @@ message Mpb_LL_Request {
|
|||
optional Mpb_LL_WedgeStatusReq wedge_status = 36;
|
||||
optional Mpb_LL_DeleteMigrationReq delete_migration = 37;
|
||||
optional Mpb_LL_TruncHackReq trunc_hack = 38;
|
||||
optional Mpb_LL_FileRepairDataReq file_repair_req = 39;
|
||||
}
|
||||
|
||||
message Mpb_LL_Response {
|
||||
|
@ -648,4 +670,5 @@ message Mpb_LL_Response {
|
|||
optional Mpb_LL_WedgeStatusResp wedge_status = 36;
|
||||
optional Mpb_LL_DeleteMigrationResp delete_migration = 37;
|
||||
optional Mpb_LL_TruncHackResp trunc_hack = 38;
|
||||
optional Mpb_LL_FileRepairDataResp file_repair_resp = 39;
|
||||
}
|
||||
|
|
|
@ -39,6 +39,7 @@
|
|||
-include("machi.hrl").
|
||||
-include("machi_pb.hrl").
|
||||
-include("machi_projection.hrl").
|
||||
-include("machi_merkle_tree.hrl").
|
||||
|
||||
-ifdef(TEST).
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
@ -294,6 +295,9 @@ do_pb_ll_request3({low_delete_migration, _EpochID, File},
|
|||
do_pb_ll_request3({low_trunc_hack, _EpochID, File},
|
||||
#state{witness=false}=S) ->
|
||||
{do_server_trunc_hack(File, S), S};
|
||||
do_pb_ll_request3({low_file_repair_data, _EpochID, File},
|
||||
#state{witness=false}=S) ->
|
||||
{do_server_get_file_repair_data(File, S), S};
|
||||
|
||||
do_pb_ll_request3(_, #state{witness=true}=S) ->
|
||||
{{error, bad_arg}, S}. % TODO: new status code??
|
||||
|
@ -506,6 +510,32 @@ do_server_trunc_hack(File, #state{data_dir=DataDir}=_S) ->
|
|||
{error, bad_arg}
|
||||
end.
|
||||
|
||||
do_server_get_file_repair_data(File, #state{flu_name=FLU, data_dir=D}) ->
|
||||
%% is it in a cache?
|
||||
case machi_merkle_cache:get(FLU, File) of
|
||||
undefined ->
|
||||
%% does the file exist?
|
||||
{_, Path} = machi_util:make_data_filename(D, File),
|
||||
case filelib:is_regular(Path) of
|
||||
true ->
|
||||
%% yep, exists
|
||||
%% XXX: Should this be async??? Probably.
|
||||
{ok, MT} = machi_merkle_tree:open(File, D),
|
||||
machi_merkle_cache:put(FLU, MT),
|
||||
build_file_data_tuple(FLU, MT);
|
||||
false ->
|
||||
{error, no_such_file}
|
||||
end;
|
||||
{ok, MT0} ->
|
||||
build_file_data_tuple(FLU, MT0)
|
||||
end.
|
||||
|
||||
build_file_data_tuple(FLU, MT=#mt{}) ->
|
||||
T = machi_merkle_tree:tree(MT),
|
||||
{ok, {file_repair_data,
|
||||
FLU, machi_merkle_tree:root(MT),
|
||||
T#naive.lvl1, T#naive.chunk_size}}.
|
||||
|
||||
sanitize_file_string(Str) ->
|
||||
case has_no_prohibited_chars(Str) andalso machi_util:is_valid_filename(Str) of
|
||||
true -> ok;
|
||||
|
@ -517,6 +547,7 @@ has_no_prohibited_chars(Str) ->
|
|||
nomatch ->
|
||||
true;
|
||||
_ ->
|
||||
%% not sure this is right; unless we're disabling this check.
|
||||
true
|
||||
end.
|
||||
|
||||
|
|
86
src/machi_merkle_cache.erl
Normal file
86
src/machi_merkle_cache.erl
Normal file
|
@ -0,0 +1,86 @@
|
|||
-module(machi_merkle_cache).
|
||||
-behaviour(gen_server).
|
||||
|
||||
-export([
|
||||
child_spec/1,
|
||||
start_link/1,
|
||||
get/2,
|
||||
put/2
|
||||
]).
|
||||
|
||||
%% gen server callbacks
|
||||
-export([
|
||||
init/1,
|
||||
handle_cast/2,
|
||||
handle_call/3,
|
||||
handle_info/2,
|
||||
terminate/2,
|
||||
code_change/3
|
||||
]).
|
||||
|
||||
-define(EXPIRE_TIMER, 60*1000).
|
||||
|
||||
child_spec(_FLU) ->
|
||||
{}.
|
||||
|
||||
start_link(FLU) ->
|
||||
gen_server:start_link({local, make_cache_name(FLU)}, ?MODULE, [FLU], []).
|
||||
|
||||
get(_FLU, _File) ->
|
||||
%% case ets:lookup(make_cache_name_ets(FLU), File) of
|
||||
%% undefined -> undefined;
|
||||
%% [MT] -> {ok, MT}
|
||||
%% end.
|
||||
undefined.
|
||||
|
||||
put(_FLU, _MT) ->
|
||||
%% gen_server:cast(make_cache_name(FLU), {put, MT}).
|
||||
ok.
|
||||
|
||||
%% gen server callbacks
|
||||
init([FLU]) ->
|
||||
Tid = ets:new(make_cache_name_ets(FLU), [named_table, {keypos,2},
|
||||
{read_concurrency, true},
|
||||
{write_concurrency, true}]),
|
||||
schedule_expire_tick(),
|
||||
{ok, Tid}.
|
||||
|
||||
handle_cast({put, MT}, Tid) ->
|
||||
ets:insert(Tid, MT),
|
||||
{noreply, Tid};
|
||||
handle_cast(Msg, S) ->
|
||||
lager:warning("unknown cast ~p", [Msg]),
|
||||
{noreply, S}.
|
||||
|
||||
handle_call(Msg, From, S) ->
|
||||
lager:warning("unknown call ~p from ~p", [Msg, From]),
|
||||
{reply, whaaaaa, S}.
|
||||
|
||||
handle_info(merkle_expire_tick, Tid) ->
|
||||
do_expire(Tid),
|
||||
schedule_expire_tick(),
|
||||
{noreply, Tid};
|
||||
handle_info(Msg, S) ->
|
||||
lager:warning("unknown info message ~p", [Msg]),
|
||||
{noreply, S}.
|
||||
|
||||
terminate(Reason, Tid) ->
|
||||
lager:debug("Terminating merkle cache ETS table ~p because ~p",
|
||||
[Tid, Reason]),
|
||||
ok.
|
||||
|
||||
code_change(_, _, S) ->
|
||||
{ok, S}.
|
||||
|
||||
%% private
|
||||
|
||||
schedule_expire_tick() ->
|
||||
erlang:send_after(?EXPIRE_TIMER, self(), merkle_expire_tick).
|
||||
|
||||
make_cache_name_ets(_FLU) ->
|
||||
merkle_ets_cache.
|
||||
|
||||
make_cache_name(_FLU) ->
|
||||
merkle_cache.
|
||||
|
||||
do_expire(_Tid) -> ok.
|
|
@ -45,6 +45,7 @@
|
|||
open/3,
|
||||
tree/1,
|
||||
filename/1,
|
||||
root/1,
|
||||
diff/2
|
||||
]).
|
||||
-endif.
|
||||
|
@ -72,6 +73,10 @@ tree(#mt{ tree = T, backend = naive }) ->
|
|||
false -> T
|
||||
end.
|
||||
|
||||
root(MT=#mt{backend = naive}) ->
|
||||
T = tree(MT),
|
||||
T#naive.root.
|
||||
|
||||
filename(#mt{ filename = F }) -> F.
|
||||
|
||||
diff(#mt{backend = naive, tree = T1}, #mt{backend = naive, tree = T2}) ->
|
||||
|
|
|
@ -132,6 +132,12 @@ from_pb_request(#mpb_ll_request{
|
|||
file=File}}) ->
|
||||
EpochID = conv_to_epoch_id(PB_EpochID),
|
||||
{ReqID, {low_trunc_hack, EpochID, File}};
|
||||
from_pb_request(#mpb_ll_request{
|
||||
req_id=ReqID,
|
||||
file_repair_req=#mpb_ll_filerepairdatareq{
|
||||
epoch_id=PB_EpochID,
|
||||
file=File}})->
|
||||
{ReqID, {low_file_repair_data, conv_to_epoch_id(PB_EpochID), File}};
|
||||
from_pb_request(#mpb_ll_request{
|
||||
req_id=ReqID,
|
||||
proj_gl=#mpb_ll_getlatestepochidreq{type=ProjType}}) ->
|
||||
|
@ -466,6 +472,11 @@ to_pb_request(ReqID, {low_trunc_hack, EpochID, File}) ->
|
|||
trunc_hack=#mpb_ll_trunchackreq{
|
||||
epoch_id=PB_EpochID,
|
||||
file=File}};
|
||||
to_pb_request(ReqID, {low_file_repair_data, EpochID, File}) ->
|
||||
#mpb_ll_request{req_id=ReqID, do_not_alter=2,
|
||||
file_repair_req=#mpb_ll_filerepairdatareq{
|
||||
epoch_id=conv_from_epoch_id(EpochID),
|
||||
file=File}};
|
||||
to_pb_request(ReqID, {low_proj, {get_latest_epochid, ProjType}}) ->
|
||||
#mpb_ll_request{req_id=ReqID, do_not_alter=2,
|
||||
proj_gl=#mpb_ll_getlatestepochidreq{type=conv_from_type(ProjType)}};
|
||||
|
@ -614,6 +625,25 @@ to_pb_response(ReqID, {low_trunc_hack, _EID, _Fl}, Resp)->
|
|||
Status = conv_from_status(Resp),
|
||||
#mpb_ll_response{req_id=ReqID,
|
||||
trunc_hack=#mpb_ll_trunchackresp{status=Status}};
|
||||
to_pb_response(ReqID, {low_file_repair_data, _EID, File}, Resp)->
|
||||
case Resp of
|
||||
{error, _} ->
|
||||
Status = conv_from_status(Resp),
|
||||
#mpb_ll_response{req_id=ReqID,
|
||||
file_repair_resp=#mpb_ll_filerepairdataresp{
|
||||
status=Status,
|
||||
file=File}};
|
||||
{ok, {file_repair_data, FLU, Root, L1, _ChunkSize}} ->
|
||||
%% TODO: Should we have a status of 'deferred' for
|
||||
%% files that have not been generated yet?
|
||||
#mpb_ll_response{req_id=ReqID,
|
||||
file_repair_resp=#mpb_ll_filerepairdataresp{
|
||||
status='OK',
|
||||
file=File,
|
||||
fluname=FLU,
|
||||
root_csum=Root,
|
||||
lvl1=L1}}
|
||||
end;
|
||||
to_pb_response(ReqID, {low_proj, {get_latest_epochid, _ProjType}}, Resp)->
|
||||
case Resp of
|
||||
{ok, {Epoch, CSum}} ->
|
||||
|
|
Loading…
Reference in a new issue