Compare commits

...

2 commits

Author SHA1 Message Date
Mark Allen
e87e84efb1 WIP 2016-01-05 16:42:40 -06:00
Mark Allen
421cf9a817 Add stub cache
WIP - probably should be generalized though. Want to think
about a library for that.
2016-01-05 16:42:07 -06:00
5 changed files with 175 additions and 0 deletions

View file

@ -363,6 +363,7 @@ message Mpb_ProjectionV1 {
// wedge_status()
// delete_migration()
// trunc_hack()
// file_repair_data()
//
// Projection-related:
//
@ -503,6 +504,26 @@ message Mpb_LL_TruncHackResp {
required Mpb_GeneralStatusCode status = 1;
}
// Low level API: file_repair_data()
message Mpb_LL_FileRepairDataReq {
required Mpb_EpochID epoch_id = 1;
required string file = 2;
}
message Mpb_LL_FileRepairDataResp {
required Mpb_GeneralStatusCode status = 1;
required string file = 2;
required string fluname = 3; // the name of the FLU where this data came from
// TODO: should this be a better locator? Info
// intended for humans
required bytes root_csum = 4; // root node checksum
optional bytes lvl1 = 5; // level 1 data serialized as Erlang
// term_to_binary byte stream
// TODO: should this be required?
}
// Low level API: get_latest_epochid() request & response
message Mpb_LL_GetLatestEpochIDReq {
@ -613,6 +634,7 @@ message Mpb_LL_Request {
optional Mpb_LL_WedgeStatusReq wedge_status = 36;
optional Mpb_LL_DeleteMigrationReq delete_migration = 37;
optional Mpb_LL_TruncHackReq trunc_hack = 38;
optional Mpb_LL_FileRepairDataReq file_repair_req = 39;
}
message Mpb_LL_Response {
@ -648,4 +670,5 @@ message Mpb_LL_Response {
optional Mpb_LL_WedgeStatusResp wedge_status = 36;
optional Mpb_LL_DeleteMigrationResp delete_migration = 37;
optional Mpb_LL_TruncHackResp trunc_hack = 38;
optional Mpb_LL_FileRepairDataResp file_repair_resp = 39;
}

View file

@ -39,6 +39,7 @@
-include("machi.hrl").
-include("machi_pb.hrl").
-include("machi_projection.hrl").
-include("machi_merkle_tree.hrl").
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
@ -294,6 +295,9 @@ do_pb_ll_request3({low_delete_migration, _EpochID, File},
do_pb_ll_request3({low_trunc_hack, _EpochID, File},
#state{witness=false}=S) ->
{do_server_trunc_hack(File, S), S};
do_pb_ll_request3({low_file_repair_data, _EpochID, File},
#state{witness=false}=S) ->
{do_server_get_file_repair_data(File, S), S};
do_pb_ll_request3(_, #state{witness=true}=S) ->
{{error, bad_arg}, S}. % TODO: new status code??
@ -506,6 +510,32 @@ do_server_trunc_hack(File, #state{data_dir=DataDir}=_S) ->
{error, bad_arg}
end.
do_server_get_file_repair_data(File, #state{flu_name=FLU, data_dir=D}) ->
%% is it in a cache?
case machi_merkle_cache:get(FLU, File) of
undefined ->
%% does the file exist?
{_, Path} = machi_util:make_data_filename(D, File),
case filelib:is_regular(Path) of
true ->
%% yep, exists
%% XXX: Should this be async??? Probably.
{ok, MT} = machi_merkle_tree:open(File, D),
machi_merkle_cache:put(FLU, MT),
build_file_data_tuple(FLU, MT);
false ->
{error, no_such_file}
end;
{ok, MT0} ->
build_file_data_tuple(FLU, MT0)
end.
build_file_data_tuple(FLU, MT=#mt{}) ->
T = machi_merkle_tree:tree(MT),
{ok, {file_repair_data,
FLU, machi_merkle_tree:root(MT),
T#naive.lvl1, T#naive.chunk_size}}.
sanitize_file_string(Str) ->
case has_no_prohibited_chars(Str) andalso machi_util:is_valid_filename(Str) of
true -> ok;
@ -517,6 +547,7 @@ has_no_prohibited_chars(Str) ->
nomatch ->
true;
_ ->
%% not sure this is right; unless we're disabling this check.
true
end.

View file

@ -0,0 +1,86 @@
-module(machi_merkle_cache).
-behaviour(gen_server).
-export([
child_spec/1,
start_link/1,
get/2,
put/2
]).
%% gen server callbacks
-export([
init/1,
handle_cast/2,
handle_call/3,
handle_info/2,
terminate/2,
code_change/3
]).
-define(EXPIRE_TIMER, 60*1000).
child_spec(_FLU) ->
{}.
start_link(FLU) ->
gen_server:start_link({local, make_cache_name(FLU)}, ?MODULE, [FLU], []).
get(_FLU, _File) ->
%% case ets:lookup(make_cache_name_ets(FLU), File) of
%% undefined -> undefined;
%% [MT] -> {ok, MT}
%% end.
undefined.
put(_FLU, _MT) ->
%% gen_server:cast(make_cache_name(FLU), {put, MT}).
ok.
%% gen server callbacks
init([FLU]) ->
Tid = ets:new(make_cache_name_ets(FLU), [named_table, {keypos,2},
{read_concurrency, true},
{write_concurrency, true}]),
schedule_expire_tick(),
{ok, Tid}.
handle_cast({put, MT}, Tid) ->
ets:insert(Tid, MT),
{noreply, Tid};
handle_cast(Msg, S) ->
lager:warning("unknown cast ~p", [Msg]),
{noreply, S}.
handle_call(Msg, From, S) ->
lager:warning("unknown call ~p from ~p", [Msg, From]),
{reply, whaaaaa, S}.
handle_info(merkle_expire_tick, Tid) ->
do_expire(Tid),
schedule_expire_tick(),
{noreply, Tid};
handle_info(Msg, S) ->
lager:warning("unknown info message ~p", [Msg]),
{noreply, S}.
terminate(Reason, Tid) ->
lager:debug("Terminating merkle cache ETS table ~p because ~p",
[Tid, Reason]),
ok.
code_change(_, _, S) ->
{ok, S}.
%% private
schedule_expire_tick() ->
erlang:send_after(?EXPIRE_TIMER, self(), merkle_expire_tick).
make_cache_name_ets(_FLU) ->
merkle_ets_cache.
make_cache_name(_FLU) ->
merkle_cache.
do_expire(_Tid) -> ok.

View file

@ -45,6 +45,7 @@
open/3,
tree/1,
filename/1,
root/1,
diff/2
]).
-endif.
@ -72,6 +73,10 @@ tree(#mt{ tree = T, backend = naive }) ->
false -> T
end.
root(MT=#mt{backend = naive}) ->
T = tree(MT),
T#naive.root.
filename(#mt{ filename = F }) -> F.
diff(#mt{backend = naive, tree = T1}, #mt{backend = naive, tree = T2}) ->

View file

@ -132,6 +132,12 @@ from_pb_request(#mpb_ll_request{
file=File}}) ->
EpochID = conv_to_epoch_id(PB_EpochID),
{ReqID, {low_trunc_hack, EpochID, File}};
from_pb_request(#mpb_ll_request{
req_id=ReqID,
file_repair_req=#mpb_ll_filerepairdatareq{
epoch_id=PB_EpochID,
file=File}})->
{ReqID, {low_file_repair_data, conv_to_epoch_id(PB_EpochID), File}};
from_pb_request(#mpb_ll_request{
req_id=ReqID,
proj_gl=#mpb_ll_getlatestepochidreq{type=ProjType}}) ->
@ -466,6 +472,11 @@ to_pb_request(ReqID, {low_trunc_hack, EpochID, File}) ->
trunc_hack=#mpb_ll_trunchackreq{
epoch_id=PB_EpochID,
file=File}};
to_pb_request(ReqID, {low_file_repair_data, EpochID, File}) ->
#mpb_ll_request{req_id=ReqID, do_not_alter=2,
file_repair_req=#mpb_ll_filerepairdatareq{
epoch_id=conv_from_epoch_id(EpochID),
file=File}};
to_pb_request(ReqID, {low_proj, {get_latest_epochid, ProjType}}) ->
#mpb_ll_request{req_id=ReqID, do_not_alter=2,
proj_gl=#mpb_ll_getlatestepochidreq{type=conv_from_type(ProjType)}};
@ -614,6 +625,25 @@ to_pb_response(ReqID, {low_trunc_hack, _EID, _Fl}, Resp)->
Status = conv_from_status(Resp),
#mpb_ll_response{req_id=ReqID,
trunc_hack=#mpb_ll_trunchackresp{status=Status}};
to_pb_response(ReqID, {low_file_repair_data, _EID, File}, Resp)->
case Resp of
{error, _} ->
Status = conv_from_status(Resp),
#mpb_ll_response{req_id=ReqID,
file_repair_resp=#mpb_ll_filerepairdataresp{
status=Status,
file=File}};
{ok, {file_repair_data, FLU, Root, L1, _ChunkSize}} ->
%% TODO: Should we have a status of 'deferred' for
%% files that have not been generated yet?
#mpb_ll_response{req_id=ReqID,
file_repair_resp=#mpb_ll_filerepairdataresp{
status='OK',
file=File,
fluname=FLU,
root_csum=Root,
lvl1=L1}}
end;
to_pb_response(ReqID, {low_proj, {get_latest_epochid, _ProjType}}, Resp)->
case Resp of
{ok, {Epoch, CSum}} ->