diff --git a/.gitignore b/.gitignore
index 2693865..180a370 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,9 +1,11 @@
+prototype/chain-manager/patch.*
.eunit
deps
-*.o
-ebin/*.beam
*.plt
erl_crash.dump
-rel/example_project
.concrete/DEV_MODE
.rebar
+doc/edoc-info
+doc/erlang.png
+doc/*.html
+doc/stylesheet.css
diff --git a/Makefile b/Makefile
new file mode 100644
index 0000000..310b8a8
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,38 @@
+REBAR_BIN := $(shell which rebar)
+ifeq ($(REBAR_BIN),)
+REBAR_BIN = ./rebar
+endif
+
+.PHONY: rel deps package pkgclean
+
+all: deps compile
+
+compile:
+ $(REBAR_BIN) compile
+
+deps:
+ $(REBAR_BIN) get-deps
+
+clean:
+ $(REBAR_BIN) -r clean
+
+test: deps compile eunit
+
+eunit:
+ $(REBAR_BIN) -v skip_deps=true eunit
+
+pulse: compile
+ env USE_PULSE=1 $(REBAR_BIN) skip_deps=true clean compile
+ env USE_PULSE=1 $(REBAR_BIN) skip_deps=true -D PULSE eunit
+
+APPS = kernel stdlib sasl erts ssl compiler eunit crypto
+PLT = $(HOME)/.machi_dialyzer_plt
+
+build_plt: deps compile
+ dialyzer --build_plt --output_plt $(PLT) --apps $(APPS) deps/*/ebin
+
+dialyzer: deps compile
+ dialyzer -Wno_return --plt $(PLT) ebin
+
+clean_plt:
+ rm $(PLT)
diff --git a/ebin/.gitignore b/ebin/.gitignore
new file mode 100644
index 0000000..120fe3a
--- /dev/null
+++ b/ebin/.gitignore
@@ -0,0 +1,2 @@
+*.beam
+*.app
diff --git a/include/machi.hrl b/include/machi.hrl
new file mode 100644
index 0000000..d717083
--- /dev/null
+++ b/include/machi.hrl
@@ -0,0 +1,26 @@
+%% -------------------------------------------------------------------
+%%
+%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+-define(MAX_FILE_SIZE, 256*1024*1024). % 256 MBytes
+-define(MAX_CHUNK_SIZE, ((1 bsl 32) - 1)).
+%% -define(DATA_DIR, "/Volumes/SAM1/seq-tests/data").
+-define(DATA_DIR, "./data").
+-define(MINIMUM_OFFSET, 1024).
+
diff --git a/include/machi_projection.hrl b/include/machi_projection.hrl
new file mode 100644
index 0000000..4b431b6
--- /dev/null
+++ b/include/machi_projection.hrl
@@ -0,0 +1,33 @@
+%% -------------------------------------------------------------------
+%%
+%% Copyright (c) 2007-2014 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+-record(projection, {
+ %% hard state
+ epoch :: non_neg_integer(),
+ last_epoch :: non_neg_integer(),
+ float_map,
+ last_float_map,
+ %% soft state
+ migrating :: boolean(),
+ tree,
+ last_tree
+ }).
+
+-define(SHA_MAX, (1 bsl (20*8))).
diff --git a/rebar b/rebar
new file mode 100755
index 0000000..03c9be6
Binary files /dev/null and b/rebar differ
diff --git a/rebar.config b/rebar.config
new file mode 100644
index 0000000..5b3cfa2
--- /dev/null
+++ b/rebar.config
@@ -0,0 +1,7 @@
+%%% {erl_opts, [warnings_as_errors, {parse_transform, lager_transform}, debug_info]}.
+{erl_opts, [{parse_transform, lager_transform}, debug_info]}.
+
+{deps, [
+ {lager, ".*", {git, "git://github.com/basho/lager.git", {tag, "2.0.1"}}}
+ ]}.
+
diff --git a/rebar.config.script b/rebar.config.script
new file mode 100644
index 0000000..364ce39
--- /dev/null
+++ b/rebar.config.script
@@ -0,0 +1,47 @@
+PulseBuild = case os:getenv("USE_PULSE") of
+ false ->
+ false;
+ _ ->
+ true
+ end,
+case PulseBuild of
+ true ->
+ PulseOpts =
+ [{pulse_no_side_effect,
+ [{erlang,display,1}
+ ]},
+ {pulse_side_effect,
+ [ {does_not_exist_yet, some_func, '_'}
+
+ , {prim_file, '_', '_'}
+ , {file, '_', '_'}
+ , {filelib, '_', '_'}
+ , {os, '_', '_'} ]},
+
+ {pulse_replace_module,
+ [ {gen_server, pulse_gen_server}
+ , {application, pulse_application}
+ , {supervisor, pulse_supervisor} ]}
+ ],
+ PulseCFlags = [{"CFLAGS", "$CFLAGS -DPULSE"}],
+ UpdConfig = case lists:keysearch(eunit_compile_opts, 1, CONFIG) of
+ {value, {eunit_compile_opts, Opts}} ->
+ lists:keyreplace(eunit_compile_opts,
+ 1,
+ CONFIG,
+ {eunit_compile_opts, Opts ++ PulseOpts});
+ _ ->
+ [{eunit_compile_opts, PulseOpts} | CONFIG]
+ end,
+ case lists:keysearch(port_env, 1, UpdConfig) of
+ {value, {port_env, PortEnv}} ->
+ lists:keyreplace(port_env,
+ 1,
+ UpdConfig,
+ {port_env, PortEnv ++ PulseCFlags});
+ _ ->
+ [{port_env, PulseCFlags} | UpdConfig]
+ end;
+ false ->
+ CONFIG
+end.
diff --git a/src/machi.app.src b/src/machi.app.src
new file mode 100644
index 0000000..7a2866b
--- /dev/null
+++ b/src/machi.app.src
@@ -0,0 +1,13 @@
+{application, machi, [
+ {description, "A village of write-once files."},
+ {vsn, "0.0.0"},
+ {applications, [kernel, stdlib, sasl, crypto]},
+ {mod,{machi_app,[]}},
+ {registered, []},
+ {env, [
+ {flu_list,
+ [
+ {flu_a, 32900, "./data.flu_a"}
+ ]}
+ ]}
+]}.
diff --git a/src/machi_app.erl b/src/machi_app.erl
new file mode 100644
index 0000000..6dfddf7
--- /dev/null
+++ b/src/machi_app.erl
@@ -0,0 +1,37 @@
+%% -------------------------------------------------------------------
+%%
+%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+-module(machi_app).
+
+-behaviour(application).
+
+%% Application callbacks
+-export([start/2, stop/1]).
+
+start(_StartType, _StartArgs) ->
+ case machi_sup:start_link() of
+ {ok, Pid} ->
+ {ok, Pid};
+ Error ->
+ Error
+ end.
+
+stop(_State) ->
+ ok.
diff --git a/src/machi_chash.erl b/src/machi_chash.erl
new file mode 100644
index 0000000..f45473a
--- /dev/null
+++ b/src/machi_chash.erl
@@ -0,0 +1,459 @@
+%%%-------------------------------------------------------------------
+%%% Copyright (c) 2007-2011 Gemini Mobile Technologies, Inc. All rights reserved.
+%%% Copyright (c) 2013-2015 Basho Technologies, Inc. All rights reserved.
+%%%
+%%% Licensed under the Apache License, Version 2.0 (the "License");
+%%% you may not use this file except in compliance with the License.
+%%% You may obtain a copy of the License at
+%%%
+%%% http://www.apache.org/licenses/LICENSE-2.0
+%%%
+%%% Unless required by applicable law or agreed to in writing, software
+%%% distributed under the License is distributed on an "AS IS" BASIS,
+%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%%% See the License for the specific language governing permissions and
+%%% limitations under the License.
+%%%
+%%%-------------------------------------------------------------------
+
+%% Consistent hashing library. Also known as "random slicing".
+%% Originally from the Hibari DB source code at https://github.com/hibari
+%%
+%% TODO items:
+%%
+%% 1. Refactor to use bigints instead of floating point numbers. The
+%% ?SMALLEST_SIGNIFICANT_FLOAT_SIZE macro below doesn't allow as
+%% much wiggle-room for making really small hashing range
+%% definitions.
+
+-module(machi_chash).
+
+-define(SMALLEST_SIGNIFICANT_FLOAT_SIZE, 0.1e-12).
+-define(SHA_MAX, (1 bsl (20*8))).
+
+%% -compile(export_all).
+-export([make_float_map/1, make_float_map/2,
+ sum_map_weights/1,
+ make_tree/1,
+ query_tree/2,
+ hash_binary_via_float_map/2,
+ hash_binary_via_float_tree/2,
+ pretty_with_integers/2,
+ pretty_with_integers/3]).
+-export([make_demo_map1/0, make_demo_map2/0]).
+-export([zzz_usage_details/0]). % merely to give EDoc a hint of our intent
+
+-type owner_name() :: term().
+%% Owner for a range on the unit interval. We are agnostic about its
+%% type.
+-type weight() :: non_neg_integer().
+%% For this library, a weight is an integer which specifies the
+%% capacity of a "owner" relative to other owners. For example, if
+%% owner A with a weight of 10, and if owner B has a weight of 20,
+%% then B will be assigned twice as much of the unit interval as A.
+
+-type float_map() :: [{owner_name(), float()}].
+%% A float map subdivides the unit interval, starting at 0.0, to
+%% partitions that are assigned to various owners. The sum of all
+%% floats must be exactly 1.0 (or close enough for floating point
+%% purposes).
+
+-opaque float_tree() :: gb_trees:tree(float(), owner_name()).
+%% We can't use gb_trees:tree() because 'nil' (the empty tree) is
+%% never valid in our case. But teaching Dialyzer that is difficult.
+
+-type owner_int_range() :: {owner_name(), non_neg_integer(), non_neg_integer()}.
+%% Used when "prettying" a float map.
+-type owner_weight() :: {owner_name(), weight()}.
+
+-type owner_weight_list() :: [owner_weight()].
+%% A owner_weight_list is a definition of brick assignments over the
+%% unit interval [0.0, 1.0]. The sum of all floats must be 1.0. For
+%% example, [{{br1,nd1}, 0.25}, {{br2,nd1}, 0.5}, {{br3,nd1}, 0.25}].
+
+-export_type([float_map/0, float_tree/0]).
+
+%% @doc Create a float map, based on a basic owner weight list.
+
+-spec make_float_map(owner_weight_list()) -> float_map().
+make_float_map(NewOwnerWeights) ->
+ make_float_map([], NewOwnerWeights).
+
+%% @doc Create a float map, based on an older float map and a new weight
+%% list.
+%%
+%% The weights in the new weight list may be different than (or the
+%% same as) whatever weights were used to make the older float map.
+
+-spec make_float_map(float_map(), owner_weight_list()) -> float_map().
+make_float_map([], NewOwnerWeights) ->
+ Sum = add_all_weights(NewOwnerWeights),
+ DiffMap = [{Ch, Wt/Sum} || {Ch, Wt} <- NewOwnerWeights],
+ make_float_map2([{unused, 1.0}], DiffMap, NewOwnerWeights);
+make_float_map(OldFloatMap, NewOwnerWeights) ->
+ NewSum = add_all_weights(NewOwnerWeights),
+ %% Normalize to unit interval
+ %% NewOwnerWeights2 = [{Ch, Wt / NewSum} || {Ch, Wt} <- NewOwnerWeights],
+
+ %% Reconstruct old owner weights (will be normalized to unit interval)
+ SumOldFloatsDict =
+ lists:foldl(fun({Ch, Wt}, OrdDict) ->
+ orddict:update_counter(Ch, Wt, OrdDict)
+ end, orddict:new(), OldFloatMap),
+ OldOwnerWeights = orddict:to_list(SumOldFloatsDict),
+ OldSum = add_all_weights(OldOwnerWeights),
+
+ OldChs = [Ch || {Ch, _} <- OldOwnerWeights],
+ NewChs = [Ch || {Ch, _} <- NewOwnerWeights],
+ OldChsOnly = OldChs -- NewChs,
+
+ %% Mark any space in by a deleted owner as unused.
+ OldFloatMap2 = lists:map(
+ fun({Ch, Wt} = ChWt) ->
+ case lists:member(Ch, OldChsOnly) of
+ true ->
+ {unused, Wt};
+ false ->
+ ChWt
+ end
+ end, OldFloatMap),
+
+ %% Create a diff map of changing owners and added owners
+ DiffMap = lists:map(fun({Ch, NewWt}) ->
+ case orddict:find(Ch, SumOldFloatsDict) of
+ {ok, OldWt} ->
+ {Ch, (NewWt / NewSum) -
+ (OldWt / OldSum)};
+ error ->
+ {Ch, NewWt / NewSum}
+ end
+ end, NewOwnerWeights),
+ make_float_map2(OldFloatMap2, DiffMap, NewOwnerWeights).
+
+make_float_map2(OldFloatMap, DiffMap, _NewOwnerWeights) ->
+ FloatMap = apply_diffmap(DiffMap, OldFloatMap),
+ XX = combine_neighbors(collapse_unused_in_float_map(FloatMap)),
+ XX.
+
+apply_diffmap(DiffMap, FloatMap) ->
+ SubtractDiff = [{Ch, abs(Diff)} || {Ch, Diff} <- DiffMap, Diff < 0],
+ AddDiff = [D || {_Ch, Diff} = D <- DiffMap, Diff > 0],
+ TmpFloatMap = iter_diffmap_subtract(SubtractDiff, FloatMap),
+ iter_diffmap_add(AddDiff, TmpFloatMap).
+
+add_all_weights(OwnerWeights) ->
+ lists:foldl(fun({_Ch, Weight}, Sum) -> Sum + Weight end, 0.0, OwnerWeights).
+
+iter_diffmap_subtract([{Ch, Diff}|T], FloatMap) ->
+ iter_diffmap_subtract(T, apply_diffmap_subtract(Ch, Diff, FloatMap));
+iter_diffmap_subtract([], FloatMap) ->
+ FloatMap.
+
+iter_diffmap_add([{Ch, Diff}|T], FloatMap) ->
+ iter_diffmap_add(T, apply_diffmap_add(Ch, Diff, FloatMap));
+iter_diffmap_add([], FloatMap) ->
+ FloatMap.
+
+apply_diffmap_subtract(Ch, Diff, [{Ch, Wt}|T]) ->
+ if Wt == Diff ->
+ [{unused, Wt}|T];
+ Wt > Diff ->
+ [{Ch, Wt - Diff}, {unused, Diff}|T];
+ Wt < Diff ->
+ [{unused, Wt}|apply_diffmap_subtract(Ch, Diff - Wt, T)]
+ end;
+apply_diffmap_subtract(Ch, Diff, [H|T]) ->
+ [H|apply_diffmap_subtract(Ch, Diff, T)];
+apply_diffmap_subtract(_Ch, _Diff, []) ->
+ [].
+
+apply_diffmap_add(Ch, Diff, [{unused, Wt}|T]) ->
+ if Wt == Diff ->
+ [{Ch, Wt}|T];
+ Wt > Diff ->
+ [{Ch, Diff}, {unused, Wt - Diff}|T];
+ Wt < Diff ->
+ [{Ch, Wt}|apply_diffmap_add(Ch, Diff - Wt, T)]
+ end;
+apply_diffmap_add(Ch, Diff, [H|T]) ->
+ [H|apply_diffmap_add(Ch, Diff, T)];
+apply_diffmap_add(_Ch, _Diff, []) ->
+ [].
+
+combine_neighbors([{Ch, Wt1}, {Ch, Wt2}|T]) ->
+ combine_neighbors([{Ch, Wt1 + Wt2}|T]);
+combine_neighbors([H|T]) ->
+ [H|combine_neighbors(T)];
+combine_neighbors([]) ->
+ [].
+
+collapse_unused_in_float_map([{Ch, Wt1}, {unused, Wt2}|T]) ->
+ collapse_unused_in_float_map([{Ch, Wt1 + Wt2}|T]);
+collapse_unused_in_float_map([{unused, _}] = L) ->
+ L; % Degenerate case only
+collapse_unused_in_float_map([H|T]) ->
+ [H|collapse_unused_in_float_map(T)];
+collapse_unused_in_float_map([]) ->
+ [].
+
+chash_float_map_to_nextfloat_list(FloatMap) when length(FloatMap) > 0 ->
+ %% QuickCheck found a bug ... need to weed out stuff smaller than
+ %% ?SMALLEST_SIGNIFICANT_FLOAT_SIZE here.
+ FM1 = [P || {_X, Y} = P <- FloatMap, Y > ?SMALLEST_SIGNIFICANT_FLOAT_SIZE],
+ {_Sum, NFs0} = lists:foldl(fun({Name, Amount}, {Sum, List}) ->
+ {Sum+Amount, [{Sum+Amount, Name}|List]}
+ end, {0, []}, FM1),
+ lists:reverse(NFs0).
+
+chash_nextfloat_list_to_gb_tree([]) ->
+ gb_trees:balance(gb_trees:from_orddict([]));
+chash_nextfloat_list_to_gb_tree(NextFloatList) ->
+ {_FloatPos, Name} = lists:last(NextFloatList),
+ %% QuickCheck found a bug ... it really helps to add a catch-all item
+ %% at the far "right" of the list ... 42.0 is much greater than 1.0.
+ NFs = NextFloatList ++ [{42.0, Name}],
+ gb_trees:balance(gb_trees:from_orddict(orddict:from_list(NFs))).
+
+-spec chash_gb_next(float(), float_tree()) -> {float(), owner_name()}.
+chash_gb_next(X, {_, GbTree}) ->
+ chash_gb_next1(X, GbTree).
+
+chash_gb_next1(X, {Key, Val, Left, _Right}) when X < Key ->
+ case chash_gb_next1(X, Left) of
+ nil ->
+ {Key, Val};
+ Res ->
+ Res
+ end;
+chash_gb_next1(X, {Key, _Val, _Left, Right}) when X >= Key ->
+ chash_gb_next1(X, Right);
+chash_gb_next1(_X, nil) ->
+ nil.
+
+%% @doc Not used directly, but can give a developer an idea of how well
+%% chash_float_map_to_nextfloat_list will do for a given value of Max.
+%%
+%% For example:
+%%
+%% NewFloatMap = make_float_map([{unused, 1.0}],
+%% [{a,100}, {b, 100}, {c, 10}]),
+%% ChashMap = chash_scale_to_int_interval(NewFloatMap, 100),
+%% io:format("QQQ: int int = ~p\n", [ChashIntInterval]),
+%% -> [{a,1,47},{b,48,94},{c,94,100}]
+%%
+%%
+%% Interpretation: out of the 100 slots:
+%%
+%% - 'a' uses the slots 1-47
+%% - 'b' uses the slots 48-94
+%% - 'c' uses the slots 95-100
+%%
+
+chash_scale_to_int_interval(NewFloatMap, Max) ->
+ chash_scale_to_int_interval(NewFloatMap, 0, Max).
+
+%% @type nextfloat_list() = list({float(), brick()}). A nextfloat_list
+%% differs from a float_map in two respects: 1) nextfloat_list contains
+%% tuples with the brick name in 2nd position, 2) the float() at each
+%% position I_n > I_m, for all n, m such that n > m.
+%% For example, a nextfloat_list of the float_map example above,
+%% [{0.25, {br1, nd1}}, {0.75, {br2, nd1}}, {1.0, {br3, nd1}].
+
+chash_scale_to_int_interval([{Ch, _Wt}], Cur, Max) ->
+ [{Ch, Cur, Max}];
+chash_scale_to_int_interval([{Ch, Wt}|T], Cur, Max) ->
+ Int = trunc(Wt * Max),
+ [{Ch, Cur + 1, Cur + Int}|chash_scale_to_int_interval(T, Cur + Int, Max)].
+
+%%%%%%%%%%%%%
+
+%% @doc Make a pretty/human-friendly version of a float map that describes
+%% integer ranges between 1 and `Scale'.
+
+-spec pretty_with_integers(float_map(), integer()) -> [owner_int_range()].
+pretty_with_integers(Map, Scale) ->
+ chash_scale_to_int_interval(Map, Scale).
+
+%% @doc Make a pretty/human-friendly version of a float map (based
+%% upon a float map created from `OldWeights' and `NewWeights') that
+%% describes integer ranges between 1 and `Scale'.
+
+-spec pretty_with_integers(owner_weight_list(), owner_weight_list(),integer())->
+ [owner_int_range()].
+pretty_with_integers(OldWeights, NewWeights, Scale) ->
+ chash_scale_to_int_interval(
+ make_float_map(make_float_map(OldWeights),
+ NewWeights),
+ Scale).
+
+%% @doc Create a float tree, which is the rapid lookup data structure
+%% for consistent hash queries.
+
+-spec make_tree(float_map()) -> float_tree().
+make_tree(Map) ->
+ chash_nextfloat_list_to_gb_tree(
+ chash_float_map_to_nextfloat_list(Map)).
+
+%% @doc Low-level function for querying a float tree: the (floating
+%% point) point within the unit interval.
+
+-spec query_tree(float(), float_tree()) -> {float(), owner_name()}.
+query_tree(Val, Tree) when is_float(Val), 0.0 =< Val, Val =< 1.0 ->
+ chash_gb_next(Val, Tree).
+
+%% @doc Create a sample float map.
+
+-spec make_demo_map1() -> float_map().
+make_demo_map1() ->
+ {_, Res} = make_demo_map1_i(),
+ Res.
+
+make_demo_map1_i() ->
+ Fail1 = {b, 100},
+ L1 = [{a, 100}, Fail1, {c, 100}],
+ L2 = L1 ++ [{d, 100}, {e, 100}],
+ L3 = L2 -- [Fail1],
+ L4 = L3 ++ [{giant, 300}],
+ {L4, lists:foldl(fun(New, Old) -> make_float_map(Old, New) end,
+ make_float_map(L1), [L2, L3, L4])}.
+
+%% @doc Create a sample float map.
+
+-spec make_demo_map2() -> float_map().
+make_demo_map2() ->
+ {L0, _} = make_demo_map1_i(),
+ L1 = L0 ++ [{h, 100}],
+ L2 = L1 ++ [{i, 100}],
+ L3 = L2 ++ [{j, 100}],
+ lists:foldl(fun(New, Old) -> make_float_map(Old, New) end,
+ make_demo_map1(), [L1, L2, L3]).
+
+%% @doc Create a human-friendly summary of a float map.
+%%
+%% The two parts of the summary are: a per-owner total of the unit
+%% interval range(s) owned by each owner, and a total sum of all
+%% per-owner ranges (which should be 1.0 but is not enforced).
+
+-spec sum_map_weights(float_map()) ->
+ {{per_owner, float_map()}, {weight_sum, float()}}.
+sum_map_weights(Map) ->
+ L = sum_map_weights(lists:sort(Map), undefined, 0.0) -- [{undefined,0.0}],
+ WeightSum = lists:sum([Weight || {_, Weight} <- L]),
+ {{per_owner, L}, {weight_sum, WeightSum}}.
+
+sum_map_weights([{SZ, Weight}|T], SZ, SZ_total) ->
+ sum_map_weights(T, SZ, SZ_total + Weight);
+sum_map_weights([{SZ, Weight}|T], LastSZ, LastSZ_total) ->
+ [{LastSZ, LastSZ_total}|sum_map_weights(T, SZ, Weight)];
+sum_map_weights([], LastSZ, LastSZ_total) ->
+ [{LastSZ, LastSZ_total}].
+
+%% @doc Query a float map with a binary (inefficient).
+
+-spec hash_binary_via_float_map(binary(), float_map()) ->
+ {float(), owner_name()}.
+hash_binary_via_float_map(Key, Map) ->
+ Tree = make_tree(Map),
+ <> = crypto:hash(sha, Key),
+ Float = Int / ?SHA_MAX,
+ query_tree(Float, Tree).
+
+%% @doc Query a float tree with a binary.
+
+-spec hash_binary_via_float_tree(binary(), float_tree()) ->
+ {float(), owner_name()}.
+hash_binary_via_float_tree(Key, Tree) ->
+ <> = crypto:hash(sha, Key),
+ Float = Int / ?SHA_MAX,
+ query_tree(Float, Tree).
+
+%%%%% @doc Various usage examples, see source code below this function
+%%%%% for full details.
+
+zzz_usage_details() ->
+
+%% %% Make a map. See the code for make_demo_map1() for the order of
+%% %% additions & deletions. Here's a brief summary of the 4 steps.
+%% %%
+%% %% * 'a' through 'e' are weighted @ 100.
+%% %% * 'giant' is weighted @ 300.
+%% %% * 'b' is removed at step #3.
+
+%% 40> M1 = machi_chash:make_demo_map1().
+%% [{a,0.09285714285714286},
+%% {giant,0.10714285714285715},
+%% {d,0.026190476190476153},
+%% {giant,0.10714285714285715},
+%% {a,0.04999999999999999},
+%% {giant,0.04999999999999999},
+%% {d,0.04999999999999999},
+%% {giant,0.050000000000000044},
+%% {d,0.06666666666666671},
+%% {e,0.009523809523809434},
+%% {giant,0.05714285714285716},
+%% {c,0.14285714285714285},
+%% {giant,0.05714285714285716},
+%% {e,0.13333333333333341}]
+
+
+%% %% Map M1 onto the interval of integers 0-10,1000
+%% %%
+%% %% output = list({SZ_name::term(), Start::integer(), End::integer()})
+
+%% 41> machi_chash:pretty_with_integers(M1, 10*1000).
+%% [{a,1,928},
+%% {giant,929,1999},
+%% {d,2000,2260},
+%% {giant,2261,3331},
+%% {a,3332,3830},
+%% {giant,3831,4329},
+%% {d,4330,4828},
+%% {giant,4829,5328},
+%% {d,5329,5994},
+%% {e,5995,6089},
+%% {giant,6090,6660},
+%% {c,6661,8088},
+%% {giant,8089,8659},
+%% {e,8659,10000}]
+
+%% %% Sum up all of the weights, make sure it's what we expect:
+
+%% 55> machi_chash:sum_map_weights(M1).
+%% {{per_owner,[{a,0.14285714285714285},
+%% {c,0.14285714285714285},
+%% {d,0.14285714285714285},
+%% {e,0.14285714285714285},
+%% {giant,0.42857142857142866}]},
+%% {weight_sum,1.0}}
+
+%% %% Make a tree, then query it
+%% %% (Hash::float(), tree()) -> {NextLargestBoundary::float(), szone()}
+
+%% 58> T1 = machi_chash:make_tree(M1).
+%% 59> machi_chash:query_tree(0.2555, T1).
+%% {0.3333333333333333,giant}
+%% 60> machi_chash:query_tree(0.3555, T1).
+%% {0.3833333333333333,a}
+%% 61> machi_chash:query_tree(0.4555, T1).
+%% {0.4833333333333333,d}
+
+%% %% How about hashing a bunch of strings and see what happens?
+
+%% 74> Key1 = "Hello, world!".
+%% "Hello, world!"
+%% 75> [{K, element(2, machi_chash:hash_binary_via_float_map(K, M1))} || K <- [lists:sublist(Key1, X) || X <- lists:seq(1, length(Key1))]].
+%% [{"H",giant},
+%% {"He",giant},
+%% {"Hel",giant},
+%% {"Hell",e},
+%% {"Hello",e},
+%% {"Hello,",giant},
+%% {"Hello, ",e},
+%% {"Hello, w",e},
+%% {"Hello, wo",giant},
+%% {"Hello, wor",d},
+%% {"Hello, worl",giant},
+%% {"Hello, world",e},
+%% {"Hello, world!",d}]
+
+ ok.
diff --git a/src/machi_flu1.erl b/src/machi_flu1.erl
new file mode 100644
index 0000000..d78de9d
--- /dev/null
+++ b/src/machi_flu1.erl
@@ -0,0 +1,464 @@
+%% -------------------------------------------------------------------
+%%
+%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+-module(machi_flu1).
+
+-include_lib("kernel/include/file.hrl").
+
+-include("machi.hrl").
+
+-export([start_link/1, stop/1]).
+
+start_link([{FluName, TcpPort, DataDir}])
+ when is_atom(FluName), is_integer(TcpPort), is_list(DataDir) ->
+ {ok, spawn_link(fun() -> main2(FluName, TcpPort, DataDir) end)}.
+
+stop(Pid) ->
+ case erlang:is_process_alive(Pid) of
+ true ->
+ Pid ! forever,
+ ok;
+ false ->
+ error
+ end.
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+main2(RegName, TcpPort, DataDir) ->
+ _Pid1 = start_listen_server(RegName, TcpPort, DataDir),
+ _Pid2 = start_append_server(RegName, DataDir),
+ receive forever -> ok end.
+
+start_listen_server(RegName, TcpPort, DataDir) ->
+ spawn_link(fun() -> run_listen_server(RegName, TcpPort, DataDir) end).
+
+start_append_server(Name, DataDir) ->
+ spawn_link(fun() -> run_append_server(Name, DataDir) end).
+
+run_listen_server(RegName, TcpPort, DataDir) ->
+ SockOpts = [{reuseaddr, true},
+ {mode, binary}, {active, false}, {packet, line}],
+ {ok, LSock} = gen_tcp:listen(TcpPort, SockOpts),
+ listen_server_loop(RegName, LSock, DataDir).
+
+run_append_server(Name, DataDir) ->
+ register(Name, self()),
+ append_server_loop(DataDir).
+
+listen_server_loop(RegName, LSock, DataDir) ->
+ {ok, Sock} = gen_tcp:accept(LSock),
+ spawn(fun() -> net_server_loop(RegName, Sock, DataDir) end),
+ listen_server_loop(RegName, LSock, DataDir).
+
+append_server_loop(DataDir) ->
+ receive
+ {seq_append, From, Prefix, Chunk, CSum} ->
+ spawn(fun() -> append_server_dispatch(From, Prefix, Chunk, CSum,
+ DataDir) end),
+ append_server_loop(DataDir)
+ end.
+
+net_server_loop(RegName, Sock, DataDir) ->
+ ok = inet:setopts(Sock, [{packet, line}]),
+ case gen_tcp:recv(Sock, 0, 60*1000) of
+ {ok, Line} ->
+ %% machi_util:verb("Got: ~p\n", [Line]),
+ PrefixLenLF = byte_size(Line) - 2 - 8 - 1 - 1,
+ PrefixLenCRLF = byte_size(Line) - 2 - 8 - 1 - 2,
+ FileLenLF = byte_size(Line) - 2 - 16 - 1 - 8 - 1 - 1,
+ FileLenCRLF = byte_size(Line) - 2 - 16 - 1 - 8 - 1 - 2,
+ CSumFileLenLF = byte_size(Line) - 2 - 1,
+ CSumFileLenCRLF = byte_size(Line) - 2 - 2,
+ WriteFileLenLF = byte_size(Line) - 7 - 16 - 1 - 8 - 1 - 1,
+ DelFileLenLF = byte_size(Line) - 14 - 1,
+ case Line of
+ %% For normal use
+ <<"A ", LenHex:8/binary, " ",
+ Prefix:PrefixLenLF/binary, "\n">> ->
+ do_net_server_append(RegName, Sock, LenHex, Prefix);
+ <<"A ", LenHex:8/binary, " ",
+ Prefix:PrefixLenCRLF/binary, "\r\n">> ->
+ do_net_server_append(RegName, Sock, LenHex, Prefix);
+ <<"R ", OffsetHex:16/binary, " ", LenHex:8/binary, " ",
+ File:FileLenLF/binary, "\n">> ->
+ do_net_server_read(Sock, OffsetHex, LenHex, File, DataDir);
+ <<"R ", OffsetHex:16/binary, " ", LenHex:8/binary, " ",
+ File:FileLenCRLF/binary, "\r\n">> ->
+ do_net_server_read(Sock, OffsetHex, LenHex, File, DataDir);
+ <<"L\n">> ->
+ do_net_server_listing(Sock, DataDir);
+ <<"L\r\n">> ->
+ do_net_server_listing(Sock, DataDir);
+ <<"C ", File:CSumFileLenLF/binary, "\n">> ->
+ do_net_server_checksum_listing(Sock, File, DataDir);
+ <<"C ", File:CSumFileLenCRLF/binary, "\n">> ->
+ do_net_server_checksum_listing(Sock, File, DataDir);
+ <<"QUIT\n">> ->
+ catch gen_tcp:close(Sock),
+ exit(normal);
+ <<"QUIT\r\n">> ->
+ catch gen_tcp:close(Sock),
+ exit(normal);
+ %% For "internal" replication only.
+ <<"W-repl ", OffsetHex:16/binary, " ", LenHex:8/binary, " ",
+ File:WriteFileLenLF/binary, "\n">> ->
+ do_net_server_write(Sock, OffsetHex, LenHex, File, DataDir);
+ %% For data migration only.
+ <<"DEL-migration ", File:DelFileLenLF/binary, "\n">> ->
+ do_net_server_delete_migration_only(Sock, File, DataDir);
+ %% For erasure coding hackityhack
+ <<"TRUNC-hack--- ", File:DelFileLenLF/binary, "\n">> ->
+ do_net_server_truncate_hackityhack(Sock, File, DataDir);
+ _ ->
+ machi_util:verb("Else Got: ~p\n", [Line]),
+ gen_tcp:send(Sock, "ERROR SYNTAX\n"),
+ catch gen_tcp:close(Sock),
+ exit(normal)
+ end,
+ net_server_loop(RegName, Sock, DataDir);
+ _ ->
+ catch gen_tcp:close(Sock),
+ exit(normal)
+ end.
+
+append_server_dispatch(From, Prefix, Chunk, CSum, DataDir) ->
+ Pid = write_server_get_pid(Prefix, DataDir),
+ Pid ! {seq_append, From, Prefix, Chunk, CSum},
+ exit(normal).
+
+do_net_server_append(RegName, Sock, LenHex, Prefix) ->
+ %% TODO: robustify against other invalid path characters such as NUL
+ case sanitize_file_string(Prefix) of
+ ok ->
+ do_net_server_append2(RegName, Sock, LenHex, Prefix);
+ _ ->
+ ok = gen_tcp:send(Sock, <<"ERROR BAD-ARG">>)
+ end.
+
+sanitize_file_string(Str) ->
+ case re:run(Str, "/") of
+ nomatch ->
+ ok;
+ _ ->
+ error
+ end.
+
+do_net_server_append2(RegName, Sock, LenHex, Prefix) ->
+ <> = machi_util:hexstr_to_bin(LenHex),
+ ok = inet:setopts(Sock, [{packet, raw}]),
+ {ok, Chunk} = gen_tcp:recv(Sock, Len, 60*1000),
+ CSum = machi_util:checksum(Chunk),
+ try
+ RegName ! {seq_append, self(), Prefix, Chunk, CSum}
+ catch error:badarg ->
+ error_logger:error_msg("Message send to ~p gave badarg, make certain server is running with correct registered name\n", [?MODULE])
+ end,
+ receive
+ {assignment, Offset, File} ->
+ OffsetHex = machi_util:bin_to_hexstr(<>),
+ Out = io_lib:format("OK ~s ~s\n", [OffsetHex, File]),
+ ok = gen_tcp:send(Sock, Out)
+ after 10*1000 ->
+ ok = gen_tcp:send(Sock, "TIMEOUT\n")
+ end.
+
+do_net_server_read(Sock, OffsetHex, LenHex, FileBin, DataDir) ->
+ DoItFun = fun(FH, Offset, Len) ->
+ case file:pread(FH, Offset, Len) of
+ {ok, Bytes} when byte_size(Bytes) == Len ->
+ gen_tcp:send(Sock, ["OK\n", Bytes]);
+ {ok, Bytes} ->
+ machi_util:verb("ok read but wanted ~p got ~p: ~p @ offset ~p\n",
+ [Len, size(Bytes), FileBin, Offset]),
+ ok = gen_tcp:send(Sock, "ERROR PARTIAL-READ\n");
+ eof ->
+ perhaps_do_net_server_ec_read(Sock, FH);
+ _Else2 ->
+ machi_util:verb("Else2 ~p ~p ~P\n",
+ [Offset, Len, _Else2, 20]),
+ ok = gen_tcp:send(Sock, "ERROR BAD-READ\n")
+ end
+ end,
+ do_net_server_readwrite_common(Sock, OffsetHex, LenHex, FileBin, DataDir,
+ [read, binary, raw], DoItFun).
+
+do_net_server_readwrite_common(Sock, OffsetHex, LenHex, FileBin, DataDir,
+ FileOpts, DoItFun) ->
+ case sanitize_file_string(FileBin) of
+ ok ->
+ do_net_server_readwrite_common2(Sock, OffsetHex, LenHex, FileBin,
+ DataDir, FileOpts, DoItFun);
+ _ ->
+ ok = gen_tcp:send(Sock, <<"ERROR BAD-ARG\n">>)
+ end.
+
+do_net_server_readwrite_common2(Sock, OffsetHex, LenHex, FileBin, DataDir,
+ FileOpts, DoItFun) ->
+ <> = machi_util:hexstr_to_bin(OffsetHex),
+ <> = machi_util:hexstr_to_bin(LenHex),
+ {_, Path} = machi_util:make_data_filename(DataDir, FileBin),
+ OptsHasWrite = lists:member(write, FileOpts),
+ case file:open(Path, FileOpts) of
+ {ok, FH} ->
+ try
+ DoItFun(FH, Offset, Len)
+ after
+ file:close(FH)
+ end;
+ {error, enoent} when OptsHasWrite ->
+ ok = filelib:ensure_dir(Path),
+ do_net_server_readwrite_common(
+ Sock, OffsetHex, LenHex, FileBin, DataDir,
+ FileOpts, DoItFun);
+ _Else ->
+ %%%%%% keep?? machi_util:verb("Else ~p ~p ~p ~p\n", [Offset, Len, Path, _Else]),
+ ok = gen_tcp:send(Sock, <<"ERROR BAD-IO\n">>)
+ end.
+
+
+do_net_server_write(Sock, OffsetHex, LenHex, FileBin, DataDir) ->
+ CSumPath = machi_util:make_checksum_filename(DataDir, FileBin),
+ case file:open(CSumPath, [append, raw, binary, delayed_write]) of
+ {ok, FHc} ->
+ do_net_server_write2(Sock, OffsetHex, LenHex, FileBin, DataDir, FHc);
+ {error, enoent} ->
+ ok = filelib:ensure_dir(CSumPath),
+ do_net_server_write(Sock, OffsetHex, LenHex, FileBin, DataDir)
+ end.
+
+do_net_server_write2(Sock, OffsetHex, LenHex, FileBin, DataDir, FHc) ->
+ DoItFun = fun(FHd, Offset, Len) ->
+ ok = inet:setopts(Sock, [{packet, raw}]),
+ {ok, Chunk} = gen_tcp:recv(Sock, Len),
+ CSum = machi_util:checksum(Chunk),
+ case file:pwrite(FHd, Offset, Chunk) of
+ ok ->
+ CSumHex = machi_util:bin_to_hexstr(CSum),
+ CSum_info = [OffsetHex, 32, LenHex, 32, CSumHex, 10],
+ ok = file:write(FHc, CSum_info),
+ ok = file:close(FHc),
+ gen_tcp:send(Sock, <<"OK\n">>);
+ _Else3 ->
+ machi_util:verb("Else3 ~p ~p ~p\n",
+ [Offset, Len, _Else3]),
+ ok = gen_tcp:send(Sock, "ERROR BAD-PWRITE\n")
+ end
+ end,
+ do_net_server_readwrite_common(Sock, OffsetHex, LenHex, FileBin, DataDir,
+ [write, read, binary, raw], DoItFun).
+
+perhaps_do_net_server_ec_read(Sock, FH) ->
+ case file:pread(FH, 0, ?MINIMUM_OFFSET) of
+ {ok, Bin} when byte_size(Bin) == ?MINIMUM_OFFSET ->
+ decode_and_reply_net_server_ec_read(Sock, Bin);
+ {ok, _AnythingElse} ->
+ ok = gen_tcp:send(Sock, "ERROR PARTIAL-READ2\n");
+ _AnythingElse ->
+ ok = gen_tcp:send(Sock, "ERROR BAD-PREAD\n")
+ end.
+
+decode_and_reply_net_server_ec_read(Sock, <<"a ", Rest/binary>>) ->
+ decode_and_reply_net_server_ec_read_version_a(Sock, Rest);
+decode_and_reply_net_server_ec_read(Sock, <<0:8, _/binary>>) ->
+ ok = gen_tcp:send(Sock, <<"ERROR NOT-ERASURE\n">>).
+
+decode_and_reply_net_server_ec_read_version_a(Sock, Rest) ->
+ %% <> = Rest,
+ HdrLen = 80 - 2 - 4 - 1,
+ <> = Rest,
+ <> = machi_util:hexstr_to_bin(BodyLenHex),
+ <> = Rest2,
+ ok = gen_tcp:send(Sock, ["ERASURE ", BodyLenHex, " ", Hdr, Body]).
+
+do_net_server_listing(Sock, DataDir) ->
+ Files = filelib:wildcard("*", DataDir) -- ["config"],
+ Out = ["OK\n",
+ [begin
+ {ok, FI} = file:read_file_info(DataDir ++ "/" ++ File),
+ Size = FI#file_info.size,
+ SizeBin = <>,
+ [machi_util:bin_to_hexstr(SizeBin), <<" ">>,
+ list_to_binary(File), <<"\n">>]
+ end || File <- Files],
+ ".\n"
+ ],
+ ok = gen_tcp:send(Sock, Out).
+
+do_net_server_checksum_listing(Sock, File, DataDir) ->
+ case sanitize_file_string(File) of
+ ok ->
+ do_net_server_checksum_listing2(Sock, File, DataDir);
+ _ ->
+ ok = gen_tcp:send(Sock, <<"ERROR BAD-ARG\n">>)
+ end.
+
+do_net_server_checksum_listing2(Sock, File, DataDir) ->
+ CSumPath = machi_util:make_checksum_filename(DataDir, File),
+ case file:open(CSumPath, [read, raw, binary]) of
+ {ok, FH} ->
+ {ok, FI} = file:read_file_info(CSumPath),
+ Len = FI#file_info.size,
+ LenHex = list_to_binary(machi_util:bin_to_hexstr(<>)),
+ %% Client has option of line-by-line with "." terminator,
+ %% or using the offset in the OK message to slurp things
+ %% down by exact byte size.
+ ok = gen_tcp:send(Sock, [<<"OK ">>, LenHex, <<"\n">>]),
+ do_net_copy_bytes(FH, Sock),
+ ok = file:close(FH),
+ ok = gen_tcp:send(Sock, ".\n");
+ {error, enoent} ->
+ ok = gen_tcp:send(Sock, "ERROR NO-SUCH-FILE\n");
+ _ ->
+ ok = gen_tcp:send(Sock, "ERROR\n")
+ end.
+
+do_net_copy_bytes(FH, Sock) ->
+ case file:read(FH, 1024*1024) of
+ {ok, Bin} ->
+ ok = gen_tcp:send(Sock, Bin),
+ do_net_copy_bytes(FH, Sock);
+ eof ->
+ ok
+ end.
+
+do_net_server_delete_migration_only(Sock, File, DataDir) ->
+ case sanitize_file_string(File) of
+ ok ->
+ do_net_server_delete_migration_only2(Sock, File, DataDir);
+ _ ->
+ ok = gen_tcp:send(Sock, <<"ERROR BAD-ARG\n">>)
+ end.
+
+do_net_server_delete_migration_only2(Sock, File, DataDir) ->
+ {_, Path} = machi_util:make_data_filename(DataDir, File),
+ case file:delete(Path) of
+ ok ->
+ ok = gen_tcp:send(Sock, "OK\n");
+ {error, enoent} ->
+ ok = gen_tcp:send(Sock, "ERROR NO-SUCH-FILE\n");
+ _ ->
+ ok = gen_tcp:send(Sock, "ERROR\n")
+ end.
+
+do_net_server_truncate_hackityhack(Sock, File, DataDir) ->
+ case sanitize_file_string(File) of
+ ok ->
+ do_net_server_truncate_hackityhack2(Sock, File, DataDir);
+ _ ->
+ ok = gen_tcp:send(Sock, <<"ERROR BAD-ARG\n">>)
+ end.
+
+do_net_server_truncate_hackityhack2(Sock, File, DataDir) ->
+ {_, Path} = machi_util:make_data_filename(DataDir, File),
+ case file:open(Path, [read, write, binary, raw]) of
+ {ok, FH} ->
+ try
+ {ok, ?MINIMUM_OFFSET} = file:position(FH, ?MINIMUM_OFFSET),
+ ok = file:truncate(FH),
+ ok = gen_tcp:send(Sock, "OK\n")
+ after
+ file:close(FH)
+ end;
+ {error, enoent} ->
+ ok = gen_tcp:send(Sock, "ERROR NO-SUCH-FILE\n");
+ _ ->
+ ok = gen_tcp:send(Sock, "ERROR\n")
+ end.
+
+write_server_get_pid(Prefix, DataDir) ->
+ RegName = machi_util:make_regname(Prefix),
+ case whereis(RegName) of
+ undefined ->
+ start_seq_append_server(Prefix, DataDir),
+ timer:sleep(1),
+ write_server_get_pid(Prefix, DataDir);
+ Pid ->
+ Pid
+ end.
+
+start_seq_append_server(Prefix, DataDir) ->
+ spawn_link(fun() -> run_seq_append_server(Prefix, DataDir) end).
+
+run_seq_append_server(Prefix, DataDir) ->
+ true = register(machi_util:make_regname(Prefix), self()),
+ ok = filelib:ensure_dir(DataDir ++ "/unused"),
+ ok = filelib:ensure_dir(DataDir ++ "/config/unused"),
+ run_seq_append_server2(Prefix, DataDir).
+
+run_seq_append_server2(Prefix, DataDir) ->
+ FileNum = machi_util:read_max_filenum(DataDir, Prefix) + 1,
+ case machi_util:increment_max_filenum(DataDir, Prefix) of
+ ok ->
+ machi_util:increment_max_filenum(DataDir, Prefix),
+ machi_util:info_msg("start: ~p server at file ~w\n",
+ [Prefix, FileNum]),
+ seq_append_server_loop(DataDir, Prefix, FileNum);
+ Else ->
+ error_logger:error_msg("start: ~p server at file ~w: ~p\n",
+ [Prefix, FileNum, Else]),
+ exit(Else)
+
+ end.
+
+seq_append_server_loop(DataDir, Prefix, FileNum) ->
+ SequencerNameHack = lists:flatten(io_lib:format(
+ "~.36B~.36B",
+ [element(3,now()),
+ list_to_integer(os:getpid())])),
+ {File, FullPath} = machi_util:make_data_filename(
+ DataDir, Prefix, SequencerNameHack, FileNum),
+ {ok, FHd} = file:open(FullPath,
+ [write, binary, raw]),
+ %% [write, binary, raw, delayed_write]),
+ CSumPath = machi_util:make_checksum_filename(
+ DataDir, Prefix, SequencerNameHack, FileNum),
+ {ok, FHc} = file:open(CSumPath, [append, raw, binary, delayed_write]),
+ seq_append_server_loop(DataDir, Prefix, File, {FHd,FHc}, FileNum,
+ ?MINIMUM_OFFSET).
+
+seq_append_server_loop(DataDir, Prefix, _File, {FHd,FHc}, FileNum, Offset)
+ when Offset > ?MAX_FILE_SIZE ->
+ ok = file:close(FHd),
+ ok = file:close(FHc),
+ machi_util:info_msg("rollover: ~p server at file ~w offset ~w\n",
+ [Prefix, FileNum, Offset]),
+ run_seq_append_server2(Prefix, DataDir);
+seq_append_server_loop(DataDir, Prefix, File, {FHd,FHc}=FH_, FileNum, Offset) ->
+ receive
+ {seq_append, From, Prefix, Chunk, CSum} ->
+ ok = file:pwrite(FHd, Offset, Chunk),
+ From ! {assignment, Offset, File},
+ Len = byte_size(Chunk),
+ OffsetHex = machi_util:bin_to_hexstr(<>),
+ LenHex = machi_util:bin_to_hexstr(<>),
+ CSumHex = machi_util:bin_to_hexstr(CSum),
+ CSum_info = [OffsetHex, 32, LenHex, 32, CSumHex, 10],
+ ok = file:write(FHc, CSum_info),
+ seq_append_server_loop(DataDir, Prefix, File, FH_,
+ FileNum, Offset + Len)
+ after 30*1000 ->
+ ok = file:close(FHd),
+ ok = file:close(FHc),
+ machi_util:info_msg("stop: ~p server at file ~w offset ~w\n",
+ [Prefix, FileNum, Offset]),
+ exit(normal)
+ end.
+
diff --git a/src/machi_flu1_client.erl b/src/machi_flu1_client.erl
new file mode 100644
index 0000000..3cccf00
--- /dev/null
+++ b/src/machi_flu1_client.erl
@@ -0,0 +1,399 @@
+%% -------------------------------------------------------------------
+%%
+%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+-module(machi_flu1_client).
+
+-include("machi.hrl").
+
+-export([
+ append_chunk/3, append_chunk/4,
+ read_chunk/4, read_chunk/5,
+ checksum_list/2, checksum_list/3,
+ list_files/1, list_files/2,
+ quit/1
+ ]).
+%% For "internal" replication only.
+-export([
+ write_chunk/4, write_chunk/5,
+ delete_migration/2, delete_migration/3,
+ trunc_hack/2, trunc_hack/3
+ ]).
+
+-type chunk() :: iolist().
+-type chunk_s() :: binary().
+-type chunk_pos() :: {file_offset(), chunk_size(), file_name_s()}.
+-type chunk_size() :: non_neg_integer().
+-type inet_host() :: inet:ip_address() | inet:hostname().
+-type inet_port() :: inet:port_number().
+-type file_name() :: binary() | list().
+-type file_name_s() :: binary(). % server reply
+-type file_offset() :: non_neg_integer().
+-type file_prefix() :: binary() | list().
+
+%% @doc Append a chunk (binary- or iolist-style) of data to a file
+%% with `Prefix'.
+
+-spec append_chunk(port(), file_prefix(), chunk()) ->
+ {ok, chunk_pos()} | {error, term()}.
+append_chunk(Sock, Prefix, Chunk) ->
+ append_chunk2(Sock, Prefix, Chunk).
+
+%% @doc Append a chunk (binary- or iolist-style) of data to a file
+%% with `Prefix'.
+
+-spec append_chunk(inet_host(), inet_port(), file_prefix(), chunk()) ->
+ {ok, chunk_pos()} | {error, term()}.
+append_chunk(Host, TcpPort, Prefix, Chunk) ->
+ Sock = machi_util:connect(Host, TcpPort),
+ try
+ append_chunk2(Sock, Prefix, Chunk)
+ after
+ catch gen_tcp:close(Sock)
+ end.
+
+%% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
+
+-spec read_chunk(port(), file_name(), file_offset(), chunk_size()) ->
+ {ok, chunk_s()} | {error, term()}.
+read_chunk(Sock, File, Offset, Size) ->
+ read_chunk2(Sock, File, Offset, Size).
+
+%% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
+
+-spec read_chunk(inet_host(), inet_port(), file_name(), file_offset(), chunk_size()) ->
+ {ok, chunk_s()} | {error, term()}.
+read_chunk(Host, TcpPort, File, Offset, Size) ->
+ Sock = machi_util:connect(Host, TcpPort),
+ try
+ read_chunk2(Sock, File, Offset, Size)
+ after
+ catch gen_tcp:close(Sock)
+ end.
+
+%% @doc Fetch the list of chunk checksums for `File'.
+
+-spec checksum_list(port(), file_name()) ->
+ {ok, [file_name()]} | {error, term()}.
+checksum_list(Sock, File) when is_port(Sock) ->
+ checksum_list2(Sock, File).
+
+%% @doc Fetch the list of chunk checksums for `File'.
+
+-spec checksum_list(inet_host(), inet_port(), file_name()) ->
+ {ok, [file_name()]} | {error, term()}.
+checksum_list(Host, TcpPort, File) when is_integer(TcpPort) ->
+ Sock = machi_util:connect(Host, TcpPort),
+ try
+ checksum_list2(Sock, File)
+ after
+ catch gen_tcp:close(Sock)
+ end.
+
+%% @doc Fetch the list of all files on the remote FLU.
+
+-spec list_files(port()) ->
+ {ok, [file_name()]} | {error, term()}.
+list_files(Sock) when is_port(Sock) ->
+ list2(Sock).
+
+%% @doc Fetch the list of all files on the remote FLU.
+
+-spec list_files(inet_host(), inet_port()) ->
+ {ok, [file_name()]} | {error, term()}.
+list_files(Host, TcpPort) when is_integer(TcpPort) ->
+ Sock = machi_util:connect(Host, TcpPort),
+ try
+ list2(Sock)
+ after
+ catch gen_tcp:close(Sock)
+ end.
+
+%% @doc Quit & close the connection to remote FLU.
+
+-spec quit(port()) ->
+ ok.
+quit(Sock) when is_port(Sock) ->
+ catch (_ = gen_tcp:send(Sock, <<"QUIT\n">>)),
+ catch gen_tcp:close(Sock),
+ ok.
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+%% @doc Restricted API: Write a chunk of already-sequenced data to
+%% `File' at `Offset'.
+
+-spec write_chunk(port(), file_name(), file_offset(), chunk()) ->
+ {ok, chunk_s()} | {error, term()}.
+write_chunk(Sock, File, Offset, Chunk) ->
+ write_chunk2(Sock, File, Offset, Chunk).
+
+%% @doc Restricted API: Write a chunk of already-sequenced data to
+%% `File' at `Offset'.
+
+-spec write_chunk(inet_host(), inet_port(), file_name(), file_offset(), chunk()) ->
+ {ok, chunk_s()} | {error, term()}.
+write_chunk(Host, TcpPort, File, Offset, Chunk) ->
+ Sock = machi_util:connect(Host, TcpPort),
+ try
+ write_chunk2(Sock, File, Offset, Chunk)
+ after
+ catch gen_tcp:close(Sock)
+ end.
+
+%% @doc Restricted API: Delete a file after it has been successfully
+%% migrated.
+
+-spec delete_migration(port(), file_name()) ->
+ {ok, [file_name()]} | {error, term()}.
+delete_migration(Sock, File) when is_port(Sock) ->
+ delete_migration2(Sock, File).
+
+%% @doc Restricted API: Delete a file after it has been successfully
+%% migrated.
+
+-spec delete_migration(inet_host(), inet_port(), file_name()) ->
+ {ok, [file_name()]} | {error, term()}.
+delete_migration(Host, TcpPort, File) when is_integer(TcpPort) ->
+ Sock = machi_util:connect(Host, TcpPort),
+ try
+ delete_migration2(Sock, File)
+ after
+ catch gen_tcp:close(Sock)
+ end.
+
+%% @doc Restricted API: Truncate a file after it has been successfully
+%% erasure coded.
+
+-spec trunc_hack(port(), file_name()) ->
+ {ok, [file_name()]} | {error, term()}.
+trunc_hack(Sock, File) when is_port(Sock) ->
+ trunc_hack2(Sock, File).
+
+%% @doc Restricted API: Truncate a file after it has been successfully
+%% erasure coded.
+
+-spec trunc_hack(inet_host(), inet_port(), file_name()) ->
+ {ok, [file_name()]} | {error, term()}.
+trunc_hack(Host, TcpPort, File) when is_integer(TcpPort) ->
+ Sock = machi_util:connect(Host, TcpPort),
+ try
+ trunc_hack2(Sock, File)
+ after
+ catch gen_tcp:close(Sock)
+ end.
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+append_chunk2(Sock, Prefix0, Chunk0) ->
+ try
+ %% TODO: add client-side checksum to the server's protocol
+ %% _ = crypto:hash(md5, Chunk),
+ Prefix = machi_util:make_binary(Prefix0),
+ Chunk = machi_util:make_binary(Chunk0),
+ Len = iolist_size(Chunk0),
+ true = (Len =< ?MAX_CHUNK_SIZE),
+ LenHex = machi_util:int_to_hexbin(Len, 32),
+ Cmd = <<"A ", LenHex/binary, " ", Prefix/binary, "\n">>,
+ ok = gen_tcp:send(Sock, [Cmd, Chunk]),
+ {ok, Line} = gen_tcp:recv(Sock, 0),
+ PathLen = byte_size(Line) - 3 - 16 - 1 - 1,
+ case Line of
+ <<"OK ", OffsetHex:16/binary, " ",
+ Path:PathLen/binary, _:1/binary>> ->
+ Offset = machi_util:hexstr_to_int(OffsetHex),
+ {ok, {Offset, Len, Path}};
+ <<"ERROR BAD-ARG", _/binary>> ->
+ {error, bad_arg};
+ <<"ERROR ", Rest/binary>> ->
+ {error, Rest}
+ end
+ catch
+ throw:Error ->
+ Error;
+ error:{badmatch,_}=BadMatch ->
+ {error, {badmatch, BadMatch, erlang:get_stacktrace()}}
+ end.
+
+read_chunk2(Sock, File0, Offset, Size) ->
+ File = machi_util:make_binary(File0),
+ PrefixHex = machi_util:int_to_hexbin(Offset, 64),
+ SizeHex = machi_util:int_to_hexbin(Size, 32),
+ CmdLF = [$R, 32, PrefixHex, 32, SizeHex, 32, File, 10],
+ ok = gen_tcp:send(Sock, CmdLF),
+ case gen_tcp:recv(Sock, 3) of
+ {ok, <<"OK\n">>} ->
+ {ok, _Chunk}=Res = gen_tcp:recv(Sock, Size),
+ Res;
+ {ok, Else} ->
+ {ok, OldOpts} = inet:getopts(Sock, [packet]),
+ ok = inet:setopts(Sock, [{packet, line}]),
+ {ok, Else2} = gen_tcp:recv(Sock, 0),
+ ok = inet:setopts(Sock, OldOpts),
+ case Else of
+ <<"ERA">> ->
+ {error, todo_erasure_coded}; %% escript_cc_parse_ec_info(Sock, Line, Else2);
+ <<"ERR">> ->
+ case Else2 of
+ <<"OR BAD-IO\n">> ->
+ {error, no_such_file};
+ <<"OR NOT-ERASURE\n">> ->
+ {error, no_such_file};
+ <<"OR BAD-ARG\n">> ->
+ {error, bad_arg};
+ <<"OR PARTIAL-READ\n">> ->
+ {error, partial_read};
+ _ ->
+ {error, Else2}
+ end;
+ _ ->
+ {error, {whaaa, <>}}
+ end
+ end.
+
+list2(Sock) ->
+ try
+ ok = gen_tcp:send(Sock, <<"L\n">>),
+ ok = inet:setopts(Sock, [{packet, line}]),
+ {ok, <<"OK\n">>} = gen_tcp:recv(Sock, 0),
+ Res = list2(gen_tcp:recv(Sock, 0), Sock),
+ ok = inet:setopts(Sock, [{packet, raw}]),
+ {ok, Res}
+ catch
+ throw:Error ->
+ Error;
+ error:{badmatch,_}=BadMatch ->
+ {error, {badmatch, BadMatch}}
+ end.
+
+list2({ok, <<".\n">>}, _Sock) ->
+ [];
+list2({ok, Line}, Sock) ->
+ [Line|list2(gen_tcp:recv(Sock, 0), Sock)];
+list2(Else, _Sock) ->
+ throw({server_protocol_error, Else}).
+
+checksum_list2(Sock, File) ->
+ try
+ ok = gen_tcp:send(Sock, [<<"C ">>, File, <<"\n">>]),
+ ok = inet:setopts(Sock, [{packet, line}]),
+ case gen_tcp:recv(Sock, 0) of
+ {ok, <<"OK ", Rest/binary>> = Line} ->
+ put(status, ok), % may be unset later
+ RestLen = byte_size(Rest) - 1,
+ <> = Rest,
+ <> = machi_util:hexstr_to_bin(LenHex),
+ ok = inet:setopts(Sock, [{packet, raw}]),
+ checksum_list_fast(Sock, Len);
+ {ok, <<"ERROR NO-SUCH-FILE", _/binary>>} ->
+ {error, no_such_file};
+ {ok, <<"ERROR BAD-ARG", _/binary>>} ->
+ {error, bad_arg};
+ {ok, Else} ->
+ throw({server_protocol_error, Else})
+ end
+ catch
+ throw:Error ->
+ Error;
+ error:{badmatch,_}=BadMatch ->
+ {error, {badmatch, BadMatch}}
+ end.
+
+checksum_list_fast(Sock, 0) ->
+ {ok, <<".\n">> = _Line} = gen_tcp:recv(Sock, 2),
+ [];
+checksum_list_fast(Sock, Remaining) ->
+ Num = erlang:min(Remaining, 1024*1024),
+ {ok, Bytes} = gen_tcp:recv(Sock, Num),
+ [Bytes|checksum_list_fast(Sock, Remaining - byte_size(Bytes))].
+
+
+write_chunk2(Sock, File0, Offset, Chunk0) ->
+ try
+ %% TODO: add client-side checksum to the server's protocol
+ %% _ = crypto:hash(md5, Chunk),
+ File = machi_util:make_binary(File0),
+ true = (Offset >= ?MINIMUM_OFFSET),
+ OffsetHex = machi_util:int_to_hexbin(Offset, 64),
+ Chunk = machi_util:make_binary(Chunk0),
+ Len = iolist_size(Chunk0),
+ true = (Len =< ?MAX_CHUNK_SIZE),
+ LenHex = machi_util:int_to_hexbin(Len, 32),
+ Cmd = <<"W-repl ", OffsetHex/binary, " ",
+ LenHex/binary, " ", File/binary, "\n">>,
+
+ ok = gen_tcp:send(Sock, [Cmd, Chunk]),
+ {ok, Line} = gen_tcp:recv(Sock, 0),
+ PathLen = byte_size(Line) - 3 - 16 - 1 - 1,
+ case Line of
+ <<"OK\n">> ->
+ ok;
+ <<"ERROR BAD-ARG", _/binary>> ->
+ {error, bad_arg};
+ <<"ERROR ", _/binary>>=Else ->
+ {error, {server_said, Else}}
+ end
+ catch
+ throw:Error ->
+ Error;
+ error:{badmatch,_}=BadMatch ->
+ {error, {badmatch, BadMatch, erlang:get_stacktrace()}}
+ end.
+
+delete_migration2(Sock, File) ->
+ try
+ ok = gen_tcp:send(Sock, [<<"DEL-migration ">>, File, <<"\n">>]),
+ ok = inet:setopts(Sock, [{packet, line}]),
+ case gen_tcp:recv(Sock, 0) of
+ {ok, <<"OK\n">>} ->
+ ok;
+ {ok, <<"ERROR NO-SUCH-FILE", _/binary>>} ->
+ {error, no_such_file};
+ {ok, <<"ERROR BAD-ARG", _/binary>>} ->
+ {error, bad_arg};
+ {ok, Else} ->
+ throw({server_protocol_error, Else})
+ end
+ catch
+ throw:Error ->
+ Error;
+ error:{badmatch,_}=BadMatch ->
+ {error, {badmatch, BadMatch}}
+ end.
+
+trunc_hack2(Sock, File) ->
+ try
+ ok = gen_tcp:send(Sock, [<<"TRUNC-hack--- ">>, File, <<"\n">>]),
+ ok = inet:setopts(Sock, [{packet, line}]),
+ case gen_tcp:recv(Sock, 0) of
+ {ok, <<"OK\n">>} ->
+ ok;
+ {ok, <<"ERROR NO-SUCH-FILE", _/binary>>} ->
+ {error, no_such_file};
+ {ok, <<"ERROR BAD-ARG", _/binary>>} ->
+ {error, bad_arg};
+ {ok, Else} ->
+ throw({server_protocol_error, Else})
+ end
+ catch
+ throw:Error ->
+ Error;
+ error:{badmatch,_}=BadMatch ->
+ {error, {badmatch, BadMatch}}
+ end.
diff --git a/src/machi_flu_sup.erl b/src/machi_flu_sup.erl
new file mode 100644
index 0000000..4ad26fc
--- /dev/null
+++ b/src/machi_flu_sup.erl
@@ -0,0 +1,51 @@
+%% -------------------------------------------------------------------
+%%
+%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+-module(machi_flu_sup).
+
+-behaviour(supervisor).
+
+%% API
+-export([start_link/0]).
+
+%% Supervisor callbacks
+-export([init/1]).
+
+-define(SERVER, ?MODULE).
+
+start_link() ->
+ supervisor:start_link({local, ?SERVER}, ?MODULE, []).
+
+init([]) ->
+ RestartStrategy = one_for_one,
+ MaxRestarts = 1000,
+ MaxSecondsBetweenRestarts = 3600,
+
+ SupFlags = {RestartStrategy, MaxRestarts, MaxSecondsBetweenRestarts},
+
+ Restart = permanent,
+ Shutdown = 5000,
+ Type = worker,
+
+ {ok, FluList} = application:get_env(machi, flu_list),
+ FluSpecs = [{FluName, {machi_flu, start_link, [FluArgs]},
+ Restart, Shutdown, Type, []} ||
+ {FluName, _Port, _Dir}=FluArgs <- FluList],
+ {ok, {SupFlags, FluSpecs}}.
diff --git a/src/machi_sequencer.erl b/src/machi_sequencer.erl
new file mode 100644
index 0000000..ddd81a5
--- /dev/null
+++ b/src/machi_sequencer.erl
@@ -0,0 +1,191 @@
+%% -------------------------------------------------------------------
+%%
+%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+-module(machi_sequencer).
+
+-compile(export_all).
+
+-include_lib("kernel/include/file.hrl").
+
+-define(CONFIG_DIR, "./config").
+-define(DATA_DIR, "./data").
+
+seq(Server, Prefix, Size) when is_binary(Prefix), is_integer(Size), Size > -1 ->
+ Server ! {seq, self(), Prefix, Size},
+ receive
+ {assignment, File, Offset} ->
+ {File, Offset}
+ after 1*1000 ->
+ bummer
+ end.
+
+seq_direct(Prefix, Size) when is_binary(Prefix), is_integer(Size), Size > -1 ->
+ RegName = make_regname(Prefix),
+ seq(RegName, Prefix, Size).
+
+start_server() ->
+ start_server(?MODULE).
+
+start_server(Name) ->
+ spawn_link(fun() -> run_server(Name) end).
+
+run_server(Name) ->
+ register(Name, self()),
+ ets:new(?MODULE, [named_table, public, {write_concurrency, true}]),
+ server_loop().
+
+server_loop() ->
+ receive
+ {seq, From, Prefix, Size} ->
+ spawn(fun() -> server_dispatch(From, Prefix, Size) end),
+ server_loop()
+ end.
+
+server_dispatch(From, Prefix, Size) ->
+ RegName = make_regname(Prefix),
+ case whereis(RegName) of
+ undefined ->
+ start_prefix_server(Prefix),
+ timer:sleep(1),
+ server_dispatch(From, Prefix, Size);
+ Pid ->
+ Pid ! {seq, From, Prefix, Size}
+ end,
+ exit(normal).
+
+start_prefix_server(Prefix) ->
+ spawn(fun() -> run_prefix_server(Prefix) end).
+
+run_prefix_server(Prefix) ->
+ true = register(make_regname(Prefix), self()),
+ ok = filelib:ensure_dir(?CONFIG_DIR ++ "/unused"),
+ ok = filelib:ensure_dir(?DATA_DIR ++ "/unused"),
+ FileNum = read_max_filenum(Prefix) + 1,
+ ok = increment_max_filenum(Prefix),
+ prefix_server_loop(Prefix, FileNum).
+
+prefix_server_loop(Prefix, FileNum) ->
+ File = make_data_filename(Prefix, FileNum),
+ prefix_server_loop(Prefix, File, FileNum, 0).
+
+prefix_server_loop(Prefix, File, FileNum, Offset) ->
+ receive
+ {seq, From, Prefix, Size} ->
+ From ! {assignment, File, Offset},
+ prefix_server_loop(Prefix, File, FileNum, Offset + Size)
+ after 30*1000 ->
+ io:format("timeout: ~p server stopping\n", [Prefix]),
+ exit(normal)
+ end.
+
+make_regname(Prefix) ->
+ erlang:binary_to_atom(Prefix, latin1).
+
+make_config_filename(Prefix) ->
+ lists:flatten(io_lib:format("~s/~s", [?CONFIG_DIR, Prefix])).
+
+make_data_filename(Prefix, FileNum) ->
+ erlang:iolist_to_binary(io_lib:format("~s/~s.~w",
+ [?DATA_DIR, Prefix, FileNum])).
+
+read_max_filenum(Prefix) ->
+ case file:read_file_info(make_config_filename(Prefix)) of
+ {error, enoent} ->
+ 0;
+ {ok, FI} ->
+ FI#file_info.size
+ end.
+
+increment_max_filenum(Prefix) ->
+ {ok, FH} = file:open(make_config_filename(Prefix), [append]),
+ ok = file:write(FH, "x"),
+ %% ok = file:sync(FH),
+ ok = file:close(FH).
+
+%%%%%%%%%%%%%%%%%
+
+%% basho_bench callbacks
+
+-define(SEQ, ?MODULE).
+
+new(1) ->
+ start_server(),
+ timer:sleep(100),
+ {ok, unused};
+new(_Id) ->
+ {ok, unused}.
+
+run(null, _KeyGen, _ValgueGen, State) ->
+ {ok, State};
+run(keygen_then_null, KeyGen, _ValgueGen, State) ->
+ _Prefix = KeyGen(),
+ {ok, State};
+run(seq, KeyGen, _ValgueGen, State) ->
+ Prefix = KeyGen(),
+ {_, _} = ?SEQ:seq(?SEQ, Prefix, 1),
+ {ok, State};
+run(seq_direct, KeyGen, _ValgueGen, State) ->
+ Prefix = KeyGen(),
+ Name = ?SEQ:make_regname(Prefix),
+ case get(Name) of
+ undefined ->
+ case whereis(Name) of
+ undefined ->
+ {_, _} = ?SEQ:seq(?SEQ, Prefix, 1);
+ Pid ->
+ put(Name, Pid),
+ {_, _} = ?SEQ:seq(Pid, Prefix, 1)
+ end;
+ Pid ->
+ {_, _} = ?SEQ:seq(Pid, Prefix, 1)
+ end,
+ {ok, State};
+run(seq_ets, KeyGen, _ValgueGen, State) ->
+ Tab = ?MODULE,
+ Prefix = KeyGen(),
+ Res = try
+ BigNum = ets:update_counter(Tab, Prefix, 1),
+ BigBin = <>,
+ <> = BigBin,
+ %% if Offset rem 1000 == 0 ->
+ %% io:format("~p,~p ", [FileNum, Offset]);
+ %% true ->
+ %% ok
+ %% end,
+ {fakefake, FileNum, Offset}
+ catch error:badarg ->
+ FileNum2 = 1, Offset2 = 0,
+ FileBin = <>,
+ OffsetBin = <>,
+ Glop = <>,
+ <> = Glop,
+ %% if Prefix == <<"42">> -> io:format("base:~w\n", [Base]); true -> ok end,
+ %% Base = 0,
+ case ets:insert_new(Tab, {Prefix, Base}) of
+ true ->
+ {<<"fakefakefake">>, Base};
+ false ->
+ Result2 = ets:update_counter(Tab, Prefix, 1),
+ {<<"fakefakefake">>, Result2}
+ end
+ end,
+ Res = Res,
+ {ok, State}.
+
diff --git a/src/machi_sup.erl b/src/machi_sup.erl
new file mode 100644
index 0000000..dcaadbe
--- /dev/null
+++ b/src/machi_sup.erl
@@ -0,0 +1,55 @@
+%% -------------------------------------------------------------------
+%%
+%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+-module(machi_sup).
+
+-behaviour(supervisor).
+
+%% API
+-export([start_link/0]).
+
+%% Supervisor callbacks
+-export([init/1]).
+
+-define(SERVER, ?MODULE).
+
+start_link() ->
+ supervisor:start_link({local, ?SERVER}, ?MODULE, []).
+
+init([]) ->
+ RestartStrategy = one_for_one,
+ MaxRestarts = 1000,
+ MaxSecondsBetweenRestarts = 3600,
+
+ SupFlags = {RestartStrategy, MaxRestarts, MaxSecondsBetweenRestarts},
+
+ Restart = permanent,
+ Shutdown = 5000,
+ Type = supervisor,
+
+ ServerSup =
+ {machi_flu_sup, {machi_flu_sup, start_link, []},
+ Restart, Shutdown, Type, []},
+
+ {ok, {SupFlags, [ServerSup]}}.
+
+ %% AChild = {'AName', {'AModule', start_link, []},
+ %% Restart, Shutdown, Type, ['AModule']},
+ %% {ok, {SupFlags, [AChild]}}.
diff --git a/src/machi_util.erl b/src/machi_util.erl
new file mode 100644
index 0000000..c859574
--- /dev/null
+++ b/src/machi_util.erl
@@ -0,0 +1,1102 @@
+%% -------------------------------------------------------------------
+%%
+%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+-module(machi_util).
+
+-export([
+ checksum/1,
+ hexstr_to_bin/1, bin_to_hexstr/1,
+ hexstr_to_int/1, int_to_hexstr/2, int_to_hexbin/2,
+ make_binary/1,
+ make_regname/1,
+ make_checksum_filename/2, make_data_filename/2,
+ read_max_filenum/2, increment_max_filenum/2,
+ info_msg/2, verb/1, verb/2,
+ %% TCP protocol helpers
+ connect/2
+ ]).
+-compile(export_all).
+
+-include("machi.hrl").
+-include("machi_projection.hrl").
+-include_lib("kernel/include/file.hrl").
+
+append(Server, Prefix, Chunk) when is_binary(Prefix), is_binary(Chunk) ->
+ CSum = checksum(Chunk),
+ Server ! {seq_append, self(), Prefix, Chunk, CSum},
+ receive
+ {assignment, Offset, File} ->
+ {Offset, File}
+ after 10*1000 ->
+ bummer
+ end.
+
+make_regname(Prefix) when is_binary(Prefix) ->
+ erlang:binary_to_atom(Prefix, latin1);
+make_regname(Prefix) when is_list(Prefix) ->
+ erlang:list_to_atom(Prefix).
+
+make_config_filename(DataDir, Prefix) ->
+ lists:flatten(io_lib:format("~s/config/~s", [DataDir, Prefix])).
+
+make_checksum_filename(DataDir, Prefix, SequencerName, FileNum) ->
+ lists:flatten(io_lib:format("~s/config/~s.~s.~w.csum",
+ [DataDir, Prefix, SequencerName, FileNum])).
+
+make_checksum_filename(DataDir, FileName) ->
+ lists:flatten(io_lib:format("~s/config/~s.csum", [DataDir, FileName])).
+
+make_data_filename(DataDir, File) ->
+ FullPath = lists:flatten(io_lib:format("~s/~s", [DataDir, File])),
+ {File, FullPath}.
+
+make_data_filename(DataDir, Prefix, SequencerName, FileNum) ->
+ File = erlang:iolist_to_binary(io_lib:format("~s.~s.~w",
+ [Prefix, SequencerName, FileNum])),
+ FullPath = lists:flatten(io_lib:format("~s/~s", [DataDir, File])),
+ {File, FullPath}.
+
+read_max_filenum(DataDir, Prefix) ->
+ case file:read_file_info(make_config_filename(DataDir, Prefix)) of
+ {error, enoent} ->
+ 0;
+ {ok, FI} ->
+ FI#file_info.size
+ end.
+
+increment_max_filenum(DataDir, Prefix) ->
+ try
+ {ok, FH} = file:open(make_config_filename(DataDir, Prefix), [append]),
+ ok = file:write(FH, "x"),
+ %% ok = file:sync(FH),
+ ok = file:close(FH)
+ catch
+ error:{badmatch,_}=Error ->
+ {error, Error, erlang:get_stacktrace()}
+ end.
+
+hexstr_to_bin(S) when is_list(S) ->
+ hexstr_to_bin(S, []);
+hexstr_to_bin(B) when is_binary(B) ->
+ hexstr_to_bin(binary_to_list(B), []).
+
+hexstr_to_bin([], Acc) ->
+ list_to_binary(lists:reverse(Acc));
+hexstr_to_bin([X,Y|T], Acc) ->
+ {ok, [V], []} = io_lib:fread("~16u", [X,Y]),
+ hexstr_to_bin(T, [V | Acc]).
+
+bin_to_hexstr(<<>>) ->
+ [];
+bin_to_hexstr(<>) ->
+ [hex_digit(X), hex_digit(Y)|bin_to_hexstr(Rest)].
+
+hex_digit(X) when X < 10 ->
+ X + $0;
+hex_digit(X) ->
+ X - 10 + $a.
+
+make_binary(X) when is_binary(X) ->
+ X;
+make_binary(X) when is_list(X) ->
+ iolist_to_binary(X).
+
+hexstr_to_int(X) ->
+ B = hexstr_to_bin(X),
+ B_size = byte_size(B) * 8,
+ <> = B,
+ I.
+
+int_to_hexstr(I, I_size) ->
+ bin_to_hexstr(<>).
+
+int_to_hexbin(I, I_size) ->
+ list_to_binary(int_to_hexstr(I, I_size)).
+
+%%%%%%%%%%%%%%%%%
+
+%%% escript stuff
+
+main2(["1file-write-redundant-client"]) ->
+ io:format("Use: Write a local file to a series of servers.\n"),
+ io:format("Args: BlockSize Prefix LocalFilePath [silent] [Host Port [Host Port ...]]\n"),
+ erlang:halt(1);
+main2(["1file-write-redundant-client", BlockSizeStr, PrefixStr, LocalFile|HPs0]) ->
+ BlockSize = list_to_integer(BlockSizeStr),
+ Prefix = list_to_binary(PrefixStr),
+ {Out, HPs} = case HPs0 of
+ ["silent"|Rest] -> {silent, Rest};
+ _ -> {not_silent, HPs0}
+ end,
+ Res = escript_upload_redundant(HPs, BlockSize, Prefix, LocalFile),
+ if Out /= silent ->
+ print_upload_details(user, Res);
+ true ->
+ ok
+ end,
+ Res;
+
+main2(["chunk-read-client"]) ->
+ io:format("Use: Read a series of chunks for a single server.\n"),
+ io:format("Args: Host Port LocalChunkDescriptionPath [OutputPath|'console']\n"),
+ erlang:halt(1);
+main2(["chunk-read-client", Host, PortStr, ChunkFileList]) ->
+ main2(["chunk-read-client", Host, PortStr, ChunkFileList, "console"]);
+main2(["chunk-read-client", Host, PortStr, ChunkFileList, OutputPath]) ->
+ FH = open_output_file(OutputPath),
+ OutFun = make_outfun(FH),
+ try
+ main2(["chunk-read-client2", Host, PortStr, ChunkFileList, OutFun])
+ after
+ (catch file:close(FH))
+ end;
+main2(["chunk-read-client2", Host, PortStr, ChunkFileList, ProcFun]) ->
+ Sock = escript_connect(Host, PortStr),
+ escript_download_chunks(Sock, ChunkFileList, ProcFun);
+
+main2(["delete-client"]) ->
+ io:format("Use: Delete a file (NOT FOR GENERAL USE)\n"),
+ io:format("Args: Host Port File\n"),
+ erlang:halt(1);
+main2(["delete-client", Host, PortStr, File]) ->
+ Sock = escript_connect(Host, PortStr),
+ escript_delete(Sock, File);
+
+%%%% cc flavors %%%%
+
+main2(["cc-1file-write-redundant-client"]) ->
+ io:format("Use: Write a local file to a chain via projection.\n"),
+ io:format("Args: BlockSize Prefix LocalFilePath ProjectionPath\n"),
+ erlang:halt(1);
+main2(["cc-1file-write-redundant-client", BlockSizeStr, PrefixStr, LocalFile, ProjectionPath]) ->
+ BlockSize = list_to_integer(BlockSizeStr),
+ Prefix = list_to_binary(PrefixStr),
+ {_Chain, RawHPs} = calc_chain(write, ProjectionPath, PrefixStr),
+ HPs = convert_raw_hps(RawHPs),
+ Res = escript_upload_redundant(HPs, BlockSize, Prefix, LocalFile),
+ print_upload_details(user, Res),
+ Res;
+
+main2(["cc-chunk-read-client"]) ->
+ io:format("Use: Read a series of chunks from a chain via projection.\n"),
+ io:format("Args: ProjectionPath ChunkFileList [OutputPath|'console' \\\n\t[ErrorCorrection_ProjectionPath]]\n"),
+ erlang:halt(1);
+main2(["cc-chunk-read-client", ProjectionPathOrDir, ChunkFileList]) ->
+ main3(["cc-chunk-read-client", ProjectionPathOrDir, ChunkFileList,"console",
+ undefined]);
+main2(["cc-chunk-read-client", ProjectionPathOrDir, ChunkFileList, OutputPath]) ->
+ main3(["cc-chunk-read-client", ProjectionPathOrDir, ChunkFileList, OutputPath,
+ undefined]);
+main2(["cc-chunk-read-client", ProjectionPathOrDir, ChunkFileList, OutputPath,
+ EC_ProjectionPath]) ->
+ main3(["cc-chunk-read-client", ProjectionPathOrDir, ChunkFileList, OutputPath,
+ EC_ProjectionPath]).
+
+main3(["cc-chunk-read-client",
+ ProjectionPathOrDir, ChunkFileList, OutputPath, EC_ProjectionPath]) ->
+ P = read_projection_file(ProjectionPathOrDir),
+ ChainMap = read_chain_map_file(ProjectionPathOrDir),
+ FH = open_output_file(OutputPath),
+ ProcFun = make_outfun(FH),
+ Res = try
+ escript_cc_download_chunks(ChunkFileList, P, ChainMap, ProcFun,
+ EC_ProjectionPath)
+ after
+ (catch file:close(FH))
+ end,
+ Res.
+
+-spec connect(inet:ip_address() | inet:hostname(), inet:port_number()) ->
+ port().
+connect(Host, Port) ->
+ escript_connect(Host, Port).
+
+escript_connect(Host, PortStr) when is_list(PortStr) ->
+ Port = list_to_integer(PortStr),
+ escript_connect(Host, Port);
+escript_connect(Host, Port) when is_integer(Port) ->
+ {ok, Sock} = gen_tcp:connect(Host, Port, [{active,false}, {mode,binary},
+ {packet, raw}]),
+ Sock.
+
+escript_upload_file(Sock, BlockSize, Prefix, File) ->
+ {ok, FH} = file:open(File, [read, raw, binary]),
+ try
+ escript_upload_file2(file:read(FH, BlockSize), FH,
+ BlockSize, Prefix, Sock, [])
+ after
+ file:close(FH)
+ end.
+
+escript_upload_file2({ok, Chunk}, FH, BlockSize, Prefix, Sock, Acc) ->
+ {OffsetHex, LenHex, File} = upload_chunk_append(Sock, Prefix, Chunk),
+ verb("~s ~s ~s\n", [OffsetHex, LenHex, File]),
+ <> = hexstr_to_bin(OffsetHex),
+ <> = hexstr_to_bin(LenHex),
+ OSF = {Offset, Size, File},
+ escript_upload_file2(file:read(FH, BlockSize), FH, BlockSize, Prefix, Sock,
+ [OSF|Acc]);
+escript_upload_file2(eof, _FH, _BlockSize, _Prefix, _Sock, Acc) ->
+ lists:reverse(Acc).
+
+upload_chunk_append(Sock, Prefix, Chunk) ->
+ %% _ = crypto:hash(md5, Chunk),
+ Len = byte_size(Chunk),
+ LenHex = list_to_binary(bin_to_hexstr(<>)),
+ Cmd = <<"A ", LenHex/binary, " ", Prefix/binary, "\n">>,
+ ok = gen_tcp:send(Sock, [Cmd, Chunk]),
+ {ok, Line} = gen_tcp:recv(Sock, 0),
+ PathLen = byte_size(Line) - 3 - 16 - 1 - 1,
+ <<"OK ", OffsetHex:16/binary, " ", Path:PathLen/binary, _:1/binary>> = Line,
+ {OffsetHex, LenHex, Path}.
+
+upload_chunk_write(Sock, Offset, File, Chunk) when is_integer(Offset) ->
+ OffsetHex = list_to_binary(bin_to_hexstr(<>)),
+ upload_chunk_write(Sock, OffsetHex, File, Chunk);
+upload_chunk_write(Sock, OffsetHex, File, Chunk) when is_binary(OffsetHex) ->
+ %% _ = crypto:hash(md5, Chunk),
+ Len = byte_size(Chunk),
+ LenHex = list_to_binary(bin_to_hexstr(<>)),
+ Cmd = <<"W-repl ", OffsetHex/binary, " ",
+ LenHex/binary, " ", File/binary, "\n">>,
+ ok = gen_tcp:send(Sock, [Cmd, Chunk]),
+ {ok, Line} = gen_tcp:recv(Sock, 0),
+ <<"OK\n">> = Line,
+ {OffsetHex, LenHex, File}.
+
+escript_upload_redundant([Host, PortStr|HPs], BlockSize, Prefix, LocalFile) ->
+ Sock = escript_connect(Host, PortStr),
+ ok = inet:setopts(Sock, [{packet, line}]),
+ OSFs = try
+ escript_upload_file(Sock, BlockSize, Prefix, LocalFile)
+ after
+ gen_tcp:close(Sock)
+ end,
+ escript_upload_redundant2(HPs, OSFs, LocalFile, OSFs).
+
+escript_upload_redundant2([], _OSFs, _LocalFile, OSFs) ->
+ OSFs;
+escript_upload_redundant2([Host, PortStr|HPs], OSFs, LocalFile, OSFs) ->
+ Sock = escript_connect(Host, PortStr),
+ {ok, FH} = file:open(LocalFile, [read, binary, raw]),
+ try
+ [begin
+ {ok, Chunk} = file:read(FH, Size),
+ _OSF2 = upload_chunk_write(Sock, Offset, File, Chunk)
+ %% verb("~p: ~p\n", [{Host, PortStr}, OSF2])
+ end || {Offset, Size, File} <- OSFs]
+ after
+ gen_tcp:close(Sock),
+ file:close(FH)
+ end,
+ escript_upload_redundant2(HPs, OSFs, LocalFile, OSFs).
+
+escript_download_chunks(Sock, {{{ChunkLine}}}, ProcFun) ->
+ escript_download_chunk({ok, ChunkLine}, invalid_fd, Sock, ProcFun);
+escript_download_chunks(Sock, ChunkFileList, ProcFun) ->
+ {ok, FH} = file:open(ChunkFileList, [read, raw, binary]),
+ escript_download_chunk(file:read_line(FH), FH, Sock, ProcFun).
+
+escript_download_chunk({ok, Line}, FH, Sock, ProcFun) ->
+ ChunkOrError = escript_cc_download_chunk2(Sock, Line),
+ ProcFun(ChunkOrError),
+ [ChunkOrError|
+ escript_download_chunk((catch file:read_line(FH)), FH, Sock, ProcFun)];
+escript_download_chunk(eof, _FH, _Sock, ProcFun) ->
+ ProcFun(eof),
+ [];
+escript_download_chunk(_Else, _FH, _Sock, ProcFun) ->
+ ProcFun(eof),
+ [].
+
+escript_cc_download_chunks({{{ChunkLine}}}, P, ChainMap, ProcFun,
+ EC_ProjectionPath) ->
+ escript_cc_download_chunk({ok,ChunkLine}, invalid_fd, P, ChainMap, ProcFun,
+ EC_ProjectionPath);
+escript_cc_download_chunks(ChunkFileList, P, ChainMap, ProcFun,
+ EC_ProjectionPath) ->
+ {ok, FH} = file:open(ChunkFileList, [read, raw, binary]),
+ escript_cc_download_chunk(file:read_line(FH), FH, P, ChainMap, ProcFun,
+ EC_ProjectionPath).
+
+escript_cc_download_chunk({ok, Line}, FH, P, ChainMap, ProcFun,
+ EC_ProjectionPath) ->
+ RestLen = byte_size(Line) - 16 - 1 - 8 - 1 - 1,
+ <<_Offset:16/binary, " ", _Len:8/binary, " ", Rest:RestLen/binary, "\n">>
+ = Line,
+ Prefix = re:replace(Rest, "\\..*", "", [{return, binary}]),
+ {_Chains, RawHPs} = calc_chain(read, P, ChainMap, Prefix),
+ Chunk = lists:foldl(
+ fun(_RawHP, Bin) when is_binary(Bin) -> Bin;
+ (RawHP, _) ->
+ [Host, PortStr] = convert_raw_hps([RawHP]),
+ Sock = get_cached_sock(Host, PortStr),
+ case escript_cc_download_chunk2(Sock, Line) of
+ Bin when is_binary(Bin) ->
+ Bin;
+ {error, _} = Error ->
+ Error;
+ {erasure_encoded, _} = EC_info ->
+ escript_cc_download_ec_chunk(EC_info,
+ EC_ProjectionPath)
+ end
+ end, undefined, RawHPs),
+ ProcFun(Chunk),
+ [Chunk|escript_cc_download_chunk((catch file:read_line(FH)),
+ FH, P, ChainMap, ProcFun,
+ EC_ProjectionPath)];
+escript_cc_download_chunk(eof, _FH, _P, _ChainMap, ProcFun,
+ _EC_ProjectionPath) ->
+ ProcFun(eof),
+ [];
+escript_cc_download_chunk(Else, _FH, _P, _ChainMap, ProcFun,
+ _EC_ProjectionPath) ->
+ ProcFun(Else),
+ [].
+
+escript_cc_download_chunk2(Sock, Line) ->
+ %% Line includes an LF, so we can be lazy.
+ CmdLF = [<<"R ">>, Line],
+ ok = gen_tcp:send(Sock, CmdLF),
+ case gen_tcp:recv(Sock, 3) of
+ {ok, <<"OK\n">>} ->
+ {_Offset, Size, _File} = read_hex_size(Line),
+ {ok, Chunk} = gen_tcp:recv(Sock, Size),
+ Chunk;
+ {ok, Else} ->
+ {ok, OldOpts} = inet:getopts(Sock, [packet]),
+ ok = inet:setopts(Sock, [{packet, line}]),
+ {ok, Else2} = gen_tcp:recv(Sock, 0),
+ ok = inet:setopts(Sock, OldOpts),
+ case Else of
+ <<"ERA">> ->
+ escript_cc_parse_ec_info(Sock, Line, Else2);
+ _ ->
+ {error, {Line, <>}}
+ end
+ end.
+
+escript_cc_parse_ec_info(Sock, Line, Else2) ->
+ ChompLine = chomp(Line),
+ {Offset, Size, File} = read_hex_size(ChompLine),
+ <<"SURE ", BodyLenHex:4/binary, " ", StripeWidthHex:16/binary, " ",
+ OrigFileLenHex:16/binary, " rs_10_4_v1", _/binary>> = Else2,
+ <> = hexstr_to_bin(BodyLenHex),
+ {ok, SummaryBody} = gen_tcp:recv(Sock, BodyLen),
+
+ <> = hexstr_to_bin(StripeWidthHex),
+ <> = hexstr_to_bin(OrigFileLenHex),
+ NewFileNum = (Offset div StripeWidth) + 1,
+ NewOffset = Offset rem StripeWidth,
+ if Offset + Size > OrigFileLen ->
+ %% Client's request is larger than original file size, derp
+ {error, bad_offset_and_size};
+ NewOffset + Size > StripeWidth ->
+ %% Client's request straddles a stripe boundary, TODO fix me
+ {error, todo_TODO_implement_this_with_two_reads_and_then_glue_together};
+ true ->
+ NewOffsetHex = bin_to_hexstr(<>),
+ LenHex = bin_to_hexstr(<>),
+ NewSuffix = file_suffix_rs_10_4_v1(NewFileNum),
+ NewFile = iolist_to_binary([File, NewSuffix]),
+ NewLine = iolist_to_binary([NewOffsetHex, " ", LenHex, " ",
+ NewFile, "\n"]),
+ {erasure_encoded, {Offset, Size, File, NewOffset, NewFile,
+ NewFileNum, NewLine, SummaryBody}}
+ end.
+
+%% TODO: The EC method/version/type stuff here is loosey-goosey
+escript_cc_download_ec_chunk(EC_info, undefined) ->
+ EC_info;
+escript_cc_download_ec_chunk({erasure_encoded,
+ {_Offset, _Size, _File, _NewOffset, NewFile,
+ NewFileNum, NewLine, SummaryBody}},
+ EC_ProjectionPath) ->
+ {P, ChainMap} = get_cached_projection(EC_ProjectionPath),
+ %% Remember: we use the whole file name for hashing, not the prefix
+ {_Chains, RawHPs} = calc_chain(read, P, ChainMap, NewFile),
+ RawHP = lists:nth(NewFileNum, RawHPs),
+ [Host, PortStr] = convert_raw_hps([RawHP]),
+ Sock = get_cached_sock(Host, PortStr),
+ case escript_cc_download_chunk2(Sock, NewLine) of
+ Chunk when is_binary(Chunk) ->
+ Chunk;
+ {error, _} = Else ->
+ io:format("TODO: EC chunk get failed:\n\t~s\n", [NewLine]),
+ io:format("Use this info to reconstruct:\n\t~p\n\n", [SummaryBody]),
+ Else
+ end.
+
+get_cached_projection(EC_ProjectionPath) ->
+ case get(cached_projection) of
+ undefined ->
+ P = read_projection_file(EC_ProjectionPath),
+ ChainMap = read_chain_map_file(EC_ProjectionPath),
+ put(cached_projection, {P, ChainMap}),
+ get_cached_projection(EC_ProjectionPath);
+ Stuff ->
+ Stuff
+ end.
+
+file_suffix_rs_10_4_v1(1) -> <<"_k01">>;
+file_suffix_rs_10_4_v1(2) -> <<"_k02">>;
+file_suffix_rs_10_4_v1(3) -> <<"_k03">>;
+file_suffix_rs_10_4_v1(4) -> <<"_k04">>;
+file_suffix_rs_10_4_v1(5) -> <<"_k05">>;
+file_suffix_rs_10_4_v1(6) -> <<"_k06">>;
+file_suffix_rs_10_4_v1(7) -> <<"_k07">>;
+file_suffix_rs_10_4_v1(8) -> <<"_k08">>;
+file_suffix_rs_10_4_v1(9) -> <<"_k09">>;
+file_suffix_rs_10_4_v1(10) -> <<"_k10">>.
+
+escript_delete(Sock, File) ->
+ ok = gen_tcp:send(Sock, [<<"DEL-migration ">>, File, <<"\n">>]),
+ ok = inet:setopts(Sock, [{packet, line}]),
+ case gen_tcp:recv(Sock, 0) of
+ {ok, <<"OK\n">>} ->
+ ok;
+ {ok, <<"ERROR", _/binary>>} ->
+ error
+ end.
+
+escript_compare_servers(Sock1, Sock2, H1, H2, Args) ->
+ FileFilterFun = fun(_) -> true end,
+ escript_compare_servers(Sock1, Sock2, H1, H2, FileFilterFun, Args).
+
+escript_compare_servers(Sock1, Sock2, H1, H2, FileFilterFun, Args) ->
+ All = [H1, H2],
+ put(mydict, dict:new()),
+ Fetch1 = make_fetcher(H1),
+ Fetch2 = make_fetcher(H2),
+
+ Fmt = case Args of
+ [] ->
+ fun(eof) -> ok; (Str) -> io:format(user, Str, []) end;
+ [null] ->
+ fun(_) -> ok end;
+ [OutFile] ->
+ {ok, FH} = file:open(OutFile, [write]),
+ fun(eof) -> file:close(FH);
+ (Str) -> file:write(FH, Str)
+ end
+ end,
+
+ %% TODO: Broken! Fetch1 and Fetch2 aren't created when comments are below
+ Sock1=Sock1,Sock2=Sock2,Fetch1=Fetch1,Fetch2=Fetch2, % shut up compiler
+ %% _X1 = escript_list2(Sock1, Fetch1),
+ %% _X2 = escript_list2(Sock2, Fetch2),
+ FoldRes = lists:sort(dict:to_list(get(mydict))),
+ Fmt("{legend, {file, list_of_servers_without_file}}.\n"),
+ Fmt(io_lib:format("{all, ~p}.\n", [All])),
+ Res = [begin
+ {GotIt, Sizes} = lists:unzip(GotSizes),
+ Size = lists:max(Sizes),
+ Missing = {File, {Size, All -- GotIt}},
+ verb("~p.\n", [Missing]),
+ Missing
+ end || {File, GotSizes} <- FoldRes, FileFilterFun(File)],
+ (catch Fmt(eof)),
+ Res.
+
+make_fetcher(Host) ->
+ fun(eof) ->
+ ok;
+ (<>) ->
+ <> = hexstr_to_bin(SizeHex),
+ FileLen = byte_size(Rest) - 1,
+ <> = Rest,
+ NewDict = dict:append(File, {Host, Size}, get(mydict)),
+ put(mydict, NewDict)
+ end.
+
+checksum(Bin) when is_binary(Bin) ->
+ crypto:hash(md5, Bin).
+
+verb(Fmt) ->
+ verb(Fmt, []).
+
+verb(Fmt, Args) ->
+ case application:get_env(kernel, verbose) of
+ {ok, true} -> io:format(Fmt, Args);
+ _ -> ok
+ end.
+
+info_msg(Fmt, Args) ->
+ case application:get_env(kernel, verbose) of {ok, false} -> ok;
+ _ -> error_logger:info_msg(Fmt, Args)
+ end.
+
+repair(File, Size, [], Mode, V, SrcS, SrcS2, DstS, DstS2, _Src) ->
+ verb("~s: present on both: ", [File]),
+ repair_both_present(File, Size, Mode, V, SrcS, SrcS2, DstS, DstS2);
+repair(File, Size, MissingList, Mode, V, SrcS, SrcS2, DstS, _DstS2, Src) ->
+ case lists:member(Src, MissingList) of
+ true ->
+ verb("~s -> ~p, skipping: not on source server\n", [File, MissingList]);
+ false when Mode == check ->
+ verb("~s -> ~p, copy ~s MB (skipped)\n", [File, MissingList, mbytes(Size)]);
+ false ->
+ verb("~s -> ~p, copy ~s MB ", [File, MissingList, mbytes(Size)]),
+ ok = copy_file(File, SrcS, SrcS2, DstS, V),
+ verb("done\n", [])
+ end.
+
+copy_file(File, SrcS, SrcS2, DstS, Verbose) ->
+ %% Use the *second* source socket to copy each chunk.
+ ProcChecksum = copy_file_proc_checksum_fun(File, SrcS2, DstS, Verbose),
+ %% Use the *first source socket to enumerate the chunks & checksums.
+ exit(todo_broken),
+ machi_flu1_client:checksum_list(SrcS, File, line_by_line, ProcChecksum).
+
+copy_file_proc_checksum_fun(File, SrcS, DstS, _Verbose) ->
+ fun(<>) ->
+ <> = hexstr_to_bin(LenHex),
+ DownloadChunkBin = <>,
+ [Chunk] = escript_download_chunks(SrcS, {{{DownloadChunkBin}}},
+ fun(_) -> ok end),
+ CSum = hexstr_to_bin(CSumHex),
+ CSum2 = checksum(Chunk),
+ if Len == byte_size(Chunk), CSum == CSum2 ->
+ {_,_,_} = upload_chunk_write(DstS, OffsetHex, File, Chunk),
+ ok;
+ true ->
+ io:format("ERROR: ~s ~s ~s csum/size error\n",
+ [File, OffsetHex, LenHex]),
+ error
+ end;
+ (_Else) ->
+ ok
+ end.
+
+repair_both_present(File, Size, Mode, V, SrcS, _SrcS2, DstS, _DstS2) ->
+ Tmp1 = lists:flatten(io_lib:format("/tmp/sort.1.~w.~w.~w", tuple_to_list(now()))),
+ Tmp2 = lists:flatten(io_lib:format("/tmp/sort.2.~w.~w.~w", tuple_to_list(now()))),
+ J_Both = lists:flatten(io_lib:format("/tmp/join.3-both.~w.~w.~w", tuple_to_list(now()))),
+ J_SrcOnly = lists:flatten(io_lib:format("/tmp/join.4-src-only.~w.~w.~w", tuple_to_list(now()))),
+ J_DstOnly = lists:flatten(io_lib:format("/tmp/join.5-dst-only.~w.~w.~w", tuple_to_list(now()))),
+ S_Identical = lists:flatten(io_lib:format("/tmp/join.6-sort-identical.~w.~w.~w", tuple_to_list(now()))),
+ {ok, FH1} = file:open(Tmp1, [write, raw, binary]),
+ {ok, FH2} = file:open(Tmp2, [write, raw, binary]),
+ try
+ K = md5_ctx,
+ MD5_it = fun(Bin) ->
+ {FH, MD5ctx1} = get(K),
+ file:write(FH, Bin),
+ MD5ctx2 = crypto:hash_update(MD5ctx1, Bin),
+ put(K, {FH, MD5ctx2})
+ end,
+ put(K, {FH1, crypto:hash_init(md5)}),
+ exit(todo_broken),
+ ok = machi_flu1_client:checksum_list(SrcS, File, fast, MD5_it),
+ {_, MD5_1} = get(K),
+ SrcMD5 = crypto:hash_final(MD5_1),
+ put(K, {FH2, crypto:hash_init(md5)}),
+ exit(todo_broken),
+ ok = machi_flu1_client:checksum_list(DstS, File, fast, MD5_it),
+ {_, MD5_2} = get(K),
+ DstMD5 = crypto:hash_final(MD5_2),
+ if SrcMD5 == DstMD5 ->
+ verb("identical\n", []);
+ true ->
+ ok = file:close(FH1),
+ ok = file:close(FH2),
+ _Q1 = os:cmd("./REPAIR-SORT-JOIN.sh " ++ Tmp1 ++ " " ++ Tmp2 ++ " " ++ J_Both ++ " " ++ J_SrcOnly ++ " " ++ J_DstOnly ++ " " ++ S_Identical),
+ case file:read_file_info(S_Identical) of
+ {ok, _} ->
+ verb("identical (secondary sort)\n", []);
+ {error, enoent} ->
+ io:format("differences found:"),
+ repair_both(File, Size, V, Mode,
+ J_Both, J_SrcOnly, J_DstOnly,
+ SrcS, DstS)
+ end
+ end
+ after
+ catch file:close(FH1),
+ catch file:close(FH2),
+ [(catch file:delete(FF)) || FF <- [Tmp1,Tmp2,J_Both,J_SrcOnly,J_DstOnly,
+ S_Identical]]
+ end.
+
+repair_both(File, _Size, V, Mode, J_Both, J_SrcOnly, J_DstOnly, SrcS, DstS) ->
+ AccFun = if Mode == check ->
+ fun(_X, List) -> List end;
+ Mode == repair ->
+ fun( X, List) -> [X|List] end
+ end,
+ BothFun = fun(<<_OffsetSrcHex:16/binary, " ",
+ LenSrcHex:8/binary, " ", CSumSrcHex:32/binary, " ",
+ LenDstHex:8/binary, " ", CSumDstHex:32/binary, "\n">> =Line,
+ {SameB, SameC, DiffB, DiffC, Ds}) ->
+ <> = hexstr_to_bin(LenSrcHex),
+ if LenSrcHex == LenDstHex,
+ CSumSrcHex == CSumDstHex ->
+ {SameB + Len, SameC + 1, DiffB, DiffC, Ds};
+ true ->
+ %% D = {OffsetSrcHex, LenSrcHex, ........
+ {SameB, SameC, DiffB + Len, DiffC + 1,
+ AccFun(Line, Ds)}
+ end;
+ (_Else, Acc) ->
+ Acc
+ end,
+ OnlyFun = fun(<<_OffsetSrcHex:16/binary, " ", LenSrcHex:8/binary, " ",
+ _CSumHex:32/binary, "\n">> = Line,
+ {DiffB, DiffC, Ds}) ->
+ <> = hexstr_to_bin(LenSrcHex),
+ {DiffB + Len, DiffC + 1, AccFun(Line, Ds)};
+ (_Else, Acc) ->
+ Acc
+ end,
+ {SameBx, SameCx, DiffBy, DiffCy, BothDiffs} =
+ file_folder(BothFun, {0,0,0,0,[]}, J_Both),
+ {DiffB_src, DiffC_src, Ds_src} = file_folder(OnlyFun, {0,0,[]}, J_SrcOnly),
+ {DiffB_dst, DiffC_dst, Ds_dst} = file_folder(OnlyFun, {0,0,[]}, J_DstOnly),
+ if Mode == check orelse V == true ->
+ io:format("\n\t"),
+ io:format("BothR ~p, ", [{SameBx, SameCx, DiffBy, DiffCy}]),
+ io:format("SrcR ~p, ", [{DiffB_src, DiffC_src}]),
+ io:format("DstR ~p", [{DiffB_dst, DiffC_dst}]),
+ io:format("\n");
+ true -> ok
+ end,
+ if Mode == repair ->
+ ok = repair_both_both(File, V, BothDiffs, SrcS, DstS),
+ ok = repair_copy_chunks(File, V, Ds_src, DiffB_src, DiffC_src,
+ SrcS, DstS),
+ ok = repair_copy_chunks(File, V, Ds_dst, DiffB_dst, DiffC_dst,
+ DstS, SrcS);
+ true ->
+ ok
+ end.
+
+repair_both_both(_File, _V, [_|_], _SrcS, _DstS) ->
+ %% TODO: fetch both, check checksums, hopefully only exactly one
+ %% is correct, then use that one to repair the other. And if the
+ %% sizes are different, hrm, there may be an extra corner case(s)
+ %% hiding there.
+ io:format("WHOA! We have differing checksums or sizes here, TODO not implemented, but there's trouble in the little village on the river....\n"),
+ timer:sleep(3*1000),
+ ok;
+repair_both_both(_File, _V, [], _SrcS, _DstS) ->
+ ok.
+
+repair_copy_chunks(_File, _V, [], _DiffBytes, _DiffCount, _SrcS, _DstS) ->
+ ok;
+repair_copy_chunks(File, V, ToBeCopied, DiffBytes, DiffCount, SrcS, DstS) ->
+ verb("\n", []),
+ verb("Starting copy of ~p chunks/~s MBytes to \n ~s: ",
+ [DiffCount, mbytes(DiffBytes), File]),
+ InnerCopyFun = copy_file_proc_checksum_fun(File, SrcS, DstS, V),
+ FoldFun = fun(Line, ok) ->
+ ok = InnerCopyFun(Line) % Strong sanity check
+ end,
+ ok = lists:foldl(FoldFun, ok, ToBeCopied),
+ verb(" done\n", []),
+ ok.
+
+file_folder(Fun, Acc, Path) ->
+ {ok, FH} = file:open(Path, [read, raw, binary]),
+ try
+ file_folder2(Fun, Acc, FH)
+ after
+ file:close(FH)
+ end.
+
+file_folder2(Fun, Acc, FH) ->
+ file_folder2(file:read_line(FH), Fun, Acc, FH).
+
+file_folder2({ok, Line}, Fun, Acc, FH) ->
+ Acc2 = Fun(Line, Acc),
+ file_folder2(Fun, Acc2, FH);
+file_folder2(eof, _Fun, Acc, _FH) ->
+ Acc.
+
+make_repair_props(["check"|T]) ->
+ [{mode, check}|make_repair_props(T)];
+make_repair_props(["repair"|T]) ->
+ [{mode, repair}|make_repair_props(T)];
+make_repair_props(["verbose"|T]) ->
+ application:set_env(kernel, verbose, true),
+ [{verbose, true}|make_repair_props(T)];
+make_repair_props(["noverbose"|T]) ->
+ [{verbose, false}|make_repair_props(T)];
+make_repair_props(["progress"|T]) ->
+ [{progress, true}|make_repair_props(T)];
+make_repair_props(["delete-source"|T]) ->
+ [{delete_source, true}|make_repair_props(T)];
+make_repair_props(["nodelete-source"|T]) ->
+ [{delete_source, false}|make_repair_props(T)];
+make_repair_props(["nodelete-tmp"|T]) ->
+ [{delete_tmp, false}|make_repair_props(T)];
+make_repair_props([X|T]) ->
+ io:format("Error: skipping unknown option ~p\n", [X]),
+ make_repair_props(T);
+make_repair_props([]) ->
+ %% Proplist defaults
+ [{mode, check}, {delete_source, false}].
+
+mbytes(0) ->
+ "0.0";
+mbytes(Size) ->
+ lists:flatten(io_lib:format("~.1.0f", [max(0.1, Size / (1024*1024))])).
+
+chomp(Line) when is_binary(Line) ->
+ LineLen = byte_size(Line) - 1,
+ <> = Line,
+ ChompLine.
+
+make_outfun(FH) ->
+ fun({error, _} = Error) ->
+ file:write(FH, io_lib:format("Error: ~p\n", [Error]));
+ (eof) ->
+ ok;
+ ({erasure_encoded, Info} = _Erasure) ->
+ file:write(FH, "TODO/WIP: erasure_coded:\n"),
+ file:write(FH, io_lib:format("\t~p\n", [Info]));
+ (Bytes) when is_binary(Bytes) orelse is_list(Bytes) ->
+ file:write(FH, Bytes)
+ end.
+
+open_output_file("console") ->
+ user;
+open_output_file(Path) ->
+ {ok, FH} = file:open(Path, [write]),
+ FH.
+
+print_upload_details(_, {error, _} = Res) ->
+ io:format("Error: ~p\n", [Res]),
+ erlang:halt(1);
+print_upload_details(FH, Res) ->
+ [io:format(FH, "~s ~s ~s\n", [bin_to_hexstr(<>),
+ bin_to_hexstr(<>),
+ File]) ||
+ {Offset, Len, File} <- Res].
+
+%%%%%%%%%%%%%%%%%
+
+read_projection_file("new") ->
+ #projection{epoch=0, last_epoch=0,
+ float_map=undefined, last_float_map=undefined};
+read_projection_file(Path) ->
+ case filelib:is_dir(Path) of
+ true ->
+ read_projection_file_loop(Path ++ "/current.proj");
+ false ->
+ case filelib:is_file(Path) of
+ true ->
+ read_projection_file2(Path);
+ false ->
+ error({bummer, Path})
+ end
+ end.
+
+read_projection_file2(Path) ->
+ {ok, [P]} = file:consult(Path),
+ true = is_record(P, projection),
+ FloatMap = P#projection.float_map,
+ LastFloatMap = if P#projection.last_float_map == undefined ->
+ FloatMap;
+ true ->
+ P#projection.last_float_map
+ end,
+ P#projection{migrating=(FloatMap /= LastFloatMap),
+ tree=machi_chash:make_tree(FloatMap),
+ last_tree=machi_chash:make_tree(LastFloatMap)}.
+
+read_projection_file_loop(Path) ->
+ read_projection_file_loop(Path, 100).
+
+read_projection_file_loop(Path, 0) ->
+ error({bummer, Path});
+read_projection_file_loop(Path, N) ->
+ try
+ read_projection_file2(Path)
+ catch
+ error:{badmatch,{error,enoent}} ->
+ timer:sleep(100),
+ read_projection_file_loop(Path, N-1)
+ end.
+
+write_projection(P, Path) when is_record(P, projection) ->
+ {error, enoent} = file:read_file_info(Path),
+ {ok, FH} = file:open(Path, [write]),
+ WritingP = P#projection{tree=undefined, last_tree=undefined},
+ io:format(FH, "~p.\n", [WritingP]),
+ ok = file:close(FH).
+
+read_weight_map_file(Path) ->
+ {ok, [Map]} = file:consult(Path),
+ true = is_list(Map),
+ true = lists:all(fun({Chain, Weight})
+ when is_binary(Chain),
+ is_integer(Weight), Weight >= 0 ->
+ true;
+ (_) ->
+ false
+ end, Map),
+ Map.
+
+%% Assume the file "chains.map" in whatever dir that stores projections.
+read_chain_map_file(DirPath) ->
+ L = case filelib:is_dir(DirPath) of
+ true ->
+ {ok, Map} = file:consult(DirPath ++ "/chains.map"),
+ Map;
+ false ->
+ Dir = filename:dirname(DirPath),
+ {ok, Map} = file:consult(Dir ++ "/chains.map"),
+ Map
+ end,
+ orddict:from_list(L).
+
+get_float_map(P) when is_record(P, projection) ->
+ P#projection.float_map.
+
+get_last_float_map(P) when is_record(P, projection) ->
+ P#projection.last_float_map.
+
+hash_and_query(Key, P) when is_record(P, projection) ->
+ <> = crypto:hash(sha, Key),
+ Float = Int / ?SHA_MAX,
+ {_, Current} = machi_chash:query_tree(Float, P#projection.tree),
+ if P#projection.migrating ->
+ {_, Last} = machi_chash:query_tree(Float, P#projection.last_tree),
+ if Last == Current ->
+ [Current];
+ true ->
+ [Current, Last, Current]
+ end;
+ true ->
+ [Current]
+ end.
+
+calc_chain(write=Op, ProjectionPathOrDir, PrefixStr) ->
+ P = read_projection_file(ProjectionPathOrDir),
+ ChainMap = read_chain_map_file(ProjectionPathOrDir),
+ calc_chain(Op, P, ChainMap, PrefixStr);
+calc_chain(read=Op, ProjectionPathOrDir, PrefixStr) ->
+ P = read_projection_file(ProjectionPathOrDir),
+ ChainMap = read_chain_map_file(ProjectionPathOrDir),
+ calc_chain(Op, P, ChainMap, PrefixStr).
+
+calc_chain(write=_Op, P, ChainMap, PrefixStr) ->
+ %% Writes are easy: always use the new location.
+ [Chain|_] = hash_and_query(PrefixStr, P),
+ {Chain, orddict:fetch(Chain, ChainMap)};
+calc_chain(read=_Op, P, ChainMap, PrefixStr) ->
+ %% Reads are slightly trickier: reverse each chain so tail is tried first.
+ Chains = hash_and_query(PrefixStr, P),
+ {Chains, lists:flatten([lists:reverse(orddict:fetch(Chain, ChainMap)) ||
+ Chain <- Chains])}.
+
+convert_raw_hps([{HostBin, Port}|T]) ->
+ [binary_to_list(HostBin), integer_to_list(Port)|convert_raw_hps(T)];
+convert_raw_hps([]) ->
+ [].
+
+get_cached_sock(Host, PortStr) ->
+ K = {socket_cache, Host, PortStr},
+ case erlang:get(K) of
+ undefined ->
+ Sock = escript_connect(Host, PortStr),
+ Krev = {socket_cache_rev, Sock},
+ erlang:put(K, Sock),
+ erlang:put(Krev, {Host, PortStr}),
+ Sock;
+ Sock ->
+ Sock
+ end.
+
+invalidate_cached_sock(Sock) ->
+ (catch gen_tcp:close(Sock)),
+ Krev = {socket_cache_rev, Sock},
+ case erlang:get(Krev) of
+ undefined ->
+ ok;
+ {Host, PortStr} ->
+ K = {socket_cache, Host, PortStr},
+ erlang:erase(Krev),
+ erlang:erase(K),
+ ok
+ end.
+
+%%%%%%%%%%%%%%%%%
+
+%%% basho_bench callbacks
+
+-define(SEQ, ?MODULE).
+-define(DEFAULT_HOSTIP_LIST, [{{127,0,0,1}, 7071}]).
+
+-record(bb, {
+ host,
+ port_str,
+ %% sock,
+ proj_check_ticker_started=false,
+ proj_path,
+ proj,
+ chain_map
+ }).
+
+new(1 = Id) ->
+ %% broken: start_append_server(),
+ case basho_bench_config:get(file0_start_listener, no) of
+ no ->
+ ok;
+ {_Port, _DataDir} ->
+ exit(todo_broken)
+ end,
+ timer:sleep(100),
+ new_common(Id);
+new(Id) ->
+ new_common(Id).
+
+new_common(Id) ->
+ random:seed(now()),
+ ProjectionPathOrDir =
+ basho_bench_config:get(file0_projection_path, undefined),
+
+ Servers = basho_bench_config:get(file0_ip_list, ?DEFAULT_HOSTIP_LIST),
+ NumServers = length(Servers),
+ {Host, Port} = lists:nth((Id rem NumServers) + 1, Servers),
+ State0 = #bb{host=Host, port_str=integer_to_list(Port),
+ proj_path=ProjectionPathOrDir},
+ {ok, read_projection_info(State0)}.
+
+run(null, _KeyGen, _ValueGen, State) ->
+ {ok, State};
+run(keygen_valuegen_then_null, KeyGen, ValueGen, State) ->
+ _Prefix = KeyGen(),
+ _Value = ValueGen(),
+ {ok, State};
+run(append_local_server, KeyGen, ValueGen, State) ->
+ Prefix = KeyGen(),
+ Value = ValueGen(),
+ {_, _} = ?SEQ:append(?SEQ, Prefix, Value),
+ {ok, State};
+run(append_remote_server, KeyGen, ValueGen, State) ->
+ Prefix = KeyGen(),
+ Value = ValueGen(),
+ bb_do_write_chunk(Prefix, Value, State#bb.host, State#bb.port_str, State);
+run(cc_append_remote_server, KeyGen, ValueGen, State0) ->
+ State = check_projection_check(State0),
+ Prefix = KeyGen(),
+ Value = ValueGen(),
+ {_Chain, ModHPs} = calc_chain(write, State#bb.proj, State#bb.chain_map,
+ Prefix),
+ FoldFun = fun({Host, PortStr}, Acc) ->
+ case bb_do_write_chunk(Prefix, Value, Host, PortStr,
+ State) of
+ {ok, _} ->
+ Acc + 1;
+ _ ->
+ Acc
+ end
+ end,
+ case lists:foldl(FoldFun, 0, ModHPs) of
+ N when is_integer(N), N > 0 ->
+ {ok, State};
+ 0 ->
+ {error, oh_some_problem_yo, State}
+ end;
+run(read_raw_line_local, KeyGen, _ValueGen, State) ->
+ {RawLine, Size, _File} = setup_read_raw_line(KeyGen),
+ bb_do_read_chunk(RawLine, Size, State#bb.host, State#bb.port_str, State);
+run(cc_read_raw_line_local, KeyGen, _ValueGen, State0) ->
+ State = check_projection_check(State0),
+ {RawLine, Size, File} = setup_read_raw_line(KeyGen),
+ Prefix = re:replace(File, "\\..*", "", [{return, binary}]),
+ {_Chain, ModHPs} = calc_chain(read, State#bb.proj, State#bb.chain_map,
+ Prefix),
+ FoldFun = fun(_, {ok, _}=Acc) ->
+ Acc;
+ ({Host, PortStr}, _Acc) ->
+ bb_do_read_chunk(RawLine, Size, Host, PortStr, State)
+ end,
+ lists:foldl(FoldFun, undefined, ModHPs).
+
+bb_do_read_chunk(RawLine, Size, Host, PortStr, State) ->
+ try
+ Sock = get_cached_sock(Host, PortStr),
+ try
+ ok = gen_tcp:send(Sock, [RawLine, <<"\n">>]),
+ read_chunk(Sock, Size, State)
+ catch X2:Y2 ->
+ invalidate_cached_sock(Sock),
+ {error, {X2,Y2}, State}
+ end
+ catch X:Y ->
+ {error, {X,Y}, State}
+ end.
+
+bb_do_write_chunk(Prefix, Value, Host, PortStr, State) ->
+ try
+ Sock = get_cached_sock(Host, PortStr),
+ try
+ {_, _, _} = upload_chunk_append(Sock, Prefix, Value),
+ {ok, State}
+ catch X2:Y2 ->
+ invalidate_cached_sock(Sock),
+ {error, {X2,Y2}, State}
+ end
+ catch X:Y ->
+ {error, {X,Y}, State}
+ end.
+
+read_chunk(Sock, Size, State) ->
+ {ok, <<"OK\n">>} = gen_tcp:recv(Sock, 3),
+ {ok, _Chunk} = gen_tcp:recv(Sock, Size),
+ {ok, State}.
+
+setup_read_raw_line(KeyGen) ->
+ RawLine = KeyGen(),
+ <<"R ", Rest/binary>> = RawLine,
+ {_Offset, Size, File} = read_hex_size(Rest),
+ {RawLine, Size, File}.
+
+read_hex_size(Line) ->
+ <> = Line,
+ <> = hexstr_to_bin(OffsetHex),
+ <> = hexstr_to_bin(SizeHex),
+ {Offset, Size, File}.
+
+read_projection_info(#bb{proj_path=undefined}=State) ->
+ State;
+read_projection_info(#bb{proj_path=ProjectionPathOrDir}=State) ->
+ Proj = read_projection_file(ProjectionPathOrDir),
+ ChainMap = read_chain_map_file(ProjectionPathOrDir),
+ ModChainMap =
+ [{Chain, [{binary_to_list(Host), integer_to_list(Port)} ||
+ {Host, Port} <- Members]} ||
+ {Chain, Members} <- ChainMap],
+ State#bb{proj=Proj, chain_map=ModChainMap}.
+
+check_projection_check(#bb{proj_check_ticker_started=false} = State) ->
+ timer:send_interval(5*1000 - random:uniform(500), projection_check),
+ check_projection_check(State#bb{proj_check_ticker_started=true});
+check_projection_check(#bb{proj_check_ticker_started=true} = State) ->
+ receive
+ projection_check ->
+ read_projection_info(State)
+ after 0 ->
+ State
+ end.
diff --git a/test/machi_flu1_test.erl b/test/machi_flu1_test.erl
new file mode 100644
index 0000000..d51143c
--- /dev/null
+++ b/test/machi_flu1_test.erl
@@ -0,0 +1,98 @@
+%% -------------------------------------------------------------------
+%%
+%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+-module(machi_flu1_test).
+-compile(export_all).
+
+-ifdef(TEST).
+-include("machi.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+-define(FLU, machi_flu1).
+-define(FLU_C, machi_flu1_client).
+
+flu_smoke_test() ->
+ Host = "localhost",
+ TcpPort = 32957,
+ DataDir = "./data",
+ Prefix = <<"prefix!">>,
+ BadPrefix = BadFile = "no/good",
+ clean_up_data_dir(DataDir),
+
+ {ok, FLU1} = ?FLU:start_link([{smoke_flu, TcpPort, DataDir}]),
+ try
+ {error, no_such_file} = ?FLU_C:checksum_list(Host, TcpPort,
+ "does-not-exist"),
+ {error, bad_arg} = ?FLU_C:checksum_list(Host, TcpPort, BadFile),
+
+ {ok, []} = ?FLU_C:list_files(Host, TcpPort),
+
+ Chunk1 = <<"yo!">>,
+ {ok, {Off1,Len1,File1}} = ?FLU_C:append_chunk(Host, TcpPort,
+ Prefix, Chunk1),
+ {ok, Chunk1} = ?FLU_C:read_chunk(Host, TcpPort, File1, Off1, Len1),
+ {error, bad_arg} = ?FLU_C:append_chunk(Host, TcpPort,
+ BadPrefix, Chunk1),
+
+ Chunk2 = <<"yo yo">>,
+ Len2 = byte_size(Chunk2),
+ Off2 = ?MINIMUM_OFFSET + 77,
+ File2 = "smoke-file",
+ ok = ?FLU_C:write_chunk(Host, TcpPort, File2, Off2, Chunk2),
+ {error, bad_arg} = ?FLU_C:write_chunk(Host, TcpPort,
+ BadFile, Off2, Chunk2),
+ {ok, Chunk2} = ?FLU_C:read_chunk(Host, TcpPort, File2, Off2, Len2),
+ {error, no_such_file} = ?FLU_C:read_chunk(Host, TcpPort,
+ File2, Off2*983, Len2),
+ {error, partial_read} = ?FLU_C:read_chunk(Host, TcpPort,
+ File2, Off2, Len2*984),
+ {error, no_such_file} = ?FLU_C:read_chunk(Host, TcpPort,
+ "no!!", Off2, Len2),
+ {error, bad_arg} = ?FLU_C:read_chunk(Host, TcpPort,
+ BadFile, Off2, Len2),
+
+ %% We know that File1 still exists. Pretend that we've done a
+ %% migration and exercise the delete_migration() API.
+ ok = ?FLU_C:delete_migration(Host, TcpPort, File1),
+ {error, no_such_file} = ?FLU_C:delete_migration(Host, TcpPort, File1),
+ {error, bad_arg} = ?FLU_C:delete_migration(Host, TcpPort, BadFile),
+
+ %% We know that File2 still exists. Pretend that we've done a
+ %% migration and exercise the trunc_hack() API.
+ ok = ?FLU_C:trunc_hack(Host, TcpPort, File2),
+ ok = ?FLU_C:trunc_hack(Host, TcpPort, File2),
+ {error, bad_arg} = ?FLU_C:trunc_hack(Host, TcpPort, BadFile),
+
+ ok = ?FLU_C:quit(machi_util:connect(Host, TcpPort))
+ after
+ ok = ?FLU:stop(FLU1)
+ end.
+
+clean_up_data_dir(DataDir) ->
+ Dir1 = DataDir ++ "/config",
+ Fs1 = filelib:wildcard(Dir1 ++ "/*"),
+ [file:delete(F) || F <- Fs1],
+ _ = file:del_dir(Dir1),
+ Fs2 = filelib:wildcard(DataDir ++ "/*"),
+ [file:delete(F) || F <- Fs2],
+ _ = file:del_dir(DataDir),
+ ok.
+
+-endif. % TEST