Single server client & server code (squashed)

This commit is contained in:
Scott Lystig Fritchie 2015-03-31 16:46:03 +09:00
parent 78f2ff4bbf
commit d243ffca23
18 changed files with 3027 additions and 3 deletions

8
.gitignore vendored
View file

@ -1,9 +1,11 @@
prototype/chain-manager/patch.*
.eunit
deps
*.o
ebin/*.beam
*.plt
erl_crash.dump
rel/example_project
.concrete/DEV_MODE
.rebar
doc/edoc-info
doc/erlang.png
doc/*.html
doc/stylesheet.css

38
Makefile Normal file
View file

@ -0,0 +1,38 @@
REBAR_BIN := $(shell which rebar)
ifeq ($(REBAR_BIN),)
REBAR_BIN = ./rebar
endif
.PHONY: rel deps package pkgclean
all: deps compile
compile:
$(REBAR_BIN) compile
deps:
$(REBAR_BIN) get-deps
clean:
$(REBAR_BIN) -r clean
test: deps compile eunit
eunit:
$(REBAR_BIN) -v skip_deps=true eunit
pulse: compile
env USE_PULSE=1 $(REBAR_BIN) skip_deps=true clean compile
env USE_PULSE=1 $(REBAR_BIN) skip_deps=true -D PULSE eunit
APPS = kernel stdlib sasl erts ssl compiler eunit crypto
PLT = $(HOME)/.machi_dialyzer_plt
build_plt: deps compile
dialyzer --build_plt --output_plt $(PLT) --apps $(APPS) deps/*/ebin
dialyzer: deps compile
dialyzer -Wno_return --plt $(PLT) ebin
clean_plt:
rm $(PLT)

2
ebin/.gitignore vendored Normal file
View file

@ -0,0 +1,2 @@
*.beam
*.app

26
include/machi.hrl Normal file
View file

@ -0,0 +1,26 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-define(MAX_FILE_SIZE, 256*1024*1024). % 256 MBytes
-define(MAX_CHUNK_SIZE, ((1 bsl 32) - 1)).
%% -define(DATA_DIR, "/Volumes/SAM1/seq-tests/data").
-define(DATA_DIR, "./data").
-define(MINIMUM_OFFSET, 1024).

View file

@ -0,0 +1,33 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2007-2014 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-record(projection, {
%% hard state
epoch :: non_neg_integer(),
last_epoch :: non_neg_integer(),
float_map,
last_float_map,
%% soft state
migrating :: boolean(),
tree,
last_tree
}).
-define(SHA_MAX, (1 bsl (20*8))).

BIN
rebar vendored Executable file

Binary file not shown.

7
rebar.config Normal file
View file

@ -0,0 +1,7 @@
%%% {erl_opts, [warnings_as_errors, {parse_transform, lager_transform}, debug_info]}.
{erl_opts, [{parse_transform, lager_transform}, debug_info]}.
{deps, [
{lager, ".*", {git, "git://github.com/basho/lager.git", {tag, "2.0.1"}}}
]}.

47
rebar.config.script Normal file
View file

@ -0,0 +1,47 @@
PulseBuild = case os:getenv("USE_PULSE") of
false ->
false;
_ ->
true
end,
case PulseBuild of
true ->
PulseOpts =
[{pulse_no_side_effect,
[{erlang,display,1}
]},
{pulse_side_effect,
[ {does_not_exist_yet, some_func, '_'}
, {prim_file, '_', '_'}
, {file, '_', '_'}
, {filelib, '_', '_'}
, {os, '_', '_'} ]},
{pulse_replace_module,
[ {gen_server, pulse_gen_server}
, {application, pulse_application}
, {supervisor, pulse_supervisor} ]}
],
PulseCFlags = [{"CFLAGS", "$CFLAGS -DPULSE"}],
UpdConfig = case lists:keysearch(eunit_compile_opts, 1, CONFIG) of
{value, {eunit_compile_opts, Opts}} ->
lists:keyreplace(eunit_compile_opts,
1,
CONFIG,
{eunit_compile_opts, Opts ++ PulseOpts});
_ ->
[{eunit_compile_opts, PulseOpts} | CONFIG]
end,
case lists:keysearch(port_env, 1, UpdConfig) of
{value, {port_env, PortEnv}} ->
lists:keyreplace(port_env,
1,
UpdConfig,
{port_env, PortEnv ++ PulseCFlags});
_ ->
[{port_env, PulseCFlags} | UpdConfig]
end;
false ->
CONFIG
end.

13
src/machi.app.src Normal file
View file

@ -0,0 +1,13 @@
{application, machi, [
{description, "A village of write-once files."},
{vsn, "0.0.0"},
{applications, [kernel, stdlib, sasl, crypto]},
{mod,{machi_app,[]}},
{registered, []},
{env, [
{flu_list,
[
{flu_a, 32900, "./data.flu_a"}
]}
]}
]}.

37
src/machi_app.erl Normal file
View file

@ -0,0 +1,37 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(machi_app).
-behaviour(application).
%% Application callbacks
-export([start/2, stop/1]).
start(_StartType, _StartArgs) ->
case machi_sup:start_link() of
{ok, Pid} ->
{ok, Pid};
Error ->
Error
end.
stop(_State) ->
ok.

459
src/machi_chash.erl Normal file
View file

@ -0,0 +1,459 @@
%%%-------------------------------------------------------------------
%%% Copyright (c) 2007-2011 Gemini Mobile Technologies, Inc. All rights reserved.
%%% Copyright (c) 2013-2015 Basho Technologies, Inc. All rights reserved.
%%%
%%% Licensed under the Apache License, Version 2.0 (the "License");
%%% you may not use this file except in compliance with the License.
%%% You may obtain a copy of the License at
%%%
%%% http://www.apache.org/licenses/LICENSE-2.0
%%%
%%% Unless required by applicable law or agreed to in writing, software
%%% distributed under the License is distributed on an "AS IS" BASIS,
%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%%% See the License for the specific language governing permissions and
%%% limitations under the License.
%%%
%%%-------------------------------------------------------------------
%% Consistent hashing library. Also known as "random slicing".
%% Originally from the Hibari DB source code at https://github.com/hibari
%%
%% TODO items:
%%
%% 1. Refactor to use bigints instead of floating point numbers. The
%% ?SMALLEST_SIGNIFICANT_FLOAT_SIZE macro below doesn't allow as
%% much wiggle-room for making really small hashing range
%% definitions.
-module(machi_chash).
-define(SMALLEST_SIGNIFICANT_FLOAT_SIZE, 0.1e-12).
-define(SHA_MAX, (1 bsl (20*8))).
%% -compile(export_all).
-export([make_float_map/1, make_float_map/2,
sum_map_weights/1,
make_tree/1,
query_tree/2,
hash_binary_via_float_map/2,
hash_binary_via_float_tree/2,
pretty_with_integers/2,
pretty_with_integers/3]).
-export([make_demo_map1/0, make_demo_map2/0]).
-export([zzz_usage_details/0]). % merely to give EDoc a hint of our intent
-type owner_name() :: term().
%% Owner for a range on the unit interval. We are agnostic about its
%% type.
-type weight() :: non_neg_integer().
%% For this library, a weight is an integer which specifies the
%% capacity of a "owner" relative to other owners. For example, if
%% owner A with a weight of 10, and if owner B has a weight of 20,
%% then B will be assigned twice as much of the unit interval as A.
-type float_map() :: [{owner_name(), float()}].
%% A float map subdivides the unit interval, starting at 0.0, to
%% partitions that are assigned to various owners. The sum of all
%% floats must be exactly 1.0 (or close enough for floating point
%% purposes).
-opaque float_tree() :: gb_trees:tree(float(), owner_name()).
%% We can't use gb_trees:tree() because 'nil' (the empty tree) is
%% never valid in our case. But teaching Dialyzer that is difficult.
-type owner_int_range() :: {owner_name(), non_neg_integer(), non_neg_integer()}.
%% Used when "prettying" a float map.
-type owner_weight() :: {owner_name(), weight()}.
-type owner_weight_list() :: [owner_weight()].
%% A owner_weight_list is a definition of brick assignments over the
%% unit interval [0.0, 1.0]. The sum of all floats must be 1.0. For
%% example, [{{br1,nd1}, 0.25}, {{br2,nd1}, 0.5}, {{br3,nd1}, 0.25}].
-export_type([float_map/0, float_tree/0]).
%% @doc Create a float map, based on a basic owner weight list.
-spec make_float_map(owner_weight_list()) -> float_map().
make_float_map(NewOwnerWeights) ->
make_float_map([], NewOwnerWeights).
%% @doc Create a float map, based on an older float map and a new weight
%% list.
%%
%% The weights in the new weight list may be different than (or the
%% same as) whatever weights were used to make the older float map.
-spec make_float_map(float_map(), owner_weight_list()) -> float_map().
make_float_map([], NewOwnerWeights) ->
Sum = add_all_weights(NewOwnerWeights),
DiffMap = [{Ch, Wt/Sum} || {Ch, Wt} <- NewOwnerWeights],
make_float_map2([{unused, 1.0}], DiffMap, NewOwnerWeights);
make_float_map(OldFloatMap, NewOwnerWeights) ->
NewSum = add_all_weights(NewOwnerWeights),
%% Normalize to unit interval
%% NewOwnerWeights2 = [{Ch, Wt / NewSum} || {Ch, Wt} <- NewOwnerWeights],
%% Reconstruct old owner weights (will be normalized to unit interval)
SumOldFloatsDict =
lists:foldl(fun({Ch, Wt}, OrdDict) ->
orddict:update_counter(Ch, Wt, OrdDict)
end, orddict:new(), OldFloatMap),
OldOwnerWeights = orddict:to_list(SumOldFloatsDict),
OldSum = add_all_weights(OldOwnerWeights),
OldChs = [Ch || {Ch, _} <- OldOwnerWeights],
NewChs = [Ch || {Ch, _} <- NewOwnerWeights],
OldChsOnly = OldChs -- NewChs,
%% Mark any space in by a deleted owner as unused.
OldFloatMap2 = lists:map(
fun({Ch, Wt} = ChWt) ->
case lists:member(Ch, OldChsOnly) of
true ->
{unused, Wt};
false ->
ChWt
end
end, OldFloatMap),
%% Create a diff map of changing owners and added owners
DiffMap = lists:map(fun({Ch, NewWt}) ->
case orddict:find(Ch, SumOldFloatsDict) of
{ok, OldWt} ->
{Ch, (NewWt / NewSum) -
(OldWt / OldSum)};
error ->
{Ch, NewWt / NewSum}
end
end, NewOwnerWeights),
make_float_map2(OldFloatMap2, DiffMap, NewOwnerWeights).
make_float_map2(OldFloatMap, DiffMap, _NewOwnerWeights) ->
FloatMap = apply_diffmap(DiffMap, OldFloatMap),
XX = combine_neighbors(collapse_unused_in_float_map(FloatMap)),
XX.
apply_diffmap(DiffMap, FloatMap) ->
SubtractDiff = [{Ch, abs(Diff)} || {Ch, Diff} <- DiffMap, Diff < 0],
AddDiff = [D || {_Ch, Diff} = D <- DiffMap, Diff > 0],
TmpFloatMap = iter_diffmap_subtract(SubtractDiff, FloatMap),
iter_diffmap_add(AddDiff, TmpFloatMap).
add_all_weights(OwnerWeights) ->
lists:foldl(fun({_Ch, Weight}, Sum) -> Sum + Weight end, 0.0, OwnerWeights).
iter_diffmap_subtract([{Ch, Diff}|T], FloatMap) ->
iter_diffmap_subtract(T, apply_diffmap_subtract(Ch, Diff, FloatMap));
iter_diffmap_subtract([], FloatMap) ->
FloatMap.
iter_diffmap_add([{Ch, Diff}|T], FloatMap) ->
iter_diffmap_add(T, apply_diffmap_add(Ch, Diff, FloatMap));
iter_diffmap_add([], FloatMap) ->
FloatMap.
apply_diffmap_subtract(Ch, Diff, [{Ch, Wt}|T]) ->
if Wt == Diff ->
[{unused, Wt}|T];
Wt > Diff ->
[{Ch, Wt - Diff}, {unused, Diff}|T];
Wt < Diff ->
[{unused, Wt}|apply_diffmap_subtract(Ch, Diff - Wt, T)]
end;
apply_diffmap_subtract(Ch, Diff, [H|T]) ->
[H|apply_diffmap_subtract(Ch, Diff, T)];
apply_diffmap_subtract(_Ch, _Diff, []) ->
[].
apply_diffmap_add(Ch, Diff, [{unused, Wt}|T]) ->
if Wt == Diff ->
[{Ch, Wt}|T];
Wt > Diff ->
[{Ch, Diff}, {unused, Wt - Diff}|T];
Wt < Diff ->
[{Ch, Wt}|apply_diffmap_add(Ch, Diff - Wt, T)]
end;
apply_diffmap_add(Ch, Diff, [H|T]) ->
[H|apply_diffmap_add(Ch, Diff, T)];
apply_diffmap_add(_Ch, _Diff, []) ->
[].
combine_neighbors([{Ch, Wt1}, {Ch, Wt2}|T]) ->
combine_neighbors([{Ch, Wt1 + Wt2}|T]);
combine_neighbors([H|T]) ->
[H|combine_neighbors(T)];
combine_neighbors([]) ->
[].
collapse_unused_in_float_map([{Ch, Wt1}, {unused, Wt2}|T]) ->
collapse_unused_in_float_map([{Ch, Wt1 + Wt2}|T]);
collapse_unused_in_float_map([{unused, _}] = L) ->
L; % Degenerate case only
collapse_unused_in_float_map([H|T]) ->
[H|collapse_unused_in_float_map(T)];
collapse_unused_in_float_map([]) ->
[].
chash_float_map_to_nextfloat_list(FloatMap) when length(FloatMap) > 0 ->
%% QuickCheck found a bug ... need to weed out stuff smaller than
%% ?SMALLEST_SIGNIFICANT_FLOAT_SIZE here.
FM1 = [P || {_X, Y} = P <- FloatMap, Y > ?SMALLEST_SIGNIFICANT_FLOAT_SIZE],
{_Sum, NFs0} = lists:foldl(fun({Name, Amount}, {Sum, List}) ->
{Sum+Amount, [{Sum+Amount, Name}|List]}
end, {0, []}, FM1),
lists:reverse(NFs0).
chash_nextfloat_list_to_gb_tree([]) ->
gb_trees:balance(gb_trees:from_orddict([]));
chash_nextfloat_list_to_gb_tree(NextFloatList) ->
{_FloatPos, Name} = lists:last(NextFloatList),
%% QuickCheck found a bug ... it really helps to add a catch-all item
%% at the far "right" of the list ... 42.0 is much greater than 1.0.
NFs = NextFloatList ++ [{42.0, Name}],
gb_trees:balance(gb_trees:from_orddict(orddict:from_list(NFs))).
-spec chash_gb_next(float(), float_tree()) -> {float(), owner_name()}.
chash_gb_next(X, {_, GbTree}) ->
chash_gb_next1(X, GbTree).
chash_gb_next1(X, {Key, Val, Left, _Right}) when X < Key ->
case chash_gb_next1(X, Left) of
nil ->
{Key, Val};
Res ->
Res
end;
chash_gb_next1(X, {Key, _Val, _Left, Right}) when X >= Key ->
chash_gb_next1(X, Right);
chash_gb_next1(_X, nil) ->
nil.
%% @doc Not used directly, but can give a developer an idea of how well
%% chash_float_map_to_nextfloat_list will do for a given value of Max.
%%
%% For example:
%% <verbatim>
%% NewFloatMap = make_float_map([{unused, 1.0}],
%% [{a,100}, {b, 100}, {c, 10}]),
%% ChashMap = chash_scale_to_int_interval(NewFloatMap, 100),
%% io:format("QQQ: int int = ~p\n", [ChashIntInterval]),
%% -> [{a,1,47},{b,48,94},{c,94,100}]
%% </verbatim>
%%
%% Interpretation: out of the 100 slots:
%% <ul>
%% <li> 'a' uses the slots 1-47 </li>
%% <li> 'b' uses the slots 48-94 </li>
%% <li> 'c' uses the slots 95-100 </li>
%% </ul>
chash_scale_to_int_interval(NewFloatMap, Max) ->
chash_scale_to_int_interval(NewFloatMap, 0, Max).
%% @type nextfloat_list() = list({float(), brick()}). A nextfloat_list
%% differs from a float_map in two respects: 1) nextfloat_list contains
%% tuples with the brick name in 2nd position, 2) the float() at each
%% position I_n > I_m, for all n, m such that n > m.
%% For example, a nextfloat_list of the float_map example above,
%% [{0.25, {br1, nd1}}, {0.75, {br2, nd1}}, {1.0, {br3, nd1}].
chash_scale_to_int_interval([{Ch, _Wt}], Cur, Max) ->
[{Ch, Cur, Max}];
chash_scale_to_int_interval([{Ch, Wt}|T], Cur, Max) ->
Int = trunc(Wt * Max),
[{Ch, Cur + 1, Cur + Int}|chash_scale_to_int_interval(T, Cur + Int, Max)].
%%%%%%%%%%%%%
%% @doc Make a pretty/human-friendly version of a float map that describes
%% integer ranges between 1 and `Scale'.
-spec pretty_with_integers(float_map(), integer()) -> [owner_int_range()].
pretty_with_integers(Map, Scale) ->
chash_scale_to_int_interval(Map, Scale).
%% @doc Make a pretty/human-friendly version of a float map (based
%% upon a float map created from `OldWeights' and `NewWeights') that
%% describes integer ranges between 1 and `Scale'.
-spec pretty_with_integers(owner_weight_list(), owner_weight_list(),integer())->
[owner_int_range()].
pretty_with_integers(OldWeights, NewWeights, Scale) ->
chash_scale_to_int_interval(
make_float_map(make_float_map(OldWeights),
NewWeights),
Scale).
%% @doc Create a float tree, which is the rapid lookup data structure
%% for consistent hash queries.
-spec make_tree(float_map()) -> float_tree().
make_tree(Map) ->
chash_nextfloat_list_to_gb_tree(
chash_float_map_to_nextfloat_list(Map)).
%% @doc Low-level function for querying a float tree: the (floating
%% point) point within the unit interval.
-spec query_tree(float(), float_tree()) -> {float(), owner_name()}.
query_tree(Val, Tree) when is_float(Val), 0.0 =< Val, Val =< 1.0 ->
chash_gb_next(Val, Tree).
%% @doc Create a sample float map.
-spec make_demo_map1() -> float_map().
make_demo_map1() ->
{_, Res} = make_demo_map1_i(),
Res.
make_demo_map1_i() ->
Fail1 = {b, 100},
L1 = [{a, 100}, Fail1, {c, 100}],
L2 = L1 ++ [{d, 100}, {e, 100}],
L3 = L2 -- [Fail1],
L4 = L3 ++ [{giant, 300}],
{L4, lists:foldl(fun(New, Old) -> make_float_map(Old, New) end,
make_float_map(L1), [L2, L3, L4])}.
%% @doc Create a sample float map.
-spec make_demo_map2() -> float_map().
make_demo_map2() ->
{L0, _} = make_demo_map1_i(),
L1 = L0 ++ [{h, 100}],
L2 = L1 ++ [{i, 100}],
L3 = L2 ++ [{j, 100}],
lists:foldl(fun(New, Old) -> make_float_map(Old, New) end,
make_demo_map1(), [L1, L2, L3]).
%% @doc Create a human-friendly summary of a float map.
%%
%% The two parts of the summary are: a per-owner total of the unit
%% interval range(s) owned by each owner, and a total sum of all
%% per-owner ranges (which should be 1.0 but is not enforced).
-spec sum_map_weights(float_map()) ->
{{per_owner, float_map()}, {weight_sum, float()}}.
sum_map_weights(Map) ->
L = sum_map_weights(lists:sort(Map), undefined, 0.0) -- [{undefined,0.0}],
WeightSum = lists:sum([Weight || {_, Weight} <- L]),
{{per_owner, L}, {weight_sum, WeightSum}}.
sum_map_weights([{SZ, Weight}|T], SZ, SZ_total) ->
sum_map_weights(T, SZ, SZ_total + Weight);
sum_map_weights([{SZ, Weight}|T], LastSZ, LastSZ_total) ->
[{LastSZ, LastSZ_total}|sum_map_weights(T, SZ, Weight)];
sum_map_weights([], LastSZ, LastSZ_total) ->
[{LastSZ, LastSZ_total}].
%% @doc Query a float map with a binary (inefficient).
-spec hash_binary_via_float_map(binary(), float_map()) ->
{float(), owner_name()}.
hash_binary_via_float_map(Key, Map) ->
Tree = make_tree(Map),
<<Int:(20*8)/unsigned>> = crypto:hash(sha, Key),
Float = Int / ?SHA_MAX,
query_tree(Float, Tree).
%% @doc Query a float tree with a binary.
-spec hash_binary_via_float_tree(binary(), float_tree()) ->
{float(), owner_name()}.
hash_binary_via_float_tree(Key, Tree) ->
<<Int:(20*8)/unsigned>> = crypto:hash(sha, Key),
Float = Int / ?SHA_MAX,
query_tree(Float, Tree).
%%%%% @doc Various usage examples, see source code below this function
%%%%% for full details.
zzz_usage_details() ->
%% %% Make a map. See the code for make_demo_map1() for the order of
%% %% additions & deletions. Here's a brief summary of the 4 steps.
%% %%
%% %% * 'a' through 'e' are weighted @ 100.
%% %% * 'giant' is weighted @ 300.
%% %% * 'b' is removed at step #3.
%% 40> M1 = machi_chash:make_demo_map1().
%% [{a,0.09285714285714286},
%% {giant,0.10714285714285715},
%% {d,0.026190476190476153},
%% {giant,0.10714285714285715},
%% {a,0.04999999999999999},
%% {giant,0.04999999999999999},
%% {d,0.04999999999999999},
%% {giant,0.050000000000000044},
%% {d,0.06666666666666671},
%% {e,0.009523809523809434},
%% {giant,0.05714285714285716},
%% {c,0.14285714285714285},
%% {giant,0.05714285714285716},
%% {e,0.13333333333333341}]
%% %% Map M1 onto the interval of integers 0-10,1000
%% %%
%% %% output = list({SZ_name::term(), Start::integer(), End::integer()})
%% 41> machi_chash:pretty_with_integers(M1, 10*1000).
%% [{a,1,928},
%% {giant,929,1999},
%% {d,2000,2260},
%% {giant,2261,3331},
%% {a,3332,3830},
%% {giant,3831,4329},
%% {d,4330,4828},
%% {giant,4829,5328},
%% {d,5329,5994},
%% {e,5995,6089},
%% {giant,6090,6660},
%% {c,6661,8088},
%% {giant,8089,8659},
%% {e,8659,10000}]
%% %% Sum up all of the weights, make sure it's what we expect:
%% 55> machi_chash:sum_map_weights(M1).
%% {{per_owner,[{a,0.14285714285714285},
%% {c,0.14285714285714285},
%% {d,0.14285714285714285},
%% {e,0.14285714285714285},
%% {giant,0.42857142857142866}]},
%% {weight_sum,1.0}}
%% %% Make a tree, then query it
%% %% (Hash::float(), tree()) -> {NextLargestBoundary::float(), szone()}
%% 58> T1 = machi_chash:make_tree(M1).
%% 59> machi_chash:query_tree(0.2555, T1).
%% {0.3333333333333333,giant}
%% 60> machi_chash:query_tree(0.3555, T1).
%% {0.3833333333333333,a}
%% 61> machi_chash:query_tree(0.4555, T1).
%% {0.4833333333333333,d}
%% %% How about hashing a bunch of strings and see what happens?
%% 74> Key1 = "Hello, world!".
%% "Hello, world!"
%% 75> [{K, element(2, machi_chash:hash_binary_via_float_map(K, M1))} || K <- [lists:sublist(Key1, X) || X <- lists:seq(1, length(Key1))]].
%% [{"H",giant},
%% {"He",giant},
%% {"Hel",giant},
%% {"Hell",e},
%% {"Hello",e},
%% {"Hello,",giant},
%% {"Hello, ",e},
%% {"Hello, w",e},
%% {"Hello, wo",giant},
%% {"Hello, wor",d},
%% {"Hello, worl",giant},
%% {"Hello, world",e},
%% {"Hello, world!",d}]
ok.

464
src/machi_flu1.erl Normal file
View file

@ -0,0 +1,464 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(machi_flu1).
-include_lib("kernel/include/file.hrl").
-include("machi.hrl").
-export([start_link/1, stop/1]).
start_link([{FluName, TcpPort, DataDir}])
when is_atom(FluName), is_integer(TcpPort), is_list(DataDir) ->
{ok, spawn_link(fun() -> main2(FluName, TcpPort, DataDir) end)}.
stop(Pid) ->
case erlang:is_process_alive(Pid) of
true ->
Pid ! forever,
ok;
false ->
error
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%
main2(RegName, TcpPort, DataDir) ->
_Pid1 = start_listen_server(RegName, TcpPort, DataDir),
_Pid2 = start_append_server(RegName, DataDir),
receive forever -> ok end.
start_listen_server(RegName, TcpPort, DataDir) ->
spawn_link(fun() -> run_listen_server(RegName, TcpPort, DataDir) end).
start_append_server(Name, DataDir) ->
spawn_link(fun() -> run_append_server(Name, DataDir) end).
run_listen_server(RegName, TcpPort, DataDir) ->
SockOpts = [{reuseaddr, true},
{mode, binary}, {active, false}, {packet, line}],
{ok, LSock} = gen_tcp:listen(TcpPort, SockOpts),
listen_server_loop(RegName, LSock, DataDir).
run_append_server(Name, DataDir) ->
register(Name, self()),
append_server_loop(DataDir).
listen_server_loop(RegName, LSock, DataDir) ->
{ok, Sock} = gen_tcp:accept(LSock),
spawn(fun() -> net_server_loop(RegName, Sock, DataDir) end),
listen_server_loop(RegName, LSock, DataDir).
append_server_loop(DataDir) ->
receive
{seq_append, From, Prefix, Chunk, CSum} ->
spawn(fun() -> append_server_dispatch(From, Prefix, Chunk, CSum,
DataDir) end),
append_server_loop(DataDir)
end.
net_server_loop(RegName, Sock, DataDir) ->
ok = inet:setopts(Sock, [{packet, line}]),
case gen_tcp:recv(Sock, 0, 60*1000) of
{ok, Line} ->
%% machi_util:verb("Got: ~p\n", [Line]),
PrefixLenLF = byte_size(Line) - 2 - 8 - 1 - 1,
PrefixLenCRLF = byte_size(Line) - 2 - 8 - 1 - 2,
FileLenLF = byte_size(Line) - 2 - 16 - 1 - 8 - 1 - 1,
FileLenCRLF = byte_size(Line) - 2 - 16 - 1 - 8 - 1 - 2,
CSumFileLenLF = byte_size(Line) - 2 - 1,
CSumFileLenCRLF = byte_size(Line) - 2 - 2,
WriteFileLenLF = byte_size(Line) - 7 - 16 - 1 - 8 - 1 - 1,
DelFileLenLF = byte_size(Line) - 14 - 1,
case Line of
%% For normal use
<<"A ", LenHex:8/binary, " ",
Prefix:PrefixLenLF/binary, "\n">> ->
do_net_server_append(RegName, Sock, LenHex, Prefix);
<<"A ", LenHex:8/binary, " ",
Prefix:PrefixLenCRLF/binary, "\r\n">> ->
do_net_server_append(RegName, Sock, LenHex, Prefix);
<<"R ", OffsetHex:16/binary, " ", LenHex:8/binary, " ",
File:FileLenLF/binary, "\n">> ->
do_net_server_read(Sock, OffsetHex, LenHex, File, DataDir);
<<"R ", OffsetHex:16/binary, " ", LenHex:8/binary, " ",
File:FileLenCRLF/binary, "\r\n">> ->
do_net_server_read(Sock, OffsetHex, LenHex, File, DataDir);
<<"L\n">> ->
do_net_server_listing(Sock, DataDir);
<<"L\r\n">> ->
do_net_server_listing(Sock, DataDir);
<<"C ", File:CSumFileLenLF/binary, "\n">> ->
do_net_server_checksum_listing(Sock, File, DataDir);
<<"C ", File:CSumFileLenCRLF/binary, "\n">> ->
do_net_server_checksum_listing(Sock, File, DataDir);
<<"QUIT\n">> ->
catch gen_tcp:close(Sock),
exit(normal);
<<"QUIT\r\n">> ->
catch gen_tcp:close(Sock),
exit(normal);
%% For "internal" replication only.
<<"W-repl ", OffsetHex:16/binary, " ", LenHex:8/binary, " ",
File:WriteFileLenLF/binary, "\n">> ->
do_net_server_write(Sock, OffsetHex, LenHex, File, DataDir);
%% For data migration only.
<<"DEL-migration ", File:DelFileLenLF/binary, "\n">> ->
do_net_server_delete_migration_only(Sock, File, DataDir);
%% For erasure coding hackityhack
<<"TRUNC-hack--- ", File:DelFileLenLF/binary, "\n">> ->
do_net_server_truncate_hackityhack(Sock, File, DataDir);
_ ->
machi_util:verb("Else Got: ~p\n", [Line]),
gen_tcp:send(Sock, "ERROR SYNTAX\n"),
catch gen_tcp:close(Sock),
exit(normal)
end,
net_server_loop(RegName, Sock, DataDir);
_ ->
catch gen_tcp:close(Sock),
exit(normal)
end.
append_server_dispatch(From, Prefix, Chunk, CSum, DataDir) ->
Pid = write_server_get_pid(Prefix, DataDir),
Pid ! {seq_append, From, Prefix, Chunk, CSum},
exit(normal).
do_net_server_append(RegName, Sock, LenHex, Prefix) ->
%% TODO: robustify against other invalid path characters such as NUL
case sanitize_file_string(Prefix) of
ok ->
do_net_server_append2(RegName, Sock, LenHex, Prefix);
_ ->
ok = gen_tcp:send(Sock, <<"ERROR BAD-ARG">>)
end.
sanitize_file_string(Str) ->
case re:run(Str, "/") of
nomatch ->
ok;
_ ->
error
end.
do_net_server_append2(RegName, Sock, LenHex, Prefix) ->
<<Len:32/big>> = machi_util:hexstr_to_bin(LenHex),
ok = inet:setopts(Sock, [{packet, raw}]),
{ok, Chunk} = gen_tcp:recv(Sock, Len, 60*1000),
CSum = machi_util:checksum(Chunk),
try
RegName ! {seq_append, self(), Prefix, Chunk, CSum}
catch error:badarg ->
error_logger:error_msg("Message send to ~p gave badarg, make certain server is running with correct registered name\n", [?MODULE])
end,
receive
{assignment, Offset, File} ->
OffsetHex = machi_util:bin_to_hexstr(<<Offset:64/big>>),
Out = io_lib:format("OK ~s ~s\n", [OffsetHex, File]),
ok = gen_tcp:send(Sock, Out)
after 10*1000 ->
ok = gen_tcp:send(Sock, "TIMEOUT\n")
end.
do_net_server_read(Sock, OffsetHex, LenHex, FileBin, DataDir) ->
DoItFun = fun(FH, Offset, Len) ->
case file:pread(FH, Offset, Len) of
{ok, Bytes} when byte_size(Bytes) == Len ->
gen_tcp:send(Sock, ["OK\n", Bytes]);
{ok, Bytes} ->
machi_util:verb("ok read but wanted ~p got ~p: ~p @ offset ~p\n",
[Len, size(Bytes), FileBin, Offset]),
ok = gen_tcp:send(Sock, "ERROR PARTIAL-READ\n");
eof ->
perhaps_do_net_server_ec_read(Sock, FH);
_Else2 ->
machi_util:verb("Else2 ~p ~p ~P\n",
[Offset, Len, _Else2, 20]),
ok = gen_tcp:send(Sock, "ERROR BAD-READ\n")
end
end,
do_net_server_readwrite_common(Sock, OffsetHex, LenHex, FileBin, DataDir,
[read, binary, raw], DoItFun).
do_net_server_readwrite_common(Sock, OffsetHex, LenHex, FileBin, DataDir,
FileOpts, DoItFun) ->
case sanitize_file_string(FileBin) of
ok ->
do_net_server_readwrite_common2(Sock, OffsetHex, LenHex, FileBin,
DataDir, FileOpts, DoItFun);
_ ->
ok = gen_tcp:send(Sock, <<"ERROR BAD-ARG\n">>)
end.
do_net_server_readwrite_common2(Sock, OffsetHex, LenHex, FileBin, DataDir,
FileOpts, DoItFun) ->
<<Offset:64/big>> = machi_util:hexstr_to_bin(OffsetHex),
<<Len:32/big>> = machi_util:hexstr_to_bin(LenHex),
{_, Path} = machi_util:make_data_filename(DataDir, FileBin),
OptsHasWrite = lists:member(write, FileOpts),
case file:open(Path, FileOpts) of
{ok, FH} ->
try
DoItFun(FH, Offset, Len)
after
file:close(FH)
end;
{error, enoent} when OptsHasWrite ->
ok = filelib:ensure_dir(Path),
do_net_server_readwrite_common(
Sock, OffsetHex, LenHex, FileBin, DataDir,
FileOpts, DoItFun);
_Else ->
%%%%%% keep?? machi_util:verb("Else ~p ~p ~p ~p\n", [Offset, Len, Path, _Else]),
ok = gen_tcp:send(Sock, <<"ERROR BAD-IO\n">>)
end.
do_net_server_write(Sock, OffsetHex, LenHex, FileBin, DataDir) ->
CSumPath = machi_util:make_checksum_filename(DataDir, FileBin),
case file:open(CSumPath, [append, raw, binary, delayed_write]) of
{ok, FHc} ->
do_net_server_write2(Sock, OffsetHex, LenHex, FileBin, DataDir, FHc);
{error, enoent} ->
ok = filelib:ensure_dir(CSumPath),
do_net_server_write(Sock, OffsetHex, LenHex, FileBin, DataDir)
end.
do_net_server_write2(Sock, OffsetHex, LenHex, FileBin, DataDir, FHc) ->
DoItFun = fun(FHd, Offset, Len) ->
ok = inet:setopts(Sock, [{packet, raw}]),
{ok, Chunk} = gen_tcp:recv(Sock, Len),
CSum = machi_util:checksum(Chunk),
case file:pwrite(FHd, Offset, Chunk) of
ok ->
CSumHex = machi_util:bin_to_hexstr(CSum),
CSum_info = [OffsetHex, 32, LenHex, 32, CSumHex, 10],
ok = file:write(FHc, CSum_info),
ok = file:close(FHc),
gen_tcp:send(Sock, <<"OK\n">>);
_Else3 ->
machi_util:verb("Else3 ~p ~p ~p\n",
[Offset, Len, _Else3]),
ok = gen_tcp:send(Sock, "ERROR BAD-PWRITE\n")
end
end,
do_net_server_readwrite_common(Sock, OffsetHex, LenHex, FileBin, DataDir,
[write, read, binary, raw], DoItFun).
perhaps_do_net_server_ec_read(Sock, FH) ->
case file:pread(FH, 0, ?MINIMUM_OFFSET) of
{ok, Bin} when byte_size(Bin) == ?MINIMUM_OFFSET ->
decode_and_reply_net_server_ec_read(Sock, Bin);
{ok, _AnythingElse} ->
ok = gen_tcp:send(Sock, "ERROR PARTIAL-READ2\n");
_AnythingElse ->
ok = gen_tcp:send(Sock, "ERROR BAD-PREAD\n")
end.
decode_and_reply_net_server_ec_read(Sock, <<"a ", Rest/binary>>) ->
decode_and_reply_net_server_ec_read_version_a(Sock, Rest);
decode_and_reply_net_server_ec_read(Sock, <<0:8, _/binary>>) ->
ok = gen_tcp:send(Sock, <<"ERROR NOT-ERASURE\n">>).
decode_and_reply_net_server_ec_read_version_a(Sock, Rest) ->
%% <<BodyLenHex:4/binary, " ", StripeWidthHex:16/binary, " ",
%% OrigFileLenHex:16/binary, " ", _/binary>> = Rest,
HdrLen = 80 - 2 - 4 - 1,
<<BodyLenHex:4/binary, " ", Hdr:HdrLen/binary, Rest2/binary>> = Rest,
<<BodyLen:16/big>> = machi_util:hexstr_to_bin(BodyLenHex),
<<Body:BodyLen/binary, _/binary>> = Rest2,
ok = gen_tcp:send(Sock, ["ERASURE ", BodyLenHex, " ", Hdr, Body]).
do_net_server_listing(Sock, DataDir) ->
Files = filelib:wildcard("*", DataDir) -- ["config"],
Out = ["OK\n",
[begin
{ok, FI} = file:read_file_info(DataDir ++ "/" ++ File),
Size = FI#file_info.size,
SizeBin = <<Size:64/big>>,
[machi_util:bin_to_hexstr(SizeBin), <<" ">>,
list_to_binary(File), <<"\n">>]
end || File <- Files],
".\n"
],
ok = gen_tcp:send(Sock, Out).
do_net_server_checksum_listing(Sock, File, DataDir) ->
case sanitize_file_string(File) of
ok ->
do_net_server_checksum_listing2(Sock, File, DataDir);
_ ->
ok = gen_tcp:send(Sock, <<"ERROR BAD-ARG\n">>)
end.
do_net_server_checksum_listing2(Sock, File, DataDir) ->
CSumPath = machi_util:make_checksum_filename(DataDir, File),
case file:open(CSumPath, [read, raw, binary]) of
{ok, FH} ->
{ok, FI} = file:read_file_info(CSumPath),
Len = FI#file_info.size,
LenHex = list_to_binary(machi_util:bin_to_hexstr(<<Len:64/big>>)),
%% Client has option of line-by-line with "." terminator,
%% or using the offset in the OK message to slurp things
%% down by exact byte size.
ok = gen_tcp:send(Sock, [<<"OK ">>, LenHex, <<"\n">>]),
do_net_copy_bytes(FH, Sock),
ok = file:close(FH),
ok = gen_tcp:send(Sock, ".\n");
{error, enoent} ->
ok = gen_tcp:send(Sock, "ERROR NO-SUCH-FILE\n");
_ ->
ok = gen_tcp:send(Sock, "ERROR\n")
end.
do_net_copy_bytes(FH, Sock) ->
case file:read(FH, 1024*1024) of
{ok, Bin} ->
ok = gen_tcp:send(Sock, Bin),
do_net_copy_bytes(FH, Sock);
eof ->
ok
end.
do_net_server_delete_migration_only(Sock, File, DataDir) ->
case sanitize_file_string(File) of
ok ->
do_net_server_delete_migration_only2(Sock, File, DataDir);
_ ->
ok = gen_tcp:send(Sock, <<"ERROR BAD-ARG\n">>)
end.
do_net_server_delete_migration_only2(Sock, File, DataDir) ->
{_, Path} = machi_util:make_data_filename(DataDir, File),
case file:delete(Path) of
ok ->
ok = gen_tcp:send(Sock, "OK\n");
{error, enoent} ->
ok = gen_tcp:send(Sock, "ERROR NO-SUCH-FILE\n");
_ ->
ok = gen_tcp:send(Sock, "ERROR\n")
end.
do_net_server_truncate_hackityhack(Sock, File, DataDir) ->
case sanitize_file_string(File) of
ok ->
do_net_server_truncate_hackityhack2(Sock, File, DataDir);
_ ->
ok = gen_tcp:send(Sock, <<"ERROR BAD-ARG\n">>)
end.
do_net_server_truncate_hackityhack2(Sock, File, DataDir) ->
{_, Path} = machi_util:make_data_filename(DataDir, File),
case file:open(Path, [read, write, binary, raw]) of
{ok, FH} ->
try
{ok, ?MINIMUM_OFFSET} = file:position(FH, ?MINIMUM_OFFSET),
ok = file:truncate(FH),
ok = gen_tcp:send(Sock, "OK\n")
after
file:close(FH)
end;
{error, enoent} ->
ok = gen_tcp:send(Sock, "ERROR NO-SUCH-FILE\n");
_ ->
ok = gen_tcp:send(Sock, "ERROR\n")
end.
write_server_get_pid(Prefix, DataDir) ->
RegName = machi_util:make_regname(Prefix),
case whereis(RegName) of
undefined ->
start_seq_append_server(Prefix, DataDir),
timer:sleep(1),
write_server_get_pid(Prefix, DataDir);
Pid ->
Pid
end.
start_seq_append_server(Prefix, DataDir) ->
spawn_link(fun() -> run_seq_append_server(Prefix, DataDir) end).
run_seq_append_server(Prefix, DataDir) ->
true = register(machi_util:make_regname(Prefix), self()),
ok = filelib:ensure_dir(DataDir ++ "/unused"),
ok = filelib:ensure_dir(DataDir ++ "/config/unused"),
run_seq_append_server2(Prefix, DataDir).
run_seq_append_server2(Prefix, DataDir) ->
FileNum = machi_util:read_max_filenum(DataDir, Prefix) + 1,
case machi_util:increment_max_filenum(DataDir, Prefix) of
ok ->
machi_util:increment_max_filenum(DataDir, Prefix),
machi_util:info_msg("start: ~p server at file ~w\n",
[Prefix, FileNum]),
seq_append_server_loop(DataDir, Prefix, FileNum);
Else ->
error_logger:error_msg("start: ~p server at file ~w: ~p\n",
[Prefix, FileNum, Else]),
exit(Else)
end.
seq_append_server_loop(DataDir, Prefix, FileNum) ->
SequencerNameHack = lists:flatten(io_lib:format(
"~.36B~.36B",
[element(3,now()),
list_to_integer(os:getpid())])),
{File, FullPath} = machi_util:make_data_filename(
DataDir, Prefix, SequencerNameHack, FileNum),
{ok, FHd} = file:open(FullPath,
[write, binary, raw]),
%% [write, binary, raw, delayed_write]),
CSumPath = machi_util:make_checksum_filename(
DataDir, Prefix, SequencerNameHack, FileNum),
{ok, FHc} = file:open(CSumPath, [append, raw, binary, delayed_write]),
seq_append_server_loop(DataDir, Prefix, File, {FHd,FHc}, FileNum,
?MINIMUM_OFFSET).
seq_append_server_loop(DataDir, Prefix, _File, {FHd,FHc}, FileNum, Offset)
when Offset > ?MAX_FILE_SIZE ->
ok = file:close(FHd),
ok = file:close(FHc),
machi_util:info_msg("rollover: ~p server at file ~w offset ~w\n",
[Prefix, FileNum, Offset]),
run_seq_append_server2(Prefix, DataDir);
seq_append_server_loop(DataDir, Prefix, File, {FHd,FHc}=FH_, FileNum, Offset) ->
receive
{seq_append, From, Prefix, Chunk, CSum} ->
ok = file:pwrite(FHd, Offset, Chunk),
From ! {assignment, Offset, File},
Len = byte_size(Chunk),
OffsetHex = machi_util:bin_to_hexstr(<<Offset:64/big>>),
LenHex = machi_util:bin_to_hexstr(<<Len:32/big>>),
CSumHex = machi_util:bin_to_hexstr(CSum),
CSum_info = [OffsetHex, 32, LenHex, 32, CSumHex, 10],
ok = file:write(FHc, CSum_info),
seq_append_server_loop(DataDir, Prefix, File, FH_,
FileNum, Offset + Len)
after 30*1000 ->
ok = file:close(FHd),
ok = file:close(FHc),
machi_util:info_msg("stop: ~p server at file ~w offset ~w\n",
[Prefix, FileNum, Offset]),
exit(normal)
end.

399
src/machi_flu1_client.erl Normal file
View file

@ -0,0 +1,399 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(machi_flu1_client).
-include("machi.hrl").
-export([
append_chunk/3, append_chunk/4,
read_chunk/4, read_chunk/5,
checksum_list/2, checksum_list/3,
list_files/1, list_files/2,
quit/1
]).
%% For "internal" replication only.
-export([
write_chunk/4, write_chunk/5,
delete_migration/2, delete_migration/3,
trunc_hack/2, trunc_hack/3
]).
-type chunk() :: iolist().
-type chunk_s() :: binary().
-type chunk_pos() :: {file_offset(), chunk_size(), file_name_s()}.
-type chunk_size() :: non_neg_integer().
-type inet_host() :: inet:ip_address() | inet:hostname().
-type inet_port() :: inet:port_number().
-type file_name() :: binary() | list().
-type file_name_s() :: binary(). % server reply
-type file_offset() :: non_neg_integer().
-type file_prefix() :: binary() | list().
%% @doc Append a chunk (binary- or iolist-style) of data to a file
%% with `Prefix'.
-spec append_chunk(port(), file_prefix(), chunk()) ->
{ok, chunk_pos()} | {error, term()}.
append_chunk(Sock, Prefix, Chunk) ->
append_chunk2(Sock, Prefix, Chunk).
%% @doc Append a chunk (binary- or iolist-style) of data to a file
%% with `Prefix'.
-spec append_chunk(inet_host(), inet_port(), file_prefix(), chunk()) ->
{ok, chunk_pos()} | {error, term()}.
append_chunk(Host, TcpPort, Prefix, Chunk) ->
Sock = machi_util:connect(Host, TcpPort),
try
append_chunk2(Sock, Prefix, Chunk)
after
catch gen_tcp:close(Sock)
end.
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
-spec read_chunk(port(), file_name(), file_offset(), chunk_size()) ->
{ok, chunk_s()} | {error, term()}.
read_chunk(Sock, File, Offset, Size) ->
read_chunk2(Sock, File, Offset, Size).
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
-spec read_chunk(inet_host(), inet_port(), file_name(), file_offset(), chunk_size()) ->
{ok, chunk_s()} | {error, term()}.
read_chunk(Host, TcpPort, File, Offset, Size) ->
Sock = machi_util:connect(Host, TcpPort),
try
read_chunk2(Sock, File, Offset, Size)
after
catch gen_tcp:close(Sock)
end.
%% @doc Fetch the list of chunk checksums for `File'.
-spec checksum_list(port(), file_name()) ->
{ok, [file_name()]} | {error, term()}.
checksum_list(Sock, File) when is_port(Sock) ->
checksum_list2(Sock, File).
%% @doc Fetch the list of chunk checksums for `File'.
-spec checksum_list(inet_host(), inet_port(), file_name()) ->
{ok, [file_name()]} | {error, term()}.
checksum_list(Host, TcpPort, File) when is_integer(TcpPort) ->
Sock = machi_util:connect(Host, TcpPort),
try
checksum_list2(Sock, File)
after
catch gen_tcp:close(Sock)
end.
%% @doc Fetch the list of all files on the remote FLU.
-spec list_files(port()) ->
{ok, [file_name()]} | {error, term()}.
list_files(Sock) when is_port(Sock) ->
list2(Sock).
%% @doc Fetch the list of all files on the remote FLU.
-spec list_files(inet_host(), inet_port()) ->
{ok, [file_name()]} | {error, term()}.
list_files(Host, TcpPort) when is_integer(TcpPort) ->
Sock = machi_util:connect(Host, TcpPort),
try
list2(Sock)
after
catch gen_tcp:close(Sock)
end.
%% @doc Quit &amp; close the connection to remote FLU.
-spec quit(port()) ->
ok.
quit(Sock) when is_port(Sock) ->
catch (_ = gen_tcp:send(Sock, <<"QUIT\n">>)),
catch gen_tcp:close(Sock),
ok.
%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% @doc Restricted API: Write a chunk of already-sequenced data to
%% `File' at `Offset'.
-spec write_chunk(port(), file_name(), file_offset(), chunk()) ->
{ok, chunk_s()} | {error, term()}.
write_chunk(Sock, File, Offset, Chunk) ->
write_chunk2(Sock, File, Offset, Chunk).
%% @doc Restricted API: Write a chunk of already-sequenced data to
%% `File' at `Offset'.
-spec write_chunk(inet_host(), inet_port(), file_name(), file_offset(), chunk()) ->
{ok, chunk_s()} | {error, term()}.
write_chunk(Host, TcpPort, File, Offset, Chunk) ->
Sock = machi_util:connect(Host, TcpPort),
try
write_chunk2(Sock, File, Offset, Chunk)
after
catch gen_tcp:close(Sock)
end.
%% @doc Restricted API: Delete a file after it has been successfully
%% migrated.
-spec delete_migration(port(), file_name()) ->
{ok, [file_name()]} | {error, term()}.
delete_migration(Sock, File) when is_port(Sock) ->
delete_migration2(Sock, File).
%% @doc Restricted API: Delete a file after it has been successfully
%% migrated.
-spec delete_migration(inet_host(), inet_port(), file_name()) ->
{ok, [file_name()]} | {error, term()}.
delete_migration(Host, TcpPort, File) when is_integer(TcpPort) ->
Sock = machi_util:connect(Host, TcpPort),
try
delete_migration2(Sock, File)
after
catch gen_tcp:close(Sock)
end.
%% @doc Restricted API: Truncate a file after it has been successfully
%% erasure coded.
-spec trunc_hack(port(), file_name()) ->
{ok, [file_name()]} | {error, term()}.
trunc_hack(Sock, File) when is_port(Sock) ->
trunc_hack2(Sock, File).
%% @doc Restricted API: Truncate a file after it has been successfully
%% erasure coded.
-spec trunc_hack(inet_host(), inet_port(), file_name()) ->
{ok, [file_name()]} | {error, term()}.
trunc_hack(Host, TcpPort, File) when is_integer(TcpPort) ->
Sock = machi_util:connect(Host, TcpPort),
try
trunc_hack2(Sock, File)
after
catch gen_tcp:close(Sock)
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%
append_chunk2(Sock, Prefix0, Chunk0) ->
try
%% TODO: add client-side checksum to the server's protocol
%% _ = crypto:hash(md5, Chunk),
Prefix = machi_util:make_binary(Prefix0),
Chunk = machi_util:make_binary(Chunk0),
Len = iolist_size(Chunk0),
true = (Len =< ?MAX_CHUNK_SIZE),
LenHex = machi_util:int_to_hexbin(Len, 32),
Cmd = <<"A ", LenHex/binary, " ", Prefix/binary, "\n">>,
ok = gen_tcp:send(Sock, [Cmd, Chunk]),
{ok, Line} = gen_tcp:recv(Sock, 0),
PathLen = byte_size(Line) - 3 - 16 - 1 - 1,
case Line of
<<"OK ", OffsetHex:16/binary, " ",
Path:PathLen/binary, _:1/binary>> ->
Offset = machi_util:hexstr_to_int(OffsetHex),
{ok, {Offset, Len, Path}};
<<"ERROR BAD-ARG", _/binary>> ->
{error, bad_arg};
<<"ERROR ", Rest/binary>> ->
{error, Rest}
end
catch
throw:Error ->
Error;
error:{badmatch,_}=BadMatch ->
{error, {badmatch, BadMatch, erlang:get_stacktrace()}}
end.
read_chunk2(Sock, File0, Offset, Size) ->
File = machi_util:make_binary(File0),
PrefixHex = machi_util:int_to_hexbin(Offset, 64),
SizeHex = machi_util:int_to_hexbin(Size, 32),
CmdLF = [$R, 32, PrefixHex, 32, SizeHex, 32, File, 10],
ok = gen_tcp:send(Sock, CmdLF),
case gen_tcp:recv(Sock, 3) of
{ok, <<"OK\n">>} ->
{ok, _Chunk}=Res = gen_tcp:recv(Sock, Size),
Res;
{ok, Else} ->
{ok, OldOpts} = inet:getopts(Sock, [packet]),
ok = inet:setopts(Sock, [{packet, line}]),
{ok, Else2} = gen_tcp:recv(Sock, 0),
ok = inet:setopts(Sock, OldOpts),
case Else of
<<"ERA">> ->
{error, todo_erasure_coded}; %% escript_cc_parse_ec_info(Sock, Line, Else2);
<<"ERR">> ->
case Else2 of
<<"OR BAD-IO\n">> ->
{error, no_such_file};
<<"OR NOT-ERASURE\n">> ->
{error, no_such_file};
<<"OR BAD-ARG\n">> ->
{error, bad_arg};
<<"OR PARTIAL-READ\n">> ->
{error, partial_read};
_ ->
{error, Else2}
end;
_ ->
{error, {whaaa, <<Else/binary, Else2/binary>>}}
end
end.
list2(Sock) ->
try
ok = gen_tcp:send(Sock, <<"L\n">>),
ok = inet:setopts(Sock, [{packet, line}]),
{ok, <<"OK\n">>} = gen_tcp:recv(Sock, 0),
Res = list2(gen_tcp:recv(Sock, 0), Sock),
ok = inet:setopts(Sock, [{packet, raw}]),
{ok, Res}
catch
throw:Error ->
Error;
error:{badmatch,_}=BadMatch ->
{error, {badmatch, BadMatch}}
end.
list2({ok, <<".\n">>}, _Sock) ->
[];
list2({ok, Line}, Sock) ->
[Line|list2(gen_tcp:recv(Sock, 0), Sock)];
list2(Else, _Sock) ->
throw({server_protocol_error, Else}).
checksum_list2(Sock, File) ->
try
ok = gen_tcp:send(Sock, [<<"C ">>, File, <<"\n">>]),
ok = inet:setopts(Sock, [{packet, line}]),
case gen_tcp:recv(Sock, 0) of
{ok, <<"OK ", Rest/binary>> = Line} ->
put(status, ok), % may be unset later
RestLen = byte_size(Rest) - 1,
<<LenHex:RestLen/binary, _:1/binary>> = Rest,
<<Len:64/big>> = machi_util:hexstr_to_bin(LenHex),
ok = inet:setopts(Sock, [{packet, raw}]),
checksum_list_fast(Sock, Len);
{ok, <<"ERROR NO-SUCH-FILE", _/binary>>} ->
{error, no_such_file};
{ok, <<"ERROR BAD-ARG", _/binary>>} ->
{error, bad_arg};
{ok, Else} ->
throw({server_protocol_error, Else})
end
catch
throw:Error ->
Error;
error:{badmatch,_}=BadMatch ->
{error, {badmatch, BadMatch}}
end.
checksum_list_fast(Sock, 0) ->
{ok, <<".\n">> = _Line} = gen_tcp:recv(Sock, 2),
[];
checksum_list_fast(Sock, Remaining) ->
Num = erlang:min(Remaining, 1024*1024),
{ok, Bytes} = gen_tcp:recv(Sock, Num),
[Bytes|checksum_list_fast(Sock, Remaining - byte_size(Bytes))].
write_chunk2(Sock, File0, Offset, Chunk0) ->
try
%% TODO: add client-side checksum to the server's protocol
%% _ = crypto:hash(md5, Chunk),
File = machi_util:make_binary(File0),
true = (Offset >= ?MINIMUM_OFFSET),
OffsetHex = machi_util:int_to_hexbin(Offset, 64),
Chunk = machi_util:make_binary(Chunk0),
Len = iolist_size(Chunk0),
true = (Len =< ?MAX_CHUNK_SIZE),
LenHex = machi_util:int_to_hexbin(Len, 32),
Cmd = <<"W-repl ", OffsetHex/binary, " ",
LenHex/binary, " ", File/binary, "\n">>,
ok = gen_tcp:send(Sock, [Cmd, Chunk]),
{ok, Line} = gen_tcp:recv(Sock, 0),
PathLen = byte_size(Line) - 3 - 16 - 1 - 1,
case Line of
<<"OK\n">> ->
ok;
<<"ERROR BAD-ARG", _/binary>> ->
{error, bad_arg};
<<"ERROR ", _/binary>>=Else ->
{error, {server_said, Else}}
end
catch
throw:Error ->
Error;
error:{badmatch,_}=BadMatch ->
{error, {badmatch, BadMatch, erlang:get_stacktrace()}}
end.
delete_migration2(Sock, File) ->
try
ok = gen_tcp:send(Sock, [<<"DEL-migration ">>, File, <<"\n">>]),
ok = inet:setopts(Sock, [{packet, line}]),
case gen_tcp:recv(Sock, 0) of
{ok, <<"OK\n">>} ->
ok;
{ok, <<"ERROR NO-SUCH-FILE", _/binary>>} ->
{error, no_such_file};
{ok, <<"ERROR BAD-ARG", _/binary>>} ->
{error, bad_arg};
{ok, Else} ->
throw({server_protocol_error, Else})
end
catch
throw:Error ->
Error;
error:{badmatch,_}=BadMatch ->
{error, {badmatch, BadMatch}}
end.
trunc_hack2(Sock, File) ->
try
ok = gen_tcp:send(Sock, [<<"TRUNC-hack--- ">>, File, <<"\n">>]),
ok = inet:setopts(Sock, [{packet, line}]),
case gen_tcp:recv(Sock, 0) of
{ok, <<"OK\n">>} ->
ok;
{ok, <<"ERROR NO-SUCH-FILE", _/binary>>} ->
{error, no_such_file};
{ok, <<"ERROR BAD-ARG", _/binary>>} ->
{error, bad_arg};
{ok, Else} ->
throw({server_protocol_error, Else})
end
catch
throw:Error ->
Error;
error:{badmatch,_}=BadMatch ->
{error, {badmatch, BadMatch}}
end.

51
src/machi_flu_sup.erl Normal file
View file

@ -0,0 +1,51 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(machi_flu_sup).
-behaviour(supervisor).
%% API
-export([start_link/0]).
%% Supervisor callbacks
-export([init/1]).
-define(SERVER, ?MODULE).
start_link() ->
supervisor:start_link({local, ?SERVER}, ?MODULE, []).
init([]) ->
RestartStrategy = one_for_one,
MaxRestarts = 1000,
MaxSecondsBetweenRestarts = 3600,
SupFlags = {RestartStrategy, MaxRestarts, MaxSecondsBetweenRestarts},
Restart = permanent,
Shutdown = 5000,
Type = worker,
{ok, FluList} = application:get_env(machi, flu_list),
FluSpecs = [{FluName, {machi_flu, start_link, [FluArgs]},
Restart, Shutdown, Type, []} ||
{FluName, _Port, _Dir}=FluArgs <- FluList],
{ok, {SupFlags, FluSpecs}}.

191
src/machi_sequencer.erl Normal file
View file

@ -0,0 +1,191 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(machi_sequencer).
-compile(export_all).
-include_lib("kernel/include/file.hrl").
-define(CONFIG_DIR, "./config").
-define(DATA_DIR, "./data").
seq(Server, Prefix, Size) when is_binary(Prefix), is_integer(Size), Size > -1 ->
Server ! {seq, self(), Prefix, Size},
receive
{assignment, File, Offset} ->
{File, Offset}
after 1*1000 ->
bummer
end.
seq_direct(Prefix, Size) when is_binary(Prefix), is_integer(Size), Size > -1 ->
RegName = make_regname(Prefix),
seq(RegName, Prefix, Size).
start_server() ->
start_server(?MODULE).
start_server(Name) ->
spawn_link(fun() -> run_server(Name) end).
run_server(Name) ->
register(Name, self()),
ets:new(?MODULE, [named_table, public, {write_concurrency, true}]),
server_loop().
server_loop() ->
receive
{seq, From, Prefix, Size} ->
spawn(fun() -> server_dispatch(From, Prefix, Size) end),
server_loop()
end.
server_dispatch(From, Prefix, Size) ->
RegName = make_regname(Prefix),
case whereis(RegName) of
undefined ->
start_prefix_server(Prefix),
timer:sleep(1),
server_dispatch(From, Prefix, Size);
Pid ->
Pid ! {seq, From, Prefix, Size}
end,
exit(normal).
start_prefix_server(Prefix) ->
spawn(fun() -> run_prefix_server(Prefix) end).
run_prefix_server(Prefix) ->
true = register(make_regname(Prefix), self()),
ok = filelib:ensure_dir(?CONFIG_DIR ++ "/unused"),
ok = filelib:ensure_dir(?DATA_DIR ++ "/unused"),
FileNum = read_max_filenum(Prefix) + 1,
ok = increment_max_filenum(Prefix),
prefix_server_loop(Prefix, FileNum).
prefix_server_loop(Prefix, FileNum) ->
File = make_data_filename(Prefix, FileNum),
prefix_server_loop(Prefix, File, FileNum, 0).
prefix_server_loop(Prefix, File, FileNum, Offset) ->
receive
{seq, From, Prefix, Size} ->
From ! {assignment, File, Offset},
prefix_server_loop(Prefix, File, FileNum, Offset + Size)
after 30*1000 ->
io:format("timeout: ~p server stopping\n", [Prefix]),
exit(normal)
end.
make_regname(Prefix) ->
erlang:binary_to_atom(Prefix, latin1).
make_config_filename(Prefix) ->
lists:flatten(io_lib:format("~s/~s", [?CONFIG_DIR, Prefix])).
make_data_filename(Prefix, FileNum) ->
erlang:iolist_to_binary(io_lib:format("~s/~s.~w",
[?DATA_DIR, Prefix, FileNum])).
read_max_filenum(Prefix) ->
case file:read_file_info(make_config_filename(Prefix)) of
{error, enoent} ->
0;
{ok, FI} ->
FI#file_info.size
end.
increment_max_filenum(Prefix) ->
{ok, FH} = file:open(make_config_filename(Prefix), [append]),
ok = file:write(FH, "x"),
%% ok = file:sync(FH),
ok = file:close(FH).
%%%%%%%%%%%%%%%%%
%% basho_bench callbacks
-define(SEQ, ?MODULE).
new(1) ->
start_server(),
timer:sleep(100),
{ok, unused};
new(_Id) ->
{ok, unused}.
run(null, _KeyGen, _ValgueGen, State) ->
{ok, State};
run(keygen_then_null, KeyGen, _ValgueGen, State) ->
_Prefix = KeyGen(),
{ok, State};
run(seq, KeyGen, _ValgueGen, State) ->
Prefix = KeyGen(),
{_, _} = ?SEQ:seq(?SEQ, Prefix, 1),
{ok, State};
run(seq_direct, KeyGen, _ValgueGen, State) ->
Prefix = KeyGen(),
Name = ?SEQ:make_regname(Prefix),
case get(Name) of
undefined ->
case whereis(Name) of
undefined ->
{_, _} = ?SEQ:seq(?SEQ, Prefix, 1);
Pid ->
put(Name, Pid),
{_, _} = ?SEQ:seq(Pid, Prefix, 1)
end;
Pid ->
{_, _} = ?SEQ:seq(Pid, Prefix, 1)
end,
{ok, State};
run(seq_ets, KeyGen, _ValgueGen, State) ->
Tab = ?MODULE,
Prefix = KeyGen(),
Res = try
BigNum = ets:update_counter(Tab, Prefix, 1),
BigBin = <<BigNum:80/big>>,
<<FileNum:32/big, Offset:48/big>> = BigBin,
%% if Offset rem 1000 == 0 ->
%% io:format("~p,~p ", [FileNum, Offset]);
%% true ->
%% ok
%% end,
{fakefake, FileNum, Offset}
catch error:badarg ->
FileNum2 = 1, Offset2 = 0,
FileBin = <<FileNum2:32/big>>,
OffsetBin = <<Offset2:48/big>>,
Glop = <<FileBin/binary, OffsetBin/binary>>,
<<Base:80/big>> = Glop,
%% if Prefix == <<"42">> -> io:format("base:~w\n", [Base]); true -> ok end,
%% Base = 0,
case ets:insert_new(Tab, {Prefix, Base}) of
true ->
{<<"fakefakefake">>, Base};
false ->
Result2 = ets:update_counter(Tab, Prefix, 1),
{<<"fakefakefake">>, Result2}
end
end,
Res = Res,
{ok, State}.

55
src/machi_sup.erl Normal file
View file

@ -0,0 +1,55 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(machi_sup).
-behaviour(supervisor).
%% API
-export([start_link/0]).
%% Supervisor callbacks
-export([init/1]).
-define(SERVER, ?MODULE).
start_link() ->
supervisor:start_link({local, ?SERVER}, ?MODULE, []).
init([]) ->
RestartStrategy = one_for_one,
MaxRestarts = 1000,
MaxSecondsBetweenRestarts = 3600,
SupFlags = {RestartStrategy, MaxRestarts, MaxSecondsBetweenRestarts},
Restart = permanent,
Shutdown = 5000,
Type = supervisor,
ServerSup =
{machi_flu_sup, {machi_flu_sup, start_link, []},
Restart, Shutdown, Type, []},
{ok, {SupFlags, [ServerSup]}}.
%% AChild = {'AName', {'AModule', start_link, []},
%% Restart, Shutdown, Type, ['AModule']},
%% {ok, {SupFlags, [AChild]}}.

1102
src/machi_util.erl Normal file

File diff suppressed because it is too large Load diff

98
test/machi_flu1_test.erl Normal file
View file

@ -0,0 +1,98 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(machi_flu1_test).
-compile(export_all).
-ifdef(TEST).
-include("machi.hrl").
-include_lib("eunit/include/eunit.hrl").
-define(FLU, machi_flu1).
-define(FLU_C, machi_flu1_client).
flu_smoke_test() ->
Host = "localhost",
TcpPort = 32957,
DataDir = "./data",
Prefix = <<"prefix!">>,
BadPrefix = BadFile = "no/good",
clean_up_data_dir(DataDir),
{ok, FLU1} = ?FLU:start_link([{smoke_flu, TcpPort, DataDir}]),
try
{error, no_such_file} = ?FLU_C:checksum_list(Host, TcpPort,
"does-not-exist"),
{error, bad_arg} = ?FLU_C:checksum_list(Host, TcpPort, BadFile),
{ok, []} = ?FLU_C:list_files(Host, TcpPort),
Chunk1 = <<"yo!">>,
{ok, {Off1,Len1,File1}} = ?FLU_C:append_chunk(Host, TcpPort,
Prefix, Chunk1),
{ok, Chunk1} = ?FLU_C:read_chunk(Host, TcpPort, File1, Off1, Len1),
{error, bad_arg} = ?FLU_C:append_chunk(Host, TcpPort,
BadPrefix, Chunk1),
Chunk2 = <<"yo yo">>,
Len2 = byte_size(Chunk2),
Off2 = ?MINIMUM_OFFSET + 77,
File2 = "smoke-file",
ok = ?FLU_C:write_chunk(Host, TcpPort, File2, Off2, Chunk2),
{error, bad_arg} = ?FLU_C:write_chunk(Host, TcpPort,
BadFile, Off2, Chunk2),
{ok, Chunk2} = ?FLU_C:read_chunk(Host, TcpPort, File2, Off2, Len2),
{error, no_such_file} = ?FLU_C:read_chunk(Host, TcpPort,
File2, Off2*983, Len2),
{error, partial_read} = ?FLU_C:read_chunk(Host, TcpPort,
File2, Off2, Len2*984),
{error, no_such_file} = ?FLU_C:read_chunk(Host, TcpPort,
"no!!", Off2, Len2),
{error, bad_arg} = ?FLU_C:read_chunk(Host, TcpPort,
BadFile, Off2, Len2),
%% We know that File1 still exists. Pretend that we've done a
%% migration and exercise the delete_migration() API.
ok = ?FLU_C:delete_migration(Host, TcpPort, File1),
{error, no_such_file} = ?FLU_C:delete_migration(Host, TcpPort, File1),
{error, bad_arg} = ?FLU_C:delete_migration(Host, TcpPort, BadFile),
%% We know that File2 still exists. Pretend that we've done a
%% migration and exercise the trunc_hack() API.
ok = ?FLU_C:trunc_hack(Host, TcpPort, File2),
ok = ?FLU_C:trunc_hack(Host, TcpPort, File2),
{error, bad_arg} = ?FLU_C:trunc_hack(Host, TcpPort, BadFile),
ok = ?FLU_C:quit(machi_util:connect(Host, TcpPort))
after
ok = ?FLU:stop(FLU1)
end.
clean_up_data_dir(DataDir) ->
Dir1 = DataDir ++ "/config",
Fs1 = filelib:wildcard(Dir1 ++ "/*"),
[file:delete(F) || F <- Fs1],
_ = file:del_dir(Dir1),
Fs2 = filelib:wildcard(DataDir ++ "/*"),
[file:delete(F) || F <- Fs2],
_ = file:del_dir(DataDir),
ok.
-endif. % TEST