Generic parameterization of the map, done badly, part 1
This commit is contained in:
parent
9c73872d20
commit
4fe4758d7a
2 changed files with 39 additions and 39 deletions
|
@ -23,7 +23,7 @@
|
||||||
-behaviour(gen_server).
|
-behaviour(gen_server).
|
||||||
|
|
||||||
%% API
|
%% API
|
||||||
-export([start_link/3, stop/1,
|
-export([start_link/4, stop/1,
|
||||||
put/3, get/2]).
|
put/3, get/2]).
|
||||||
|
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
|
@ -41,17 +41,19 @@
|
||||||
-type lpn() :: non_neg_integer().
|
-type lpn() :: non_neg_integer().
|
||||||
|
|
||||||
-record(state, {
|
-record(state, {
|
||||||
page_size :: non_neg_integer(),
|
page_size :: non_neg_integer(), % CORFU page size
|
||||||
seq :: pid(),
|
seq :: pid(), % sequencer pid
|
||||||
proj :: term(),
|
proj :: term(), % projection
|
||||||
last_read_lpn :: lpn(),
|
cb_mod :: atom(), % callback module
|
||||||
|
last_read_lpn :: lpn(), %
|
||||||
last_write_lpn :: lpn(),
|
last_write_lpn :: lpn(),
|
||||||
back_ps :: [lpn()],
|
back_ps :: [lpn()], % back pointers (up to 4)
|
||||||
i_state :: orddict:orddict()
|
i_state :: orddict:orddict() % internal state thingie
|
||||||
}).
|
}).
|
||||||
|
|
||||||
start_link(PageSize, SequencerPid, Proj) ->
|
start_link(PageSize, SequencerPid, Proj, CallbackMod) ->
|
||||||
gen_server:start_link(?MODULE, [PageSize, SequencerPid, Proj], []).
|
gen_server:start_link(?MODULE,
|
||||||
|
[PageSize, SequencerPid, Proj, CallbackMod], []).
|
||||||
|
|
||||||
stop(Pid) ->
|
stop(Pid) ->
|
||||||
gen_server:call(Pid, {stop}, ?LONG_TIME).
|
gen_server:call(Pid, {stop}, ?LONG_TIME).
|
||||||
|
@ -64,32 +66,35 @@ get(Pid, Key) ->
|
||||||
|
|
||||||
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
||||||
|
|
||||||
init([PageSize, SequencerPid, Proj]) ->
|
init([PageSize, SequencerPid, Proj, CallbackMod]) ->
|
||||||
LastLPN = find_last_lpn(SequencerPid),
|
LastLPN = find_last_lpn(SequencerPid),
|
||||||
{BackPs, Pages} = fetch_unread_pages(Proj, LastLPN, 0),
|
{BackPs, Pages} = fetch_unread_pages(Proj, LastLPN, 0),
|
||||||
Dict = play_log_pages(Pages, orddict:new()),
|
I_State = play_log_pages(Pages, CallbackMod:fresh(), CallbackMod),
|
||||||
{ok, #state{page_size=PageSize,
|
{ok, #state{page_size=PageSize,
|
||||||
seq=SequencerPid,
|
seq=SequencerPid,
|
||||||
proj=Proj,
|
proj=Proj,
|
||||||
|
cb_mod=CallbackMod,
|
||||||
last_read_lpn=LastLPN,
|
last_read_lpn=LastLPN,
|
||||||
last_write_lpn=LastLPN,
|
last_write_lpn=LastLPN,
|
||||||
back_ps=BackPs,
|
back_ps=BackPs,
|
||||||
i_state=Dict}}.
|
i_state=I_State}}.
|
||||||
|
|
||||||
handle_call({put, Key, Val}, _From,
|
handle_call({put, _Key, _Val}=Op, _From,
|
||||||
#state{proj=Proj0, page_size=PageSize, back_ps=BackPs} = State) ->
|
#state{proj=Proj0, cb_mod=CallbackMod,
|
||||||
Page = term_to_binary({put, Key, Val}),
|
page_size=PageSize, back_ps=BackPs, i_state=I_State}=State) ->
|
||||||
FullPage = tango:pack_v1([{?OID_STREAM_NUMBER, BackPs}], Page, PageSize),
|
{Res, I_State2, Proj1, LPN, NewBackPs} =
|
||||||
{{ok, LPN}, Proj1} = corfurl_client:append_page(Proj0, FullPage),
|
CallbackMod:do_dirty_op(Op, I_State, ?OID_STREAM_NUMBER,
|
||||||
NewBackPs = tango:add_back_pointer(BackPs, LPN),
|
Proj0, PageSize, BackPs),
|
||||||
{reply, garrr, State#state{proj=Proj1,
|
{reply, Res, State#state{i_state=I_State2,
|
||||||
|
proj=Proj1,
|
||||||
last_write_lpn=LPN,
|
last_write_lpn=LPN,
|
||||||
back_ps=NewBackPs}};
|
back_ps=NewBackPs}};
|
||||||
handle_call({get, Key}, _From, State) ->
|
handle_call({get, _Key}=Op, _From, #state{cb_mod=CallbackMod} = State) ->
|
||||||
{Pages, State2} = fetch_unread_pages(State),
|
{Pages, State2} = fetch_unread_pages(State),
|
||||||
%% ?D(Pages),
|
%% ?D(Pages),
|
||||||
#state{i_state=Dict} = State3 = play_log_pages(Pages, State2),
|
#state{i_state=I_State} = State3 = play_log_pages(Pages, State2),
|
||||||
{reply, orddict:find(Key, Dict), State3};
|
Reply = CallbackMod:do_pure_op(Op, I_State),
|
||||||
|
{reply, Reply, State3};
|
||||||
handle_call({stop}, _From, State) ->
|
handle_call({stop}, _From, State) ->
|
||||||
{stop, normal, ok, State};
|
{stop, normal, ok, State};
|
||||||
handle_call(_Request, _From, State) ->
|
handle_call(_Request, _From, State) ->
|
||||||
|
@ -137,14 +142,9 @@ fetch_unread_pages(Proj, LastLPN, StopAtLPN) ->
|
||||||
[], LPNs),
|
[], LPNs),
|
||||||
{BackPs, Pages}.
|
{BackPs, Pages}.
|
||||||
|
|
||||||
play_log_pages(Pages, #state{i_state=Dict} = State) ->
|
play_log_pages(Pages, #state{cb_mod=CallbackMod, i_state=I_State} = State) ->
|
||||||
Dict2 = play_log_pages(Pages, Dict),
|
I_State2 = play_log_pages(Pages, I_State, CallbackMod),
|
||||||
State#state{i_state=Dict2};
|
State#state{i_state=I_State2}.
|
||||||
play_log_pages(Pages, Dict) ->
|
|
||||||
%% ?D({play_log_pages, Pages}),
|
play_log_pages(Pages, I_State, CallbackMod) ->
|
||||||
lists:foldl(fun({put, K, V}=_Op, D) ->
|
CallbackMod:play_log_mutate_i_state(Pages, I_State).
|
||||||
%% ?D(_Op),
|
|
||||||
orddict:store(K, V, D)
|
|
||||||
end,
|
|
||||||
Dict,
|
|
||||||
[binary_to_term(Page) || Page <- Pages]).
|
|
||||||
|
|
|
@ -36,7 +36,7 @@ tango_oid_smoke_test() ->
|
||||||
fun tango_oid_smoke_test_int/3).
|
fun tango_oid_smoke_test_int/3).
|
||||||
|
|
||||||
tango_oid_smoke_test_int(PageSize, Seq, Proj) ->
|
tango_oid_smoke_test_int(PageSize, Seq, Proj) ->
|
||||||
{ok, OID_Map} = tango_oid:start_link(PageSize, Seq, Proj),
|
{ok, OID_Map} = tango_oid:start_link(PageSize, Seq, Proj, tango_dt_map),
|
||||||
|
|
||||||
ok = tango_oid:stop(OID_Map),
|
ok = tango_oid:stop(OID_Map),
|
||||||
ok.
|
ok.
|
||||||
|
@ -46,15 +46,15 @@ tango_oid_one_test() ->
|
||||||
fun tango_oid_one_test_int/3).
|
fun tango_oid_one_test_int/3).
|
||||||
|
|
||||||
tango_oid_one_test_int(PageSize, Seq, Proj) ->
|
tango_oid_one_test_int(PageSize, Seq, Proj) ->
|
||||||
{ok, OID_Map} = tango_oid:start_link(PageSize, Seq, Proj),
|
{ok, OID_Map} = tango_oid:start_link(PageSize, Seq, Proj, tango_dt_map),
|
||||||
|
|
||||||
try
|
try
|
||||||
K1 = foo,
|
K1 = foo,
|
||||||
V1 = bar1,
|
V1 = bar1,
|
||||||
V2 = bar2,
|
V2 = bar2,
|
||||||
garrr = tango_oid:put(OID_Map, K1, V1),
|
ok = tango_oid:put(OID_Map, K1, V1),
|
||||||
{ok, V1} = tango_oid:get(OID_Map, K1),
|
{ok, V1} = tango_oid:get(OID_Map, K1),
|
||||||
garrr = tango_oid:put(OID_Map, K1, V2),
|
ok = tango_oid:put(OID_Map, K1, V2),
|
||||||
{ok, V2} = tango_oid:get(OID_Map, K1),
|
{ok, V2} = tango_oid:get(OID_Map, K1),
|
||||||
error = tango_oid:get(OID_Map, "does not exist"),
|
error = tango_oid:get(OID_Map, "does not exist"),
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue