WIP: Stand back, I'm going to try CRDTs!

This commit is contained in:
Scott Lystig Fritchie 2015-09-08 19:13:03 +09:00
parent 7af863d840
commit 21015efcbb
3 changed files with 102 additions and 8 deletions

View file

@ -88,8 +88,8 @@ Implementation sketch:
2. If map key is not atom, then atom->string or atom->binary is fine.
3. For map value, is it possible CRDT LWW type?
** TODO Investigate riak_dt data structure definition, manipulating, etc.
** TODO Add dependency on riak_dt
** DONE Investigate riak_dt data structure definition, manipulating, etc.
** DONE Add dependency on riak_dt
** DONE Update is an entire dict from Observer O
*** TODO Merge my pending map + update map + my last mod time + my unfit list
*** TODO if merged /= pending:

View file

@ -6,6 +6,7 @@
{deps, [
{lager, ".*", {git, "git://github.com/basho/lager.git", {tag, "2.0.1"}}},
{protobuffs, "0.8.*", {git, "git://github.com/basho/erlang_protobuffs.git", {tag, "0.8.1p4"}}}
{protobuffs, "0.8.*", {git, "git://github.com/basho/erlang_protobuffs.git", {tag, "0.8.1p4"}}},
{riak_dt, ".*", {git, "git://github.com/basho/riak_dt.git", {branch, "develop"}}}
]}.

View file

@ -22,6 +22,13 @@
-behaviour(gen_server).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif. % TEST
-define(LWWREG, riak_dt_lwwreg).
-define(MAP, riak_dt_map).
%% API
-export([start_link/1,
get_unfit_list/1, update_local_down_list/2]).
@ -31,7 +38,10 @@
terminate/2, code_change/3]).
-record(state, {
unfit=[] :: list()
name :: atom() | binary(),
local_down=[] :: list(),
active_map=?MAP:new() :: ?MAP:riak_dt_map(),
pending_map=?MAP:new() :: ?MAP:riak_dt_map()
}).
start_link(Args) ->
@ -48,12 +58,23 @@ update_local_down_list(PidSpec, Down) ->
init([{FluName}|_Args]) ->
RegName = machi_flu_psup:make_fitness_regname(FluName),
register(RegName, self()),
{ok, #state{}}.
{ok, #state{name=RegName}}.
handle_call({get_unfit_list}, _From, S) ->
{reply, S#state.unfit, S};
handle_call({update_local_down_list, Down}, _From, S) ->
{reply, ok, S#state{unfit=Down}};
Unfit = make_unfit_list(S),
io:format(user, "LINE ~p val ~p\n", [?LINE, Unfit]),
{reply, Unfit, S};
handle_call({update_local_down_list, Down}, _From,
#state{name=Name, active_map=OldMap, local_down=OldDown}=S) ->
NewMap = store_in_map(OldMap, Name, erlang:now(), Down, [props_yo]),
if Down == OldDown ->
ok;
true ->
io:format(user, "fitness: ~w: ~w -> ~w\n", [Name, OldDown, Down]),
io:format(user, "TODO: spam\n", []),
io:format(user, "TODO: sched ticks, others...?\n", [])
end,
{reply, ok, S#state{local_down=Down, pending_map=NewMap}};
handle_call(_Request, _From, S) ->
Reply = whhhhhhhhhhhhhhaaaaaaaaaaaaaaa,
{reply, Reply, S}.
@ -71,3 +92,75 @@ code_change(_OldVsn, S, _Extra) ->
{ok, S}.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
make_unfit_list(S) ->
Now = erlang:now(),
F = fun({Server, {UpdateTime, DownList, _Props}}, Acc) ->
case timer:now_diff(Now, UpdateTime) div (1000*1000) of
N when N > 900 ->
Acc;
_ ->
Probs = [{Server,problem_with,D} || D <- DownList],
[Probs|Acc]
end
end,
QQ1 = (catch map_fold(F, [], S#state.active_map)),
QQ2 = (catch map_fold(F, [], S#state.pending_map)),
io:format(user, "QQ1 ~p\n", [QQ1]),
io:format(user, "QQ2 ~p\n", [QQ2]),
S#state.local_down.
store_in_map(Map, Name, Now, Down, Props) ->
Val = {Now, Down, Props},
map_set(Name, Map, Name, Val).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
map_set(Actor, Map, Key, ValTerm) ->
Field = {Key, ?LWWREG},
Val = term_to_binary(ValTerm),
{ok, Map2} = ?MAP:update({update, [{update, Field, {assign, Val}}]},
Actor, Map),
Map2.
map_get(Map, Key) ->
Field = {Key, ?LWWREG},
case lists:keyfind(Field, 1, ?MAP:value(Map)) of
false ->
error;
{Field, ValBin} ->
{ok, binary_to_term(ValBin)}
end.
map_fold(Fun, Acc, Map) ->
Vs = [{K, binary_to_term(V)} || {{K, _Type}, V} <- ?MAP:value(Map)],
lists:foldl(Fun, Acc, lists:sort(Vs)).
-ifdef(TEST).
dt_understanding_test() ->
F1 = {'X', riak_dt_lwwreg},
F2 = {'Y', riak_dt_lwwreg},
{ok, Map1} = ?MAP:update({update, [{update, F1, {assign, <<"A">>}}]}, a, ?MAP:new()),
{ok, Map2} = ?MAP:update({update, [{update, F2, {assign, <<"B2">>}}]}, b, ?MAP:new()),
%% io:format(user, "\n", []),
%% io:format(user, "Merge comparison: ~p\n", [?MAP:merge(Map1, Map2) == ?MAP:merge(Map2, Map1)]),
%% io:format(user, "M12 Val: ~p\n", [?MAP:value(?MAP:merge(Map1, Map2))]),
%% io:format(user, "M21 Val: ~p\n", [?MAP:value(?MAP:merge(Map2, Map1))]),
?MAP:merge(Map1, Map2) == ?MAP:merge(Map2, Map1).
smoke_test() ->
{ok, Map1} = map_set(a, ?MAP:new(), k1, val1),
{ok, Map2} = map_set(a, Map1, k2, val2),
{ok, val1} = map_get(Map2, k1),
{ok, val2} = map_get(Map2, k2),
error = map_get(Map2, does_not_exist),
{ok, Map3} = map_set(a, Map2, k3, val3),
[{k3,1},{k2,1},{k1,1}] = map_fold(fun({K,_}, Acc) -> [{K,1}|Acc] end,
[], Map3),
ok.
-endif. % TEST