diff --git a/.gitignore b/.gitignore
index 2693865..180a370 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,9 +1,11 @@
diff --git a/Makefile b/Makefile
new file mode 100644
index 0000000..ba8df11
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,41 @@
+REBAR_BIN := $(shell which rebar)
+ifeq ($(REBAR_BIN),)
+REBAR_BIN = ./rebar
+.PHONY: rel deps package pkgclean
+all: deps compile
+ $(REBAR_BIN) compile
+ $(REBAR_BIN) get-deps
+ $(REBAR_BIN) -r clean
+test: deps compile eunit
+ $(REBAR_BIN) -v skip_deps=true eunit
+pulse: compile
+ env USE_PULSE=1 $(REBAR_BIN) skip_deps=true clean compile
+ env USE_PULSE=1 $(REBAR_BIN) skip_deps=true -D PULSE eunit
+APPS = kernel stdlib sasl erts ssl compiler eunit crypto
+PLT = $(HOME)/.machi_dialyzer_plt
+build_plt: deps compile
+ dialyzer --build_plt --output_plt $(PLT) --apps $(APPS) deps/*/ebin
+dialyzer: deps compile
+ dialyzer -Wno_return --plt $(PLT) ebin
+dialyzer-test: deps compile
+ dialyzer -Wno_return --plt $(PLT) .eunit
+ rm $(PLT)
diff --git a/TODO-shortterm.org b/TODO-shortterm.org
new file mode 100644
index 0000000..428bddc
--- /dev/null
+++ b/TODO-shortterm.org
@@ -0,0 +1,37 @@
+* To Do list
+** DONE remove the escript* stuff from machi_util.erl
+** DONE Add functions to manipulate 1-chain projections
+- Add epoch ID = epoch number + checksum of projection!
+ Done via compare() func.
+** DONE Change all protocol ops to add epoch ID
+** TODO Add projection store to each FLU.
+*** DONE What should the API look like? (borrow from chain mgr PoC?)
+Yeah, I think that's pretty complete. Steal it now, worry later.
+*** DONE Choose protocol & TCP port. Share with get/put? Separate?
+Hrm, I like the idea of having a single TCP port to talk to any single
+To make the protocol "easy" to hack, how about using the same basic
+method as append/write where there's a variable size blob. But we'll
+format that blob as a term_to_binary(). Then dispatch to a single
+func, and pattern match Erlang style in that func.
+*** TODO Do it.
+** TODO Change all protocol ops to enforce the epoch ID
+** TODO Add projection wedging logic to each FLU.
+- Add no-wedging state to make testing easier?
+** TODO Move prototype/chain-manager code to "top" of source tree
+*** TODO Preserve current test code (leave as-is? tiny changes?)
+*** TODO Make chain manager code flexible enough to run "real world" or "sim"
+** TODO Replace registered name use from FLU write/append dispatcher
+** TODO Move the FLU server to gen_server behavior?
diff --git a/ebin/.gitignore b/ebin/.gitignore
new file mode 100644
index 0000000..120fe3a
--- /dev/null
+++ b/ebin/.gitignore
@@ -0,0 +1,2 @@
diff --git a/include/machi.hrl b/include/machi.hrl
new file mode 100644
index 0000000..d717083
--- /dev/null
+++ b/include/machi.hrl
@@ -0,0 +1,26 @@
+%% -------------------------------------------------------------------
+%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%% http://www.apache.org/licenses/LICENSE-2.0
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%% -------------------------------------------------------------------
+-define(MAX_FILE_SIZE, 256*1024*1024). % 256 MBytes
+-define(MAX_CHUNK_SIZE, ((1 bsl 32) - 1)).
+%% -define(DATA_DIR, "/Volumes/SAM1/seq-tests/data").
+-define(DATA_DIR, "./data").
+-define(MINIMUM_OFFSET, 1024).
diff --git a/include/machi_projection.hrl b/include/machi_projection.hrl
new file mode 100644
index 0000000..2e35aed
--- /dev/null
+++ b/include/machi_projection.hrl
@@ -0,0 +1,53 @@
+%% -------------------------------------------------------------------
+%% Copyright (c) 2007-2014 Basho Technologies, Inc. All Rights Reserved.
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%% http://www.apache.org/licenses/LICENSE-2.0
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%% -------------------------------------------------------------------
+-type pv1_csum() :: binary().
+-type pv1_epoch() :: {pv1_epoch_n(), pv1_csum()}.
+-type pv1_epoch_n() :: non_neg_integer().
+-type pv1_server() :: atom() | binary().
+-type pv1_timestamp() :: {non_neg_integer(), non_neg_integer(), non_neg_integer()}.
+-define(DUMMY_PV1_EPOCH, {0,<<0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0>>}).
+-record(projection_v1, {
+ epoch_number :: pv1_epoch_n(),
+ epoch_csum :: pv1_csum(),
+ all_members :: [pv1_server()],
+ member_dict :: orddict:orddict(),
+ down :: [pv1_server()],
+ creation_time :: pv1_timestamp(),
+ author_server :: pv1_server(),
+ upi :: [pv1_server()],
+ repairing :: [pv1_server()],
+ dbg :: list(), %proplist(), is checksummed
+ dbg2 :: list() %proplist(), is not checksummed
+ }).
+-define(MACHI_DEFAULT_TCP_PORT, 50000).
+-record(p_srvr, {
+ name :: pv1_server(),
+ proto = 'ipv4' :: 'ipv4' | 'disterl', % disterl? Hrm.
+ address :: term(), % Protocol-specific
+ port :: term(), % Protocol-specific
+ props = [] :: list() % proplist for other related info
+ }).
+-define(SHA_MAX, (1 bsl (20*8))).
diff --git a/rebar b/rebar
new file mode 100755
index 0000000..03c9be6
Binary files /dev/null and b/rebar differ
diff --git a/rebar.config b/rebar.config
new file mode 100644
index 0000000..5b3cfa2
--- /dev/null
+++ b/rebar.config
@@ -0,0 +1,7 @@
+%%% {erl_opts, [warnings_as_errors, {parse_transform, lager_transform}, debug_info]}.
+{erl_opts, [{parse_transform, lager_transform}, debug_info]}.
+{deps, [
+ {lager, ".*", {git, "git://github.com/basho/lager.git", {tag, "2.0.1"}}}
+ ]}.
diff --git a/rebar.config.script b/rebar.config.script
new file mode 100644
index 0000000..364ce39
--- /dev/null
+++ b/rebar.config.script
@@ -0,0 +1,47 @@
+PulseBuild = case os:getenv("USE_PULSE") of
+ false ->
+ false;
+ _ ->
+ true
+ end,
+case PulseBuild of
+ true ->
+ PulseOpts =
+ [{pulse_no_side_effect,
+ [{erlang,display,1}
+ ]},
+ {pulse_side_effect,
+ [ {does_not_exist_yet, some_func, '_'}
+ , {prim_file, '_', '_'}
+ , {file, '_', '_'}
+ , {filelib, '_', '_'}
+ , {os, '_', '_'} ]},
+ {pulse_replace_module,
+ [ {gen_server, pulse_gen_server}
+ , {application, pulse_application}
+ , {supervisor, pulse_supervisor} ]}
+ ],
+ PulseCFlags = [{"CFLAGS", "$CFLAGS -DPULSE"}],
+ UpdConfig = case lists:keysearch(eunit_compile_opts, 1, CONFIG) of
+ {value, {eunit_compile_opts, Opts}} ->
+ lists:keyreplace(eunit_compile_opts,
+ 1,
+ {eunit_compile_opts, Opts ++ PulseOpts});
+ _ ->
+ [{eunit_compile_opts, PulseOpts} | CONFIG]
+ end,
+ case lists:keysearch(port_env, 1, UpdConfig) of
+ {value, {port_env, PortEnv}} ->
+ lists:keyreplace(port_env,
+ 1,
+ UpdConfig,
+ {port_env, PortEnv ++ PulseCFlags});
+ _ ->
+ [{port_env, PulseCFlags} | UpdConfig]
+ end;
+ false ->
diff --git a/src/machi.app.src b/src/machi.app.src
new file mode 100644
index 0000000..7a2866b
--- /dev/null
+++ b/src/machi.app.src
@@ -0,0 +1,13 @@
+{application, machi, [
+ {description, "A village of write-once files."},
+ {vsn, "0.0.0"},
+ {applications, [kernel, stdlib, sasl, crypto]},
+ {mod,{machi_app,[]}},
+ {registered, []},
+ {env, [
+ {flu_list,
+ [
+ {flu_a, 32900, "./data.flu_a"}
+ ]}
+ ]}
diff --git a/src/machi_admin_util.erl b/src/machi_admin_util.erl
new file mode 100644
index 0000000..990d948
--- /dev/null
+++ b/src/machi_admin_util.erl
@@ -0,0 +1,126 @@
+%% -------------------------------------------------------------------
+%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%% http://www.apache.org/licenses/LICENSE-2.0
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%% -------------------------------------------------------------------
+%% TODO Move these types to a common header file? (also machi_flu1_client.erl?)
+-type inet_host() :: inet:ip_address() | inet:hostname().
+-type inet_port() :: inet:port_number().
+ verify_file_checksums_local/3, verify_file_checksums_local/4,
+ verify_file_checksums_remote/3, verify_file_checksums_remote/4
+ ]).
+-define(FLU_C, machi_flu1_client).
+-spec verify_file_checksums_local(port(), machi_flu1_client:epoch_id(), binary()|list()) ->
+ {ok, [tuple()]} | {error, term()}.
+verify_file_checksums_local(Sock1, EpochID, Path) when is_port(Sock1) ->
+ verify_file_checksums_local2(Sock1, EpochID, Path).
+-spec verify_file_checksums_local(inet_host(), inet_port(),
+ machi_flu1_client:epoch_id(), binary()|list()) ->
+ {ok, [tuple()]} | {error, term()}.
+verify_file_checksums_local(Host, TcpPort, EpochID, Path) ->
+ Sock1 = machi_util:connect(Host, TcpPort),
+ try
+ verify_file_checksums_local2(Sock1, EpochID, Path)
+ after
+ catch gen_tcp:close(Sock1)
+ end.
+-spec verify_file_checksums_remote(port(), machi_flu1_client:epoch_id(), binary()|list()) ->
+ {ok, [tuple()]} | {error, term()}.
+verify_file_checksums_remote(Sock1, EpochID, File) when is_port(Sock1) ->
+ verify_file_checksums_remote2(Sock1, EpochID, File).
+-spec verify_file_checksums_remote(inet_host(), inet_port(),
+ machi_flu1_client:epoch_id(), binary()|list()) ->
+ {ok, [tuple()]} | {error, term()}.
+verify_file_checksums_remote(Host, TcpPort, EpochID, File) ->
+ Sock1 = machi_util:connect(Host, TcpPort),
+ try
+ verify_file_checksums_remote2(Sock1, EpochID, File)
+ after
+ catch gen_tcp:close(Sock1)
+ end.
+verify_file_checksums_local2(Sock1, EpochID, Path0) ->
+ Path = machi_util:make_string(Path0),
+ case file:open(Path, [read, binary, raw]) of
+ {ok, FH} ->
+ File = re:replace(Path, ".*/", "", [{return, binary}]),
+ try
+ ReadChunk = fun(_File, Offset, Size) ->
+ file:pread(FH, Offset, Size)
+ end,
+ verify_file_checksums_common(Sock1, EpochID, File, ReadChunk)
+ after
+ file:close(FH)
+ end;
+ Else ->
+ Else
+ end.
+verify_file_checksums_remote2(Sock1, EpochID, File) ->
+ ReadChunk = fun(File_name, Offset, Size) ->
+ ?FLU_C:read_chunk(Sock1, EpochID,
+ File_name, Offset, Size)
+ end,
+ verify_file_checksums_common(Sock1, EpochID, File, ReadChunk).
+verify_file_checksums_common(Sock1, EpochID, File, ReadChunk) ->
+ try
+ case ?FLU_C:checksum_list(Sock1, EpochID, File) of
+ {ok, Info} ->
+ Res = lists:foldl(verify_chunk_checksum(File, ReadChunk),
+ [], Info),
+ {ok, Res};
+ {error, no_such_file}=Nope ->
+ Nope;
+ {error, _}=Else ->
+ Else
+ end
+ catch
+ What:Why ->
+ {error, {What, Why, erlang:get_stacktrace()}}
+ end.
+verify_chunk_checksum(File, ReadChunk) ->
+ fun({Offset, Size, CSum}, Acc) ->
+ case ReadChunk(File, Offset, Size) of
+ {ok, Chunk} ->
+ CSum2 = machi_util:checksum(Chunk),
+ if CSum == CSum2 ->
+ Acc;
+ true ->
+ [{Offset, Size, File, CSum, now, CSum2}|Acc]
+ end;
+ _Else ->
+ [{Offset, Size, File, CSum, now, read_failure}|Acc]
+ end
+ end.
diff --git a/src/machi_app.erl b/src/machi_app.erl
new file mode 100644
index 0000000..6dfddf7
--- /dev/null
+++ b/src/machi_app.erl
@@ -0,0 +1,37 @@
+%% -------------------------------------------------------------------
+%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%% http://www.apache.org/licenses/LICENSE-2.0
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%% -------------------------------------------------------------------
+%% Application callbacks
+-export([start/2, stop/1]).
+start(_StartType, _StartArgs) ->
+ case machi_sup:start_link() of
+ {ok, Pid} ->
+ {ok, Pid};
+ Error ->
+ Error
+ end.
+stop(_State) ->
+ ok.
diff --git a/src/machi_chash.erl b/src/machi_chash.erl
new file mode 100644
index 0000000..f45473a
--- /dev/null
+++ b/src/machi_chash.erl
@@ -0,0 +1,459 @@
+%%% Copyright (c) 2007-2011 Gemini Mobile Technologies, Inc. All rights reserved.
+%%% Copyright (c) 2013-2015 Basho Technologies, Inc. All rights reserved.
+%%% Licensed under the Apache License, Version 2.0 (the "License");
+%%% you may not use this file except in compliance with the License.
+%%% You may obtain a copy of the License at
+%%% http://www.apache.org/licenses/LICENSE-2.0
+%%% Unless required by applicable law or agreed to in writing, software
+%%% distributed under the License is distributed on an "AS IS" BASIS,
+%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%%% See the License for the specific language governing permissions and
+%%% limitations under the License.
+%% Consistent hashing library. Also known as "random slicing".
+%% Originally from the Hibari DB source code at https://github.com/hibari
+%% TODO items:
+%% 1. Refactor to use bigints instead of floating point numbers. The
+%% ?SMALLEST_SIGNIFICANT_FLOAT_SIZE macro below doesn't allow as
+%% much wiggle-room for making really small hashing range
+%% definitions.
+-define(SHA_MAX, (1 bsl (20*8))).
+%% -compile(export_all).
+-export([make_float_map/1, make_float_map/2,
+ sum_map_weights/1,
+ make_tree/1,
+ query_tree/2,
+ hash_binary_via_float_map/2,
+ hash_binary_via_float_tree/2,
+ pretty_with_integers/2,
+ pretty_with_integers/3]).
+-export([make_demo_map1/0, make_demo_map2/0]).
+-export([zzz_usage_details/0]). % merely to give EDoc a hint of our intent
+-type owner_name() :: term().
+%% Owner for a range on the unit interval. We are agnostic about its
+%% type.
+-type weight() :: non_neg_integer().
+%% For this library, a weight is an integer which specifies the
+%% capacity of a "owner" relative to other owners. For example, if
+%% owner A with a weight of 10, and if owner B has a weight of 20,
+%% then B will be assigned twice as much of the unit interval as A.
+-type float_map() :: [{owner_name(), float()}].
+%% A float map subdivides the unit interval, starting at 0.0, to
+%% partitions that are assigned to various owners. The sum of all
+%% floats must be exactly 1.0 (or close enough for floating point
+%% purposes).
+-opaque float_tree() :: gb_trees:tree(float(), owner_name()).
+%% We can't use gb_trees:tree() because 'nil' (the empty tree) is
+%% never valid in our case. But teaching Dialyzer that is difficult.
+-type owner_int_range() :: {owner_name(), non_neg_integer(), non_neg_integer()}.
+%% Used when "prettying" a float map.
+-type owner_weight() :: {owner_name(), weight()}.
+-type owner_weight_list() :: [owner_weight()].
+%% A owner_weight_list is a definition of brick assignments over the
+%% unit interval [0.0, 1.0]. The sum of all floats must be 1.0. For
+%% example, [{{br1,nd1}, 0.25}, {{br2,nd1}, 0.5}, {{br3,nd1}, 0.25}].
+-export_type([float_map/0, float_tree/0]).
+%% @doc Create a float map, based on a basic owner weight list.
+-spec make_float_map(owner_weight_list()) -> float_map().
+make_float_map(NewOwnerWeights) ->
+ make_float_map([], NewOwnerWeights).
+%% @doc Create a float map, based on an older float map and a new weight
+%% list.
+%% The weights in the new weight list may be different than (or the
+%% same as) whatever weights were used to make the older float map.
+-spec make_float_map(float_map(), owner_weight_list()) -> float_map().
+make_float_map([], NewOwnerWeights) ->
+ Sum = add_all_weights(NewOwnerWeights),
+ DiffMap = [{Ch, Wt/Sum} || {Ch, Wt} <- NewOwnerWeights],
+ make_float_map2([{unused, 1.0}], DiffMap, NewOwnerWeights);
+make_float_map(OldFloatMap, NewOwnerWeights) ->
+ NewSum = add_all_weights(NewOwnerWeights),
+ %% Normalize to unit interval
+ %% NewOwnerWeights2 = [{Ch, Wt / NewSum} || {Ch, Wt} <- NewOwnerWeights],
+ %% Reconstruct old owner weights (will be normalized to unit interval)
+ SumOldFloatsDict =
+ lists:foldl(fun({Ch, Wt}, OrdDict) ->
+ orddict:update_counter(Ch, Wt, OrdDict)
+ end, orddict:new(), OldFloatMap),
+ OldOwnerWeights = orddict:to_list(SumOldFloatsDict),
+ OldSum = add_all_weights(OldOwnerWeights),
+ OldChs = [Ch || {Ch, _} <- OldOwnerWeights],
+ NewChs = [Ch || {Ch, _} <- NewOwnerWeights],
+ OldChsOnly = OldChs -- NewChs,
+ %% Mark any space in by a deleted owner as unused.
+ OldFloatMap2 = lists:map(
+ fun({Ch, Wt} = ChWt) ->
+ case lists:member(Ch, OldChsOnly) of
+ true ->
+ {unused, Wt};
+ false ->
+ ChWt
+ end
+ end, OldFloatMap),
+ %% Create a diff map of changing owners and added owners
+ DiffMap = lists:map(fun({Ch, NewWt}) ->
+ case orddict:find(Ch, SumOldFloatsDict) of
+ {ok, OldWt} ->
+ {Ch, (NewWt / NewSum) -
+ (OldWt / OldSum)};
+ error ->
+ {Ch, NewWt / NewSum}
+ end
+ end, NewOwnerWeights),
+ make_float_map2(OldFloatMap2, DiffMap, NewOwnerWeights).
+make_float_map2(OldFloatMap, DiffMap, _NewOwnerWeights) ->
+ FloatMap = apply_diffmap(DiffMap, OldFloatMap),
+ XX = combine_neighbors(collapse_unused_in_float_map(FloatMap)),
+ XX.
+apply_diffmap(DiffMap, FloatMap) ->
+ SubtractDiff = [{Ch, abs(Diff)} || {Ch, Diff} <- DiffMap, Diff < 0],
+ AddDiff = [D || {_Ch, Diff} = D <- DiffMap, Diff > 0],
+ TmpFloatMap = iter_diffmap_subtract(SubtractDiff, FloatMap),
+ iter_diffmap_add(AddDiff, TmpFloatMap).
+add_all_weights(OwnerWeights) ->
+ lists:foldl(fun({_Ch, Weight}, Sum) -> Sum + Weight end, 0.0, OwnerWeights).
+iter_diffmap_subtract([{Ch, Diff}|T], FloatMap) ->
+ iter_diffmap_subtract(T, apply_diffmap_subtract(Ch, Diff, FloatMap));
+iter_diffmap_subtract([], FloatMap) ->
+ FloatMap.
+iter_diffmap_add([{Ch, Diff}|T], FloatMap) ->
+ iter_diffmap_add(T, apply_diffmap_add(Ch, Diff, FloatMap));
+iter_diffmap_add([], FloatMap) ->
+ FloatMap.
+apply_diffmap_subtract(Ch, Diff, [{Ch, Wt}|T]) ->
+ if Wt == Diff ->
+ [{unused, Wt}|T];
+ Wt > Diff ->
+ [{Ch, Wt - Diff}, {unused, Diff}|T];
+ Wt < Diff ->
+ [{unused, Wt}|apply_diffmap_subtract(Ch, Diff - Wt, T)]
+ end;
+apply_diffmap_subtract(Ch, Diff, [H|T]) ->
+ [H|apply_diffmap_subtract(Ch, Diff, T)];
+apply_diffmap_subtract(_Ch, _Diff, []) ->
+ [].
+apply_diffmap_add(Ch, Diff, [{unused, Wt}|T]) ->
+ if Wt == Diff ->
+ [{Ch, Wt}|T];
+ Wt > Diff ->
+ [{Ch, Diff}, {unused, Wt - Diff}|T];
+ Wt < Diff ->
+ [{Ch, Wt}|apply_diffmap_add(Ch, Diff - Wt, T)]
+ end;
+apply_diffmap_add(Ch, Diff, [H|T]) ->
+ [H|apply_diffmap_add(Ch, Diff, T)];
+apply_diffmap_add(_Ch, _Diff, []) ->
+ [].
+combine_neighbors([{Ch, Wt1}, {Ch, Wt2}|T]) ->
+ combine_neighbors([{Ch, Wt1 + Wt2}|T]);
+combine_neighbors([H|T]) ->
+ [H|combine_neighbors(T)];
+combine_neighbors([]) ->
+ [].
+collapse_unused_in_float_map([{Ch, Wt1}, {unused, Wt2}|T]) ->
+ collapse_unused_in_float_map([{Ch, Wt1 + Wt2}|T]);
+collapse_unused_in_float_map([{unused, _}] = L) ->
+ L; % Degenerate case only
+collapse_unused_in_float_map([H|T]) ->
+ [H|collapse_unused_in_float_map(T)];
+collapse_unused_in_float_map([]) ->
+ [].
+chash_float_map_to_nextfloat_list(FloatMap) when length(FloatMap) > 0 ->
+ %% QuickCheck found a bug ... need to weed out stuff smaller than
+ FM1 = [P || {_X, Y} = P <- FloatMap, Y > ?SMALLEST_SIGNIFICANT_FLOAT_SIZE],
+ {_Sum, NFs0} = lists:foldl(fun({Name, Amount}, {Sum, List}) ->
+ {Sum+Amount, [{Sum+Amount, Name}|List]}
+ end, {0, []}, FM1),
+ lists:reverse(NFs0).
+chash_nextfloat_list_to_gb_tree([]) ->
+ gb_trees:balance(gb_trees:from_orddict([]));
+chash_nextfloat_list_to_gb_tree(NextFloatList) ->
+ {_FloatPos, Name} = lists:last(NextFloatList),
+ %% QuickCheck found a bug ... it really helps to add a catch-all item
+ %% at the far "right" of the list ... 42.0 is much greater than 1.0.
+ NFs = NextFloatList ++ [{42.0, Name}],
+ gb_trees:balance(gb_trees:from_orddict(orddict:from_list(NFs))).
+-spec chash_gb_next(float(), float_tree()) -> {float(), owner_name()}.
+chash_gb_next(X, {_, GbTree}) ->
+ chash_gb_next1(X, GbTree).
+chash_gb_next1(X, {Key, Val, Left, _Right}) when X < Key ->
+ case chash_gb_next1(X, Left) of
+ nil ->
+ {Key, Val};
+ Res ->
+ Res
+ end;
+chash_gb_next1(X, {Key, _Val, _Left, Right}) when X >= Key ->
+ chash_gb_next1(X, Right);
+chash_gb_next1(_X, nil) ->
+ nil.
+%% @doc Not used directly, but can give a developer an idea of how well
+%% chash_float_map_to_nextfloat_list will do for a given value of Max.
+%% For example:
+%% NewFloatMap = make_float_map([{unused, 1.0}],
+%% [{a,100}, {b, 100}, {c, 10}]),
+%% ChashMap = chash_scale_to_int_interval(NewFloatMap, 100),
+%% io:format("QQQ: int int = ~p\n", [ChashIntInterval]),
+%% -> [{a,1,47},{b,48,94},{c,94,100}]
+%% Interpretation: out of the 100 slots:
+%% - 'a' uses the slots 1-47
+%% - 'b' uses the slots 48-94
+%% - 'c' uses the slots 95-100
+chash_scale_to_int_interval(NewFloatMap, Max) ->
+ chash_scale_to_int_interval(NewFloatMap, 0, Max).
+%% @type nextfloat_list() = list({float(), brick()}). A nextfloat_list
+%% differs from a float_map in two respects: 1) nextfloat_list contains
+%% tuples with the brick name in 2nd position, 2) the float() at each
+%% position I_n > I_m, for all n, m such that n > m.
+%% For example, a nextfloat_list of the float_map example above,
+%% [{0.25, {br1, nd1}}, {0.75, {br2, nd1}}, {1.0, {br3, nd1}].
+chash_scale_to_int_interval([{Ch, _Wt}], Cur, Max) ->
+ [{Ch, Cur, Max}];
+chash_scale_to_int_interval([{Ch, Wt}|T], Cur, Max) ->
+ Int = trunc(Wt * Max),
+ [{Ch, Cur + 1, Cur + Int}|chash_scale_to_int_interval(T, Cur + Int, Max)].
+%% @doc Make a pretty/human-friendly version of a float map that describes
+%% integer ranges between 1 and `Scale'.
+-spec pretty_with_integers(float_map(), integer()) -> [owner_int_range()].
+pretty_with_integers(Map, Scale) ->
+ chash_scale_to_int_interval(Map, Scale).
+%% @doc Make a pretty/human-friendly version of a float map (based
+%% upon a float map created from `OldWeights' and `NewWeights') that
+%% describes integer ranges between 1 and `Scale'.
+-spec pretty_with_integers(owner_weight_list(), owner_weight_list(),integer())->
+ [owner_int_range()].
+pretty_with_integers(OldWeights, NewWeights, Scale) ->
+ chash_scale_to_int_interval(
+ make_float_map(make_float_map(OldWeights),
+ NewWeights),
+ Scale).
+%% @doc Create a float tree, which is the rapid lookup data structure
+%% for consistent hash queries.
+-spec make_tree(float_map()) -> float_tree().
+make_tree(Map) ->
+ chash_nextfloat_list_to_gb_tree(
+ chash_float_map_to_nextfloat_list(Map)).
+%% @doc Low-level function for querying a float tree: the (floating
+%% point) point within the unit interval.
+-spec query_tree(float(), float_tree()) -> {float(), owner_name()}.
+query_tree(Val, Tree) when is_float(Val), 0.0 =< Val, Val =< 1.0 ->
+ chash_gb_next(Val, Tree).
+%% @doc Create a sample float map.
+-spec make_demo_map1() -> float_map().
+make_demo_map1() ->
+ {_, Res} = make_demo_map1_i(),
+ Res.
+make_demo_map1_i() ->
+ Fail1 = {b, 100},
+ L1 = [{a, 100}, Fail1, {c, 100}],
+ L2 = L1 ++ [{d, 100}, {e, 100}],
+ L3 = L2 -- [Fail1],
+ L4 = L3 ++ [{giant, 300}],
+ {L4, lists:foldl(fun(New, Old) -> make_float_map(Old, New) end,
+ make_float_map(L1), [L2, L3, L4])}.
+%% @doc Create a sample float map.
+-spec make_demo_map2() -> float_map().
+make_demo_map2() ->
+ {L0, _} = make_demo_map1_i(),
+ L1 = L0 ++ [{h, 100}],
+ L2 = L1 ++ [{i, 100}],
+ L3 = L2 ++ [{j, 100}],
+ lists:foldl(fun(New, Old) -> make_float_map(Old, New) end,
+ make_demo_map1(), [L1, L2, L3]).
+%% @doc Create a human-friendly summary of a float map.
+%% The two parts of the summary are: a per-owner total of the unit
+%% interval range(s) owned by each owner, and a total sum of all
+%% per-owner ranges (which should be 1.0 but is not enforced).
+-spec sum_map_weights(float_map()) ->
+ {{per_owner, float_map()}, {weight_sum, float()}}.
+sum_map_weights(Map) ->
+ L = sum_map_weights(lists:sort(Map), undefined, 0.0) -- [{undefined,0.0}],
+ WeightSum = lists:sum([Weight || {_, Weight} <- L]),
+ {{per_owner, L}, {weight_sum, WeightSum}}.
+sum_map_weights([{SZ, Weight}|T], SZ, SZ_total) ->
+ sum_map_weights(T, SZ, SZ_total + Weight);
+sum_map_weights([{SZ, Weight}|T], LastSZ, LastSZ_total) ->
+ [{LastSZ, LastSZ_total}|sum_map_weights(T, SZ, Weight)];
+sum_map_weights([], LastSZ, LastSZ_total) ->
+ [{LastSZ, LastSZ_total}].
+%% @doc Query a float map with a binary (inefficient).
+-spec hash_binary_via_float_map(binary(), float_map()) ->
+ {float(), owner_name()}.
+hash_binary_via_float_map(Key, Map) ->
+ Tree = make_tree(Map),
+ <> = crypto:hash(sha, Key),
+ Float = Int / ?SHA_MAX,
+ query_tree(Float, Tree).
+%% @doc Query a float tree with a binary.
+-spec hash_binary_via_float_tree(binary(), float_tree()) ->
+ {float(), owner_name()}.
+hash_binary_via_float_tree(Key, Tree) ->
+ <> = crypto:hash(sha, Key),
+ Float = Int / ?SHA_MAX,
+ query_tree(Float, Tree).
+%%%%% @doc Various usage examples, see source code below this function
+%%%%% for full details.
+zzz_usage_details() ->
+%% %% Make a map. See the code for make_demo_map1() for the order of
+%% %% additions & deletions. Here's a brief summary of the 4 steps.
+%% %%
+%% %% * 'a' through 'e' are weighted @ 100.
+%% %% * 'giant' is weighted @ 300.
+%% %% * 'b' is removed at step #3.
+%% 40> M1 = machi_chash:make_demo_map1().
+%% [{a,0.09285714285714286},
+%% {giant,0.10714285714285715},
+%% {d,0.026190476190476153},
+%% {giant,0.10714285714285715},
+%% {a,0.04999999999999999},
+%% {giant,0.04999999999999999},
+%% {d,0.04999999999999999},
+%% {giant,0.050000000000000044},
+%% {d,0.06666666666666671},
+%% {e,0.009523809523809434},
+%% {giant,0.05714285714285716},
+%% {c,0.14285714285714285},
+%% {giant,0.05714285714285716},
+%% {e,0.13333333333333341}]
+%% %% Map M1 onto the interval of integers 0-10,1000
+%% %%
+%% %% output = list({SZ_name::term(), Start::integer(), End::integer()})
+%% 41> machi_chash:pretty_with_integers(M1, 10*1000).
+%% [{a,1,928},
+%% {giant,929,1999},
+%% {d,2000,2260},
+%% {giant,2261,3331},
+%% {a,3332,3830},
+%% {giant,3831,4329},
+%% {d,4330,4828},
+%% {giant,4829,5328},
+%% {d,5329,5994},
+%% {e,5995,6089},
+%% {giant,6090,6660},
+%% {c,6661,8088},
+%% {giant,8089,8659},
+%% {e,8659,10000}]
+%% %% Sum up all of the weights, make sure it's what we expect:
+%% 55> machi_chash:sum_map_weights(M1).
+%% {{per_owner,[{a,0.14285714285714285},
+%% {c,0.14285714285714285},
+%% {d,0.14285714285714285},
+%% {e,0.14285714285714285},
+%% {giant,0.42857142857142866}]},
+%% {weight_sum,1.0}}
+%% %% Make a tree, then query it
+%% %% (Hash::float(), tree()) -> {NextLargestBoundary::float(), szone()}
+%% 58> T1 = machi_chash:make_tree(M1).
+%% 59> machi_chash:query_tree(0.2555, T1).
+%% {0.3333333333333333,giant}
+%% 60> machi_chash:query_tree(0.3555, T1).
+%% {0.3833333333333333,a}
+%% 61> machi_chash:query_tree(0.4555, T1).
+%% {0.4833333333333333,d}
+%% %% How about hashing a bunch of strings and see what happens?
+%% 74> Key1 = "Hello, world!".
+%% "Hello, world!"
+%% 75> [{K, element(2, machi_chash:hash_binary_via_float_map(K, M1))} || K <- [lists:sublist(Key1, X) || X <- lists:seq(1, length(Key1))]].
+%% [{"H",giant},
+%% {"He",giant},
+%% {"Hel",giant},
+%% {"Hell",e},
+%% {"Hello",e},
+%% {"Hello,",giant},
+%% {"Hello, ",e},
+%% {"Hello, w",e},
+%% {"Hello, wo",giant},
+%% {"Hello, wor",d},
+%% {"Hello, worl",giant},
+%% {"Hello, world",e},
+%% {"Hello, world!",d}]
+ ok.
diff --git a/src/machi_flu1.erl b/src/machi_flu1.erl
new file mode 100644
index 0000000..02f7925
--- /dev/null
+++ b/src/machi_flu1.erl
@@ -0,0 +1,589 @@
+%% -------------------------------------------------------------------
+%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%% http://www.apache.org/licenses/LICENSE-2.0
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%% -------------------------------------------------------------------
+-export([start_link/1, stop/1]).
+-record(state, {
+ reg_name :: atom(),
+ proj_store :: pid(),
+ append_pid :: pid(),
+ tcp_port :: non_neg_integer(),
+ data_dir :: string(),
+ wedge = true :: 'disabled' | boolean(),
+ my_epoch_id :: 'undefined',
+ dbg_props = [] :: list(), % proplist
+ props = [] :: list() % proplist
+ }).
+start_link([{FluName, TcpPort, DataDir}|Rest])
+ when is_atom(FluName), is_integer(TcpPort), is_list(DataDir) ->
+ {ok, spawn_link(fun() -> main2(FluName, TcpPort, DataDir, Rest) end)}.
+stop(Pid) ->
+ case erlang:is_process_alive(Pid) of
+ true ->
+ Pid ! forever,
+ ok;
+ false ->
+ error
+ end.
+main2(RegName, TcpPort, DataDir, Rest) ->
+ S0 = #state{reg_name=RegName,
+ tcp_port=TcpPort,
+ data_dir=DataDir,
+ props=Rest},
+ AppendPid = start_append_server(S0),
+ ProjRegName = make_projection_server_regname(RegName),
+ {ok, ProjectionPid} =
+ machi_projection_store:start_link(ProjRegName, DataDir, AppendPid),
+ S1 = S0#state{append_pid=AppendPid,
+ proj_store=ProjectionPid},
+ S2 = case proplists:get_value(dbg, Rest) of
+ undefined ->
+ S1;
+ DbgProps ->
+ S1#state{wedge=disabled,
+ dbg_props=DbgProps,
+ props=lists:keydelete(dbg, 1, Rest)}
+ end,
+ ListenPid = start_listen_server(S2),
+ Config_e = machi_util:make_config_filename(DataDir, "unused"),
+ ok = filelib:ensure_dir(Config_e),
+ {_, Data_e} = machi_util:make_data_filename(DataDir, "unused"),
+ ok = filelib:ensure_dir(Data_e),
+ Projection_e = machi_util:make_projection_filename(DataDir, "unused"),
+ ok = filelib:ensure_dir(Projection_e),
+ put(flu_reg_name, RegName),
+ put(flu_append_pid, AppendPid),
+ put(flu_projection_pid, ProjectionPid),
+ put(flu_listen_pid, ListenPid),
+ receive forever -> ok end.
+start_listen_server(S) ->
+ spawn_link(fun() -> run_listen_server(S) end).
+start_append_server(S) ->
+ spawn_link(fun() -> run_append_server(S) end).
+%% start_projection_server(S) ->
+%% spawn_link(fun() -> run_projection_server(S) end).
+run_listen_server(#state{tcp_port=TcpPort}=S) ->
+ SockOpts = [{reuseaddr, true},
+ {mode, binary}, {active, false}, {packet, line}],
+ {ok, LSock} = gen_tcp:listen(TcpPort, SockOpts),
+ listen_server_loop(LSock, S).
+run_append_server(#state{reg_name=Name}=S) ->
+ register(Name, self()),
+ append_server_loop(S).
+listen_server_loop(LSock, S) ->
+ {ok, Sock} = gen_tcp:accept(LSock),
+ spawn_link(fun() -> net_server_loop(Sock, S) end),
+ listen_server_loop(LSock, S).
+append_server_loop(#state{data_dir=DataDir}=S) ->
+ receive
+ {seq_append, From, Prefix, Chunk, CSum} ->
+ spawn(fun() -> append_server_dispatch(From, Prefix, Chunk, CSum,
+ DataDir) end),
+ append_server_loop(S);
+ {wedge_state_change, Boolean} ->
+ append_server_loop(S#state{wedge=Boolean})
+ end.
+-define(EpochIDSpace, (4+20)).
+net_server_loop(Sock, #state{reg_name=RegName, data_dir=DataDir}=S) ->
+ ok = inet:setopts(Sock, [{packet, line}]),
+ case gen_tcp:recv(Sock, 0, 60*1000) of
+ {ok, Line} ->
+ %% machi_util:verb("Got: ~p\n", [Line]),
+ PrefixLenLF = byte_size(Line) - 2 - ?EpochIDSpace - 8 - 1,
+ FileLenLF = byte_size(Line) - 2 - ?EpochIDSpace - 16 - 8 - 1,
+ CSumFileLenLF = byte_size(Line) - 2 - ?EpochIDSpace - 1,
+ WriteFileLenLF = byte_size(Line) - 7 - ?EpochIDSpace - 16 - 8 - 1,
+ DelFileLenLF = byte_size(Line) - 14 - ?EpochIDSpace - 1,
+ case Line of
+ %% For normal use
+ <<"A ",
+ _EpochIDRaw:(?EpochIDSpace)/binary,
+ LenHex:8/binary,
+ Prefix:PrefixLenLF/binary, "\n">> ->
+ do_net_server_append(RegName, Sock, LenHex, Prefix);
+ <<"R ",
+ _EpochIDRaw:(?EpochIDSpace)/binary,
+ OffsetHex:16/binary, LenHex:8/binary,
+ File:FileLenLF/binary, "\n">> ->
+ do_net_server_read(Sock, OffsetHex, LenHex, File, DataDir);
+ <<"L ", _EpochIDRaw:(?EpochIDSpace)/binary, "\n">> ->
+ do_net_server_listing(Sock, DataDir);
+ <<"C ",
+ _EpochIDRaw:(?EpochIDSpace)/binary,
+ File:CSumFileLenLF/binary, "\n">> ->
+ do_net_server_checksum_listing(Sock, File, DataDir);
+ <<"QUIT\n">> ->
+ catch gen_tcp:close(Sock),
+ exit(normal);
+ <<"QUIT\r\n">> ->
+ catch gen_tcp:close(Sock),
+ exit(normal);
+ %% For "internal" replication only.
+ <<"W-repl ",
+ _EpochIDRaw:(?EpochIDSpace)/binary,
+ OffsetHex:16/binary, LenHex:8/binary,
+ File:WriteFileLenLF/binary, "\n">> ->
+ do_net_server_write(Sock, OffsetHex, LenHex, File, DataDir);
+ %% For data migration only.
+ <<"DEL-migration ",
+ _EpochIDRaw:(?EpochIDSpace)/binary,
+ File:DelFileLenLF/binary, "\n">> ->
+ do_net_server_delete_migration_only(Sock, File, DataDir);
+ %% For erasure coding hackityhack
+ <<"TRUNC-hack--- ",
+ _EpochIDRaw:(?EpochIDSpace)/binary,
+ File:DelFileLenLF/binary, "\n">> ->
+ do_net_server_truncate_hackityhack(Sock, File, DataDir);
+ <<"PROJ ", LenHex:8/binary, "\n">> ->
+ do_projection_command(Sock, LenHex, S);
+ _ ->
+ machi_util:verb("Else Got: ~p\n", [Line]),
+ gen_tcp:send(Sock, "ERROR SYNTAX\n"),
+ catch gen_tcp:close(Sock),
+ exit(normal)
+ end,
+ net_server_loop(Sock, S);
+ _ ->
+ catch gen_tcp:close(Sock),
+ exit(normal)
+ end.
+append_server_dispatch(From, Prefix, Chunk, CSum, DataDir) ->
+ Pid = write_server_get_pid(Prefix, DataDir),
+ Pid ! {seq_append, From, Prefix, Chunk, CSum},
+ exit(normal).
+do_net_server_append(RegName, Sock, LenHex, Prefix) ->
+ %% TODO: robustify against other invalid path characters such as NUL
+ case sanitize_file_string(Prefix) of
+ ok ->
+ do_net_server_append2(RegName, Sock, LenHex, Prefix);
+ _ ->
+ ok = gen_tcp:send(Sock, <<"ERROR BAD-ARG">>)
+ end.
+sanitize_file_string(Str) ->
+ case re:run(Str, "/") of
+ nomatch ->
+ ok;
+ _ ->
+ error
+ end.
+do_net_server_append2(RegName, Sock, LenHex, Prefix) ->
+ <> = machi_util:hexstr_to_bin(LenHex),
+ ok = inet:setopts(Sock, [{packet, raw}]),
+ {ok, Chunk} = gen_tcp:recv(Sock, Len, 60*1000),
+ CSum = machi_util:checksum(Chunk),
+ try
+ RegName ! {seq_append, self(), Prefix, Chunk, CSum}
+ catch error:badarg ->
+ error_logger:error_msg("Message send to ~p gave badarg, make certain server is running with correct registered name\n", [?MODULE])
+ end,
+ receive
+ {assignment, Offset, File} ->
+ OffsetHex = machi_util:bin_to_hexstr(<>),
+ Out = io_lib:format("OK ~s ~s\n", [OffsetHex, File]),
+ ok = gen_tcp:send(Sock, Out)
+ after 10*1000 ->
+ ok = gen_tcp:send(Sock, "TIMEOUT\n")
+ end.
+do_net_server_read(Sock, OffsetHex, LenHex, FileBin, DataDir) ->
+ DoItFun = fun(FH, Offset, Len) ->
+ case file:pread(FH, Offset, Len) of
+ {ok, Bytes} when byte_size(Bytes) == Len ->
+ gen_tcp:send(Sock, ["OK\n", Bytes]);
+ {ok, Bytes} ->
+ machi_util:verb("ok read but wanted ~p got ~p: ~p @ offset ~p\n",
+ [Len, size(Bytes), FileBin, Offset]),
+ ok = gen_tcp:send(Sock, "ERROR PARTIAL-READ\n");
+ eof ->
+ perhaps_do_net_server_ec_read(Sock, FH);
+ _Else2 ->
+ machi_util:verb("Else2 ~p ~p ~P\n",
+ [Offset, Len, _Else2, 20]),
+ ok = gen_tcp:send(Sock, "ERROR BAD-READ\n")
+ end
+ end,
+ do_net_server_readwrite_common(Sock, OffsetHex, LenHex, FileBin, DataDir,
+ [read, binary, raw], DoItFun).
+do_net_server_readwrite_common(Sock, OffsetHex, LenHex, FileBin, DataDir,
+ FileOpts, DoItFun) ->
+ case sanitize_file_string(FileBin) of
+ ok ->
+ do_net_server_readwrite_common2(Sock, OffsetHex, LenHex, FileBin,
+ DataDir, FileOpts, DoItFun);
+ _ ->
+ ok = gen_tcp:send(Sock, <<"ERROR BAD-ARG\n">>)
+ end.
+do_net_server_readwrite_common2(Sock, OffsetHex, LenHex, FileBin, DataDir,
+ FileOpts, DoItFun) ->
+ <> = machi_util:hexstr_to_bin(OffsetHex),
+ <> = machi_util:hexstr_to_bin(LenHex),
+ {_, Path} = machi_util:make_data_filename(DataDir, FileBin),
+ OptsHasWrite = lists:member(write, FileOpts),
+ case file:open(Path, FileOpts) of
+ {ok, FH} ->
+ try
+ DoItFun(FH, Offset, Len)
+ after
+ file:close(FH)
+ end;
+ {error, enoent} when OptsHasWrite ->
+ do_net_server_readwrite_common(
+ Sock, OffsetHex, LenHex, FileBin, DataDir,
+ FileOpts, DoItFun);
+ _Else ->
+ %%%%%% keep?? machi_util:verb("Else ~p ~p ~p ~p\n", [Offset, Len, Path, _Else]),
+ ok = gen_tcp:send(Sock, <<"ERROR BAD-IO\n">>)
+ end.
+do_net_server_write(Sock, OffsetHex, LenHex, FileBin, DataDir) ->
+ CSumPath = machi_util:make_checksum_filename(DataDir, FileBin),
+ case file:open(CSumPath, [append, raw, binary, delayed_write]) of
+ {ok, FHc} ->
+ do_net_server_write2(Sock, OffsetHex, LenHex, FileBin, DataDir, FHc);
+ {error, enoent} ->
+ ok = filelib:ensure_dir(CSumPath),
+ do_net_server_write(Sock, OffsetHex, LenHex, FileBin, DataDir)
+ end.
+do_net_server_write2(Sock, OffsetHex, LenHex, FileBin, DataDir, FHc) ->
+ DoItFun = fun(FHd, Offset, Len) ->
+ ok = inet:setopts(Sock, [{packet, raw}]),
+ {ok, Chunk} = gen_tcp:recv(Sock, Len),
+ CSum = machi_util:checksum(Chunk),
+ case file:pwrite(FHd, Offset, Chunk) of
+ ok ->
+ CSumHex = machi_util:bin_to_hexstr(CSum),
+ CSum_info = [OffsetHex, 32, LenHex, 32, CSumHex, 10],
+ ok = file:write(FHc, CSum_info),
+ ok = file:close(FHc),
+ gen_tcp:send(Sock, <<"OK\n">>);
+ _Else3 ->
+ machi_util:verb("Else3 ~p ~p ~p\n",
+ [Offset, Len, _Else3]),
+ ok = gen_tcp:send(Sock, "ERROR BAD-PWRITE\n")
+ end
+ end,
+ do_net_server_readwrite_common(Sock, OffsetHex, LenHex, FileBin, DataDir,
+ [write, read, binary, raw], DoItFun).
+perhaps_do_net_server_ec_read(Sock, FH) ->
+ case file:pread(FH, 0, ?MINIMUM_OFFSET) of
+ {ok, Bin} when byte_size(Bin) == ?MINIMUM_OFFSET ->
+ decode_and_reply_net_server_ec_read(Sock, Bin);
+ {ok, _AnythingElse} ->
+ ok = gen_tcp:send(Sock, "ERROR PARTIAL-READ2\n");
+ _AnythingElse ->
+ ok = gen_tcp:send(Sock, "ERROR BAD-PREAD\n")
+ end.
+decode_and_reply_net_server_ec_read(Sock, <<"a ", Rest/binary>>) ->
+ decode_and_reply_net_server_ec_read_version_a(Sock, Rest);
+decode_and_reply_net_server_ec_read(Sock, <<0:8, _/binary>>) ->
+ ok = gen_tcp:send(Sock, <<"ERROR NOT-ERASURE\n">>).
+decode_and_reply_net_server_ec_read_version_a(Sock, Rest) ->
+ %% <> = Rest,
+ HdrLen = 80 - 2 - 4 - 1,
+ <> = Rest,
+ <> = machi_util:hexstr_to_bin(BodyLenHex),
+ <> = Rest2,
+ ok = gen_tcp:send(Sock, ["ERASURE ", BodyLenHex, " ", Hdr, Body]).
+do_net_server_listing(Sock, DataDir) ->
+ {_, WildPath} = machi_util:make_data_filename(DataDir, ""),
+ Files = filelib:wildcard("*", WildPath),
+ Out = ["OK\n",
+ [begin
+ {ok, FI} = file:read_file_info(WildPath ++ "/" ++ File),
+ Size = FI#file_info.size,
+ SizeBin = <>,
+ [machi_util:bin_to_hexstr(SizeBin), <<" ">>,
+ list_to_binary(File), <<"\n">>]
+ end || File <- Files],
+ ".\n"
+ ],
+ ok = gen_tcp:send(Sock, Out).
+do_net_server_checksum_listing(Sock, File, DataDir) ->
+ case sanitize_file_string(File) of
+ ok ->
+ do_net_server_checksum_listing2(Sock, File, DataDir);
+ _ ->
+ ok = gen_tcp:send(Sock, <<"ERROR BAD-ARG\n">>)
+ end.
+do_net_server_checksum_listing2(Sock, File, DataDir) ->
+ ok = sync_checksum_file(File),
+ CSumPath = machi_util:make_checksum_filename(DataDir, File),
+ case file:open(CSumPath, [read, raw, binary]) of
+ {ok, FH} ->
+ {ok, FI} = file:read_file_info(CSumPath),
+ Len = FI#file_info.size,
+ LenHex = list_to_binary(machi_util:bin_to_hexstr(<>)),
+ %% Client has option of line-by-line with "." terminator,
+ %% or using the offset in the OK message to slurp things
+ %% down by exact byte size.
+ ok = gen_tcp:send(Sock, [<<"OK ">>, LenHex, <<"\n">>]),
+ do_net_copy_bytes(FH, Sock),
+ ok = file:close(FH),
+ ok = gen_tcp:send(Sock, ".\n");
+ {error, enoent} ->
+ ok = gen_tcp:send(Sock, "ERROR NO-SUCH-FILE\n");
+ _ ->
+ ok = gen_tcp:send(Sock, "ERROR\n")
+ end.
+sync_checksum_file(File) ->
+ Prefix = re:replace(File, "\\..*", "", [{return, binary}]),
+ case write_server_find_pid(Prefix) of
+ undefined ->
+ ok;
+ Pid ->
+ Ref = make_ref(),
+ Pid ! {sync_stuff, self(), Ref},
+ receive
+ {sync_finished, Ref} ->
+ ok
+ after 5000 ->
+ case write_server_find_pid(Prefix) of
+ undefined ->
+ ok;
+ Pid2 when Pid2 /= Pid ->
+ ok;
+ _Pid2 ->
+ error
+ end
+ end
+ end.
+do_net_copy_bytes(FH, Sock) ->
+ case file:read(FH, 1024*1024) of
+ {ok, Bin} ->
+ ok = gen_tcp:send(Sock, Bin),
+ do_net_copy_bytes(FH, Sock);
+ eof ->
+ ok
+ end.
+do_net_server_delete_migration_only(Sock, File, DataDir) ->
+ case sanitize_file_string(File) of
+ ok ->
+ do_net_server_delete_migration_only2(Sock, File, DataDir);
+ _ ->
+ ok = gen_tcp:send(Sock, <<"ERROR BAD-ARG\n">>)
+ end.
+do_net_server_delete_migration_only2(Sock, File, DataDir) ->
+ {_, Path} = machi_util:make_data_filename(DataDir, File),
+ case file:delete(Path) of
+ ok ->
+ ok = gen_tcp:send(Sock, "OK\n");
+ {error, enoent} ->
+ ok = gen_tcp:send(Sock, "ERROR NO-SUCH-FILE\n");
+ _ ->
+ ok = gen_tcp:send(Sock, "ERROR\n")
+ end.
+do_net_server_truncate_hackityhack(Sock, File, DataDir) ->
+ case sanitize_file_string(File) of
+ ok ->
+ do_net_server_truncate_hackityhack2(Sock, File, DataDir);
+ _ ->
+ ok = gen_tcp:send(Sock, <<"ERROR BAD-ARG\n">>)
+ end.
+do_net_server_truncate_hackityhack2(Sock, File, DataDir) ->
+ {_, Path} = machi_util:make_data_filename(DataDir, File),
+ case file:open(Path, [read, write, binary, raw]) of
+ {ok, FH} ->
+ try
+ {ok, ?MINIMUM_OFFSET} = file:position(FH, ?MINIMUM_OFFSET),
+ ok = file:truncate(FH),
+ ok = gen_tcp:send(Sock, "OK\n")
+ after
+ file:close(FH)
+ end;
+ {error, enoent} ->
+ ok = gen_tcp:send(Sock, "ERROR NO-SUCH-FILE\n");
+ _ ->
+ ok = gen_tcp:send(Sock, "ERROR\n")
+ end.
+write_server_get_pid(Prefix, DataDir) ->
+ case write_server_find_pid(Prefix) of
+ undefined ->
+ start_seq_append_server(Prefix, DataDir),
+ timer:sleep(1),
+ write_server_get_pid(Prefix, DataDir);
+ Pid ->
+ Pid
+ end.
+write_server_find_pid(Prefix) ->
+ RegName = machi_util:make_regname(Prefix),
+ whereis(RegName).
+start_seq_append_server(Prefix, DataDir) ->
+ spawn_link(fun() -> run_seq_append_server(Prefix, DataDir) end).
+run_seq_append_server(Prefix, DataDir) ->
+ true = register(machi_util:make_regname(Prefix), self()),
+ run_seq_append_server2(Prefix, DataDir).
+run_seq_append_server2(Prefix, DataDir) ->
+ FileNum = machi_util:read_max_filenum(DataDir, Prefix) + 1,
+ case machi_util:increment_max_filenum(DataDir, Prefix) of
+ ok ->
+ machi_util:increment_max_filenum(DataDir, Prefix),
+ machi_util:info_msg("start: ~p server at file ~w\n",
+ [Prefix, FileNum]),
+ seq_append_server_loop(DataDir, Prefix, FileNum);
+ Else ->
+ error_logger:error_msg("start: ~p server at file ~w: ~p\n",
+ [Prefix, FileNum, Else]),
+ exit(Else)
+ end.
+seq_append_server_loop(DataDir, Prefix, FileNum) ->
+ SequencerNameHack = lists:flatten(io_lib:format(
+ "~.36B~.36B",
+ [element(3,now()),
+ list_to_integer(os:getpid())])),
+ {File, FullPath} = machi_util:make_data_filename(
+ DataDir, Prefix, SequencerNameHack, FileNum),
+ {ok, FHd} = file:open(FullPath,
+ [write, binary, raw]),
+ %% [write, binary, raw, delayed_write]),
+ CSumPath = machi_util:make_checksum_filename(
+ DataDir, Prefix, SequencerNameHack, FileNum),
+ {ok, FHc} = file:open(CSumPath, [append, raw, binary, delayed_write]),
+ seq_append_server_loop(DataDir, Prefix, File, {FHd,FHc}, FileNum,
+seq_append_server_loop(DataDir, Prefix, _File, {FHd,FHc}, FileNum, Offset)
+ when Offset > ?MAX_FILE_SIZE ->
+ ok = file:close(FHd),
+ ok = file:close(FHc),
+ machi_util:info_msg("rollover: ~p server at file ~w offset ~w\n",
+ [Prefix, FileNum, Offset]),
+ run_seq_append_server2(Prefix, DataDir);
+seq_append_server_loop(DataDir, Prefix, File, {FHd,FHc}=FH_, FileNum, Offset) ->
+ receive
+ {seq_append, From, Prefix, Chunk, CSum} ->
+ ok = file:pwrite(FHd, Offset, Chunk),
+ From ! {assignment, Offset, File},
+ Len = byte_size(Chunk),
+ OffsetHex = machi_util:bin_to_hexstr(<>),
+ LenHex = machi_util:bin_to_hexstr(<>),
+ CSumHex = machi_util:bin_to_hexstr(CSum),
+ CSum_info = [OffsetHex, 32, LenHex, 32, CSumHex, 10],
+ ok = file:write(FHc, CSum_info),
+ seq_append_server_loop(DataDir, Prefix, File, FH_,
+ FileNum, Offset + Len);
+ {sync_stuff, FromPid, Ref} ->
+ file:sync(FHc),
+ FromPid ! {sync_finished, Ref},
+ seq_append_server_loop(DataDir, Prefix, File, FH_,
+ FileNum, Offset)
+ after 30*1000 ->
+ ok = file:close(FHd),
+ ok = file:close(FHc),
+ machi_util:info_msg("stop: ~p server at file ~w offset ~w\n",
+ [Prefix, FileNum, Offset]),
+ exit(normal)
+ end.
+do_projection_command(Sock, LenHex, S) ->
+ try
+ Len = machi_util:hexstr_to_int(LenHex),
+ ok = inet:setopts(Sock, [{packet, raw}]),
+ {ok, ProjCmdBin} = gen_tcp:recv(Sock, Len),
+ ok = inet:setopts(Sock, [{packet, line}]),
+ ProjCmd = binary_to_term(ProjCmdBin),
+ put(hack, ProjCmd),
+ Res = handle_projection_command(ProjCmd, S),
+ ResBin = term_to_binary(Res),
+ ResLenHex = machi_util:int_to_hexbin(byte_size(ResBin), 32),
+ ok = gen_tcp:send(Sock, [<<"OK ">>, ResLenHex, <<"\n">>, ResBin])
+ catch
+ What:Why ->
+ io:format(user, "OOPS ~p\n", [get(hack)]),
+ io:format(user, "OOPS ~p ~p ~p\n", [What, Why, erlang:get_stacktrace()]),
+ WHA = list_to_binary(io_lib:format("TODO-YOLO.~w:~w-~w",
+ [What, Why, erlang:get_stacktrace()])),
+ _ = (catch gen_tcp:send(Sock, [<<"ERROR ">>, WHA, <<"\n">>]))
+ end.
+handle_projection_command({get_latest_epoch, ProjType},
+ #state{proj_store=ProjStore}) ->
+ machi_projection_store:get_latest_epoch(ProjStore, ProjType);
+handle_projection_command({read_latest_projection, ProjType},
+ #state{proj_store=ProjStore}) ->
+ machi_projection_store:read_latest_projection(ProjStore, ProjType);
+handle_projection_command({read_projection, ProjType, Epoch},
+ #state{proj_store=ProjStore}) ->
+ machi_projection_store:read(ProjStore, ProjType, Epoch);
+handle_projection_command({write_projection, ProjType, Proj},
+ #state{proj_store=ProjStore}) ->
+ machi_projection_store:write(ProjStore, ProjType, Proj);
+handle_projection_command({get_all, ProjType},
+ #state{proj_store=ProjStore}) ->
+ machi_projection_store:get_all(ProjStore, ProjType);
+handle_projection_command({list_all, ProjType},
+ #state{proj_store=ProjStore}) ->
+ machi_projection_store:list_all(ProjStore, ProjType);
+handle_projection_command(Else, _S) ->
+ {error, unknown_cmd, Else}.
+make_projection_server_regname(BaseName) ->
+ list_to_atom(atom_to_list(BaseName) ++ "_projection").
diff --git a/src/machi_flu1_client.erl b/src/machi_flu1_client.erl
new file mode 100644
index 0000000..6dd6c65
--- /dev/null
+++ b/src/machi_flu1_client.erl
@@ -0,0 +1,647 @@
+%% -------------------------------------------------------------------
+%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%% http://www.apache.org/licenses/LICENSE-2.0
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%% -------------------------------------------------------------------
+ %% File API
+ append_chunk/4, append_chunk/5,
+ read_chunk/5, read_chunk/6,
+ checksum_list/3, checksum_list/4,
+ list_files/2, list_files/3,
+ %% Projection API
+ get_latest_epoch/2, get_latest_epoch/3,
+ read_latest_projection/2, read_latest_projection/3,
+ read_projection/3, read_projection/4,
+ write_projection/3, write_projection/4,
+ get_all/2, get_all/3,
+ list_all/2, list_all/3,
+ %% Common API
+ quit/1
+ ]).
+%% For "internal" replication only.
+ write_chunk/5, write_chunk/6,
+ delete_migration/3, delete_migration/4,
+ trunc_hack/3, trunc_hack/4
+ ]).
+-type chunk() :: binary() | iolist(). % client can use either
+-type chunk_csum() :: {file_offset(), chunk_size(), binary()}.
+-type chunk_s() :: binary(). % server always uses binary()
+-type chunk_pos() :: {file_offset(), chunk_size(), file_name_s()}.
+-type chunk_size() :: non_neg_integer().
+-type epoch_csum() :: binary().
+-type epoch_num() :: non_neg_integer().
+-type epoch_id() :: {epoch_num(), epoch_csum()}.
+-type file_info() :: {file_size(), file_name_s()}.
+-type file_name() :: binary() | list().
+-type file_name_s() :: binary(). % server reply
+-type file_offset() :: non_neg_integer().
+-type file_size() :: non_neg_integer().
+-type file_prefix() :: binary() | list().
+-type inet_host() :: inet:ip_address() | inet:hostname().
+-type inet_port() :: inet:port_number().
+-type projection() :: #projection_v1{}.
+-type projection_type() :: 'public' | 'private'.
+%% @doc Append a chunk (binary- or iolist-style) of data to a file
+%% with `Prefix'.
+-spec append_chunk(port(), epoch_id(), file_prefix(), chunk()) ->
+ {ok, chunk_pos()} | {error, term()}.
+append_chunk(Sock, EpochID, Prefix, Chunk) ->
+ append_chunk2(Sock, EpochID, Prefix, Chunk).
+%% @doc Append a chunk (binary- or iolist-style) of data to a file
+%% with `Prefix'.
+-spec append_chunk(inet_host(), inet_port(),
+ epoch_id(), file_prefix(), chunk()) ->
+ {ok, chunk_pos()} | {error, term()}.
+append_chunk(Host, TcpPort, EpochID, Prefix, Chunk) ->
+ Sock = machi_util:connect(Host, TcpPort),
+ try
+ append_chunk2(Sock, EpochID, Prefix, Chunk)
+ after
+ catch gen_tcp:close(Sock)
+ end.
+%% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
+-spec read_chunk(port(), epoch_id(), file_name(), file_offset(), chunk_size()) ->
+ {ok, chunk_s()} | {error, term()}.
+read_chunk(Sock, EpochID, File, Offset, Size)
+ when Offset >= ?MINIMUM_OFFSET, Size >= 0 ->
+ read_chunk2(Sock, EpochID, File, Offset, Size).
+%% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
+-spec read_chunk(inet_host(), inet_port(), epoch_id(),
+ file_name(), file_offset(), chunk_size()) ->
+ {ok, chunk_s()} | {error, term()}.
+read_chunk(Host, TcpPort, EpochID, File, Offset, Size)
+ when Offset >= ?MINIMUM_OFFSET, Size >= 0 ->
+ Sock = machi_util:connect(Host, TcpPort),
+ try
+ read_chunk2(Sock, EpochID, File, Offset, Size)
+ after
+ catch gen_tcp:close(Sock)
+ end.
+%% @doc Fetch the list of chunk checksums for `File'.
+-spec checksum_list(port(), epoch_id(), file_name()) ->
+ {ok, [chunk_csum()]} | {error, term()}.
+checksum_list(Sock, EpochID, File) when is_port(Sock) ->
+ checksum_list2(Sock, EpochID, File).
+%% @doc Fetch the list of chunk checksums for `File'.
+-spec checksum_list(inet_host(), inet_port(), epoch_id(), file_name()) ->
+ {ok, [chunk_csum()]} | {error, term()}.
+checksum_list(Host, TcpPort, EpochID, File) when is_integer(TcpPort) ->
+ Sock = machi_util:connect(Host, TcpPort),
+ try
+ checksum_list2(Sock, EpochID, File)
+ after
+ catch gen_tcp:close(Sock)
+ end.
+%% @doc Fetch the list of all files on the remote FLU.
+-spec list_files(port(), epoch_id()) ->
+ {ok, [file_info()]} | {error, term()}.
+list_files(Sock, EpochID) when is_port(Sock) ->
+ list2(Sock, EpochID).
+%% @doc Fetch the list of all files on the remote FLU.
+-spec list_files(inet_host(), inet_port(), epoch_id()) ->
+ {ok, [file_info()]} | {error, term()}.
+list_files(Host, TcpPort, EpochID) when is_integer(TcpPort) ->
+ Sock = machi_util:connect(Host, TcpPort),
+ try
+ list2(Sock, EpochID)
+ after
+ catch gen_tcp:close(Sock)
+ end.
+%% @doc Get the latest epoch number from the FLU's projection store.
+-spec get_latest_epoch(port(), projection_type()) ->
+ {ok, -1|non_neg_integer()} | {error, term()}.
+get_latest_epoch(Sock, ProjType)
+ when ProjType == 'public' orelse ProjType == 'private' ->
+ get_latest_epoch2(Sock, ProjType).
+%% @doc Get the latest epoch number from the FLU's projection store.
+-spec get_latest_epoch(inet_host(), inet_port(),
+ projection_type()) ->
+ {ok, -1|non_neg_integer()} | {error, term()}.
+get_latest_epoch(Host, TcpPort, ProjType)
+ when ProjType == 'public' orelse ProjType == 'private' ->
+ Sock = machi_util:connect(Host, TcpPort),
+ try
+ get_latest_epoch2(Sock, ProjType)
+ after
+ catch gen_tcp:close(Sock)
+ end.
+%% @doc Get the latest epoch number from the FLU's projection store.
+-spec read_latest_projection(port(), projection_type()) ->
+ {ok, projection()} | {error, not_written} | {error, term()}.
+read_latest_projection(Sock, ProjType)
+ when ProjType == 'public' orelse ProjType == 'private' ->
+ read_latest_projection2(Sock, ProjType).
+%% @doc Get the latest epoch number from the FLU's projection store.
+-spec read_latest_projection(inet_host(), inet_port(),
+ projection_type()) ->
+ {ok, projection()} | {error, not_written} | {error, term()}.
+read_latest_projection(Host, TcpPort, ProjType)
+ when ProjType == 'public' orelse ProjType == 'private' ->
+ Sock = machi_util:connect(Host, TcpPort),
+ try
+ read_latest_projection2(Sock, ProjType)
+ after
+ catch gen_tcp:close(Sock)
+ end.
+%% @doc Read a projection `Proj' of type `ProjType'.
+-spec read_projection(port(), projection_type(), epoch_num()) ->
+ {ok, projection()} | {error, written} | {error, term()}.
+read_projection(Sock, ProjType, Epoch)
+ when ProjType == 'public' orelse ProjType == 'private' ->
+ read_projection2(Sock, ProjType, Epoch).
+%% @doc Read a projection `Proj' of type `ProjType'.
+-spec read_projection(inet_host(), inet_port(),
+ projection_type(), epoch_num()) ->
+ {ok, projection()} | {error, written} | {error, term()}.
+read_projection(Host, TcpPort, ProjType, Epoch)
+ when ProjType == 'public' orelse ProjType == 'private' ->
+ Sock = machi_util:connect(Host, TcpPort),
+ try
+ read_projection2(Sock, ProjType, Epoch)
+ after
+ catch gen_tcp:close(Sock)
+ end.
+%% @doc Write a projection `Proj' of type `ProjType'.
+-spec write_projection(port(), projection_type(), projection()) ->
+ 'ok' | {error, written} | {error, term()}.
+write_projection(Sock, ProjType, Proj)
+ when ProjType == 'public' orelse ProjType == 'private',
+ is_record(Proj, projection_v1) ->
+ write_projection2(Sock, ProjType, Proj).
+%% @doc Write a projection `Proj' of type `ProjType'.
+-spec write_projection(inet_host(), inet_port(),
+ projection_type(), projection()) ->
+ 'ok' | {error, written} | {error, term()}.
+write_projection(Host, TcpPort, ProjType, Proj)
+ when ProjType == 'public' orelse ProjType == 'private',
+ is_record(Proj, projection_v1) ->
+ Sock = machi_util:connect(Host, TcpPort),
+ try
+ write_projection2(Sock, ProjType, Proj)
+ after
+ catch gen_tcp:close(Sock)
+ end.
+%% @doc Get all projections from the FLU's projection store.
+-spec get_all(port(), projection_type()) ->
+ {ok, [projection()]} | {error, term()}.
+get_all(Sock, ProjType)
+ when ProjType == 'public' orelse ProjType == 'private' ->
+ get_all2(Sock, ProjType).
+%% @doc Get all projections from the FLU's projection store.
+-spec get_all(inet_host(), inet_port(),
+ projection_type()) ->
+ {ok, [projection()]} | {error, term()}.
+get_all(Host, TcpPort, ProjType)
+ when ProjType == 'public' orelse ProjType == 'private' ->
+ Sock = machi_util:connect(Host, TcpPort),
+ try
+ get_all2(Sock, ProjType)
+ after
+ catch gen_tcp:close(Sock)
+ end.
+%% @doc Get all epoch numbers from the FLU's projection store.
+-spec list_all(port(), projection_type()) ->
+ {ok, [non_neg_integer()]} | {error, term()}.
+list_all(Sock, ProjType)
+ when ProjType == 'public' orelse ProjType == 'private' ->
+ list_all2(Sock, ProjType).
+%% @doc Get all epoch numbers from the FLU's projection store.
+-spec list_all(inet_host(), inet_port(),
+ projection_type()) ->
+ {ok, [non_neg_integer()]} | {error, term()}.
+list_all(Host, TcpPort, ProjType)
+ when ProjType == 'public' orelse ProjType == 'private' ->
+ Sock = machi_util:connect(Host, TcpPort),
+ try
+ list_all2(Sock, ProjType)
+ after
+ catch gen_tcp:close(Sock)
+ end.
+%% @doc Quit & close the connection to remote FLU.
+-spec quit(port()) ->
+ ok.
+quit(Sock) when is_port(Sock) ->
+ catch (_ = gen_tcp:send(Sock, <<"QUIT\n">>)),
+ catch gen_tcp:close(Sock),
+ ok.
+%% @doc Restricted API: Write a chunk of already-sequenced data to
+%% `File' at `Offset'.
+-spec write_chunk(port(), epoch_id(), file_name(), file_offset(), chunk()) ->
+ ok | {error, term()}.
+write_chunk(Sock, EpochID, File, Offset, Chunk)
+ when Offset >= ?MINIMUM_OFFSET ->
+ write_chunk2(Sock, EpochID, File, Offset, Chunk).
+%% @doc Restricted API: Write a chunk of already-sequenced data to
+%% `File' at `Offset'.
+-spec write_chunk(inet_host(), inet_port(),
+ epoch_id(), file_name(), file_offset(), chunk()) ->
+ ok | {error, term()}.
+write_chunk(Host, TcpPort, EpochID, File, Offset, Chunk)
+ when Offset >= ?MINIMUM_OFFSET ->
+ Sock = machi_util:connect(Host, TcpPort),
+ try
+ write_chunk2(Sock, EpochID, File, Offset, Chunk)
+ after
+ catch gen_tcp:close(Sock)
+ end.
+%% @doc Restricted API: Delete a file after it has been successfully
+%% migrated.
+-spec delete_migration(port(), epoch_id(), file_name()) ->
+ ok | {error, term()}.
+delete_migration(Sock, EpochID, File) when is_port(Sock) ->
+ delete_migration2(Sock, EpochID, File).
+%% @doc Restricted API: Delete a file after it has been successfully
+%% migrated.
+-spec delete_migration(inet_host(), inet_port(), epoch_id(), file_name()) ->
+ ok | {error, term()}.
+delete_migration(Host, TcpPort, EpochID, File) when is_integer(TcpPort) ->
+ Sock = machi_util:connect(Host, TcpPort),
+ try
+ delete_migration2(Sock, EpochID, File)
+ after
+ catch gen_tcp:close(Sock)
+ end.
+%% @doc Restricted API: Truncate a file after it has been successfully
+%% erasure coded.
+-spec trunc_hack(port(), epoch_id(), file_name()) ->
+ ok | {error, term()}.
+trunc_hack(Sock, EpochID, File) when is_port(Sock) ->
+ trunc_hack2(Sock, EpochID, File).
+%% @doc Restricted API: Truncate a file after it has been successfully
+%% erasure coded.
+-spec trunc_hack(inet_host(), inet_port(), epoch_id(), file_name()) ->
+ ok | {error, term()}.
+trunc_hack(Host, TcpPort, EpochID, File) when is_integer(TcpPort) ->
+ Sock = machi_util:connect(Host, TcpPort),
+ try
+ trunc_hack2(Sock, EpochID, File)
+ after
+ catch gen_tcp:close(Sock)
+ end.
+append_chunk2(Sock, EpochID, Prefix0, Chunk0) ->
+ try
+ %% TODO: add client-side checksum to the server's protocol
+ %% _ = crypto:hash(md5, Chunk),
+ Prefix = machi_util:make_binary(Prefix0),
+ Chunk = machi_util:make_binary(Chunk0),
+ Len = iolist_size(Chunk0),
+ true = (Len =< ?MAX_CHUNK_SIZE),
+ {EpochNum, EpochCSum} = EpochID,
+ EpochIDRaw = <>,
+ LenHex = machi_util:int_to_hexbin(Len, 32),
+ Cmd = [<<"A ">>, EpochIDRaw, LenHex, Prefix, 10],
+ ok = gen_tcp:send(Sock, [Cmd, Chunk]),
+ {ok, Line} = gen_tcp:recv(Sock, 0),
+ PathLen = byte_size(Line) - 3 - 16 - 1 - 1,
+ case Line of
+ <<"OK ", OffsetHex:16/binary, " ",
+ Path:PathLen/binary, _:1/binary>> ->
+ Offset = machi_util:hexstr_to_int(OffsetHex),
+ {ok, {Offset, Len, Path}};
+ <<"ERROR BAD-ARG", _/binary>> ->
+ {error, bad_arg};
+ <<"ERROR ", Rest/binary>> ->
+ {error, Rest}
+ end
+ catch
+ throw:Error ->
+ Error;
+ error:{badmatch,_}=BadMatch ->
+ {error, {badmatch, BadMatch, erlang:get_stacktrace()}}
+ end.
+read_chunk2(Sock, EpochID, File0, Offset, Size) ->
+ {EpochNum, EpochCSum} = EpochID,
+ EpochIDRaw = <>,
+ File = machi_util:make_binary(File0),
+ PrefixHex = machi_util:int_to_hexbin(Offset, 64),
+ SizeHex = machi_util:int_to_hexbin(Size, 32),
+ CmdLF = [$R, 32, EpochIDRaw, PrefixHex, SizeHex, File, 10],
+ ok = gen_tcp:send(Sock, CmdLF),
+ case gen_tcp:recv(Sock, 3) of
+ {ok, <<"OK\n">>} ->
+ {ok, _Chunk}=Res = gen_tcp:recv(Sock, Size),
+ Res;
+ {ok, Else} ->
+ {ok, OldOpts} = inet:getopts(Sock, [packet]),
+ ok = inet:setopts(Sock, [{packet, line}]),
+ {ok, Else2} = gen_tcp:recv(Sock, 0),
+ ok = inet:setopts(Sock, OldOpts),
+ case Else of
+ <<"ERA">> ->
+ {error, todo_erasure_coded}; %% escript_cc_parse_ec_info(Sock, Line, Else2);
+ <<"ERR">> ->
+ case Else2 of
+ <<"OR BAD-IO\n">> ->
+ {error, no_such_file};
+ <<"OR NOT-ERASURE\n">> ->
+ {error, no_such_file};
+ <<"OR BAD-ARG\n">> ->
+ {error, bad_arg};
+ <<"OR PARTIAL-READ\n">> ->
+ {error, partial_read};
+ _ ->
+ {error, Else2}
+ end;
+ _ ->
+ {error, {whaaa, <>}}
+ end
+ end.
+list2(Sock, EpochID) ->
+ try
+ {EpochNum, EpochCSum} = EpochID,
+ EpochIDRaw = <>,
+ ok = gen_tcp:send(Sock, [<<"L ">>, EpochIDRaw, <<"\n">>]),
+ ok = inet:setopts(Sock, [{packet, line}]),
+ {ok, <<"OK\n">>} = gen_tcp:recv(Sock, 0),
+ Res = list3(gen_tcp:recv(Sock, 0), Sock),
+ ok = inet:setopts(Sock, [{packet, raw}]),
+ {ok, Res}
+ catch
+ throw:Error ->
+ Error;
+ error:{badmatch,_}=BadMatch ->
+ {error, {badmatch, BadMatch}}
+ end.
+list3({ok, <<".\n">>}, _Sock) ->
+ [];
+list3({ok, Line}, Sock) ->
+ FileLen = byte_size(Line) - 16 - 1 - 1,
+ <> = Line,
+ Size = machi_util:hexstr_to_int(SizeHex),
+ [{Size, File}|list3(gen_tcp:recv(Sock, 0), Sock)];
+list3(Else, _Sock) ->
+ throw({server_protocol_error, Else}).
+checksum_list2(Sock, EpochID, File) ->
+ try
+ {EpochNum, EpochCSum} = EpochID,
+ EpochIDRaw = <>,
+ ok = gen_tcp:send(Sock, [<<"C ">>, EpochIDRaw, File, <<"\n">>]),
+ ok = inet:setopts(Sock, [{packet, line}]),
+ case gen_tcp:recv(Sock, 0) of
+ {ok, <<"OK ", Rest/binary>> = Line} ->
+ put(status, ok), % may be unset later
+ RestLen = byte_size(Rest) - 1,
+ <> = Rest,
+ <> = machi_util:hexstr_to_bin(LenHex),
+ ok = inet:setopts(Sock, [{packet, raw}]),
+ {ok, checksum_list_finish(checksum_list_fast(Sock, Len))};
+ {ok, <<"ERROR NO-SUCH-FILE", _/binary>>} ->
+ {error, no_such_file};
+ {ok, <<"ERROR BAD-ARG", _/binary>>} ->
+ {error, bad_arg};
+ {ok, Else} ->
+ throw({server_protocol_error, Else})
+ end
+ catch
+ throw:Error ->
+ Error;
+ error:{badmatch,_}=BadMatch ->
+ {error, {badmatch, BadMatch}}
+ end.
+checksum_list_fast(Sock, 0) ->
+ {ok, <<".\n">> = _Line} = gen_tcp:recv(Sock, 2),
+ [];
+checksum_list_fast(Sock, Remaining) ->
+ Num = erlang:min(Remaining, 1024*1024),
+ {ok, Bytes} = gen_tcp:recv(Sock, Num),
+ [Bytes|checksum_list_fast(Sock, Remaining - byte_size(Bytes))].
+checksum_list_finish(Chunks) ->
+ Bin = case Chunks of
+ [X] ->
+ X;
+ _ ->
+ iolist_to_binary(Chunks)
+ end,
+ [begin
+ CSumLen = byte_size(Line) - 16 - 1 - 8 - 1,
+ <> = Line,
+ {machi_util:hexstr_to_int(OffsetHex),
+ machi_util:hexstr_to_int(SizeHex),
+ machi_util:hexstr_to_bin(CSum)}
+ end || Line <- re:split(Bin, "\n", [{return, binary}]),
+ Line /= <<>>].
+write_chunk2(Sock, EpochID, File0, Offset, Chunk0) ->
+ try
+ {EpochNum, EpochCSum} = EpochID,
+ EpochIDRaw = <>,
+ %% TODO: add client-side checksum to the server's protocol
+ %% _ = crypto:hash(md5, Chunk),
+ File = machi_util:make_binary(File0),
+ true = (Offset >= ?MINIMUM_OFFSET),
+ OffsetHex = machi_util:int_to_hexbin(Offset, 64),
+ Chunk = machi_util:make_binary(Chunk0),
+ Len = iolist_size(Chunk0),
+ true = (Len =< ?MAX_CHUNK_SIZE),
+ LenHex = machi_util:int_to_hexbin(Len, 32),
+ Cmd = [<<"W-repl ">>, EpochIDRaw, OffsetHex,
+ LenHex, File, <<"\n">>],
+ ok = gen_tcp:send(Sock, [Cmd, Chunk]),
+ {ok, Line} = gen_tcp:recv(Sock, 0),
+ PathLen = byte_size(Line) - 3 - 16 - 1 - 1,
+ case Line of
+ <<"OK\n">> ->
+ ok;
+ <<"ERROR BAD-ARG", _/binary>> ->
+ {error, bad_arg};
+ <<"ERROR ", _/binary>>=Else ->
+ {error, {server_said, Else}}
+ end
+ catch
+ throw:Error ->
+ Error;
+ error:{badmatch,_}=BadMatch ->
+ {error, {badmatch, BadMatch, erlang:get_stacktrace()}}
+ end.
+delete_migration2(Sock, EpochID, File) ->
+ try
+ {EpochNum, EpochCSum} = EpochID,
+ EpochIDRaw = <>,
+ Cmd = [<<"DEL-migration ">>, EpochIDRaw, File, <<"\n">>],
+ ok = gen_tcp:send(Sock, Cmd),
+ ok = inet:setopts(Sock, [{packet, line}]),
+ case gen_tcp:recv(Sock, 0) of
+ {ok, <<"OK\n">>} ->
+ ok;
+ {ok, <<"ERROR NO-SUCH-FILE", _/binary>>} ->
+ {error, no_such_file};
+ {ok, <<"ERROR BAD-ARG", _/binary>>} ->
+ {error, bad_arg};
+ {ok, Else} ->
+ throw({server_protocol_error, Else})
+ end
+ catch
+ throw:Error ->
+ Error;
+ error:{badmatch,_}=BadMatch ->
+ {error, {badmatch, BadMatch}}
+ end.
+trunc_hack2(Sock, EpochID, File) ->
+ try
+ {EpochNum, EpochCSum} = EpochID,
+ EpochIDRaw = <>,
+ Cmd = [<<"TRUNC-hack--- ">>, EpochIDRaw, File, <<"\n">>],
+ ok = gen_tcp:send(Sock, Cmd),
+ ok = inet:setopts(Sock, [{packet, line}]),
+ case gen_tcp:recv(Sock, 0) of
+ {ok, <<"OK\n">>} ->
+ ok;
+ {ok, <<"ERROR NO-SUCH-FILE", _/binary>>} ->
+ {error, no_such_file};
+ {ok, <<"ERROR BAD-ARG", _/binary>>} ->
+ {error, bad_arg};
+ {ok, Else} ->
+ throw({server_protocol_error, Else})
+ end
+ catch
+ throw:Error ->
+ Error;
+ error:{badmatch,_}=BadMatch ->
+ {error, {badmatch, BadMatch}}
+ end.
+get_latest_epoch2(Sock, ProjType) ->
+ ProjCmd = {get_latest_epoch, ProjType},
+ do_projection_common(Sock, ProjCmd).
+read_latest_projection2(Sock, ProjType) ->
+ ProjCmd = {read_latest_projection, ProjType},
+ do_projection_common(Sock, ProjCmd).
+read_projection2(Sock, ProjType, Epoch) ->
+ ProjCmd = {read_projection, ProjType, Epoch},
+ do_projection_common(Sock, ProjCmd).
+write_projection2(Sock, ProjType, Proj) ->
+ ProjCmd = {write_projection, ProjType, Proj},
+ do_projection_common(Sock, ProjCmd).
+get_all2(Sock, ProjType) ->
+ ProjCmd = {get_all, ProjType},
+ do_projection_common(Sock, ProjCmd).
+list_all2(Sock, ProjType) ->
+ ProjCmd = {list_all, ProjType},
+ do_projection_common(Sock, ProjCmd).
+do_projection_common(Sock, ProjCmd) ->
+ try
+ ProjCmdBin = term_to_binary(ProjCmd),
+ Len = iolist_size(ProjCmdBin),
+ true = (Len =< ?MAX_CHUNK_SIZE),
+ LenHex = machi_util:int_to_hexbin(Len, 32),
+ Cmd = [<<"PROJ ">>, LenHex, <<"\n">>],
+ ok = gen_tcp:send(Sock, [Cmd, ProjCmdBin]),
+ ok = inet:setopts(Sock, [{packet, line}]),
+ {ok, Line} = gen_tcp:recv(Sock, 0),
+ case Line of
+ <<"OK ", ResLenHex:8/binary, "\n">> ->
+ ResLen = machi_util:hexstr_to_int(ResLenHex),
+ ok = inet:setopts(Sock, [{packet, raw}]),
+ {ok, ResBin} = gen_tcp:recv(Sock, ResLen),
+ ok = inet:setopts(Sock, [{packet, line}]),
+ binary_to_term(ResBin);
+ Else ->
+ {error, Else}
+ end
+ catch
+ throw:Error ->
+ Error;
+ error:{badmatch,_}=BadMatch ->
+ {error, {badmatch, BadMatch, erlang:get_stacktrace()}}
+ end.
diff --git a/src/machi_flu_sup.erl b/src/machi_flu_sup.erl
new file mode 100644
index 0000000..4ad26fc
--- /dev/null
+++ b/src/machi_flu_sup.erl
@@ -0,0 +1,51 @@
+%% -------------------------------------------------------------------
+%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%% http://www.apache.org/licenses/LICENSE-2.0
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%% -------------------------------------------------------------------
+%% API
+%% Supervisor callbacks
+-define(SERVER, ?MODULE).
+start_link() ->
+ supervisor:start_link({local, ?SERVER}, ?MODULE, []).
+init([]) ->
+ RestartStrategy = one_for_one,
+ MaxRestarts = 1000,
+ MaxSecondsBetweenRestarts = 3600,
+ SupFlags = {RestartStrategy, MaxRestarts, MaxSecondsBetweenRestarts},
+ Restart = permanent,
+ Shutdown = 5000,
+ Type = worker,
+ {ok, FluList} = application:get_env(machi, flu_list),
+ FluSpecs = [{FluName, {machi_flu, start_link, [FluArgs]},
+ Restart, Shutdown, Type, []} ||
+ {FluName, _Port, _Dir}=FluArgs <- FluList],
+ {ok, {SupFlags, FluSpecs}}.
diff --git a/src/machi_projection.erl b/src/machi_projection.erl
new file mode 100644
index 0000000..d4f7e42
--- /dev/null
+++ b/src/machi_projection.erl
@@ -0,0 +1,119 @@
+%% -------------------------------------------------------------------
+%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%% http://www.apache.org/licenses/LICENSE-2.0
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%% -------------------------------------------------------------------
+ new/6, new/7, new/8,
+ update_projection_checksum/1,
+ update_projection_dbg2/2,
+ compare/2,
+ make_projection_summary/1
+ ]).
+new(MyName, All_list, UPI_list, Down_list, Repairing_list, Ps) ->
+ new(0, MyName, All_list, Down_list, UPI_list, Repairing_list, Ps).
+new(EpochNum, MyName, All_list, Down_list, UPI_list, Repairing_list, Dbg) ->
+ new(EpochNum, MyName, All_list, Down_list, UPI_list, Repairing_list,
+ Dbg, []).
+new(EpochNum, MyName, All_list0, Down_list, UPI_list, Repairing_list,
+ Dbg, Dbg2)
+ when is_integer(EpochNum), EpochNum >= 0,
+ is_atom(MyName) orelse is_binary(MyName),
+ is_list(All_list0), is_list(Down_list), is_list(UPI_list),
+ is_list(Repairing_list), is_list(Dbg), is_list(Dbg2) ->
+ {All_list, MemberDict} =
+ case lists:all(fun(P) when is_record(P, p_srvr) -> true;
+ (_) -> false
+ end, All_list0) of
+ true ->
+ All = [S#p_srvr.name || S <- All_list0],
+ TmpL = [{S#p_srvr.name, S} || S <- All_list0],
+ {All, orddict:from_list(TmpL)};
+ false ->
+ All_list1 = lists:zip(All_list0,lists:seq(0,length(All_list0)-1)),
+ All_list2 = [#p_srvr{name=S, address="localhost",
+ {S, I} <- All_list1],
+ TmpL = [{S#p_srvr.name, S} || S <- All_list2],
+ {All_list0, orddict:from_list(TmpL)}
+ end,
+ true = lists:all(fun(X) when is_atom(X) orelse is_binary(X) -> true;
+ (_) -> false
+ end, All_list),
+ [true = lists:sort(SomeList) == lists:usort(SomeList) ||
+ SomeList <- [All_list, Down_list, UPI_list, Repairing_list] ],
+ AllSet = ordsets:from_list(All_list),
+ DownSet = ordsets:from_list(Down_list),
+ UPISet = ordsets:from_list(UPI_list),
+ RepairingSet = ordsets:from_list(Repairing_list),
+ true = ordsets:is_element(MyName, AllSet),
+ true = (AllSet == ordsets:union([DownSet, UPISet, RepairingSet])),
+ true = ordsets:is_disjoint(DownSet, UPISet),
+ true = ordsets:is_disjoint(DownSet, RepairingSet),
+ true = ordsets:is_disjoint(UPISet, RepairingSet),
+ P = #projection_v1{epoch_number=EpochNum,
+ creation_time=now(),
+ author_server=MyName,
+ all_members=All_list,
+ member_dict=MemberDict,
+ down=Down_list,
+ upi=UPI_list,
+ repairing=Repairing_list,
+ dbg=Dbg
+ },
+ update_projection_dbg2(update_projection_checksum(P), Dbg2).
+update_projection_checksum(P) ->
+ CSum = crypto:hash(sha,
+ term_to_binary(P#projection_v1{epoch_csum= <<>>,
+ dbg2=[]})),
+ P#projection_v1{epoch_csum=CSum}.
+update_projection_dbg2(P, Dbg2) when is_list(Dbg2) ->
+ P#projection_v1{dbg2=Dbg2}.
+-spec compare(#projection_v1{}, #projection_v1{}) ->
+ integer().
+compare(#projection_v1{epoch_number=E1, epoch_csum=C1},
+ #projection_v1{epoch_number=E1, epoch_csum=C1}) ->
+ 0;
+ #projection_v1{epoch_number=E2}) ->
+ if E1 =< E2 -> -1;
+ E1 > E2 -> 1
+ end.
+ all_members=_All_list,
+ down=Down_list,
+ author_server=Author,
+ upi=UPI_list,
+ repairing=Repairing_list,
+ dbg=Dbg, dbg2=Dbg2}) ->
+ [{epoch,EpochNum},{author,Author},
+ {upi,UPI_list},{repair,Repairing_list},{down,Down_list},
+ {d,Dbg}, {d2,Dbg2}].
diff --git a/src/machi_projection_store.erl b/src/machi_projection_store.erl
new file mode 100644
index 0000000..c88a21b
--- /dev/null
+++ b/src/machi_projection_store.erl
@@ -0,0 +1,265 @@
+%% -------------------------------------------------------------------
+%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%% http://www.apache.org/licenses/LICENSE-2.0
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%% -------------------------------------------------------------------
+%% API
+ start_link/3,
+ get_latest_epoch/2, get_latest_epoch/3,
+ read_latest_projection/2, read_latest_projection/3,
+ read/3, read/4,
+ write/3, write/4,
+ get_all/2, get_all/3,
+ list_all/2, list_all/3
+ ]).
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+-record(state, {
+ public_dir = "" :: string(),
+ private_dir = "" :: string(),
+ wedged = true :: boolean(),
+ wedge_notify_pid :: pid() | atom(),
+ max_public_epoch = -1 :: -1 | non_neg_integer(),
+ max_private_epoch = -1 :: -1 | non_neg_integer()
+ }).
+start_link(RegName, DataDir, NotifyWedgeStateChanges) ->
+ gen_server:start_link({local, RegName},
+ ?MODULE, [DataDir, NotifyWedgeStateChanges], []).
+get_latest_epoch(PidSpec, ProjType) ->
+ get_latest_epoch(PidSpec, ProjType, infinity).
+get_latest_epoch(PidSpec, ProjType, Timeout)
+ when ProjType == 'public' orelse ProjType == 'private' ->
+ g_call(PidSpec, {get_latest_epoch, ProjType}, Timeout).
+read_latest_projection(PidSpec, ProjType) ->
+ read_latest_projection(PidSpec, ProjType, infinity).
+read_latest_projection(PidSpec, ProjType, Timeout)
+ when ProjType == 'public' orelse ProjType == 'private' ->
+ g_call(PidSpec, {read_latest_projection, ProjType}, Timeout).
+read(PidSpec, ProjType, Epoch) ->
+ read(PidSpec, ProjType, Epoch, infinity).
+read(PidSpec, ProjType, Epoch, Timeout)
+ when ProjType == 'public' orelse ProjType == 'private',
+ is_integer(Epoch), Epoch >= 0 ->
+ g_call(PidSpec, {read, ProjType, Epoch}, Timeout).
+write(PidSpec, ProjType, Proj) ->
+ write(PidSpec, ProjType, Proj, infinity).
+write(PidSpec, ProjType, Proj, Timeout)
+ when ProjType == 'public' orelse ProjType == 'private',
+ is_record(Proj, projection_v1),
+ is_integer(Proj#projection_v1.epoch_number),
+ Proj#projection_v1.epoch_number >= 0 ->
+ g_call(PidSpec, {write, ProjType, Proj}, Timeout).
+get_all(PidSpec, ProjType) ->
+ get_all(PidSpec, ProjType, infinity).
+get_all(PidSpec, ProjType, Timeout)
+ when ProjType == 'public' orelse ProjType == 'private' ->
+ g_call(PidSpec, {get_all, ProjType}, Timeout).
+list_all(PidSpec, ProjType) ->
+ list_all(PidSpec, ProjType, infinity).
+list_all(PidSpec, ProjType, Timeout)
+ when ProjType == 'public' orelse ProjType == 'private' ->
+ g_call(PidSpec, {list_all, ProjType}, Timeout).
+g_call(PidSpec, Arg, Timeout) ->
+ LC1 = lclock_get(),
+ {Res, LC2} = gen_server:call(PidSpec, {Arg, LC1}, Timeout),
+ lclock_update(LC2),
+ Res.
+init([DataDir, NotifyWedgeStateChanges]) ->
+ lclock_init(),
+ PublicDir = machi_util:make_projection_filename(DataDir, "public"),
+ PrivateDir = machi_util:make_projection_filename(DataDir, "private"),
+ ok = filelib:ensure_dir(PublicDir ++ "/ignored"),
+ ok = filelib:ensure_dir(PrivateDir ++ "/ignored"),
+ MaxPublicEpoch = find_max_epoch(PublicDir),
+ MaxPrivateEpoch = find_max_epoch(PrivateDir),
+ {ok, #state{public_dir=PublicDir,
+ private_dir=PrivateDir,
+ wedged=true,
+ wedge_notify_pid=NotifyWedgeStateChanges,
+ max_public_epoch=MaxPublicEpoch,
+ max_private_epoch=MaxPrivateEpoch}}.
+handle_call({{get_latest_epoch, ProjType}, LC1}, _From, S) ->
+ LC2 = lclock_update(LC1),
+ Epoch = if ProjType == public -> S#state.max_public_epoch;
+ ProjType == private -> S#state.max_private_epoch
+ end,
+ {reply, {{ok, Epoch}, LC2}, S};
+handle_call({{read_latest_projection, ProjType}, LC1}, _From, S) ->
+ LC2 = lclock_update(LC1),
+ Epoch = if ProjType == public -> S#state.max_public_epoch;
+ ProjType == private -> S#state.max_private_epoch
+ end,
+ {Reply, NewS} = do_proj_read(ProjType, Epoch, S),
+ {reply, {Reply, LC2}, NewS};
+handle_call({{read, ProjType, Epoch}, LC1}, _From, S) ->
+ LC2 = lclock_update(LC1),
+ {Reply, NewS} = do_proj_read(ProjType, Epoch, S),
+ {reply, {Reply, LC2}, NewS};
+handle_call({{write, ProjType, Proj}, LC1}, _From, S) ->
+ LC2 = lclock_update(LC1),
+ {Reply, NewS} = do_proj_write(ProjType, Proj, S),
+ {reply, {Reply, LC2}, NewS};
+handle_call({{get_all, ProjType}, LC1}, _From, S) ->
+ LC2 = lclock_update(LC1),
+ Dir = pick_path(ProjType, S),
+ Epochs = find_all(Dir),
+ All = [begin
+ {{ok, Proj}, _} = do_proj_read(ProjType, Epoch, S),
+ Proj
+ end || Epoch <- Epochs],
+ {reply, {{ok, All}, LC2}, S};
+handle_call({{list_all, ProjType}, LC1}, _From, S) ->
+ LC2 = lclock_update(LC1),
+ Dir = pick_path(ProjType, S),
+ {reply, {{ok, find_all(Dir)}, LC2}, S};
+handle_call(_Request, _From, S) ->
+ Reply = whaaaaaaaaaaaaa,
+ {reply, Reply, S}.
+handle_cast(_Msg, S) ->
+ {noreply, S}.
+handle_info(_Info, S) ->
+ {noreply, S}.
+terminate(_Reason, _S) ->
+ ok.
+code_change(_OldVsn, S, _Extra) ->
+ {ok, S}.
+do_proj_read(_ProjType, Epoch, S) when Epoch < 0 ->
+ {{error, not_written}, S};
+do_proj_read(ProjType, Epoch, S) ->
+ Dir = pick_path(ProjType, S),
+ Path = filename:join(Dir, epoch2name(Epoch)),
+ case file:read_file(Path) of
+ {ok, Bin} ->
+ %% TODO and if Bin is corrupt? (even if binary_to_term() succeeds)
+ {{ok, binary_to_term(Bin)}, S};
+ {error, enoent} ->
+ {{error, not_written}, S};
+ {error, Else} ->
+ {{error, Else}, S}
+ end.
+do_proj_write(ProjType, #projection_v1{epoch_number=Epoch}=Proj, S) ->
+ %% TODO: We probably ought to check the projection checksum for sanity, eh?
+ Dir = pick_path(ProjType, S),
+ Path = filename:join(Dir, epoch2name(Epoch)),
+ case file:read_file_info(Path) of
+ {ok, _FI} ->
+ {{error, written}, S};
+ {error, enoent} ->
+ {ok, FH} = file:open(Path, [write, raw, binary]),
+ ok = file:write(FH, term_to_binary(Proj)),
+ ok = file:sync(FH),
+ ok = file:close(FH),
+ NewS = if ProjType == public, Epoch > S#state.max_public_epoch ->
+ io:format(user, "TODO: tell ~p we are wedged by epoch ~p\n", [S#state.wedge_notify_pid, Epoch]),
+ S#state{max_public_epoch=Epoch, wedged=true};
+ ProjType == private, Epoch > S#state.max_private_epoch ->
+ io:format(user, "TODO: tell ~p we are unwedged by epoch ~p\n", [S#state.wedge_notify_pid, Epoch]),
+ S#state{max_private_epoch=Epoch, wedged=false};
+ true ->
+ S
+ end,
+ {ok, NewS};
+ {error, Else} ->
+ {{error, Else}, S}
+ end.
+pick_path(public, S) ->
+ S#state.public_dir;
+pick_path(private, S) ->
+ S#state.private_dir.
+epoch2name(Epoch) ->
+ machi_util:int_to_hexstr(Epoch, 32).
+name2epoch(Name) ->
+ machi_util:hexstr_to_int(Name).
+find_all(Dir) ->
+ Fs = filelib:wildcard("*", Dir),
+ lists:sort([name2epoch(F) || F <- Fs]).
+find_max_epoch(Dir) ->
+ Fs = lists:sort(filelib:wildcard("*", Dir)),
+ if Fs == [] ->
+ -1;
+ true ->
+ name2epoch(lists:last(Fs))
+ end.
+lclock_init() ->
+ lamport_clock:init().
+lclock_get() ->
+ lamport_clock:get().
+lclock_update(LC) ->
+ lamport_clock:update(LC).
+-else. % TEST
+lclock_init() ->
+ ok.
+lclock_get() ->
+ ok.
+lclock_update(_LC) ->
+ ok.
+-endif. % TEST
diff --git a/src/machi_sequencer.erl b/src/machi_sequencer.erl
new file mode 100644
index 0000000..ddd81a5
--- /dev/null
+++ b/src/machi_sequencer.erl
@@ -0,0 +1,191 @@
+%% -------------------------------------------------------------------
+%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%% http://www.apache.org/licenses/LICENSE-2.0
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%% -------------------------------------------------------------------
+-define(CONFIG_DIR, "./config").
+-define(DATA_DIR, "./data").
+seq(Server, Prefix, Size) when is_binary(Prefix), is_integer(Size), Size > -1 ->
+ Server ! {seq, self(), Prefix, Size},
+ receive
+ {assignment, File, Offset} ->
+ {File, Offset}
+ after 1*1000 ->
+ bummer
+ end.
+seq_direct(Prefix, Size) when is_binary(Prefix), is_integer(Size), Size > -1 ->
+ RegName = make_regname(Prefix),
+ seq(RegName, Prefix, Size).
+start_server() ->
+ start_server(?MODULE).
+start_server(Name) ->
+ spawn_link(fun() -> run_server(Name) end).
+run_server(Name) ->
+ register(Name, self()),
+ ets:new(?MODULE, [named_table, public, {write_concurrency, true}]),
+ server_loop().
+server_loop() ->
+ receive
+ {seq, From, Prefix, Size} ->
+ spawn(fun() -> server_dispatch(From, Prefix, Size) end),
+ server_loop()
+ end.
+server_dispatch(From, Prefix, Size) ->
+ RegName = make_regname(Prefix),
+ case whereis(RegName) of
+ undefined ->
+ start_prefix_server(Prefix),
+ timer:sleep(1),
+ server_dispatch(From, Prefix, Size);
+ Pid ->
+ Pid ! {seq, From, Prefix, Size}
+ end,
+ exit(normal).
+start_prefix_server(Prefix) ->
+ spawn(fun() -> run_prefix_server(Prefix) end).
+run_prefix_server(Prefix) ->
+ true = register(make_regname(Prefix), self()),
+ ok = filelib:ensure_dir(?CONFIG_DIR ++ "/unused"),
+ ok = filelib:ensure_dir(?DATA_DIR ++ "/unused"),
+ FileNum = read_max_filenum(Prefix) + 1,
+ ok = increment_max_filenum(Prefix),
+ prefix_server_loop(Prefix, FileNum).
+prefix_server_loop(Prefix, FileNum) ->
+ File = make_data_filename(Prefix, FileNum),
+ prefix_server_loop(Prefix, File, FileNum, 0).
+prefix_server_loop(Prefix, File, FileNum, Offset) ->
+ receive
+ {seq, From, Prefix, Size} ->
+ From ! {assignment, File, Offset},
+ prefix_server_loop(Prefix, File, FileNum, Offset + Size)
+ after 30*1000 ->
+ io:format("timeout: ~p server stopping\n", [Prefix]),
+ exit(normal)
+ end.
+make_regname(Prefix) ->
+ erlang:binary_to_atom(Prefix, latin1).
+make_config_filename(Prefix) ->
+ lists:flatten(io_lib:format("~s/~s", [?CONFIG_DIR, Prefix])).
+make_data_filename(Prefix, FileNum) ->
+ erlang:iolist_to_binary(io_lib:format("~s/~s.~w",
+ [?DATA_DIR, Prefix, FileNum])).
+read_max_filenum(Prefix) ->
+ case file:read_file_info(make_config_filename(Prefix)) of
+ {error, enoent} ->
+ 0;
+ {ok, FI} ->
+ FI#file_info.size
+ end.
+increment_max_filenum(Prefix) ->
+ {ok, FH} = file:open(make_config_filename(Prefix), [append]),
+ ok = file:write(FH, "x"),
+ %% ok = file:sync(FH),
+ ok = file:close(FH).
+%% basho_bench callbacks
+-define(SEQ, ?MODULE).
+new(1) ->
+ start_server(),
+ timer:sleep(100),
+ {ok, unused};
+new(_Id) ->
+ {ok, unused}.
+run(null, _KeyGen, _ValgueGen, State) ->
+ {ok, State};
+run(keygen_then_null, KeyGen, _ValgueGen, State) ->
+ _Prefix = KeyGen(),
+ {ok, State};
+run(seq, KeyGen, _ValgueGen, State) ->
+ Prefix = KeyGen(),
+ {_, _} = ?SEQ:seq(?SEQ, Prefix, 1),
+ {ok, State};
+run(seq_direct, KeyGen, _ValgueGen, State) ->
+ Prefix = KeyGen(),
+ Name = ?SEQ:make_regname(Prefix),
+ case get(Name) of
+ undefined ->
+ case whereis(Name) of
+ undefined ->
+ {_, _} = ?SEQ:seq(?SEQ, Prefix, 1);
+ Pid ->
+ put(Name, Pid),
+ {_, _} = ?SEQ:seq(Pid, Prefix, 1)
+ end;
+ Pid ->
+ {_, _} = ?SEQ:seq(Pid, Prefix, 1)
+ end,
+ {ok, State};
+run(seq_ets, KeyGen, _ValgueGen, State) ->
+ Tab = ?MODULE,
+ Prefix = KeyGen(),
+ Res = try
+ BigNum = ets:update_counter(Tab, Prefix, 1),
+ BigBin = <>,
+ <> = BigBin,
+ %% if Offset rem 1000 == 0 ->
+ %% io:format("~p,~p ", [FileNum, Offset]);
+ %% true ->
+ %% ok
+ %% end,
+ {fakefake, FileNum, Offset}
+ catch error:badarg ->
+ FileNum2 = 1, Offset2 = 0,
+ FileBin = <>,
+ OffsetBin = <>,
+ Glop = <>,
+ <> = Glop,
+ %% if Prefix == <<"42">> -> io:format("base:~w\n", [Base]); true -> ok end,
+ %% Base = 0,
+ case ets:insert_new(Tab, {Prefix, Base}) of
+ true ->
+ {<<"fakefakefake">>, Base};
+ false ->
+ Result2 = ets:update_counter(Tab, Prefix, 1),
+ {<<"fakefakefake">>, Result2}
+ end
+ end,
+ Res = Res,
+ {ok, State}.
diff --git a/src/machi_sup.erl b/src/machi_sup.erl
new file mode 100644
index 0000000..dcaadbe
--- /dev/null
+++ b/src/machi_sup.erl
@@ -0,0 +1,55 @@
+%% -------------------------------------------------------------------
+%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%% http://www.apache.org/licenses/LICENSE-2.0
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%% -------------------------------------------------------------------
+%% API
+%% Supervisor callbacks
+-define(SERVER, ?MODULE).
+start_link() ->
+ supervisor:start_link({local, ?SERVER}, ?MODULE, []).
+init([]) ->
+ RestartStrategy = one_for_one,
+ MaxRestarts = 1000,
+ MaxSecondsBetweenRestarts = 3600,
+ SupFlags = {RestartStrategy, MaxRestarts, MaxSecondsBetweenRestarts},
+ Restart = permanent,
+ Shutdown = 5000,
+ Type = supervisor,
+ ServerSup =
+ {machi_flu_sup, {machi_flu_sup, start_link, []},
+ Restart, Shutdown, Type, []},
+ {ok, {SupFlags, [ServerSup]}}.
+ %% AChild = {'AName', {'AModule', start_link, []},
+ %% Restart, Shutdown, Type, ['AModule']},
+ %% {ok, {SupFlags, [AChild]}}.
diff --git a/src/machi_util.erl b/src/machi_util.erl
new file mode 100644
index 0000000..1331d11
--- /dev/null
+++ b/src/machi_util.erl
@@ -0,0 +1,180 @@
+%% -------------------------------------------------------------------
+%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%% http://www.apache.org/licenses/LICENSE-2.0
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%% -------------------------------------------------------------------
+ checksum/1,
+ hexstr_to_bin/1, bin_to_hexstr/1,
+ hexstr_to_int/1, int_to_hexstr/2, int_to_hexbin/2,
+ make_binary/1, make_string/1,
+ make_regname/1,
+ make_checksum_filename/2, make_data_filename/2,
+ make_projection_filename/2,
+ read_max_filenum/2, increment_max_filenum/2,
+ info_msg/2, verb/1, verb/2,
+ %% TCP protocol helpers
+ connect/2
+ ]).
+append(Server, Prefix, Chunk) when is_binary(Prefix), is_binary(Chunk) ->
+ CSum = checksum(Chunk),
+ Server ! {seq_append, self(), Prefix, Chunk, CSum},
+ receive
+ {assignment, Offset, File} ->
+ {Offset, File}
+ after 10*1000 ->
+ bummer
+ end.
+make_regname(Prefix) when is_binary(Prefix) ->
+ erlang:binary_to_atom(Prefix, latin1);
+make_regname(Prefix) when is_list(Prefix) ->
+ erlang:list_to_atom(Prefix).
+make_config_filename(DataDir, Prefix) ->
+ lists:flatten(io_lib:format("~s/config/~s", [DataDir, Prefix])).
+make_checksum_filename(DataDir, Prefix, SequencerName, FileNum) ->
+ lists:flatten(io_lib:format("~s/config/~s.~s.~w.csum",
+ [DataDir, Prefix, SequencerName, FileNum])).
+make_checksum_filename(DataDir, "") ->
+ lists:flatten(io_lib:format("~s/config", [DataDir]));
+make_checksum_filename(DataDir, FileName) ->
+ lists:flatten(io_lib:format("~s/config/~s.csum", [DataDir, FileName])).
+make_data_filename(DataDir, "") ->
+ FullPath = lists:flatten(io_lib:format("~s/data", [DataDir])),
+ {"", FullPath};
+make_data_filename(DataDir, File) ->
+ FullPath = lists:flatten(io_lib:format("~s/data/~s", [DataDir, File])),
+ {File, FullPath}.
+make_data_filename(DataDir, Prefix, SequencerName, FileNum) ->
+ File = erlang:iolist_to_binary(io_lib:format("~s.~s.~w",
+ [Prefix, SequencerName, FileNum])),
+ FullPath = lists:flatten(io_lib:format("~s/data/~s", [DataDir, File])),
+ {File, FullPath}.
+make_projection_filename(DataDir, "") ->
+ lists:flatten(io_lib:format("~s/projection", [DataDir]));
+make_projection_filename(DataDir, File) ->
+ lists:flatten(io_lib:format("~s/projection/~s", [DataDir, File])).
+read_max_filenum(DataDir, Prefix) ->
+ case file:read_file_info(make_config_filename(DataDir, Prefix)) of
+ {error, enoent} ->
+ 0;
+ {ok, FI} ->
+ FI#file_info.size
+ end.
+increment_max_filenum(DataDir, Prefix) ->
+ try
+ {ok, FH} = file:open(make_config_filename(DataDir, Prefix), [append]),
+ ok = file:write(FH, "x"),
+ %% ok = file:sync(FH),
+ ok = file:close(FH)
+ catch
+ error:{badmatch,_}=Error ->
+ {error, Error, erlang:get_stacktrace()}
+ end.
+hexstr_to_bin(S) when is_list(S) ->
+ hexstr_to_bin(S, []);
+hexstr_to_bin(B) when is_binary(B) ->
+ hexstr_to_bin(binary_to_list(B), []).
+hexstr_to_bin([], Acc) ->
+ list_to_binary(lists:reverse(Acc));
+hexstr_to_bin([X,Y|T], Acc) ->
+ {ok, [V], []} = io_lib:fread("~16u", [X,Y]),
+ hexstr_to_bin(T, [V | Acc]).
+bin_to_hexstr(<<>>) ->
+ [];
+bin_to_hexstr(<>) ->
+ [hex_digit(X), hex_digit(Y)|bin_to_hexstr(Rest)].
+hex_digit(X) when X < 10 ->
+ X + $0;
+hex_digit(X) ->
+ X - 10 + $a.
+make_binary(X) when is_binary(X) ->
+ X;
+make_binary(X) when is_list(X) ->
+ iolist_to_binary(X).
+make_string(X) when is_list(X) ->
+ lists:flatten(X);
+make_string(X) when is_binary(X) ->
+ binary_to_list(X).
+hexstr_to_int(X) ->
+ B = hexstr_to_bin(X),
+ B_size = byte_size(B) * 8,
+ <> = B,
+ I.
+int_to_hexstr(I, I_size) ->
+ bin_to_hexstr(<>).
+int_to_hexbin(I, I_size) ->
+ list_to_binary(int_to_hexstr(I, I_size)).
+checksum(Bin) when is_binary(Bin) ->
+ crypto:hash(md5, Bin).
+verb(Fmt) ->
+ verb(Fmt, []).
+verb(Fmt, Args) ->
+ case application:get_env(kernel, verbose) of
+ {ok, true} -> io:format(Fmt, Args);
+ _ -> ok
+ end.
+info_msg(Fmt, Args) ->
+ case application:get_env(kernel, verbose) of {ok, false} -> ok;
+ _ -> error_logger:info_msg(Fmt, Args)
+ end.
+-spec connect(inet:ip_address() | inet:hostname(), inet:port_number()) ->
+ port().
+connect(Host, Port) ->
+ escript_connect(Host, Port).
+escript_connect(Host, PortStr) when is_list(PortStr) ->
+ Port = list_to_integer(PortStr),
+ escript_connect(Host, Port);
+escript_connect(Host, Port) when is_integer(Port) ->
+ {ok, Sock} = gen_tcp:connect(Host, Port, [{active,false}, {mode,binary},
+ {packet, raw}]),
+ Sock.
diff --git a/test/machi_admin_util_test.erl b/test/machi_admin_util_test.erl
new file mode 100644
index 0000000..8555959
--- /dev/null
+++ b/test/machi_admin_util_test.erl
@@ -0,0 +1,72 @@
+%% -------------------------------------------------------------------
+%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%% http://www.apache.org/licenses/LICENSE-2.0
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%% -------------------------------------------------------------------
+-define(FLU, machi_flu1).
+-define(FLU_C, machi_flu1_client).
+verify_file_checksums_test() ->
+ Host = "localhost",
+ TcpPort = 32958,
+ DataDir = "./data",
+ FLU1 = machi_flu1_test:setup_test_flu(verify1_flu, TcpPort, DataDir),
+ Sock1 = machi_util:connect(Host, TcpPort),
+ try
+ Prefix = <<"verify_prefix">>,
+ [{ok, _} = ?FLU_C:append_chunk(Sock1, ?DUMMY_PV1_EPOCH,
+ Prefix, <>) ||
+ X <- lists:seq(1,10)],
+ {ok, [{_FileSize,File}]} = ?FLU_C:list_files(Sock1, ?DUMMY_PV1_EPOCH),
+ {ok, []} = machi_admin_util:verify_file_checksums_remote(
+ Host, TcpPort, ?DUMMY_PV1_EPOCH, File),
+ {_, Path} = machi_util:make_data_filename(DataDir,binary_to_list(File)),
+ {ok, FH} = file:open(Path, [read,write]),
+ {ok, _} = file:position(FH, ?MINIMUM_OFFSET),
+ ok = file:write(FH, "y"),
+ ok = file:write(FH, "yo"),
+ ok = file:write(FH, "yo!"),
+ ok = file:close(FH),
+ %% Check the local flavor of the API
+ {ok, Res1} = machi_admin_util:verify_file_checksums_local(
+ Host, TcpPort, ?DUMMY_PV1_EPOCH, Path),
+ 3 = length(Res1),
+ %% Check the remote flavor of the API
+ {ok, Res2} = machi_admin_util:verify_file_checksums_remote(
+ Host, TcpPort, ?DUMMY_PV1_EPOCH, File),
+ 3 = length(Res2),
+ ok
+ after
+ catch ?FLU_C:quit(Sock1),
+ ok = ?FLU:stop(FLU1)
+ end.
+-endif. % TEST
diff --git a/test/machi_flu1_test.erl b/test/machi_flu1_test.erl
new file mode 100644
index 0000000..136d6d0
--- /dev/null
+++ b/test/machi_flu1_test.erl
@@ -0,0 +1,156 @@
+%% -------------------------------------------------------------------
+%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%% http://www.apache.org/licenses/LICENSE-2.0
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%% -------------------------------------------------------------------
+-define(FLU, machi_flu1).
+-define(FLU_C, machi_flu1_client).
+setup_test_flu(RegName, TcpPort, DataDir) ->
+ setup_test_flu(RegName, TcpPort, DataDir, []).
+setup_test_flu(RegName, TcpPort, DataDir, DbgProps) ->
+ clean_up_data_dir(DataDir),
+ {ok, FLU1} = ?FLU:start_link([{RegName, TcpPort, DataDir},
+ {dbg, DbgProps}]),
+ %% TODO the process structuring/racy-ness of the various processes
+ %% of the FLU needs to be deterministic to remove this sleep race
+ %% "prevention".
+ timer:sleep(10),
+ FLU1.
+flu_smoke_test() ->
+ Host = "localhost",
+ TcpPort = 32957,
+ DataDir = "./data",
+ Prefix = <<"prefix!">>,
+ BadPrefix = BadFile = "no/good",
+ FLU1 = setup_test_flu(smoke_flu, TcpPort, DataDir),
+ try
+ {error, no_such_file} = ?FLU_C:checksum_list(Host, TcpPort,
+ "does-not-exist"),
+ {error, bad_arg} = ?FLU_C:checksum_list(Host, TcpPort,
+ ?DUMMY_PV1_EPOCH, BadFile),
+ {ok, []} = ?FLU_C:list_files(Host, TcpPort, ?DUMMY_PV1_EPOCH),
+ Chunk1 = <<"yo!">>,
+ {ok, {Off1,Len1,File1}} = ?FLU_C:append_chunk(Host, TcpPort,
+ Prefix, Chunk1),
+ {ok, Chunk1} = ?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH,
+ File1, Off1, Len1),
+ {ok, [{_,_,_}]} = ?FLU_C:checksum_list(Host, TcpPort,
+ ?DUMMY_PV1_EPOCH, File1),
+ {error, bad_arg} = ?FLU_C:append_chunk(Host, TcpPort,
+ BadPrefix, Chunk1),
+ {ok, [{_,File1}]} = ?FLU_C:list_files(Host, TcpPort, ?DUMMY_PV1_EPOCH),
+ Len1 = size(Chunk1),
+ {error, no_such_file} = ?FLU_C:read_chunk(Host, TcpPort,
+ File1, Off1*983, Len1),
+ {error, partial_read} = ?FLU_C:read_chunk(Host, TcpPort,
+ File1, Off1, Len1*984),
+ Chunk2 = <<"yo yo">>,
+ Len2 = byte_size(Chunk2),
+ Off2 = ?MINIMUM_OFFSET + 77,
+ File2 = "smoke-prefix",
+ ok = ?FLU_C:write_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH,
+ File2, Off2, Chunk2),
+ {error, bad_arg} = ?FLU_C:write_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH,
+ BadFile, Off2, Chunk2),
+ {ok, Chunk2} = ?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH,
+ File2, Off2, Len2),
+ {error, no_such_file} = ?FLU_C:read_chunk(Host, TcpPort,
+ "no!!", Off2, Len2),
+ {error, bad_arg} = ?FLU_C:read_chunk(Host, TcpPort,
+ BadFile, Off2, Len2),
+ %% We know that File1 still exists. Pretend that we've done a
+ %% migration and exercise the delete_migration() API.
+ ok = ?FLU_C:delete_migration(Host, TcpPort, ?DUMMY_PV1_EPOCH, File1),
+ {error, no_such_file} = ?FLU_C:delete_migration(Host, TcpPort,
+ ?DUMMY_PV1_EPOCH, File1),
+ {error, bad_arg} = ?FLU_C:delete_migration(Host, TcpPort,
+ ?DUMMY_PV1_EPOCH, BadFile),
+ %% We know that File2 still exists. Pretend that we've done a
+ %% migration and exercise the trunc_hack() API.
+ ok = ?FLU_C:trunc_hack(Host, TcpPort, ?DUMMY_PV1_EPOCH, File2),
+ ok = ?FLU_C:trunc_hack(Host, TcpPort, ?DUMMY_PV1_EPOCH, File2),
+ {error, bad_arg} = ?FLU_C:trunc_hack(Host, TcpPort,
+ ?DUMMY_PV1_EPOCH, BadFile),
+ ok = ?FLU_C:quit(machi_util:connect(Host, TcpPort))
+ after
+ ok = ?FLU:stop(FLU1)
+ end.
+flu_projection_smoke_test() ->
+ Host = "localhost",
+ TcpPort = 32959,
+ DataDir = "./data",
+ FLU1 = setup_test_flu(projection_test_flu, TcpPort, DataDir),
+ try
+ [begin
+ {ok, -1} = ?FLU_C:get_latest_epoch(Host, TcpPort, T),
+ {error, not_written} =
+ ?FLU_C:read_latest_projection(Host, TcpPort, T),
+ {ok, []} = ?FLU_C:list_all(Host, TcpPort, T),
+ {ok, []} = ?FLU_C:get_all(Host, TcpPort, T),
+ P1 = machi_projection:new(1, a, [a], [], [a], [], []),
+ ok = ?FLU_C:write_projection(Host, TcpPort, T, P1),
+ {error, written} = ?FLU_C:write_projection(Host, TcpPort, T, P1),
+ {ok, P1} = ?FLU_C:read_projection(Host, TcpPort, T, 1),
+ {ok, P1} = ?FLU_C:read_latest_projection(Host, TcpPort, T),
+ {ok, [1]} = ?FLU_C:list_all(Host, TcpPort, T),
+ {ok, [P1]} = ?FLU_C:get_all(Host, TcpPort, T),
+ {error, not_written} = ?FLU_C:read_projection(Host, TcpPort, T, 2)
+ end || T <- [public, private] ]
+ after
+ ok = ?FLU:stop(FLU1)
+ end.
+clean_up_data_dir(DataDir) ->
+ [begin
+ Fs = filelib:wildcard(DataDir ++ Glob),
+ [file:delete(F) || F <- Fs],
+ [file:del_dir(F) || F <- Fs]
+ end || Glob <- ["*/*/*/*", "*/*/*", "*/*", "*"] ],
+ _ = file:del_dir(DataDir),
+ ok.
+-endif. % TEST
diff --git a/test/machi_projection_test.erl b/test/machi_projection_test.erl
new file mode 100644
index 0000000..f30411a
--- /dev/null
+++ b/test/machi_projection_test.erl
@@ -0,0 +1,77 @@
+%% -------------------------------------------------------------------
+%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%% http://www.apache.org/licenses/LICENSE-2.0
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%% -------------------------------------------------------------------
+new_test() ->
+ %% Bleh, hey QuickCheck ... except that any model probably equals
+ %% code under test, bleh.
+ true = try_it(a, [a,b,c], [a,b], [], [c], []),
+ true = try_it(<<"a">>, [<<"a">>,b,c], [<<"a">>,b], [], [c], []),
+ Servers = [#p_srvr{name=a}, #p_srvr{name=b}, #p_srvr{name=c}],
+ Servers_bad1 = [#p_srvr{name= <<"a">>}, #p_srvr{name=b}, #p_srvr{name=c}],
+ Servers_bad2 = [#p_srvr{name=z}, #p_srvr{name=b}, #p_srvr{name=c}],
+ true = try_it(a, Servers, [a,b], [], [c], []),
+ false = try_it(a, not_list, [a,b], [], [c], []),
+ false = try_it(a, [a,b,c], not_list, [], [c], []),
+ false = try_it(a, [a,b,c], [a,b], not_list, [c], []),
+ false = try_it(a, [a,b,c], [a,b], [], not_list, []),
+ false = try_it(a, [a,b,c], [a,b], [], [c], not_list),
+ false = try_it(<<"x">>, [a,b,c], [a,b], [], [c], []),
+ false = try_it(a, [a,b,c], [a,b,c], [], [c], []),
+ false = try_it(a, [a,b,c], [a,b], [c], [c], []),
+ false = try_it(a, [a,b,c], [a,b], [], [c,c], []),
+ false = try_it(a, Servers_bad1, [a,b], [], [c], []),
+ false = try_it(a, Servers_bad2, [a,b], [], [c], []),
+ ok.
+compare_test() ->
+ P0 = machi_projection:new(0, a, [a,b,c], [a,b], [], [c], []),
+ P1a = machi_projection:new(1, a, [a,b,c], [a,b], [], [c], []),
+ P1b = machi_projection:new(1, b, [a,b,c], [a,b], [], [c], []),
+ P2 = machi_projection:new(2, a, [a,b,c], [a,b], [], [c], []),
+ 0 = machi_projection:compare(P0, P0),
+ -1 = machi_projection:compare(P0, P1a),
+ -1 = machi_projection:compare(P1a, P1b),
+ -1 = machi_projection:compare(P1b, P1a),
+ 1 = machi_projection:compare(P2, P1a),
+ 1 = machi_projection:compare(P2, P1b),
+ 1 = machi_projection:compare(P2, P0),
+ ok.
+try_it(MyName, All_list, UPI_list, Down_list, Repairing_list, Ps) ->
+ try
+ P = machi_projection:new(MyName, All_list, UPI_list, Down_list,
+ Repairing_list, Ps),
+ is_record(P, projection_v1)
+ catch _:_ ->
+ false
+ end.
+-endif. % TEST
diff --git a/test/pulse_util/event_logger.erl b/test/pulse_util/event_logger.erl
new file mode 100644
index 0000000..f6a39d0
--- /dev/null
+++ b/test/pulse_util/event_logger.erl
@@ -0,0 +1,154 @@
+%% -------------------------------------------------------------------
+%% Machi: a small village of replicated files
+%% Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved.
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%% http://www.apache.org/licenses/LICENSE-2.0
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%% -------------------------------------------------------------------
+%%% File : handle_errors.erl
+%%% Author : Ulf Norell
+%%% Description :
+%%% Created : 26 Mar 2012 by Ulf Norell
+%% API
+-export([start_link/0, event/1, event/2, get_events/0, start_logging/0]).
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+-define(SERVER, ?MODULE).
+-record(state, { start_time, events = [] }).
+-record(event, { timestamp, data }).
+%% API
+%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
+%% Description: Starts the server
+start_link() ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+start_logging() ->
+ gen_server:call(?MODULE, {start, timestamp()}).
+event(EventData) ->
+ event(EventData, timestamp()).
+event(EventData, Timestamp) ->
+ gen_server:call(?MODULE,
+ #event{ timestamp = Timestamp, data = EventData }).
+async_event(EventData) ->
+ gen_server:cast(?MODULE,
+ #event{ timestamp = timestamp(), data = EventData }).
+get_events() ->
+ gen_server:call(?MODULE, get_events).
+%% gen_server callbacks
+%% Function: init(Args) -> {ok, State} |
+%% {ok, State, Timeout} |
+%% ignore |
+%% {stop, Reason}
+%% Description: Initiates the server
+init([]) ->
+ {ok, #state{}}.
+%% Function: %% handle_call(Request, From, State) ->
+%% {reply, Reply, State} |
+%% {reply, Reply, State, Timeout} |
+%% {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, Reply, State} |
+%% {stop, Reason, State}
+%% Description: Handling call messages
+handle_call(Event = #event{}, _From, State) ->
+ {reply, ok, add_event(Event, State)};
+handle_call({start, Now}, _From, S) ->
+ {reply, ok, S#state{ events = [], start_time = Now }};
+handle_call(get_events, _From, S) ->
+ {reply, lists:reverse([ {E#event.timestamp, E#event.data} || E <- S#state.events]),
+ S#state{ events = [] }};
+handle_call(Request, _From, State) ->
+ {reply, {error, {bad_call, Request}}, State}.
+%% Function: handle_cast(Msg, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% Description: Handling cast messages
+handle_cast(Event = #event{}, State) ->
+ {noreply, add_event(Event, State)};
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+%% Function: handle_info(Info, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% Description: Handling all non call/cast messages
+handle_info(_Info, State) ->
+ {noreply, State}.
+%% Function: terminate(Reason, State) -> void()
+%% Description: This function is called by a gen_server when it is about to
+%% terminate. It should be the opposite of Module:init/1 and do any necessary
+%% cleaning up. When it returns, the gen_server terminates with Reason.
+%% The return value is ignored.
+terminate(_Reason, _State) ->
+ ok.
+%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
+%% Description: Convert process state when code is changed
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+%%% Internal functions
+add_event(#event{timestamp = Now, data = Data}, State) ->
+ Event = #event{ timestamp = Now, data = Data },
+ State#state{ events = [Event|State#state.events] }.
+timestamp() ->
+ lamport_clock:get().
diff --git a/test/pulse_util/handle_errors.erl b/test/pulse_util/handle_errors.erl
new file mode 100644
index 0000000..97965b8
--- /dev/null
+++ b/test/pulse_util/handle_errors.erl
@@ -0,0 +1,174 @@
+%% -------------------------------------------------------------------
+%% Machi: a small village of replicated files
+%% Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved.
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%% http://www.apache.org/licenses/LICENSE-2.0
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%% -------------------------------------------------------------------
+%%% @author Hans Svensson <>
+%%% @copyright (C) 2012, Hans Svensson
+%%% @doc
+%%% @end
+%%% Created : 19 Mar 2012 by Hans Svensson <>
+%% API
+-export([start_link/0, add_handler/0]).
+%% gen_event callbacks
+-export([init/1, handle_event/2, handle_call/2,
+ handle_info/2, terminate/2, code_change/3]).
+-define(SERVER, ?MODULE).
+-record(state, { errors = [] }).
+%%% gen_event callbacks
+%% @doc
+%% Creates an event manager
+%% @spec start_link() -> {ok, Pid} | {error, Error}
+%% @end
+start_link() ->
+ gen_event:start_link({local, ?SERVER}).
+%% @doc
+%% Adds an event handler
+%% @spec add_handler() -> ok | {'EXIT', Reason} | term()
+%% @end
+add_handler() ->
+ gen_event:add_handler(?SERVER, ?MODULE, []).
+%%% gen_event callbacks
+%% @private
+%% @doc
+%% Whenever a new event handler is added to an event manager,
+%% this function is called to initialize the event handler.
+%% @spec init(Args) -> {ok, State}
+%% @end
+init([]) ->
+ {ok, #state{}}.
+%% @private
+%% @doc
+%% Whenever an event manager receives an event sent using
+%% gen_event:notify/2 or gen_event:sync_notify/2, this function is
+%% called for each installed event handler to handle the event.
+%% @spec handle_event(Event, State) ->
+%% {ok, State} |
+%% {swap_handler, Args1, State1, Mod2, Args2} |
+%% remove_handler
+%% @end
+handle_event({error, _, {_, "Hintfile '~s' has bad CRC" ++ _, _}}, State) ->
+ {ok, State};
+handle_event({error, _, {_, "** Generic server" ++ _, _}}, State) ->
+ {ok, State};
+handle_event({error, _, {_, "Failed to merge ~p: ~p\n", [_, not_ready]}}, State) ->
+ {ok, State};
+handle_event({error, _, {_, "Failed to merge ~p: ~p\n", [_, {merge_locked, _, _}]}}, State) ->
+ {ok, State};
+handle_event({error, _, {_, "Failed to read lock data from ~s: ~p\n", [_, {invalid_data, <<>>}]}}, State) ->
+ {ok, State};
+handle_event({error, _, Event}, State) ->
+ {ok, State#state{ errors = [Event|State#state.errors] }};
+handle_event(_Event, State) ->
+ {ok, State}.
+%% @private
+%% @doc
+%% Whenever an event manager receives a request sent using
+%% gen_event:call/3,4, this function is called for the specified
+%% event handler to handle the request.
+%% @spec handle_call(Request, State) ->
+%% {ok, Reply, State} |
+%% {swap_handler, Reply, Args1, State1, Mod2, Args2} |
+%% {remove_handler, Reply}
+%% @end
+handle_call(get_errors, S) ->
+ {ok, S#state.errors, S#state{ errors = [] }};
+handle_call(_Request, State) ->
+ Reply = ok,
+ {ok, Reply, State}.
+%% @private
+%% @doc
+%% This function is called for each installed event handler when
+%% an event manager receives any other message than an event or a
+%% synchronous request (or a system message).
+%% @spec handle_info(Info, State) ->
+%% {ok, State} |
+%% {swap_handler, Args1, State1, Mod2, Args2} |
+%% remove_handler
+%% @end
+handle_info(_Info, State) ->
+ {ok, State}.
+%% @private
+%% @doc
+%% Whenever an event handler is deleted from an event manager, this
+%% function is called. It should be the opposite of Module:init/1 and
+%% do any necessary cleaning up.
+%% @spec terminate(Reason, State) -> void()
+%% @end
+terminate(_Reason, _State) ->
+ ok.
+%% @private
+%% @doc
+%% Convert process state when code is changed
+%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
+%% @end
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+%%% Internal functions
diff --git a/test/pulse_util/lamport_clock.erl b/test/pulse_util/lamport_clock.erl
new file mode 100644
index 0000000..0bb8e3d
--- /dev/null
+++ b/test/pulse_util/lamport_clock.erl
@@ -0,0 +1,73 @@
+%% -------------------------------------------------------------------
+%% Machi: a small village of replicated files
+%% Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved.
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%% http://www.apache.org/licenses/LICENSE-2.0
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%% -------------------------------------------------------------------
+-export([init/0, reset/0, get/0, update/1, incr/0]).
+-define(KEY, ?MODULE).
+init() ->
+ case get(?KEY) of
+ undefined ->
+ reset();
+ N when is_integer(N) ->
+ ok
+ end.
+reset() ->
+ FakeTOD = 0,
+ put(?KEY, FakeTOD + 1).
+get() ->
+ init(),
+ get(?KEY).
+update(Remote) ->
+ New = erlang:max(get(?KEY), Remote) + 1,
+ put(?KEY, New),
+ New.
+incr() ->
+ New = get(?KEY) + 1,
+ put(?KEY, New),
+ New.
+-else. % TEST
+init() ->
+ ok.
+reset() ->
+ ok.
+get() ->
+ ok.
+update(_) ->
+ ok.
+incr() ->
+ ok.
+-endif. % TEST