Fix broken sequencer semantics.
It occurred to me today that I implemented the sequencer incorrectly and hadn't yet noticed because I don't have any tests that are complex/interleaved/perhaps-non-deterministic to find the problem. The problem is that the sequencer's current implementation only keeps track of the last LPN for any Tango stream. The fix is to do what the paper actually says: the sequencer keeps a *list* of the last $K$ LPNs for each stream. Derp. Yes, that's really necessary to avoid a pretty simple race condition with 2 actors simultaneously updating a single Tango stream. 1st commit: fix the implementation and the smoke test. The broken-everything-else will be repaired in later commits.
This commit is contained in:
parent
940012cef1
commit
b8c051c89f
2 changed files with 28 additions and 7 deletions
|
@ -109,7 +109,7 @@ init({FLUs, TypeOrSeed}) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
handle_call({get, NumPages, StreamList, LC}, _From, {Tab, MLP}) ->
|
handle_call({get, NumPages, StreamList, LC}, _From, {Tab, MLP}) ->
|
||||||
[ets:insert(Tab, {Stream, MLP}) || Stream <- StreamList],
|
update_stream_tails(Tab, StreamList, MLP),
|
||||||
NewLC = lclock_update(LC),
|
NewLC = lclock_update(LC),
|
||||||
{reply, {{ok, MLP}, NewLC}, {Tab, MLP + NumPages}};
|
{reply, {{ok, MLP}, NewLC}, {Tab, MLP + NumPages}};
|
||||||
handle_call({get, NumPages, StreamList, LC}, _From,
|
handle_call({get, NumPages, StreamList, LC}, _From,
|
||||||
|
@ -164,6 +164,22 @@ get_max_logical_page(FLUs) ->
|
||||||
FLU <- FLUs,
|
FLU <- FLUs,
|
||||||
{ok, Ps} <- [corfurl_flu:status(FLU)]]).
|
{ok, Ps} <- [corfurl_flu:status(FLU)]]).
|
||||||
|
|
||||||
|
update_stream_tails(Tab, StreamList, LPN) ->
|
||||||
|
[begin
|
||||||
|
OldBackPs = try ets:lookup_element(Tab, Stream, 2)
|
||||||
|
catch error:badarg -> []
|
||||||
|
end,
|
||||||
|
NewBackPs = add_back_pointer(OldBackPs, LPN),
|
||||||
|
ets:insert(Tab, {Stream, NewBackPs})
|
||||||
|
end || Stream <- StreamList].
|
||||||
|
|
||||||
|
add_back_pointer([D,C,B,_A|_], New) ->
|
||||||
|
[New,D,C,B];
|
||||||
|
add_back_pointer([], New) ->
|
||||||
|
[New];
|
||||||
|
add_back_pointer(BackPs, New) ->
|
||||||
|
[New|BackPs].
|
||||||
|
|
||||||
-ifdef(PULSE).
|
-ifdef(PULSE).
|
||||||
|
|
||||||
lclock_init() ->
|
lclock_init() ->
|
||||||
|
|
|
@ -65,15 +65,20 @@ smoke_test() ->
|
||||||
MLP4 = MLP0 + 4,
|
MLP4 = MLP0 + 4,
|
||||||
{ok, Sequencer} = ?M:start_link(FLUs),
|
{ok, Sequencer} = ?M:start_link(FLUs),
|
||||||
try
|
try
|
||||||
[{Stream9, Tail9}] = StreamTails = [{9, 99999}],
|
[{Stream9, Tails9}] = StreamTails = [{9, [1125, 1124, 1123]}],
|
||||||
ok = ?M:set_tails(Sequencer, StreamTails),
|
ok = ?M:set_tails(Sequencer, StreamTails),
|
||||||
{ok, [Tail9]} = ?M:get_tails(Sequencer, [Stream9]),
|
{ok, [Tails9]} = ?M:get_tails(Sequencer, [Stream9]),
|
||||||
|
|
||||||
{ok, MLP1} = ?M:get(Sequencer, 2),
|
{ok, LPN1} = ?M:get(Sequencer, 2),
|
||||||
{ok, MLP3} = ?M:get(Sequencer, 1, [2]),
|
{ok, LPN3} = ?M:get(Sequencer, 1, [2]),
|
||||||
{ok, MLP4} = ?M:get(Sequencer, 1, [1]),
|
{ok, LPN4} = ?M:get(Sequencer, 1, [1]),
|
||||||
|
{ok, LPN5} = ?M:get(Sequencer, 1, [2]),
|
||||||
|
{ok, LPN6} = ?M:get(Sequencer, 1, [2]),
|
||||||
|
{ok, LPN7} = ?M:get(Sequencer, 1, [2]),
|
||||||
|
{ok, LPN8} = ?M:get(Sequencer, 1, [2]),
|
||||||
|
|
||||||
{ok, [MLP4, MLP3]} = ?M:get_tails(Sequencer, [1,2])
|
{ok, [[LPN4], [LPN8, LPN7, LPN6, LPN5]]} = ?M:get_tails(Sequencer,
|
||||||
|
[1,2])
|
||||||
after
|
after
|
||||||
?M:stop(Sequencer)
|
?M:stop(Sequencer)
|
||||||
end
|
end
|
||||||
|
|
Loading…
Reference in a new issue